]> git.lizzy.rs Git - rust.git/commitdiff
std: Improve pipe() functionality
authorAlex Crichton <alex@alexcrichton.com>
Mon, 9 Jun 2014 20:23:49 +0000 (13:23 -0700)
committerAlex Crichton <alex@alexcrichton.com>
Mon, 16 Jun 2014 17:53:49 +0000 (10:53 -0700)
* os::pipe() now returns IoResult<os::Pipe>
* os::pipe() is now unsafe because it does not arrange for deallocation of file
  descriptors
* os::Pipe fields are renamed from input to reader and out to write.
* PipeStream::pair() has been added. This is a safe method to get a pair of
  pipes.
* Dealing with pipes in native process bindings have been improved to be more
  robust in the face of failure and intermittent errors. This converts a few
  fail!() situations to Err situations.

Closes #9458
cc #13538
Closes #14724
[breaking-change]

src/liblibc/lib.rs
src/libnative/io/file_unix.rs
src/libnative/io/helper_thread.rs
src/libnative/io/process.rs
src/libstd/io/pipe.rs
src/libstd/os.rs
src/test/run-pass/sigpipe-should-be-ignored.rs

index 9ed0d50a03e8aae811dc9384a4d8ed9d906353a8..8f245f1d5b4c47b4aec8fb3cf7b1469a074eeffa 100644 (file)
 #[cfg(unix)] pub use consts::os::posix88::{ENOTCONN, ECONNABORTED, EADDRNOTAVAIL, EINTR};
 #[cfg(unix)] pub use consts::os::posix88::{EADDRINUSE, ENOENT, EISDIR, EAGAIN, EWOULDBLOCK};
 #[cfg(unix)] pub use consts::os::posix88::{ECANCELED, SIGINT, EINPROGRESS};
-#[cfg(unix)] pub use consts::os::posix88::{ENOSYS, ENOTTY, ETIMEDOUT};
+#[cfg(unix)] pub use consts::os::posix88::{ENOSYS, ENOTTY, ETIMEDOUT, EMFILE};
 #[cfg(unix)] pub use consts::os::posix88::{SIGTERM, SIGKILL, SIGPIPE, PROT_NONE};
 #[cfg(unix)] pub use consts::os::posix01::{SIG_IGN};
 #[cfg(unix)] pub use consts::os::bsd44::{AF_UNIX};
 #[cfg(windows)] pub use consts::os::c95::{WSAECONNREFUSED, WSAECONNRESET, WSAEACCES};
 #[cfg(windows)] pub use consts::os::c95::{WSAEWOULDBLOCK, WSAENOTCONN, WSAECONNABORTED};
 #[cfg(windows)] pub use consts::os::c95::{WSAEADDRNOTAVAIL, WSAEADDRINUSE, WSAEINTR};
-#[cfg(windows)] pub use consts::os::c95::{WSAEINPROGRESS, WSAEINVAL};
+#[cfg(windows)] pub use consts::os::c95::{WSAEINPROGRESS, WSAEINVAL, WSAEMFILE};
 #[cfg(windows)] pub use consts::os::extra::{ERROR_INSUFFICIENT_BUFFER};
 #[cfg(windows)] pub use consts::os::extra::{O_BINARY, O_NOINHERIT, PAGE_NOACCESS};
 #[cfg(windows)] pub use consts::os::extra::{PAGE_READONLY, PAGE_READWRITE, PAGE_EXECUTE};
