]> git.lizzy.rs Git - rust.git/commitdiff
Move stdin to using libuv's pipes instead of a tty
authorAlex Crichton <alex@alexcrichton.com>
Fri, 18 Oct 2013 21:01:22 +0000 (14:01 -0700)
committerAlex Crichton <alex@alexcrichton.com>
Thu, 24 Oct 2013 21:21:58 +0000 (14:21 -0700)
I was seeing a lot of weird behavior with stdin behaving as a tty, and it
doesn't really quite make sense, so instead this moves to using libuv's pipes
instead (which make more sense for stdin specifically).

This prevents piping input to rustc hanging forever.

src/libstd/rt/io/file.rs
src/libstd/rt/io/net/unix.rs
src/libstd/rt/io/pipe.rs
src/libstd/rt/io/process.rs
src/libstd/rt/io/stdio.rs
src/libstd/rt/rtio.rs
src/libstd/rt/uv/net.rs
src/libstd/rt/uv/process.rs
src/libstd/rt/uv/uvio.rs

index 381fa9f2d073a7c3433cc97100ee423ee181811a..d035e2f457cc2b940f10f377d91b58eb64e6e209 100644 (file)
@@ -19,7 +19,7 @@
 objects such as strings and `Path` instances.
 
 All operations in this module, including those as part of `FileStream` et al
-block the task during execution. Most will raise `std::rt::io::{io_error,io_error}`
+block the task during execution. Most will raise `std::rt::io::io_error`
 conditions in the event of failure.
 
 Also included in this module are the `FileInfo` and `DirectoryInfo` traits. When
index fc7839d545fa9a53a37b0c51d345fd2c9ee370fc..e424956e2ff6323520cbf3533d5b0a132b30e73a 100644 (file)
@@ -37,7 +37,7 @@ pub struct UnixStream {
 
 impl UnixStream {
     fn new(obj: ~RtioPipe) -> UnixStream {
-        UnixStream { obj: PipeStream::new_bound(obj) }
+        UnixStream { obj: PipeStream::new(obj) }
     }
 
     /// Connect to a pipe named by `path`. This will attempt to open a
index eba58b97c4df453d002d9f57b56eac2e038fcbe7..979a1dfc65e33c6c4b33dfb774930edbf76da216 100644 (file)
@@ -23,7 +23,7 @@ pub struct PipeStream {
 }
 
 impl PipeStream {
-    pub fn new_bound(inner: ~RtioPipe) -> PipeStream {
+    pub fn new(inner: ~RtioPipe) -> PipeStream {
         PipeStream { obj: inner }
     }
 }
@@ -42,7 +42,7 @@ fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
         }
     }
 
-    fn eof(&mut self) -> bool { fail!() }
+    fn eof(&mut self) -> bool { false }
 }
 
 impl Writer for PipeStream {
@@ -55,5 +55,5 @@ fn write(&mut self, buf: &[u8]) {
         }
     }
 
-    fn flush(&mut self) { fail!() }
+    fn flush(&mut self) {}
 }
