use libc;
use option::{Option, Some, None};
use result::{Ok, Err};
-use rt::rtio::{IoFactory, RtioTTY, with_local_io};
+use rt::rtio::{IoFactory, RtioTTY, with_local_io, RtioPipe};
use super::{Reader, Writer, io_error};
#[fixed_stack_segment] #[inline(never)]
/// Creates a new non-blocking handle to the stdin of the current process.
///
/// See `stdout()` for notes about this function.
+#[fixed_stack_segment] #[inline(never)]
pub fn stdin() -> StdReader {
- do tty(libc::STDIN_FILENO) |tty| { StdReader { inner: tty } }
+ do with_local_io |io| {
+ match io.pipe_open(unsafe { libc::dup(libc::STDIN_FILENO) }) {
+ Ok(stream) => Some(StdReader { inner: stream }),
+ Err(e) => {
+ io_error::cond.raise(e);
+ None
+ }
+ }
+ }.unwrap()
}
/// Creates a new non-blocking handle to the stdout of the current process.
/// Representation of a reader of a standard input stream
pub struct StdReader {
- priv inner: ~RtioTTY
-}
-
-impl StdReader {
- /// Controls whether this output stream is a "raw stream" or simply a normal
- /// stream.
- ///
- /// # Failure
- ///
- /// This function will raise on the `io_error` condition if an error
- /// happens.
- pub fn set_raw(&mut self, raw: bool) {
- match self.inner.set_raw(raw) {
- Ok(()) => {},
- Err(e) => io_error::cond.raise(e),
- }
- }
-
- /// Returns whether this tream is attached to a TTY instance or not.
- ///
- /// This is similar to libc's isatty() function
- pub fn isatty(&self) -> bool { self.inner.isatty() }
+ priv inner: ~RtioPipe
}
impl Reader for StdReader {
fn unix_bind(&mut self, path: &CString) ->
Result<~RtioUnixListener, IoError> {
- let mut pipe = Pipe::new(self.uv_loop(), false);
- match pipe.bind(path) {
- Ok(()) => {
- let handle = get_handle_to_current_scheduler!();
- let pipe = UvUnboundPipe::new(pipe, handle);
- Ok(~UvUnixListener::new(pipe) as ~RtioUnixListener)
- }
- Err(e) => {
- let scheduler: ~Scheduler = Local::take();
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- do pipe.close {
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(
- task_cell.take());
- }
- }
- Err(uv_error_to_io_error(e))
- }
+ let mut pipe = UvUnboundPipe::new(self.uv_loop());
+ match pipe.pipe.bind(path) {
+ Ok(()) => Ok(~UvUnixListener::new(pipe) as ~RtioUnixListener),
+ Err(e) => Err(uv_error_to_io_error(e)),
}
}
fn unix_connect(&mut self, path: &CString) -> Result<~RtioPipe, IoError> {
- let scheduler: ~Scheduler = Local::take();
- let mut pipe = Pipe::new(self.uv_loop(), false);
+ let pipe = UvUnboundPipe::new(self.uv_loop());
+ let mut rawpipe = pipe.pipe;
+
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<~RtioPipe, IoError>> = &result_cell;
+ let pipe_cell = Cell::new(pipe);
+ let pipe_cell_ptr: *Cell<UvUnboundPipe> = &pipe_cell;
+ let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
- do pipe.connect(path) |stream, err| {
+ do rawpipe.connect(path) |_stream, err| {
let res = match err {
None => {
- let handle = stream.native_handle();
- let pipe = NativeHandle::from_native_handle(
- handle as *uvll::uv_pipe_t);
- let home = get_handle_to_current_scheduler!();
- let pipe = UvUnboundPipe::new(pipe, home);
+ let pipe = unsafe { (*pipe_cell_ptr).take() };
Ok(~UvPipeStream::new(pipe) as ~RtioPipe)
}
- Some(e) => { Err(uv_error_to_io_error(e)) }
+ Some(e) => Err(uv_error_to_io_error(e)),
};
unsafe { (*result_cell_ptr).put_back(res); }
let scheduler: ~Scheduler = Local::take();
}
assert!(!result_cell.is_empty());
- let ret = result_cell.take();
- if ret.is_err() {
- let scheduler: ~Scheduler = Local::take();
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- do pipe.close {
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
- }
- }
- return ret;
+ return result_cell.take();
}
fn tty_open(&mut self, fd: c_int, readable: bool)
Err(e) => Err(uv_error_to_io_error(e))
}
}
+
+ fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError> {
+ let mut pipe = UvUnboundPipe::new(self.uv_loop());
+ match pipe.pipe.open(fd) {
+ Ok(()) => Ok(~UvPipeStream::new(pipe) as ~RtioPipe),
+ Err(e) => Err(uv_error_to_io_error(e))
+ }
+ }
}
pub struct UvTcpListener {
}
impl UvUnboundPipe {
- /// Takes ownership of an unbound pipe along with the scheduler that it is
- /// homed on.
- fn new(pipe: Pipe, home: SchedHandle) -> UvUnboundPipe {
- UvUnboundPipe { pipe: pipe, home: home }
- }
-
- /// Creates a fresh new unbound pipe on the specified I/O loop
- pub fn new_fresh(loop_: &Loop) -> UvUnboundPipe {
+ /// Creates a new unbound pipe homed to the current scheduler, placed on the
+ /// specified event loop
+ pub fn new(loop_: &Loop) -> UvUnboundPipe {
UvUnboundPipe {
pipe: Pipe::new(loop_, false),
home: get_handle_to_current_scheduler!(),
let inc = match status {
Some(e) => Err(uv_error_to_io_error(e)),
None => {
- let inc = Pipe::new(&server.event_loop(), false);
- server.accept(inc.as_stream());
- let home = get_handle_to_current_scheduler!();
- let pipe = UvUnboundPipe::new(inc, home);
+ let pipe = UvUnboundPipe::new(&server.event_loop());
+ server.accept(pipe.pipe.as_stream());
Ok(~UvPipeStream::new(pipe) as ~RtioPipe)
}
};