index 6472356ea164171b2453bccddf5ad98edc14dab4..93938e3d5b860925fbb6b4e69e07ccd21e0d15c6 100644 (file)
@@ -525,9 +525,9 @@ mod tests {
     fn test_file_desc() {
         // Run this test with some pipes so we don't have to mess around with
         // opening or closing files.
-        let os::Pipe { input, out } = os::pipe();
-        let mut reader = FileDesc::new(input, true);
-        let mut writer = FileDesc::new(out, true);
+        let os::Pipe { reader, writer } = unsafe { os::pipe().unwrap() };
+        let mut reader = FileDesc::new(reader, true);
+        let mut writer = FileDesc::new(writer, true);
 
         writer.inner_write(bytes!("test")).ok().unwrap();
         let mut buf = [0u8, ..4];
index 443c82c6a547c53918a3ebd586783ce604268805..d18e92866bf7a1054def771fafd83df9cfd0933c 100644 (file)
@@ -158,8 +158,8 @@ mod imp {
     pub type signal = libc::c_int;
 
     pub fn new() -> (signal, signal) {
-        let pipe = os::pipe();
-        (pipe.input, pipe.out)
+        let os::Pipe { reader, writer } = unsafe { os::pipe().unwrap() };
+        (reader, writer)
     }
 
     pub fn signal(fd: libc::c_int) {
index f26d87ba1b52ff9f8853c7e54ef62b622768f5bc..c421dada205fc7e9f4886d195df2eda291cba6cc 100644 (file)
 
 use libc::{pid_t, c_void, c_int};
 use libc;
+use std::c_str::CString;
+use std::io;
 use std::mem;
 use std::os;
 use std::ptr;
-use std::rt::rtio;
 use std::rt::rtio::{ProcessConfig, IoResult, IoError};
-use std::c_str::CString;
+use std::rt::rtio;
 
 use super::file;
 use super::util;
@@ -73,47 +74,43 @@ pub fn spawn(cfg: ProcessConfig)
 
         fn get_io(io: rtio::StdioContainer,
                   ret: &mut Vec<Option<file::FileDesc>>)
-            -> (Option<os::Pipe>, c_int)
+            -> IoResult<Option<file::FileDesc>>
         {
             match io {
-                rtio::Ignored => { ret.push(None); (None, -1) }
-                rtio::InheritFd(fd) => { ret.push(None); (None, fd) }
+                rtio::Ignored => { ret.push(None); Ok(None) }
+                rtio::InheritFd(fd) => {
+                    ret.push(None);
+                    Ok(Some(file::FileDesc::new(fd, true)))
+                }
                 rtio::CreatePipe(readable, _writable) => {
-                    let pipe = os::pipe();
+                    let (reader, writer) = try!(pipe());
                     let (theirs, ours) = if readable {
-                        (pipe.input, pipe.out)
+                        (reader, writer)
                     } else {
-                        (pipe.out, pipe.input)
+                        (writer, reader)
                     };
-                    ret.push(Some(file::FileDesc::new(ours, true)));
-                    (Some(pipe), theirs)
+                    ret.push(Some(ours));
+                    Ok(Some(theirs))
                 }
             }
         }
 
         let mut ret_io = Vec::new();
-        let (in_pipe, in_fd) = get_io(cfg.stdin, &mut ret_io);
-        let (out_pipe, out_fd) = get_io(cfg.stdout, &mut ret_io);
-        let (err_pipe, err_fd) = get_io(cfg.stderr, &mut ret_io);
-
-        let res = spawn_process_os(cfg, in_fd, out_fd, err_fd);
-
-        unsafe {
-            for pipe in in_pipe.iter() { let _ = libc::close(pipe.input); }
-            for pipe in out_pipe.iter() { let _ = libc::close(pipe.out); }
-            for pipe in err_pipe.iter() { let _ = libc::close(pipe.out); }
-        }
+        let res = spawn_process_os(cfg,
+                                   try!(get_io(cfg.stdin, &mut ret_io)),
+                                   try!(get_io(cfg.stdout, &mut ret_io)),
+                                   try!(get_io(cfg.stderr, &mut ret_io)));
 
         match res {
             Ok(res) => {
-                Ok((Process {
-                        pid: res.pid,
-                        handle: res.handle,
-                        exit_code: None,
-                        exit_signal: None,
-                        deadline: 0,
-                    },
-                    ret_io))
+                let p = Process {
+                    pid: res.pid,
+                    handle: res.handle,
+                    exit_code: None,
+                    exit_signal: None,
+                    deadline: 0,
+                };
+                Ok((p, ret_io))
             }
             Err(e) => Err(e)
         }
@@ -194,6 +191,37 @@ fn drop(&mut self) {
     }
 }
 
+fn pipe() -> IoResult<(file::FileDesc, file::FileDesc)> {
+    #[cfg(unix)] use ERROR = libc::EMFILE;
+    #[cfg(windows)] use ERROR = libc::WSAEMFILE;
+    struct Closer { fd: libc::c_int }
+
+    let os::Pipe { reader, writer } = match unsafe { os::pipe() } {
+        Ok(p) => p,
+        Err(io::IoError { detail, .. }) => return Err(IoError {
+            code: ERROR as uint,
+            extra: 0,
+            detail: detail,
+        })
+    };
+    let mut reader = Closer { fd: reader };
+    let mut writer = Closer { fd: writer };
+
+    let native_reader = file::FileDesc::new(reader.fd, true);
+    reader.fd = -1;
+    let native_writer = file::FileDesc::new(writer.fd, true);
+    writer.fd = -1;
+    return Ok((native_reader, native_writer));
+
+    impl Drop for Closer {
+        fn drop(&mut self) {
+            if self.fd != -1 {
+                let _ = unsafe { libc::close(self.fd) };
+            }
+        }
+    }
+}
+
 #[cfg(windows)]
 unsafe fn killpid(pid: pid_t, signal: int) -> IoResult<()> {
     let handle = libc::OpenProcess(libc::PROCESS_TERMINATE |
@@ -246,7 +274,9 @@ struct SpawnProcessResult {
 
 #[cfg(windows)]
 fn spawn_process_os(cfg: ProcessConfig,
-                    in_fd: c_int, out_fd: c_int, err_fd: c_int)
+                    in_fd: Option<file::FileDesc>,
+                    out_fd: Option<file::FileDesc>,
+                    err_fd: Option<file::FileDesc>)
                  -> IoResult<SpawnProcessResult> {
     use libc::types::os::arch::extra::{DWORD, HANDLE, STARTUPINFO};
     use libc::consts::os::extra::{
@@ -283,47 +313,51 @@ fn spawn_process_os(cfg: ProcessConfig,
         // Similarly to unix, we don't actually leave holes for the stdio file
         // descriptors, but rather open up /dev/null equivalents. These
         // equivalents are drawn from libuv's windows process spawning.
-        let set_fd = |fd: c_int, slot: &mut HANDLE, is_stdin: bool| {
-            if fd == -1 {
-                let access = if is_stdin {
-                    libc::FILE_GENERIC_READ
-                } else {
-                    libc::FILE_GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES
-                };
-                let size = mem::size_of::<libc::SECURITY_ATTRIBUTES>();
-                let mut sa = libc::SECURITY_ATTRIBUTES {
-                    nLength: size as libc::DWORD,
-                    lpSecurityDescriptor: ptr::mut_null(),
-                    bInheritHandle: 1,
-                };
-                let filename = "NUL".to_utf16().append_one(0);
-                *slot = libc::CreateFileW(filename.as_ptr(),
-                                          access,
-                                          libc::FILE_SHARE_READ |
-                                              libc::FILE_SHARE_WRITE,
-                                          &mut sa,
-                                          libc::OPEN_EXISTING,
-                                          0,
-                                          ptr::mut_null());
-                if *slot == INVALID_HANDLE_VALUE as libc::HANDLE {
-                    return Err(super::last_error())
-                }
-            } else {
-                let orig = get_osfhandle(fd) as HANDLE;
-                if orig == INVALID_HANDLE_VALUE as HANDLE {
-                    return Err(super::last_error())
+        let set_fd = |fd: &Option<file::FileDesc>, slot: &mut HANDLE,
+                      is_stdin: bool| {
+            match *fd {
+                None => {
+                    let access = if is_stdin {
+                        libc::FILE_GENERIC_READ
+                    } else {
+                        libc::FILE_GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES
+                    };
+                    let size = mem::size_of::<libc::SECURITY_ATTRIBUTES>();
+                    let mut sa = libc::SECURITY_ATTRIBUTES {
+                        nLength: size as libc::DWORD,
+                        lpSecurityDescriptor: ptr::mut_null(),
+                        bInheritHandle: 1,
+                    };
+                    let filename = "NUL".to_utf16().append_one(0);
+                    *slot = libc::CreateFileW(filename.as_ptr(),
+                                              access,
+                                              libc::FILE_SHARE_READ |
+                                                  libc::FILE_SHARE_WRITE,
+                                              &mut sa,
+                                              libc::OPEN_EXISTING,
+                                              0,
+                                              ptr::mut_null());
+                    if *slot == INVALID_HANDLE_VALUE as libc::HANDLE {
+                        return Err(super::last_error())
+                    }
                 }
-                if DuplicateHandle(cur_proc, orig, cur_proc, slot,
-                                   0, TRUE, DUPLICATE_SAME_ACCESS) == FALSE {
-                    return Err(super::last_error())
+                Some(ref fd) => {
+                    let orig = get_osfhandle(fd.fd()) as HANDLE;
+                    if orig == INVALID_HANDLE_VALUE as HANDLE {
+                        return Err(super::last_error())
+                    }
+                    if DuplicateHandle(cur_proc, orig, cur_proc, slot,
+                                       0, TRUE, DUPLICATE_SAME_ACCESS) == FALSE {
+                        return Err(super::last_error())
+                    }
                 }
             }
             Ok(())
         };
 
-        try!(set_fd(in_fd, &mut si.hStdInput, true));
-        try!(set_fd(out_fd, &mut si.hStdOutput, false));
-        try!(set_fd(err_fd, &mut si.hStdError, false));
+        try!(set_fd(&in_fd, &mut si.hStdInput, true));
+        try!(set_fd(&out_fd, &mut si.hStdOutput, false));
+        try!(set_fd(&err_fd, &mut si.hStdError, false));
 
         let cmd_str = make_command_line(cfg.program, cfg.args);
         let mut pi = zeroed_process_information();
@@ -464,7 +498,10 @@ fn backslash_run_ends_in_quote(s: &Vec<char>, mut i: uint) -> bool {
 }
 
 #[cfg(unix)]
-fn spawn_process_os(cfg: ProcessConfig, in_fd: c_int, out_fd: c_int, err_fd: c_int)
+fn spawn_process_os(cfg: ProcessConfig,
+                    in_fd: Option<file::FileDesc>,
+                    out_fd: Option<file::FileDesc>,
+                    err_fd: Option<file::FileDesc>)
                 -> IoResult<SpawnProcessResult>
 {
     use libc::funcs::posix88::unistd::{fork, dup2, close, chdir, execvp};
@@ -498,9 +535,7 @@ unsafe fn set_cloexec(fd: c_int) {
 
     with_envp(cfg.env, proc(envp) {
         with_argv(cfg.program, cfg.args, proc(argv) unsafe {
-            let pipe = os::pipe();
-            let mut input = file::FileDesc::new(pipe.input, true);
-            let mut output = file::FileDesc::new(pipe.out, true);
+            let (mut input, mut output) = try!(pipe());
 
             // We may use this in the child, so perform allocations before the
             // fork
@@ -510,7 +545,7 @@ unsafe fn set_cloexec(fd: c_int) {
 
             let pid = fork();
             if pid < 0 {
-                fail!("failure in fork: {}", os::last_os_error());
+                return Err(super::last_error())
             } else if pid > 0 {
                 drop(output);
                 let mut bytes = [0, ..4];
@@ -586,16 +621,24 @@ fn fail(output: &mut file::FileDesc) -> ! {
             // up /dev/null into that file descriptor. Otherwise, the first file
             // descriptor opened up in the child would be numbered as one of the
             // stdio file descriptors, which is likely to wreak havoc.
-            let setup = |src: c_int, dst: c_int| {
-                let src = if src == -1 {
-                    let flags = if dst == libc::STDIN_FILENO {
-                        libc::O_RDONLY
-                    } else {
-                        libc::O_RDWR
-                    };
-                    devnull.with_ref(|p| libc::open(p, flags, 0))
-                } else {
-                    src
+            let setup = |src: Option<file::FileDesc>, dst: c_int| {
+                let src = match src {
+                    None => {
+                        let flags = if dst == libc::STDIN_FILENO {
+                            libc::O_RDONLY
+                        } else {
+                            libc::O_RDWR
+                        };
+                        devnull.with_ref(|p| libc::open(p, flags, 0))
+                    }
+                    Some(obj) => {
+                        let fd = obj.fd();
+                        // Leak the memory and the file descriptor. We're in the
+                        // child now an all our resources are going to be
+                        // cleaned up very soon
+                        mem::forget(obj);
+                        fd
+                    }
                 };
                 src != -1 && retry(|| dup2(src, dst)) != -1
             };
index 6e2009545aad51f490e3ac5dcea14c883017e19b..84d388c113630ecba3187a4609c5e8785ae5d6f7 100644 (file)
 #![allow(missing_doc)]
 
 use prelude::*;
+
 use io::{IoResult, IoError};
 use libc;
+use os;
 use owned::Box;
 use rt::rtio::{RtioPipe, LocalIo};
 
@@ -27,6 +29,11 @@ pub struct PipeStream {
     obj: Box<RtioPipe + Send>,
 }
 
+pub struct PipePair {
+    pub reader: PipeStream,
+    pub writer: PipeStream,
+}
+
 impl PipeStream {
     /// Consumes a file descriptor to return a pipe stream that will have
     /// synchronous, but non-blocking reads/writes. This is useful if the file
@@ -58,6 +65,38 @@ pub fn open(fd: libc::c_int) -> IoResult<PipeStream> {
     pub fn new(inner: Box<RtioPipe + Send>) -> PipeStream {
         PipeStream { obj: inner }
     }
+
+    /// Creates a pair of in-memory OS pipes for a unidirectional communication
+    /// stream.
+    ///
+    /// The structure returned contains a reader and writer I/O object. Data
+    /// written to the writer can be read from the reader.
+    ///
+    /// # Errors
+    ///
+    /// This function can fail to succeed if the underlying OS has run out of
+    /// available resources to allocate a new pipe.
+    pub fn pair() -> IoResult<PipePair> {
+        struct Closer { fd: libc::c_int }
+
+        let os::Pipe { reader, writer } = try!(unsafe { os::pipe() });
+        let mut reader = Closer { fd: reader };
+        let mut writer = Closer { fd: writer };
+
+        let io_reader = try!(PipeStream::open(reader.fd));
+        reader.fd = -1;
+        let io_writer = try!(PipeStream::open(writer.fd));
+        writer.fd = -1;
+        return Ok(PipePair { reader: io_reader, writer: io_writer });
+
+        impl Drop for Closer {
+            fn drop(&mut self) {
+                if self.fd != -1 {
+                    let _ = unsafe { libc::close(self.fd) };
+                }
+            }
+        }
+    }
 }
 
 impl Clone for PipeStream {
@@ -84,9 +123,9 @@ mod test {
         use os;
         use io::pipe::PipeStream;
 
-        let os::Pipe { input, out } = os::pipe();
-        let out = PipeStream::open(out);
-        let mut input = PipeStream::open(input);
+        let os::Pipe { reader, writer } = unsafe { os::pipe().unwrap() };
+        let out = PipeStream::open(writer);
+        let mut input = PipeStream::open(reader);
         let (tx, rx) = channel();
         spawn(proc() {
             let mut out = out;
index f6b1c04dd34c604b5beec2a99e601f5caadf62c3..0747e7ccbe3fd98c7e4ba7466ddb981525934cc7 100644 (file)
@@ -32,6 +32,7 @@
 use clone::Clone;
 use collections::Collection;
 use fmt;
+use io::{IoResult, IoError};
 use iter::Iterator;
 use libc::{c_void, c_int};
 use libc;
@@ -513,40 +514,50 @@ pub fn split_paths<T: BytesContainer>(unparsed: T) -> Vec<Path> {
 pub struct Pipe {
     /// A file descriptor representing the reading end of the pipe. Data written
     /// on the `out` file descriptor can be read from this file descriptor.
-    pub input: c_int,
+    pub reader: c_int,
     /// A file descriptor representing the write end of the pipe. Data written
     /// to this file descriptor can be read from the `input` file descriptor.
-    pub out: c_int,
+    pub writer: c_int,
 }
 
-/// Creates a new low-level OS in-memory pipe represented as a Pipe struct.
-#[cfg(unix)]
-pub fn pipe() -> Pipe {
-    unsafe {
-        let mut fds = Pipe {input: 0,
-                            out: 0};
-        assert_eq!(libc::pipe(&mut fds.input), 0);
-        return Pipe {input: fds.input, out: fds.out};
+/// Creates a new low-level OS in-memory pipe.
+///
+/// This function can fail to succeed if there are no more resources available
+/// to allocate a pipe.
+///
+/// This function is also unsafe as there is no destructor associated with the
+/// `Pipe` structure will return. If it is not arranged for the returned file
+/// descriptors to be closed, the file descriptors will leak. For safe handling
+/// of this scenario, use `std::io::PipeStream` instead.
+pub unsafe fn pipe() -> IoResult<Pipe> {
+    return _pipe();
+
+    #[cfg(unix)]
+    unsafe fn _pipe() -> IoResult<Pipe> {
+        let mut fds = [0, ..2];
+        match libc::pipe(fds.as_mut_ptr()) {
+            0 => Ok(Pipe { reader: fds[0], writer: fds[1] }),
+            _ => Err(IoError::last_error()),
+        }
     }
-}
 
-/// Creates a new low-level OS in-memory pipe represented as a Pipe struct.
-#[cfg(windows)]
-pub fn pipe() -> Pipe {
-    unsafe {
+    #[cfg(windows)]
+    unsafe fn _pipe() -> IoResult<Pipe> {
         // Windows pipes work subtly differently than unix pipes, and their
         // inheritance has to be handled in a different way that I do not
         // fully understand. Here we explicitly make the pipe non-inheritable,
         // which means to pass it to a subprocess they need to be duplicated
         // first, as in std::run.
-        let mut fds = Pipe {input: 0,
-                    out: 0};
-        let res = libc::pipe(&mut fds.input, 1024 as ::libc::c_uint,
-                             (libc::O_BINARY | libc::O_NOINHERIT) as c_int);
-        assert_eq!(res, 0);
-        assert!((fds.input != -1 && fds.input != 0 ));
-        assert!((fds.out != -1 && fds.input != 0));
-        return Pipe {input: fds.input, out: fds.out};
+        let mut fds = [0, ..2];
+        match libc::pipe(fds.as_mut_ptr(), 1024 as ::libc::c_uint,
+                         (libc::O_BINARY | libc::O_NOINHERIT) as c_int) {
+            0 => {
+                assert!(fds[0] != -1 && fds[0] != 0);
+                assert!(fds[1] != -1 && fds[1] != 0);
+                Ok(Pipe { reader: fds[0], writer: fds[1] })
+            }
+            _ => Err(IoError::last_error()),
+        }
     }
 }
 
index 8e2cfa30066495e00b7241d4022bb0b4ae7da155..8c68ef173a5e76fc216b02bc88b043ce798d6370 100644 (file)
 use std::io::Command;
 
 fn test() {
-    let os::Pipe { input, out } = os::pipe();
-    let input = PipeStream::open(input);
-    let mut out = PipeStream::open(out);
-    drop(input);
+    let os::Pipe { reader, writer } = unsafe { os::pipe().unwrap() };
+    let reader = PipeStream::open(reader);
+    let mut writer = PipeStream::open(writer);
+    drop(reader);
 
-    let _ = out.write([1]);
+    let _ = writer.write([1]);
 }
 
 fn main() {