index c45429ca2e6cd21e1d51b8d4e299127c922a55da..a5750211b492c0046f2567ef7116239595d29c95 100644 (file)
@@ -89,7 +89,7 @@ pub fn new(config: ProcessConfig) -> Option<Process> {
                 Ok((p, io)) => Some(Process{
                     handle: p,
                     io: io.move_iter().map(|p|
-                        p.map(|p| io::PipeStream::new_bound(p))
+                        p.map(|p| io::PipeStream::new(p))
                     ).collect()
                 }),
                 Err(ioerr) => {
index 294df9a6442a180627ab1e06e06259acfed95b3d..e601ece88bb5f437bb9080fed730698ac44dabd6 100644 (file)
@@ -30,7 +30,7 @@
 use libc;
 use option::{Option, Some, None};
 use result::{Ok, Err};
-use rt::rtio::{IoFactory, RtioTTY, with_local_io};
+use rt::rtio::{IoFactory, RtioTTY, with_local_io, RtioPipe};
 use super::{Reader, Writer, io_error};
 
 #[fixed_stack_segment] #[inline(never)]
@@ -52,8 +52,17 @@ fn tty<T>(fd: libc::c_int, f: &fn(~RtioTTY) -> T) -> T {
 /// Creates a new non-blocking handle to the stdin of the current process.
 ///
 /// See `stdout()` for notes about this function.
+#[fixed_stack_segment] #[inline(never)]
 pub fn stdin() -> StdReader {
-    do tty(libc::STDIN_FILENO) |tty| { StdReader { inner: tty } }
+    do with_local_io |io| {
+        match io.pipe_open(unsafe { libc::dup(libc::STDIN_FILENO) }) {
+            Ok(stream) => Some(StdReader { inner: stream }),
+            Err(e) => {
+                io_error::cond.raise(e);
+                None
+            }
+        }
+    }.unwrap()
 }
 
 /// Creates a new non-blocking handle to the stdout of the current process.
@@ -108,28 +117,7 @@ pub fn println_args(fmt: &fmt::Arguments) {
 
 /// Representation of a reader of a standard input stream
 pub struct StdReader {
-    priv inner: ~RtioTTY
-}
-
-impl StdReader {
-    /// Controls whether this output stream is a "raw stream" or simply a normal
-    /// stream.
-    ///
-    /// # Failure
-    ///
-    /// This function will raise on the `io_error` condition if an error
-    /// happens.
-    pub fn set_raw(&mut self, raw: bool) {
-        match self.inner.set_raw(raw) {
-            Ok(()) => {},
-            Err(e) => io_error::cond.raise(e),
-        }
-    }
-
-    /// Returns whether this tream is attached to a TTY instance or not.
-    ///
-    /// This is similar to libc's isatty() function
-    pub fn isatty(&self) -> bool { self.inner.isatty() }
+    priv inner: ~RtioPipe
 }
 
 impl Reader for StdReader {
index 4a4ce4edcc2d630a093b2c6ad981fca8ae823626..45c720a89b34c89eb06c17b126336a1ab9df33b7 100644 (file)
@@ -94,6 +94,7 @@ fn fs_readdir(&mut self, path: &CString, flags: c_int) ->
     fn spawn(&mut self, config: ProcessConfig)
             -> Result<(~RtioProcess, ~[Option<~RtioPipe>]), IoError>;
 
+    fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError>;
     fn unix_bind(&mut self, path: &CString) ->
         Result<~RtioUnixListener, IoError>;
     fn unix_connect(&mut self, path: &CString) -> Result<~RtioPipe, IoError>;
index 3ab0655071acb7f225f52e94294783f6b1a988fc..22d7c9c61b3a9ae0f3789b2e8ffc68e4bc8f1af7 100644 (file)
@@ -159,7 +159,7 @@ pub fn read_stop(&mut self) {
         // but read_stop may be called from inside one of them and we
         // would end up freeing the in-use environment
         let handle = self.native_handle();
-        unsafe { uvll::read_stop(handle); }
+        unsafe { assert_eq!(uvll::read_stop(handle), 0); }
     }
 
     pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
index c3417109645d47e57a4887c6d06a2cc890da7327..a039bbc9fc0296f2ecf72ef6e4fa0d097accdbca 100644 (file)
@@ -146,7 +146,7 @@ unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,
             if writable {
                 flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int;
             }
-            let pipe = UvUnboundPipe::new_fresh(loop_);
+            let pipe = UvUnboundPipe::new(loop_);
             let handle = pipe.pipe.as_stream().native_handle();
             uvll::set_stdio_container_flags(dst, flags);
             uvll::set_stdio_container_stream(dst, handle);
index 915c7d0da528b27f9af893a3642038231762b125..0d22aa51be5be8b2635b9739e51f82295daca620 100644 (file)
@@ -805,47 +805,32 @@ fn spawn(&mut self, config: ProcessConfig)
 
     fn unix_bind(&mut self, path: &CString) ->
         Result<~RtioUnixListener, IoError> {
-        let mut pipe = Pipe::new(self.uv_loop(), false);
-        match pipe.bind(path) {
-            Ok(()) => {
-                let handle = get_handle_to_current_scheduler!();
-                let pipe = UvUnboundPipe::new(pipe, handle);
-                Ok(~UvUnixListener::new(pipe) as ~RtioUnixListener)
-            }
-            Err(e) => {
-                let scheduler: ~Scheduler = Local::take();
-                do scheduler.deschedule_running_task_and_then |_, task| {
-                    let task_cell = Cell::new(task);
-                    do pipe.close {
-                        let scheduler: ~Scheduler = Local::take();
-                        scheduler.resume_blocked_task_immediately(
-                            task_cell.take());
-                    }
-                }
-                Err(uv_error_to_io_error(e))
-            }
+        let mut pipe = UvUnboundPipe::new(self.uv_loop());
+        match pipe.pipe.bind(path) {
+            Ok(()) => Ok(~UvUnixListener::new(pipe) as ~RtioUnixListener),
+            Err(e) => Err(uv_error_to_io_error(e)),
         }
     }
 
     fn unix_connect(&mut self, path: &CString) -> Result<~RtioPipe, IoError> {
-        let scheduler: ~Scheduler = Local::take();
-        let mut pipe = Pipe::new(self.uv_loop(), false);
+        let pipe = UvUnboundPipe::new(self.uv_loop());
+        let mut rawpipe = pipe.pipe;
+
         let result_cell = Cell::new_empty();
         let result_cell_ptr: *Cell<Result<~RtioPipe, IoError>> = &result_cell;
+        let pipe_cell = Cell::new(pipe);
+        let pipe_cell_ptr: *Cell<UvUnboundPipe> = &pipe_cell;
 
+        let scheduler: ~Scheduler = Local::take();
         do scheduler.deschedule_running_task_and_then |_, task| {
             let task_cell = Cell::new(task);
-            do pipe.connect(path) |stream, err| {
+            do rawpipe.connect(path) |_stream, err| {
                 let res = match err {
                     None => {
-                        let handle = stream.native_handle();
-                        let pipe = NativeHandle::from_native_handle(
-                                        handle as *uvll::uv_pipe_t);
-                        let home = get_handle_to_current_scheduler!();
-                        let pipe = UvUnboundPipe::new(pipe, home);
+                        let pipe = unsafe { (*pipe_cell_ptr).take() };
                         Ok(~UvPipeStream::new(pipe) as ~RtioPipe)
                     }
-                    Some(e) => { Err(uv_error_to_io_error(e)) }
+                    Some(e) => Err(uv_error_to_io_error(e)),
                 };
                 unsafe { (*result_cell_ptr).put_back(res); }
                 let scheduler: ~Scheduler = Local::take();
@@ -854,18 +839,7 @@ fn unix_connect(&mut self, path: &CString) -> Result<~RtioPipe, IoError> {
         }
 
         assert!(!result_cell.is_empty());
-        let ret = result_cell.take();
-        if ret.is_err() {
-            let scheduler: ~Scheduler = Local::take();
-            do scheduler.deschedule_running_task_and_then |_, task| {
-                let task_cell = Cell::new(task);
-                do pipe.close {
-                    let scheduler: ~Scheduler = Local::take();
-                    scheduler.resume_blocked_task_immediately(task_cell.take());
-                }
-            }
-        }
-        return ret;
+        return result_cell.take();
     }
 
     fn tty_open(&mut self, fd: c_int, readable: bool)
@@ -879,6 +853,14 @@ fn tty_open(&mut self, fd: c_int, readable: bool)
             Err(e) => Err(uv_error_to_io_error(e))
         }
     }
+
+    fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError> {
+        let mut pipe = UvUnboundPipe::new(self.uv_loop());
+        match pipe.pipe.open(fd) {
+            Ok(()) => Ok(~UvPipeStream::new(pipe) as ~RtioPipe),
+            Err(e) => Err(uv_error_to_io_error(e))
+        }
+    }
 }
 
 pub struct UvTcpListener {
@@ -1075,14 +1057,9 @@ pub struct UvUnboundPipe {
 }
 
 impl UvUnboundPipe {
-    /// Takes ownership of an unbound pipe along with the scheduler that it is
-    /// homed on.
-    fn new(pipe: Pipe, home: SchedHandle) -> UvUnboundPipe {
-        UvUnboundPipe { pipe: pipe, home: home }
-    }
-
-    /// Creates a fresh new unbound pipe on the specified I/O loop
-    pub fn new_fresh(loop_: &Loop) -> UvUnboundPipe {
+    /// Creates a new unbound pipe homed to the current scheduler, placed on the
+    /// specified event loop
+    pub fn new(loop_: &Loop) -> UvUnboundPipe {
         UvUnboundPipe {
             pipe: Pipe::new(loop_, false),
             home: get_handle_to_current_scheduler!(),
@@ -1727,10 +1704,8 @@ fn listen(~self) -> Result<~RtioUnixAcceptor, IoError> {
                     let inc = match status {
                         Some(e) => Err(uv_error_to_io_error(e)),
                         None => {
-                            let inc = Pipe::new(&server.event_loop(), false);
-                            server.accept(inc.as_stream());
-                            let home = get_handle_to_current_scheduler!();
-                            let pipe = UvUnboundPipe::new(inc, home);
+                            let pipe = UvUnboundPipe::new(&server.event_loop());
+                            server.accept(pipe.pipe.as_stream());
                             Ok(~UvPipeStream::new(pipe) as ~RtioPipe)
                         }
                     };