]> git.lizzy.rs Git - rust.git/commitdiff
io: Implement process wait timeouts
authorAlex Crichton <alex@alexcrichton.com>
Mon, 5 May 2014 23:58:42 +0000 (16:58 -0700)
committerAlex Crichton <alex@alexcrichton.com>
Wed, 14 May 2014 00:27:42 +0000 (17:27 -0700)
This implements set_timeout() for std::io::Process which will affect wait()
operations on the process. This follows the same pattern as the rest of the
timeouts emerging in std::io::net.

The implementation was super easy for everything except libnative on unix
(backwards from usual!), which required a good bit of signal handling. There's a
doc comment explaining the strategy in libnative. Internally, this also required
refactoring the "helper thread" implementation used by libnative to allow for an
extra helper thread (not just the timer).

This is a breaking change in terms of the io::Process API. It is now possible
for wait() to fail, and subsequently wait_with_output(). These two functions now
return IoResult<T> due to the fact that they can time out.

Additionally, the wait_with_output() function has moved from taking `&mut self`
to taking `self`. If a timeout occurs while waiting with output, the semantics
are undesirable in almost all cases if attempting to re-wait on the process.
Equivalent functionality can still be achieved by dealing with the output
handles manually.

[breaking-change]

cc #13523

23 files changed:
src/compiletest/procsrv.rs
src/compiletest/runtest.rs
src/libcore/str.rs
src/liblibc/lib.rs
src/libnative/io/c_unix.rs
src/libnative/io/helper_thread.rs [new file with mode: 0644]
src/libnative/io/mod.rs
src/libnative/io/process.rs
src/libnative/io/timer_helper.rs [deleted file]
src/libnative/io/timer_unix.rs
src/libnative/io/timer_win32.rs
src/libnative/lib.rs
src/librustc/back/archive.rs
src/librustuv/process.rs
src/libstd/io/process.rs
src/libstd/rt/rtio.rs
src/test/run-pass/backtrace.rs
src/test/run-pass/core-run-destroy.rs
src/test/run-pass/issue-13304.rs
src/test/run-pass/logging-separate-lines.rs
src/test/run-pass/process-detach.rs
src/test/run-pass/process-spawn-with-unicode-params.rs
src/test/run-pass/sigpipe-should-be-ignored.rs

index 7d3aa33aae8533e3f1e8f011a85569bbb63b2430..d3642a939db049a30c9c42a9ea561fe30b33a79c 100644 (file)
@@ -68,7 +68,7 @@ pub fn run(lib_path: &str,
            input: Option<~str>) -> Option<Result> {
 
     let env = env.clone().append(target_env(lib_path, prog).as_slice());
-    let mut opt_process = Process::configure(ProcessConfig {
+    let opt_process = Process::configure(ProcessConfig {
         program: prog,
         args: args,
         env: Some(env.as_slice()),
@@ -76,11 +76,12 @@ pub fn run(lib_path: &str,
     });
 
     match opt_process {
-        Ok(ref mut process) => {
+        Ok(mut process) => {
             for input in input.iter() {
                 process.stdin.get_mut_ref().write(input.as_bytes()).unwrap();
             }
-            let ProcessOutput { status, output, error } = process.wait_with_output();
+            let ProcessOutput { status, output, error } =
+                process.wait_with_output().unwrap();
 
             Some(Result {
                 status: status,
index 29f7a771e86fb84246c39fca78fac7b5a4d03398..d18f1370a9f4769f1e8026d8a42bb29cc4b26ebb 100644 (file)
@@ -502,7 +502,7 @@ fn run_lldb(config: &Config, test_executable: &Path, debugger_script: &Path) ->
         let args = &[lldb_batchmode_script, test_executable_str, debugger_script_str];
         let env = &[("PYTHONPATH".to_owned(), config.lldb_python_dir.clone().unwrap())];
 
-        let mut opt_process = Process::configure(ProcessConfig {
+        let opt_process = Process::configure(ProcessConfig {
             program: "python",
             args: args,
             env: Some(env),
@@ -510,8 +510,9 @@ fn run_lldb(config: &Config, test_executable: &Path, debugger_script: &Path) ->
         });
 
         let (status, out, err) = match opt_process {
-            Ok(ref mut process) => {
-                let ProcessOutput { status, output, error } = process.wait_with_output();
+            Ok(process) => {
+                let ProcessOutput { status, output, error } =
+                    process.wait_with_output().unwrap();
 
                 (status,
                  str::from_utf8(output.as_slice()).unwrap().to_owned(),
index e677c4880b58db0d5d12226ab6368845341f12e0..1481759297868a8be3095241f6c34783f75277ae 100644 (file)
@@ -1725,6 +1725,7 @@ fn trim_right_chars<C: CharEq>(&self, mut to_trim: C) -> &'a str {
     #[inline]
     fn is_char_boundary(&self, index: uint) -> bool {
         if index == self.len() { return true; }
+        if index > self.len() { return false; }
         let b = self[index];
         return b < 128u8 || b >= 192u8;
     }
index a35ebb06437d92533d6d54815f9c3ba5fe4523b9..1edd99c1d7dcc98f4fc1b0934bd35b177368ae4c 100644 (file)
 #[cfg(unix)] pub use consts::os::posix88::{EADDRINUSE, ENOENT, EISDIR, EAGAIN, EWOULDBLOCK};
 #[cfg(unix)] pub use consts::os::posix88::{ECANCELED, SIGINT, EINPROGRESS};
 #[cfg(unix)] pub use consts::os::posix88::{SIGTERM, SIGKILL, SIGPIPE, PROT_NONE};
-#[cfg(unix)] pub use consts::os::posix01::{SIG_IGN, WNOHANG};
+#[cfg(unix)] pub use consts::os::posix01::{SIG_IGN};
 #[cfg(unix)] pub use consts::os::bsd44::{AF_UNIX};
 
 #[cfg(unix)] pub use types::os::common::posix01::{pthread_t, timespec, timezone};
@@ -2473,8 +2473,6 @@ pub mod posix01 {
 
             pub static CLOCK_REALTIME: c_int = 0;
             pub static CLOCK_MONOTONIC: c_int = 1;
-
-            pub static WNOHANG: c_int = 1;
         }
         pub mod posix08 {
         }
@@ -2924,8 +2922,6 @@ pub mod posix01 {
 
             pub static CLOCK_REALTIME: c_int = 0;
             pub static CLOCK_MONOTONIC: c_int = 4;
-
-            pub static WNOHANG: c_int = 1;
         }
         pub mod posix08 {
         }
@@ -3313,8 +3309,6 @@ pub mod posix01 {
             pub static PTHREAD_CREATE_JOINABLE: c_int = 1;
             pub static PTHREAD_CREATE_DETACHED: c_int = 2;
             pub static PTHREAD_STACK_MIN: size_t = 8192;
-
-            pub static WNOHANG: c_int = 1;
         }
         pub mod posix08 {
         }
@@ -3980,16 +3974,6 @@ pub fn signal(signum: c_int,
             }
         }
 
-        pub mod wait {
-            use types::os::arch::c95::{c_int};
-            use types::os::arch::posix88::{pid_t};
-
-            extern {
-                pub fn waitpid(pid: pid_t, status: *mut c_int, options: c_int)
-                               -> pid_t;
-            }
-        }
-
         pub mod glob {
             use types::os::arch::c95::{c_char, c_int};
             use types::os::common::posix01::{glob_t};
index abb22476e5240f4dd0da44aa2e5664fb1acf3b63..767090a10cda222e4584f9880f9973e29dddc8fc 100644 (file)
 
 //! C definitions used by libnative that don't belong in liblibc
 
+#![allow(dead_code)]
+
 pub use self::select::fd_set;
+pub use self::signal::{sigaction, siginfo, sigset_t};
+pub use self::signal::{SA_ONSTACK, SA_RESTART, SA_RESETHAND, SA_NOCLDSTOP};
+pub use self::signal::{SA_NODEFER, SA_NOCLDWAIT, SA_SIGINFO, SIGCHLD};
 
 use libc;
 
@@ -34,6 +39,8 @@
 #[cfg(target_os = "android")]
 pub static MSG_DONTWAIT: libc::c_int = 0x40;
 
+pub static WNOHANG: libc::c_int = 1;
+
 extern {
     pub fn gettimeofday(timeval: *mut libc::timeval,
                         tzp: *libc::c_void) -> libc::c_int;
@@ -49,6 +56,17 @@ pub fn getsockopt(sockfd: libc::c_int,
                       optlen: *mut libc::socklen_t) -> libc::c_int;
     pub fn ioctl(fd: libc::c_int, req: libc::c_ulong, ...) -> libc::c_int;
 
+
+    pub fn waitpid(pid: libc::pid_t, status: *mut libc::c_int,
+                   options: libc::c_int) -> libc::pid_t;
+
+    pub fn sigaction(signum: libc::c_int,
+                     act: *sigaction,
+                     oldact: *mut sigaction) -> libc::c_int;
+
+    pub fn sigaddset(set: *mut sigset_t, signum: libc::c_int) -> libc::c_int;
+    pub fn sigdelset(set: *mut sigset_t, signum: libc::c_int) -> libc::c_int;
+    pub fn sigemptyset(set: *mut sigset_t) -> libc::c_int;
 }
 
 #[cfg(target_os = "macos")]
@@ -81,3 +99,94 @@ pub fn fd_set(set: &mut fd_set, fd: i32) {
         set.fds_bits[fd / uint::BITS] |= 1 << (fd % uint::BITS);
     }
 }
+
+#[cfg(target_os = "linux")]
+#[cfg(target_os = "android")]
+mod signal {
+    use libc;
+
+    pub static SA_NOCLDSTOP: libc::c_ulong = 0x00000001;
+    pub static SA_NOCLDWAIT: libc::c_ulong = 0x00000002;
+    pub static SA_NODEFER: libc::c_ulong = 0x40000000;
+    pub static SA_ONSTACK: libc::c_ulong = 0x08000000;
+    pub static SA_RESETHAND: libc::c_ulong = 0x80000000;
+    pub static SA_RESTART: libc::c_ulong = 0x10000000;
+    pub static SA_SIGINFO: libc::c_ulong = 0x00000004;
+    pub static SIGCHLD: libc::c_int = 17;
+
+    // This definition is not as accurate as it could be, {pid, uid, status} is
+    // actually a giant union. Currently we're only interested in these fields,
+    // however.
+    pub struct siginfo {
+        si_signo: libc::c_int,
+        si_errno: libc::c_int,
+        si_code: libc::c_int,
+        pub pid: libc::pid_t,
+        pub uid: libc::uid_t,
+        pub status: libc::c_int,
+    }
+
+    pub struct sigaction {
+        pub sa_handler: extern fn(libc::c_int),
+        pub sa_mask: sigset_t,
+        pub sa_flags: libc::c_ulong,
+        sa_restorer: *mut libc::c_void,
+    }
+
+    #[cfg(target_word_size = "32")]
+    pub struct sigset_t {
+        __val: [libc::c_ulong, ..32],
+    }
+    #[cfg(target_word_size = "64")]
+    pub struct sigset_t {
+        __val: [libc::c_ulong, ..16],
+    }
+}
+
+#[cfg(target_os = "macos")]
+#[cfg(target_os = "freebsd")]
+mod signal {
+    use libc;
+
+    pub static SA_ONSTACK: libc::c_int = 0x0001;
+    pub static SA_RESTART: libc::c_int = 0x0002;
+    pub static SA_RESETHAND: libc::c_int = 0x0004;
+    pub static SA_NOCLDSTOP: libc::c_int = 0x0008;
+    pub static SA_NODEFER: libc::c_int = 0x0010;
+    pub static SA_NOCLDWAIT: libc::c_int = 0x0020;
+    pub static SA_SIGINFO: libc::c_int = 0x0040;
+    pub static SIGCHLD: libc::c_int = 20;
+
+    #[cfg(target_os = "macos")]
+    pub type sigset_t = u32;
+    #[cfg(target_os = "freebsd")]
+    pub struct sigset_t {
+        bits: [u32, ..4],
+    }
+
+    // This structure has more fields, but we're not all that interested in
+    // them.
+    pub struct siginfo {
+        pub si_signo: libc::c_int,
+        pub si_errno: libc::c_int,
+        pub si_code: libc::c_int,
+        pub pid: libc::pid_t,
+        pub uid: libc::uid_t,
+        pub status: libc::c_int,
+    }
+
+    #[cfg(target_os = "macos")]
+    pub struct sigaction {
+        pub sa_handler: extern fn(libc::c_int),
+        sa_tramp: *mut libc::c_void,
+        pub sa_mask: sigset_t,
+        pub sa_flags: libc::c_int,
+    }
+
+    #[cfg(target_os = "freebsd")]
+    pub struct sigaction {
+        pub sa_handler: extern fn(libc::c_int),
+        pub sa_flags: libc::c_int,
+        pub sa_mask: sigset_t,
+    }
+}
diff --git a/src/libnative/io/helper_thread.rs b/src/libnative/io/helper_thread.rs
new file mode 100644 (file)
index 0000000..2260d74
--- /dev/null
@@ -0,0 +1,205 @@
+// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Implementation of the helper thread for the timer module
+//!
+//! This module contains the management necessary for the timer worker thread.
+//! This thread is responsible for performing the send()s on channels for timers
+//! that are using channels instead of a blocking call.
+//!
+//! The timer thread is lazily initialized, and it's shut down via the
+//! `shutdown` function provided. It must be maintained as an invariant that
+//! `shutdown` is only called when the entire program is finished. No new timers
+//! can be created in the future and there must be no active timers at that
+//! time.
+
+#![macro_escape]
+
+use std::mem;
+use std::rt::bookkeeping;
+use std::rt;
+use std::ty::Unsafe;
+use std::unstable::mutex::StaticNativeMutex;
+
+use task;
+
+/// A structure for management of a helper thread.
+///
+/// This is generally a static structure which tracks the lifetime of a helper
+/// thread.
+///
+/// The fields of this helper are all public, but they should not be used, this
+/// is for static initialization.
+pub struct Helper<M> {
+    /// Internal lock which protects the remaining fields
+    pub lock: StaticNativeMutex,
+
+    // You'll notice that the remaining fields are Unsafe<T>, and this is
+    // because all helper thread operations are done through &self, but we need
+    // these to be mutable (once `lock` is held).
+
+    /// Lazily allocated channel to send messages to the helper thread.
+    pub chan: Unsafe<*mut Sender<M>>,
+
+    /// OS handle used to wake up a blocked helper thread
+    pub signal: Unsafe<uint>,
+
+    /// Flag if this helper thread has booted and been initialized yet.
+    pub initialized: Unsafe<bool>,
+}
+
+macro_rules! helper_init( (static mut $name:ident: Helper<$m:ty>) => (
+    static mut $name: Helper<$m> = Helper {
+        lock: ::std::unstable::mutex::NATIVE_MUTEX_INIT,
+        chan: ::std::ty::Unsafe {
+            value: 0 as *mut Sender<$m>,
+            marker1: ::std::kinds::marker::InvariantType,
+        },
+        signal: ::std::ty::Unsafe {
+            value: 0,
+            marker1: ::std::kinds::marker::InvariantType,
+        },
+        initialized: ::std::ty::Unsafe {
+            value: false,
+            marker1: ::std::kinds::marker::InvariantType,
+        },
+    };
+) )
+
+impl<M: Send> Helper<M> {
+    /// Lazily boots a helper thread, becoming a no-op if the helper has already
+    /// been spawned.
+    ///
+    /// This function will check to see if the thread has been initialized, and
+    /// if it has it returns quickly. If initialization has not happened yet,
+    /// the closure `f` will be run (inside of the initialization lock) and
+    /// passed to the helper thread in a separate task.
+    ///
+    /// This function is safe to be called many times.
+    pub fn boot<T: Send>(&'static self,
+                         f: || -> T,
+                         helper: fn(imp::signal, Receiver<M>, T)) {
+        unsafe {
+            let _guard = self.lock.lock();
+            if !*self.initialized.get() {
+                let (tx, rx) = channel();
+                *self.chan.get() = mem::transmute(box tx);
+                let (receive, send) = imp::new();
+                *self.signal.get() = send as uint;
+
+                let t = f();
+                task::spawn(proc() {
+                    bookkeeping::decrement();
+                    helper(receive, rx, t);
+                    self.lock.lock().signal()
+                });
+
+                rt::at_exit(proc() { self.shutdown() });
+                *self.initialized.get() = true;
+            }
+        }
+    }
+
+    /// Sends a message to a spawned worker thread.
+    ///
+    /// This is only valid if the worker thread has previously booted
+    pub fn send(&'static self, msg: M) {
+        unsafe {
+            let _guard = self.lock.lock();
+
+            // Must send and *then* signal to ensure that the child receives the
+            // message. Otherwise it could wake up and go to sleep before we
+            // send the message.
+            assert!(!self.chan.get().is_null());
+            (**self.chan.get()).send(msg);
+            imp::signal(*self.signal.get() as imp::signal);
+        }
+    }
+
+    fn shutdown(&'static self) {
+        unsafe {
+            // Shut down, but make sure this is done inside our lock to ensure
+            // that we'll always receive the exit signal when the thread
+            // returns.
+            let guard = self.lock.lock();
+
+            // Close the channel by destroying it
+            let chan: Box<Sender<M>> = mem::transmute(*self.chan.get());
+            *self.chan.get() = 0 as *mut Sender<M>;
+            drop(chan);
+            imp::signal(*self.signal.get() as imp::signal);
+
+            // Wait for the child to exit
+            guard.wait();
+            drop(guard);
+
+            // Clean up after ourselves
+            self.lock.destroy();
+            imp::close(*self.signal.get() as imp::signal);
+            *self.signal.get() = 0;
+        }
+    }
+}
+
+#[cfg(unix)]
+mod imp {
+    use libc;
+    use std::os;
+
+    use io::file::FileDesc;
+
+    pub type signal = libc::c_int;
+
+    pub fn new() -> (signal, signal) {
+        let pipe = os::pipe();
+        (pipe.input, pipe.out)
+    }
+
+    pub fn signal(fd: libc::c_int) {
+        FileDesc::new(fd, false).inner_write([0]).unwrap();
+    }
+
+    pub fn close(fd: libc::c_int) {
+        let _fd = FileDesc::new(fd, true);
+    }
+}
+
+#[cfg(windows)]
+mod imp {
+    use libc::{BOOL, LPCSTR, HANDLE, LPSECURITY_ATTRIBUTES, CloseHandle};
+    use std::ptr;
+    use libc;
+
+    pub type signal = HANDLE;
+
+    pub fn new() -> (HANDLE, HANDLE) {
+        unsafe {
+            let handle = CreateEventA(ptr::mut_null(), libc::FALSE, libc::FALSE,
+                                      ptr::null());
+            (handle, handle)
+        }
+    }
+
+    pub fn signal(handle: HANDLE) {
+        assert!(unsafe { SetEvent(handle) != 0 });
+    }
+
+    pub fn close(handle: HANDLE) {
+        assert!(unsafe { CloseHandle(handle) != 0 });
+    }
+
+    extern "system" {
+        fn CreateEventA(lpSecurityAttributes: LPSECURITY_ATTRIBUTES,
+                        bManualReset: BOOL,
+                        bInitialState: BOOL,
+                        lpName: LPCSTR) -> HANDLE;
+        fn SetEvent(hEvent: HANDLE) -> BOOL;
+    }
+}
index f2c2c66e1425fe521be0ff897e2a17c10b98b567..a9aca656319efa7a4263131246c4f3412023c0ad 100644 (file)
@@ -40,6 +40,8 @@
 pub use self::file::FileDesc;
 pub use self::process::Process;
 
+mod helper_thread;
+
 // Native I/O implementations
 pub mod addrinfo;
 pub mod net;
@@ -75,8 +77,6 @@
 #[cfg(unix)]    #[path = "c_unix.rs"]  mod c;
 #[cfg(windows)] #[path = "c_win32.rs"] mod c;
 
-mod timer_helper;
-
 pub type IoResult<T> = Result<T, IoError>;
 
 fn unimpl() -> IoError {
index 14ea1f12a5ca012a085ae53e97a4e12766bd5d3f..799db64e6889775878386af1bf26ceb6b5913662 100644 (file)
@@ -8,20 +8,27 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use std::io;
 use libc::{pid_t, c_void, c_int};
 use libc;
+use std::io;
+use std::mem;
 use std::os;
 use std::ptr;
 use std::rt::rtio;
 use p = std::io::process;
 
+
 use super::IoResult;
 use super::file;
+use super::util;
 
-#[cfg(windows)] use std::mem;
 #[cfg(windows)] use std::strbuf::StrBuf;
-#[cfg(not(windows))] use super::retry;
+#[cfg(unix)] use super::c;
+#[cfg(unix)] use super::retry;
+#[cfg(unix)] use io::helper_thread::Helper;
+
+#[cfg(unix)]
+helper_init!(static mut HELPER: Helper<Req>)
 
 /**
  * A value representing a child process.
@@ -44,6 +51,14 @@ pub struct Process {
 
     /// Manually delivered signal
     exit_signal: Option<int>,
+
+    /// Deadline after which wait() will return
+    deadline: u64,
+}
+
+#[cfg(unix)]
+enum Req {
+    NewChild(libc::pid_t, Sender<p::ProcessExit>, u64),
 }
 
 impl Process {
@@ -116,6 +131,7 @@ fn get_io(io: p::StdioContainer, ret: &mut Vec<Option<file::FileDesc>>)
                         handle: res.handle,
                         exit_code: None,
                         exit_signal: None,
+                        deadline: 0,
                     },
                     ret_io))
             }
@@ -131,11 +147,15 @@ pub fn kill(pid: libc::pid_t, signum: int) -> IoResult<()> {
 impl rtio::RtioProcess for Process {
     fn id(&self) -> pid_t { self.pid }
 
-    fn wait(&mut self) -> p::ProcessExit {
+    fn set_timeout(&mut self, timeout: Option<u64>) {
+        self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0);
+    }
+
+    fn wait(&mut self) -> IoResult<p::ProcessExit> {
         match self.exit_code {
-            Some(code) => code,
+            Some(code) => Ok(code),
             None => {
-                let code = waitpid(self.pid);
+                let code = try!(waitpid(self.pid, self.deadline));
                 // On windows, waitpid will never return a signal. If a signal
                 // was successfully delivered to the process, however, we can
                 // consider it as having died via a signal.
@@ -145,7 +165,7 @@ fn wait(&mut self) -> p::ProcessExit {
                     Some(..) => code,
                 };
                 self.exit_code = Some(code);
-                code
+                Ok(code)
             }
         }
     }
@@ -762,61 +782,301 @@ pub fn WTERMSIG(status: i32) -> i32 { status & 0o177 }
  * operate on a none-existent process or, even worse, on a newer process
  * with the same id.
  */
-fn waitpid(pid: pid_t) -> p::ProcessExit {
-    return waitpid_os(pid);
-
-    #[cfg(windows)]
-    fn waitpid_os(pid: pid_t) -> p::ProcessExit {
-        use libc::types::os::arch::extra::DWORD;
-        use libc::consts::os::extra::{
-            SYNCHRONIZE,
-            PROCESS_QUERY_INFORMATION,
-            FALSE,
-            STILL_ACTIVE,
-            INFINITE,
-            WAIT_FAILED
-        };
-        use libc::funcs::extra::kernel32::{
-            OpenProcess,
-            GetExitCodeProcess,
-            CloseHandle,
-            WaitForSingleObject
-        };
+#[cfg(windows)]
+fn waitpid(pid: pid_t, deadline: u64) -> IoResult<p::ProcessExit> {
+    use libc::types::os::arch::extra::DWORD;
+    use libc::consts::os::extra::{
+        SYNCHRONIZE,
+        PROCESS_QUERY_INFORMATION,
+        FALSE,
+        STILL_ACTIVE,
+        INFINITE,
+        WAIT_TIMEOUT,
+        WAIT_OBJECT_0,
+    };
+    use libc::funcs::extra::kernel32::{
+        OpenProcess,
+        GetExitCodeProcess,
+        CloseHandle,
+        WaitForSingleObject,
+    };
 
-        unsafe {
+    unsafe {
+        let process = OpenProcess(SYNCHRONIZE | PROCESS_QUERY_INFORMATION,
+                                  FALSE,
+                                  pid as DWORD);
+        if process.is_null() {
+            return Err(super::last_error())
+        }
 
-            let process = OpenProcess(SYNCHRONIZE | PROCESS_QUERY_INFORMATION,
-                                      FALSE,
-                                      pid as DWORD);
-            if process.is_null() {
-                fail!("failure in OpenProcess: {}", os::last_os_error());
+        loop {
+            let mut status = 0;
+            if GetExitCodeProcess(process, &mut status) == FALSE {
+                let err = Err(super::last_error());
+                assert!(CloseHandle(process) != 0);
+                return err;
             }
-
-            loop {
-                let mut status = 0;
-                if GetExitCodeProcess(process, &mut status) == FALSE {
-                    assert!(CloseHandle(process) != 0);
-                    fail!("failure in GetExitCodeProcess: {}", os::last_os_error());
-                }
-                if status != STILL_ACTIVE {
+            if status != STILL_ACTIVE {
+                assert!(CloseHandle(process) != 0);
+                return Ok(p::ExitStatus(status as int));
+            }
+            let interval = if deadline == 0 {
+                INFINITE
+            } else {
+                let now = ::io::timer::now();
+                if deadline < now {0} else {(deadline - now) as u32}
+            };
+            match WaitForSingleObject(process, interval) {
+                WAIT_OBJECT_0 => {}
+                WAIT_TIMEOUT => {
                     assert!(CloseHandle(process) != 0);
-                    return p::ExitStatus(status as int);
+                    return Err(util::timeout("process wait timed out"))
                 }
-                if WaitForSingleObject(process, INFINITE) == WAIT_FAILED {
+                _ => {
+                    let err = Err(super::last_error());
                     assert!(CloseHandle(process) != 0);
-                    fail!("failure in WaitForSingleObject: {}", os::last_os_error());
+                    return err
                 }
             }
         }
     }
+}
 
-    #[cfg(unix)]
-    fn waitpid_os(pid: pid_t) -> p::ProcessExit {
-        use libc::funcs::posix01::wait;
-        let mut status = 0 as c_int;
-        match retry(|| unsafe { wait::waitpid(pid, &mut status, 0) }) {
+#[cfg(unix)]
+fn waitpid(pid: pid_t, deadline: u64) -> IoResult<p::ProcessExit> {
+    use std::cmp;
+    use std::comm;
+
+    static mut WRITE_FD: libc::c_int = 0;
+
+    let mut status = 0 as c_int;
+    if deadline == 0 {
+        return match retry(|| unsafe { c::waitpid(pid, &mut status, 0) }) {
             -1 => fail!("unknown waitpid error: {}", super::last_error()),
-            _ => translate_status(status),
+            _ => Ok(translate_status(status)),
+        }
+    }
+
+    // On unix, wait() and its friends have no timeout parameters, so there is
+    // no way to time out a thread in wait(). From some googling and some
+    // thinking, it appears that there are a few ways to handle timeouts in
+    // wait(), but the only real reasonable one for a multi-threaded program is
+    // to listen for SIGCHLD.
+    //
+    // With this in mind, the waiting mechanism with a timeout barely uses
+    // waitpid() at all. There are a few times that waitpid() is invoked with
+    // WNOHANG, but otherwise all the necessary blocking is done by waiting for
+    // a SIGCHLD to arrive (and that blocking has a timeout). Note, however,
+    // that waitpid() is still used to actually reap the child.
+    //
+    // Signal handling is super tricky in general, and this is no exception. Due
+    // to the async nature of SIGCHLD, we use the self-pipe trick to transmit
+    // data out of the signal handler to the rest of the application. The first
+    // idea would be to have each thread waiting with a timeout to read this
+    // output file descriptor, but a write() is akin to a signal(), not a
+    // broadcast(), so it would only wake up one thread, and possibly the wrong
+    // thread. Hence a helper thread is used.
+    //
+    // The helper thread here is responsible for farming requests for a
+    // waitpid() with a timeout, and then processing all of the wait requests.
+    // By guaranteeing that only this helper thread is reading half of the
+    // self-pipe, we're sure that we'll never lose a SIGCHLD. This helper thread
+    // is also responsible for select() to wait for incoming messages or
+    // incoming SIGCHLD messages, along with passing an appropriate timeout to
+    // select() to wake things up as necessary.
+    //
+    // The ordering of the following statements is also very purposeful. First,
+    // we must be guaranteed that the helper thread is booted and available to
+    // receive SIGCHLD signals, and then we must also ensure that we do a
+    // nonblocking waitpid() at least once before we go ask the sigchld helper.
+    // This prevents the race where the child exits, we boot the helper, and
+    // then we ask for the child's exit status (never seeing a sigchld).
+    //
+    // The actual communication between the helper thread and this thread is
+    // quite simple, just a channel moving data around.
+
+    unsafe { HELPER.boot(register_sigchld, waitpid_helper) }
+
+    match waitpid_nowait(pid) {
+        Some(ret) => return Ok(ret),
+        None => {}
+    }
+
+    let (tx, rx) = channel();
+    unsafe { HELPER.send(NewChild(pid, tx, deadline)); }
+    return match rx.recv_opt() {
+        Ok(e) => Ok(e),
+        Err(()) => Err(util::timeout("wait timed out")),
+    };
+
+    // Register a new SIGCHLD handler, returning the reading half of the
+    // self-pipe plus the old handler registered (return value of sigaction).
+    fn register_sigchld() -> (libc::c_int, c::sigaction) {
+        unsafe {
+            let mut old: c::sigaction = mem::init();
+            let mut new: c::sigaction = mem::init();
+            new.sa_handler = sigchld_handler;
+            new.sa_flags = c::SA_NOCLDSTOP;
+            assert_eq!(c::sigaction(c::SIGCHLD, &new, &mut old), 0);
+
+            let mut pipes = [0, ..2];
+            assert_eq!(libc::pipe(pipes.as_mut_ptr()), 0);
+            util::set_nonblocking(pipes[0], true).unwrap();
+            util::set_nonblocking(pipes[1], true).unwrap();
+            WRITE_FD = pipes[1];
+            (pipes[0], old)
+        }
+    }
+
+    // Helper thread for processing SIGCHLD messages
+    fn waitpid_helper(input: libc::c_int,
+                      messages: Receiver<Req>,
+                      (read_fd, old): (libc::c_int, c::sigaction)) {
+        util::set_nonblocking(input, true).unwrap();
+        let mut set: c::fd_set = unsafe { mem::init() };
+        let mut tv: libc::timeval;
+        let mut active = Vec::<(libc::pid_t, Sender<p::ProcessExit>, u64)>::new();
+        let max = cmp::max(input, read_fd) + 1;
+
+        'outer: loop {
+            // Figure out the timeout of our syscall-to-happen. If we're waiting
+            // for some processes, then they'll have a timeout, otherwise we
+            // wait indefinitely for a message to arrive.
+            //
+            // FIXME: sure would be nice to not have to scan the entire array
+            let min = active.iter().map(|a| *a.ref2()).enumerate().min_by(|p| {
+                p.val1()
+            });
+            let (p, idx) = match min {
+                Some((idx, deadline)) => {
+                    let now = ::io::timer::now();
+                    let ms = if now < deadline {deadline - now} else {0};
+                    tv = util::ms_to_timeval(ms);
+                    (&tv as *_, idx)
+                }
+                None => (ptr::null(), -1),
+            };
+
+            // Wait for something to happen
+            c::fd_set(&mut set, input);
+            c::fd_set(&mut set, read_fd);
+            match unsafe { c::select(max, &set, ptr::null(), ptr::null(), p) } {
+                // interrupted, retry
+                -1 if os::errno() == libc::EINTR as int => continue,
+
+                // We read something, break out and process
+                1 | 2 => {}
+
+                // Timeout, the pending request is removed
+                0 => {
+                    drop(active.remove(idx));
+                    continue
+                }
+
+                n => fail!("error in select {} ({})", os::errno(), n),
+            }
+
+            // Process any pending messages
+            if drain(input) {
+                loop {
+                    match messages.try_recv() {
+                        Ok(NewChild(pid, tx, deadline)) => {
+                            active.push((pid, tx, deadline));
+                        }
+                        Err(comm::Disconnected) => {
+                            assert!(active.len() == 0);
+                            break 'outer;
+                        }
+                        Err(comm::Empty) => break,
+                    }
+                }
+            }
+
+            // If a child exited (somehow received SIGCHLD), then poll all
+            // children to see if any of them exited.
+            //
+            // We also attempt to be responsible netizens when dealing with
+            // SIGCHLD by invoking any previous SIGCHLD handler instead of just
+            // ignoring any previous SIGCHLD handler. Note that we don't provide
+            // a 1:1 mapping of our handler invocations to the previous handler
+            // invocations because we drain the `read_fd` entirely. This is
+            // probably OK because the kernel is already allowed to coalesce
+            // simultaneous signals, we're just doing some extra coalescing.
+            //
+            // Another point of note is that this likely runs the signal handler
+            // on a different thread than the one that received the signal. I
+            // *think* this is ok at this time.
+            //
+            // The main reason for doing this is to allow stdtest to run native
+            // tests as well. Both libgreen and libnative are running around
+            // with process timeouts, but libgreen should get there first
+            // (currently libuv doesn't handle old signal handlers).
+            if drain(read_fd) {
+                let i: uint = unsafe { mem::transmute(old.sa_handler) };
+                if i != 0 {
+                    assert!(old.sa_flags & c::SA_SIGINFO == 0);
+                    (old.sa_handler)(c::SIGCHLD);
+                }
+
+                // FIXME: sure would be nice to not have to scan the entire
+                //        array...
+                active.retain(|&(pid, ref tx, _)| {
+                    match waitpid_nowait(pid) {
+                        Some(msg) => { tx.send(msg); false }
+                        None => true,
+                    }
+                });
+            }
+        }
+
+        // Once this helper thread is done, we re-register the old sigchld
+        // handler and close our intermediate file descriptors.
+        unsafe {
+            assert_eq!(c::sigaction(c::SIGCHLD, &old, ptr::mut_null()), 0);
+            let _ = libc::close(read_fd);
+            let _ = libc::close(WRITE_FD);
+            WRITE_FD = -1;
+        }
+    }
+
+    // Drain all pending data from the file descriptor, returning if any data
+    // could be drained. This requires that the file descriptor is in
+    // nonblocking mode.
+    fn drain(fd: libc::c_int) -> bool {
+        let mut ret = false;
+        loop {
+            let mut buf = [0u8, ..1];
+            match unsafe {
+                libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void,
+                           buf.len() as libc::size_t)
+            } {
+                n if n > 0 => { ret = true; }
+                0 => return true,
+                -1 if util::wouldblock() => return ret,
+                n => fail!("bad read {} ({})", os::last_os_error(), n),
+            }
+        }
+    }
+
+    // Signal handler for SIGCHLD signals, must be async-signal-safe!
+    //
+    // This function will write to the writing half of the "self pipe" to wake
+    // up the helper thread if it's waiting. Note that this write must be
+    // nonblocking because if it blocks and the reader is the thread we
+    // interrupted, then we'll deadlock.
+    //
+    // When writing, if the write returns EWOULDBLOCK then we choose to ignore
+    // it. At that point we're guaranteed that there's something in the pipe
+    // which will wake up the other end at some point, so we just allow this
+    // signal to be coalesced with the pending signals on the pipe.
+    extern fn sigchld_handler(_signum: libc::c_int) {
+        let mut msg = 1;
+        match unsafe {
+            libc::write(WRITE_FD, &mut msg as *mut _ as *libc::c_void, 1)
+        } {
+            1 => {}
+            -1 if util::wouldblock() => {} // see above comments
+            n => fail!("bad error on write fd: {} {}", n, os::errno()),
         }
     }
 }
@@ -830,10 +1090,9 @@ fn waitpid_os(_pid: pid_t) -> Option<p::ProcessExit> { None }
 
     #[cfg(unix)]
     fn waitpid_os(pid: pid_t) -> Option<p::ProcessExit> {
-        use libc::funcs::posix01::wait;
         let mut status = 0 as c_int;
         match retry(|| unsafe {
-            wait::waitpid(pid, &mut status, libc::WNOHANG)
+            c::waitpid(pid, &mut status, c::WNOHANG)
         }) {
             n if n == pid => Some(translate_status(status)),
             0 => None,
diff --git a/src/libnative/io/timer_helper.rs b/src/libnative/io/timer_helper.rs
deleted file mode 100644 (file)
index 95b2620..0000000
+++ /dev/null
@@ -1,149 +0,0 @@
-// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-//! Implementation of the helper thread for the timer module
-//!
-//! This module contains the management necessary for the timer worker thread.
-//! This thread is responsible for performing the send()s on channels for timers
-//! that are using channels instead of a blocking call.
-//!
-//! The timer thread is lazily initialized, and it's shut down via the
-//! `shutdown` function provided. It must be maintained as an invariant that
-//! `shutdown` is only called when the entire program is finished. No new timers
-//! can be created in the future and there must be no active timers at that
-//! time.
-
-use std::mem;
-use std::rt::bookkeeping;
-use std::rt;
-use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
-
-use io::timer::{Req, Shutdown};
-use task;
-
-// You'll note that these variables are *not* protected by a lock. These
-// variables are initialized with a Once before any Timer is created and are
-// only torn down after everything else has exited. This means that these
-// variables are read-only during use (after initialization) and both of which
-// are safe to use concurrently.
-static mut HELPER_CHAN: *mut Sender<Req> = 0 as *mut Sender<Req>;
-static mut HELPER_SIGNAL: imp::signal = 0 as imp::signal;
-
-static mut TIMER_HELPER_EXIT: StaticNativeMutex = NATIVE_MUTEX_INIT;
-
-pub fn boot(helper: fn(imp::signal, Receiver<Req>)) {
-    static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
-    static mut INITIALIZED: bool = false;
-
-    unsafe {
-        let mut _guard = LOCK.lock();
-        if !INITIALIZED {
-            let (tx, rx) = channel();
-            // promote this to a shared channel
-            drop(tx.clone());
-            HELPER_CHAN = mem::transmute(box tx);
-            let (receive, send) = imp::new();
-            HELPER_SIGNAL = send;
-
-            task::spawn(proc() {
-                bookkeeping::decrement();
-                helper(receive, rx);
-                TIMER_HELPER_EXIT.lock().signal()
-            });
-
-            rt::at_exit(proc() { shutdown() });
-            INITIALIZED = true;
-        }
-    }
-}
-
-pub fn send(req: Req) {
-    unsafe {
-        assert!(!HELPER_CHAN.is_null());
-        (*HELPER_CHAN).send(req);
-        imp::signal(HELPER_SIGNAL);
-    }
-}
-
-fn shutdown() {
-    // Request a shutdown, and then wait for the task to exit
-    unsafe {
-        let guard = TIMER_HELPER_EXIT.lock();
-        send(Shutdown);
-        guard.wait();
-        drop(guard);
-        TIMER_HELPER_EXIT.destroy();
-    }
-
-
-    // Clean up after ther helper thread
-    unsafe {
-        imp::close(HELPER_SIGNAL);
-        let _chan: Box<Sender<Req>> = mem::transmute(HELPER_CHAN);
-        HELPER_CHAN = 0 as *mut Sender<Req>;
-        HELPER_SIGNAL = 0 as imp::signal;
-    }
-}
-
-#[cfg(unix)]
-mod imp {
-    use libc;
-    use std::os;
-
-    use io::file::FileDesc;
-
-    pub type signal = libc::c_int;
-
-    pub fn new() -> (signal, signal) {
-        let pipe = os::pipe();
-        (pipe.input, pipe.out)
-    }
-
-    pub fn signal(fd: libc::c_int) {
-        FileDesc::new(fd, false).inner_write([0]).unwrap();
-    }
-
-    pub fn close(fd: libc::c_int) {
-        let _fd = FileDesc::new(fd, true);
-    }
-}
-
-#[cfg(windows)]
-mod imp {
-    use libc::{BOOL, LPCSTR, HANDLE, LPSECURITY_ATTRIBUTES, CloseHandle};
-    use std::ptr;
-    use libc;
-
-    pub type signal = HANDLE;
-
-    pub fn new() -> (HANDLE, HANDLE) {
-        unsafe {
-            let handle = CreateEventA(ptr::mut_null(), libc::FALSE, libc::FALSE,
-                                      ptr::null());
-            (handle, handle)
-        }
-    }
-
-    pub fn signal(handle: HANDLE) {
-        assert!(unsafe { SetEvent(handle) != 0 });
-    }
-
-    pub fn close(handle: HANDLE) {
-        assert!(unsafe { CloseHandle(handle) != 0 });
-    }
-
-    extern "system" {
-        fn CreateEventA(lpSecurityAttributes: LPSECURITY_ATTRIBUTES,
-                        bManualReset: BOOL,
-                        bInitialState: BOOL,
-                        lpName: LPCSTR) -> HANDLE;
-        fn SetEvent(hEvent: HANDLE) -> BOOL;
-    }
-}
index e008e6fb9e9057184f2f99d68e10731df97fd9aa..2c5b798482777657c11770ffb002ea608d2f6210 100644 (file)
 use std::ptr;
 use std::rt::rtio;
 use std::sync::atomics;
+use std::comm;
 
 use io::IoResult;
 use io::c;
 use io::file::FileDesc;
-use io::timer_helper;
+use io::helper_thread::Helper;
+
+helper_init!(static mut HELPER: Helper<Req>)
 
 pub struct Timer {
     id: uint,
@@ -79,9 +82,6 @@ pub enum Req {
     // Remove a timer based on its id and then send it back on the channel
     // provided
     RemoveTimer(uint, Sender<Box<Inner>>),
-
-    // Shut down the loop and then ACK this channel once it's shut down
-    Shutdown,
 }
 
 // returns the current time (in milliseconds)
@@ -93,7 +93,7 @@ pub fn now() -> u64 {
     }
 }
 
-fn helper(input: libc::c_int, messages: Receiver<Req>) {
+fn helper(input: libc::c_int, messages: Receiver<Req>, _: ()) {
     let mut set: c::fd_set = unsafe { mem::init() };
 
     let mut fd = FileDesc::new(input, true);
@@ -163,7 +163,7 @@ fn signal(active: &mut Vec<Box<Inner>>,
             1 => {
                 loop {
                     match messages.try_recv() {
-                        Ok(Shutdown) => {
+                        Err(comm::Disconnected) => {
                             assert!(active.len() == 0);
                             break 'outer;
                         }
@@ -202,7 +202,7 @@ fn signal(active: &mut Vec<Box<Inner>>,
 
 impl Timer {
     pub fn new() -> IoResult<Timer> {
-        timer_helper::boot(helper);
+        unsafe { HELPER.boot(|| {}, helper); }
 
         static mut ID: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT;
         let id = unsafe { ID.fetch_add(1, atomics::Relaxed) };
@@ -235,7 +235,7 @@ fn inner(&mut self) -> Box<Inner> {
             Some(i) => i,
             None => {
                 let (tx, rx) = channel();
-                timer_helper::send(RemoveTimer(self.id, tx));
+                unsafe { HELPER.send(RemoveTimer(self.id, tx)); }
                 rx.recv()
             }
         }
@@ -261,7 +261,7 @@ fn oneshot(&mut self, msecs: u64) -> Receiver<()> {
         inner.interval = msecs;
         inner.target = now + msecs;
 
-        timer_helper::send(NewTimer(inner));
+        unsafe { HELPER.send(NewTimer(inner)); }
         return rx;
     }
 
@@ -275,7 +275,7 @@ fn period(&mut self, msecs: u64) -> Receiver<()> {
         inner.interval = msecs;
         inner.target = now + msecs;
 
-        timer_helper::send(NewTimer(inner));
+        unsafe { HELPER.send(NewTimer(inner)); }
         return rx;
     }
 }
index 15e6e62421a5b1810dcfa072db729beb128f6a36..e7130de05c26d3ffd2027b6e8af5858ff21a4f14 100644 (file)
 use libc;
 use std::ptr;
 use std::rt::rtio;
+use std::comm;
 
-use io::timer_helper;
+use io::helper_thread::Helper;
 use io::IoResult;
 
+helper_init!(static mut HELPER: Helper<Req>)
+
 pub struct Timer {
     obj: libc::HANDLE,
     on_worker: bool,
@@ -35,10 +38,9 @@ pub struct Timer {
 pub enum Req {
     NewTimer(libc::HANDLE, Sender<()>, bool),
     RemoveTimer(libc::HANDLE, Sender<()>),
-    Shutdown,
 }
 
-fn helper(input: libc::HANDLE, messages: Receiver<Req>) {
+fn helper(input: libc::HANDLE, messages: Receiver<Req>, _: ()) {
     let mut objs = vec![input];
     let mut chans = vec![];
 
@@ -67,12 +69,12 @@ fn helper(input: libc::HANDLE, messages: Receiver<Req>) {
                             None => {}
                         }
                     }
-                    Ok(Shutdown) => {
+                    Err(comm::Disconnected) => {
                         assert_eq!(objs.len(), 1);
                         assert_eq!(chans.len(), 0);
                         break 'outer;
                     }
-                    _ => break
+                    Err(..) => break
                 }
             }
         } else {
@@ -102,7 +104,7 @@ pub fn now() -> u64 {
 
 impl Timer {
     pub fn new() -> IoResult<Timer> {
-        timer_helper::boot(helper);
+        unsafe { HELPER.boot(|| {}, helper) }
 
         let obj = unsafe {
             imp::CreateWaitableTimerA(ptr::mut_null(), 0, ptr::null())
@@ -124,7 +126,7 @@ fn remove(&mut self) {
         if !self.on_worker { return }
 
         let (tx, rx) = channel();
-        timer_helper::send(RemoveTimer(self.obj, tx));
+        unsafe { HELPER.send(RemoveTimer(self.obj, tx)) }
         rx.recv();
 
         self.on_worker = false;
@@ -157,7 +159,7 @@ fn oneshot(&mut self, msecs: u64) -> Receiver<()> {
                                   ptr::mut_null(), 0)
         }, 1);
 
-        timer_helper::send(NewTimer(self.obj, tx, true));
+        unsafe { HELPER.send(NewTimer(self.obj, tx, true)) }
         self.on_worker = true;
         return rx;
     }
@@ -173,7 +175,7 @@ fn period(&mut self, msecs: u64) -> Receiver<()> {
                                   ptr::null(), ptr::mut_null(), 0)
         }, 1);
 
-        timer_helper::send(NewTimer(self.obj, tx, false));
+        unsafe { HELPER.send(NewTimer(self.obj, tx, false)) }
         self.on_worker = true;
 
         return rx;
index 05cf415ec78b0b9d5486244cda7ea2e96fab4386..8ba06133369245a13d950b838df1d71d150303b7 100644 (file)
@@ -55,6 +55,7 @@
 // NB this crate explicitly does *not* allow glob imports, please seriously
 //    consider whether they're needed before adding that feature here (the
 //    answer is that you don't need them)
+#![feature(macro_rules)]
 
 extern crate libc;
 
index 67140ac1920e3ae92a9368e45399351ac280c3f3..04ce4f88831f4d0cd823b8d1c924bf5f54a10c51 100644 (file)
@@ -54,8 +54,8 @@ fn run_ar(sess: &Session, args: &str, cwd: Option<&Path>,
         cwd: cwd.map(|a| &*a),
         .. ProcessConfig::new()
     }) {
-        Ok(mut prog) => {
-            let o = prog.wait_with_output();
+        Ok(prog) => {
+            let o = prog.wait_with_output().unwrap();
             if !o.status.success() {
                 sess.err(format!("{} {} failed with: {}", ar, args.connect(" "),
                                  o.status));
index d671e20868c5c39d64b80193fbad4f416c1f1d57..7afac6801519b652fe9453886c004164ced706f9 100644 (file)
@@ -19,7 +19,8 @@
 use homing::{HomingIO, HomeHandle};
 use pipe::PipeWatcher;
 use super::{UvHandle, UvError, uv_error_to_io_error,
-            wait_until_woken_after, wakeup};
+            wait_until_woken_after, wakeup, Loop};
+use timer::TimerWatcher;
 use uvio::UvIoFactory;
 use uvll;
 
@@ -32,6 +33,16 @@ pub struct Process {
 
     /// Collected from the exit_cb
     exit_status: Option<process::ProcessExit>,
+
+    /// Lazily initialized timeout timer
+    timer: Option<Box<TimerWatcher>>,
+    timeout_state: TimeoutState,
+}
+
+enum TimeoutState {
+    NoTimeout,
+    TimeoutPending,
+    TimeoutElapsed,
 }
 
 impl Process {
@@ -92,6 +103,8 @@ pub fn spawn(io_loop: &mut UvIoFactory, config: process::ProcessConfig)
                     home: io_loop.make_handle(),
                     to_wake: None,
                     exit_status: None,
+                    timer: None,
+                    timeout_state: NoTimeout,
                 };
                 match unsafe {
                     uvll::uv_spawn(io_loop.uv_loop(), handle, &options)
@@ -223,21 +236,71 @@ fn kill(&mut self, signal: int) -> Result<(), IoError> {
         }
     }
 
-    fn wait(&mut self) -> process::ProcessExit {
+    fn wait(&mut self) -> Result<process::ProcessExit, IoError> {
         // Make sure (on the home scheduler) that we have an exit status listed
         let _m = self.fire_homing_missile();
         match self.exit_status {
-            Some(..) => {}
-            None => {
-                // If there's no exit code previously listed, then the
-                // process's exit callback has yet to be invoked. We just
-                // need to deschedule ourselves and wait to be reawoken.
+            Some(status) => return Ok(status),
+            None => {}
+        }
+
+        // If there's no exit code previously listed, then the process's exit
+        // callback has yet to be invoked. We just need to deschedule ourselves
+        // and wait to be reawoken.
+        match self.timeout_state {
+            NoTimeout | TimeoutPending => {
                 wait_until_woken_after(&mut self.to_wake, &self.uv_loop(), || {});
-                assert!(self.exit_status.is_some());
             }
+            TimeoutElapsed => {}
+        }
+
+        // If there's still no exit status listed, then we timed out, and we
+        // need to return.
+        match self.exit_status {
+            Some(status) => Ok(status),
+            None => Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
+        }
+    }
+
+    fn set_timeout(&mut self, timeout: Option<u64>) {
+        let _m = self.fire_homing_missile();
+        self.timeout_state = NoTimeout;
+        let ms = match timeout {
+            Some(ms) => ms,
+            None => {
+                match self.timer {
+                    Some(ref mut timer) => timer.stop(),
+                    None => {}
+                }
+                return
+            }
+        };
+        if self.timer.is_none() {
+            let loop_ = Loop::wrap(unsafe {
+                uvll::get_loop_for_uv_handle(self.uv_handle())
+            });
+            let mut timer = box TimerWatcher::new_home(&loop_, self.home().clone());
+            unsafe {
+                timer.set_data(self as *mut _ as *Process);
+            }
+            self.timer = Some(timer);
         }
 
-        self.exit_status.unwrap()
+        let timer = self.timer.get_mut_ref();
+        timer.stop();
+        timer.start(timer_cb, ms, 0);
+        self.timeout_state = TimeoutPending;
+
+        extern fn timer_cb(timer: *uvll::uv_timer_t) {
+            let p: &mut Process = unsafe {
+                &mut *(uvll::get_data_for_uv_handle(timer) as *mut Process)
+            };
+            p.timeout_state = TimeoutElapsed;
+            match p.to_wake.take() {
+                Some(task) => { let _t = task.wake().map(|t| t.reawaken()); }
+                None => {}
+            }
+        }
     }
 }
 
index 529fd25dc50e90dd75f277b9a1eed8464868c9e8..349cac723ff5b3303d487accbcb7283c6fd14599 100644 (file)
@@ -10,6 +10,8 @@
 
 //! Bindings for executing child processes
 
+#![allow(experimental)]
+
 use prelude::*;
 
 use fmt;
@@ -50,7 +52,7 @@
 /// };
 ///
 /// let contents = child.stdout.get_mut_ref().read_to_end();
-/// assert!(child.wait().success());
+/// assert!(child.wait().unwrap().success());
 /// ```
 pub struct Process {
     handle: Box<RtioProcess:Send>,
@@ -284,7 +286,7 @@ pub fn new(prog: &str, args: &[~str]) -> IoResult<Process> {
     /// println!("stderr: {}", str::from_utf8_lossy(output.error.as_slice()));
     /// ```
     pub fn output(prog: &str, args: &[~str]) -> IoResult<ProcessOutput> {
-        Process::new(prog, args).map(|mut p| p.wait_with_output())
+        Process::new(prog, args).and_then(|p| p.wait_with_output())
     }
 
     /// Executes a child process and collects its exit status. This will block
@@ -303,7 +305,7 @@ pub fn output(prog: &str, args: &[~str]) -> IoResult<ProcessOutput> {
     /// println!("process exited with: {}", status);
     /// ```
     pub fn status(prog: &str, args: &[~str]) -> IoResult<ProcessExit> {
-        Process::new(prog, args).map(|mut p| p.wait())
+        Process::new(prog, args).and_then(|mut p| p.wait())
     }
 
     /// Creates a new process with the specified configuration.
@@ -378,17 +380,72 @@ pub fn signal_kill(&mut self) -> IoResult<()> {
     /// after it has been called at least once.
     ///
     /// The stdin handle to the child process will be closed before waiting.
-    pub fn wait(&mut self) -> ProcessExit {
+    ///
+    /// # Errors
+    ///
+    /// This function can fail if a timeout was previously specified via
+    /// `set_timeout` and the timeout expires before the child exits.
+    pub fn wait(&mut self) -> IoResult<ProcessExit> {
         drop(self.stdin.take());
         self.handle.wait()
     }
 
+    /// Sets a timeout, in milliseconds, for future calls to wait().
+    ///
+    /// The argument specified is a relative distance into the future, in
+    /// milliseconds, after which any call to wait() will return immediately
+    /// with a timeout error, and all future calls to wait() will not block.
+    ///
+    /// A value of `None` will clear any previous timeout, and a value of `Some`
+    /// will override any previously set timeout.
+    ///
+    /// # Example
+    ///
+    /// ```no_run
+    /// # #![allow(experimental)]
+    /// use std::io::process::{Process, ProcessExit};
+    /// use std::io::IoResult;
+    ///
+    /// fn run_gracefully(prog: &str) -> IoResult<ProcessExit> {
+    ///     let mut p = try!(Process::new("long-running-process", []));
+    ///
+    ///     // give the process 10 seconds to finish completely
+    ///     p.set_timeout(Some(10_000));
+    ///     match p.wait() {
+    ///         Ok(status) => return Ok(status),
+    ///         Err(..) => {}
+    ///     }
+    ///
+    ///     // Attempt to exit gracefully, but don't wait for it too long
+    ///     try!(p.signal_exit());
+    ///     p.set_timeout(Some(1_000));
+    ///     match p.wait() {
+    ///         Ok(status) => return Ok(status),
+    ///         Err(..) => {}
+    ///     }
+    ///
+    ///     // Well, we did our best, forcefully kill the process
+    ///     try!(p.signal_kill());
+    ///     p.set_timeout(None);
+    ///     p.wait()
+    /// }
+    /// ```
+    #[experimental = "the type of the timeout is likely to change"]
+    pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.handle.set_timeout(timeout_ms)
+    }
+
     /// Simultaneously wait for the child to exit and collect all remaining
     /// output on the stdout/stderr handles, returning a `ProcessOutput`
     /// instance.
     ///
     /// The stdin handle to the child is closed before waiting.
-    pub fn wait_with_output(&mut self) -> ProcessOutput {
+    ///
+    /// # Errors
+    ///
+    /// This function can fail for any of the same reasons that `wait()` can
+    /// fail.
+    pub fn wait_with_output(mut self) -> IoResult<ProcessOutput> {
         drop(self.stdin.take());
         fn read(stream: Option<io::PipeStream>) -> Receiver<IoResult<Vec<u8>>> {
             let (tx, rx) = channel();
@@ -404,11 +461,13 @@ fn read(stream: Option<io::PipeStream>) -> Receiver<IoResult<Vec<u8>>> {
         let stdout = read(self.stdout.take());
         let stderr = read(self.stderr.take());
 
-        let status = self.wait();
+        let status = try!(self.wait());
 
-        ProcessOutput { status: status,
-                        output: stdout.recv().ok().unwrap_or(Vec::new()),
-                        error:  stderr.recv().ok().unwrap_or(Vec::new()) }
+        Ok(ProcessOutput {
+            status: status,
+            output: stdout.recv().ok().unwrap_or(Vec::new()),
+            error:  stderr.recv().ok().unwrap_or(Vec::new()),
+        })
     }
 }
 
@@ -421,7 +480,8 @@ fn drop(&mut self) {
         drop(self.stderr.take());
         drop(mem::replace(&mut self.extra_io, Vec::new()));
 
-        self.wait();
+        self.set_timeout(None);
+        let _ = self.wait().unwrap();
     }
 }
 
@@ -441,7 +501,7 @@ mod tests {
         let p = Process::configure(args);
         assert!(p.is_ok());
         let mut p = p.unwrap();
-        assert!(p.wait().success());
+        assert!(p.wait().unwrap().success());
     })
 
     #[cfg(not(target_os="android"))]
@@ -465,7 +525,7 @@ mod tests {
         let p = Process::configure(args);
         assert!(p.is_ok());
         let mut p = p.unwrap();
-        assert!(p.wait().matches_exit_status(1));
+        assert!(p.wait().unwrap().matches_exit_status(1));
         drop(p.wait().clone());
     })
 
@@ -479,7 +539,7 @@ mod tests {
         let p = Process::configure(args);
         assert!(p.is_ok());
         let mut p = p.unwrap();
-        match p.wait() {
+        match p.wait().unwrap() {
             process::ExitSignal(1) => {},
             result => fail!("not terminated by signal 1 (instead, {})", result),
         }
@@ -495,7 +555,7 @@ pub fn run_output(args: ProcessConfig) -> ~str {
         let mut p = p.unwrap();
         assert!(p.stdout.is_some());
         let ret = read_all(p.stdout.get_mut_ref() as &mut Reader);
-        assert!(p.wait().success());
+        assert!(p.wait().unwrap().success());
         return ret;
     }
 
@@ -536,7 +596,7 @@ pub fn run_output(args: ProcessConfig) -> ~str {
         p.stdin.get_mut_ref().write("foobar".as_bytes()).unwrap();
         drop(p.stdin.take());
         let out = read_all(p.stdout.get_mut_ref() as &mut Reader);
-        assert!(p.wait().success());
+        assert!(p.wait().unwrap().success());
         assert_eq!(out, "foobar\n".to_owned());
     })
 
@@ -548,7 +608,7 @@ pub fn run_output(args: ProcessConfig) -> ~str {
             .. ProcessConfig::new()
         };
         let mut p = Process::configure(args).unwrap();
-        assert!(p.wait().success());
+        assert!(p.wait().unwrap().success());
     })
 
     #[cfg(windows)]
@@ -572,7 +632,7 @@ pub fn run_output(args: ProcessConfig) -> ~str {
             .. ProcessConfig::new()
         };
         let mut p = Process::configure(args).unwrap();
-        assert!(p.wait().success());
+        assert!(p.wait().unwrap().success());
     })
 
     #[cfg(unix, not(target_os="android"))]
@@ -635,21 +695,21 @@ pub fn run_output(args: ProcessConfig) -> ~str {
     #[cfg(not(target_os="android"))]
     iotest!(fn test_finish_once() {
         let mut prog = Process::new("false", []).unwrap();
-        assert!(prog.wait().matches_exit_status(1));
+        assert!(prog.wait().unwrap().matches_exit_status(1));
     })
 
     #[cfg(not(target_os="android"))]
     iotest!(fn test_finish_twice() {
         let mut prog = Process::new("false", []).unwrap();
-        assert!(prog.wait().matches_exit_status(1));
-        assert!(prog.wait().matches_exit_status(1));
+        assert!(prog.wait().unwrap().matches_exit_status(1));
+        assert!(prog.wait().unwrap().matches_exit_status(1));
     })
 
     #[cfg(not(target_os="android"))]
     iotest!(fn test_wait_with_output_once() {
 
-        let mut prog = Process::new("echo", ["hello".to_owned()]).unwrap();
-        let ProcessOutput {status, output, error} = prog.wait_with_output();
+        let prog = Process::new("echo", ["hello".to_owned()]).unwrap();
+        let ProcessOutput {status, output, error} = prog.wait_with_output().unwrap();
         let output_str = str::from_utf8(output.as_slice()).unwrap();
 
         assert!(status.success());
@@ -660,30 +720,6 @@ pub fn run_output(args: ProcessConfig) -> ~str {
         }
     })
 
-    #[cfg(not(target_os="android"))]
-    iotest!(fn test_wait_with_output_twice() {
-        let mut prog = Process::new("echo", ["hello".to_owned()]).unwrap();
-        let ProcessOutput {status, output, error} = prog.wait_with_output();
-
-        let output_str = str::from_utf8(output.as_slice()).unwrap();
-
-        assert!(status.success());
-        assert_eq!(output_str.trim().to_owned(), "hello".to_owned());
-        // FIXME #7224
-        if !running_on_valgrind() {
-            assert_eq!(error, Vec::new());
-        }
-
-        let ProcessOutput {status, output, error} = prog.wait_with_output();
-
-        assert!(status.success());
-        assert_eq!(output, Vec::new());
-        // FIXME #7224
-        if !running_on_valgrind() {
-            assert_eq!(error, Vec::new());
-        }
-    })
-
     #[cfg(unix,not(target_os="android"))]
     pub fn run_pwd(dir: Option<&Path>) -> Process {
         Process::configure(ProcessConfig {
@@ -714,9 +750,10 @@ pub fn run_pwd(dir: Option<&Path>) -> Process {
 
     iotest!(fn test_keep_current_working_dir() {
         use os;
-        let mut prog = run_pwd(None);
+        let prog = run_pwd(None);
 
-        let output = str::from_utf8(prog.wait_with_output().output.as_slice()).unwrap().to_owned();
+        let output = str::from_utf8(prog.wait_with_output().unwrap()
+                                        .output.as_slice()).unwrap().to_owned();
         let parent_dir = os::getcwd();
         let child_dir = Path::new(output.trim());
 
@@ -732,9 +769,10 @@ pub fn run_pwd(dir: Option<&Path>) -> Process {
         // test changing to the parent of os::getcwd() because we know
         // the path exists (and os::getcwd() is not expected to be root)
         let parent_dir = os::getcwd().dir_path();
-        let mut prog = run_pwd(Some(&parent_dir));
+        let prog = run_pwd(Some(&parent_dir));
 
-        let output = str::from_utf8(prog.wait_with_output().output.as_slice()).unwrap().to_owned();
+        let output = str::from_utf8(prog.wait_with_output().unwrap()
+                                        .output.as_slice()).unwrap().to_owned();
         let child_dir = Path::new(output.trim());
 
         let parent_stat = parent_dir.stat().unwrap();
@@ -777,8 +815,9 @@ pub fn run_env(env: Option<~[(~str, ~str)]>) -> Process {
         use os;
         if running_on_valgrind() { return; }
 
-        let mut prog = run_env(None);
-        let output = str::from_utf8(prog.wait_with_output().output.as_slice()).unwrap().to_owned();
+        let prog = run_env(None);
+        let output = str::from_utf8(prog.wait_with_output().unwrap()
+                                        .output.as_slice()).unwrap().to_owned();
 
         let r = os::env();
         for &(ref k, ref v) in r.iter() {
@@ -791,8 +830,10 @@ pub fn run_env(env: Option<~[(~str, ~str)]>) -> Process {
         use os;
         if running_on_valgrind() { return; }
 
-        let mut prog = run_env(None);
-        let output = str::from_utf8(prog.wait_with_output().output.as_slice()).unwrap().to_owned();
+        let prog = run_env(None);
+        let output = str::from_utf8(prog.wait_with_output()
+                                        .unwrap().output.as_slice())
+                                   .unwrap().to_owned();
 
         let r = os::env();
         for &(ref k, ref v) in r.iter() {
@@ -807,8 +848,8 @@ pub fn run_env(env: Option<~[(~str, ~str)]>) -> Process {
     iotest!(fn test_add_to_env() {
         let new_env = box [("RUN_TEST_NEW_ENV".to_owned(), "123".to_owned())];
 
-        let mut prog = run_env(Some(new_env));
-        let result = prog.wait_with_output();
+        let prog = run_env(Some(new_env));
+        let result = prog.wait_with_output().unwrap();
         let output = str::from_utf8_lossy(result.output.as_slice()).into_owned();
 
         assert!(output.contains("RUN_TEST_NEW_ENV=123"),
@@ -830,14 +871,14 @@ pub fn sleeper() -> Process {
     iotest!(fn test_kill() {
         let mut p = sleeper();
         Process::kill(p.id(), PleaseExitSignal).unwrap();
-        assert!(!p.wait().success());
+        assert!(!p.wait().unwrap().success());
     })
 
     iotest!(fn test_exists() {
         let mut p = sleeper();
         assert!(Process::kill(p.id(), 0).is_ok());
         p.signal_kill().unwrap();
-        assert!(!p.wait().success());
+        assert!(!p.wait().unwrap().success());
     })
 
     iotest!(fn test_zero() {
@@ -845,11 +886,42 @@ pub fn sleeper() -> Process {
         p.signal_kill().unwrap();
         for _ in range(0, 20) {
             if p.signal(0).is_err() {
-                assert!(!p.wait().success());
+                assert!(!p.wait().unwrap().success());
                 return
             }
             timer::sleep(100);
         }
         fail!("never saw the child go away");
     })
+
+    iotest!(fn wait_timeout() {
+        let mut p = sleeper();
+        p.set_timeout(Some(10));
+        assert_eq!(p.wait().err().unwrap().kind, TimedOut);
+        assert_eq!(p.wait().err().unwrap().kind, TimedOut);
+        p.signal_kill().unwrap();
+        p.set_timeout(None);
+        assert!(p.wait().is_ok());
+    })
+
+    iotest!(fn wait_timeout2() {
+        let (tx, rx) = channel();
+        let tx2 = tx.clone();
+        spawn(proc() {
+            let mut p = sleeper();
+            p.set_timeout(Some(10));
+            assert_eq!(p.wait().err().unwrap().kind, TimedOut);
+            p.signal_kill().unwrap();
+            tx.send(());
+        });
+        spawn(proc() {
+            let mut p = sleeper();
+            p.set_timeout(Some(10));
+            assert_eq!(p.wait().err().unwrap().kind, TimedOut);
+            p.signal_kill().unwrap();
+            tx2.send(());
+        });
+        rx.recv();
+        rx.recv();
+    })
 }
index d23d327d558815a55ead009cd33a48f4d67f9da8..90f97e59caadad6a4d0960d95fcc2f9d4b2355ee 100644 (file)
@@ -275,7 +275,8 @@ pub trait RtioFileStream {
 pub trait RtioProcess {
     fn id(&self) -> libc::pid_t;
     fn kill(&mut self, signal: int) -> IoResult<()>;
-    fn wait(&mut self) -> ProcessExit;
+    fn wait(&mut self) -> IoResult<ProcessExit>;
+    fn set_timeout(&mut self, timeout: Option<u64>);
 }
 
 pub trait RtioPipe {
index 989453d8570d169ae14b1dafb3d966dd479261c1..53fe91cff3745ee9302ae20f29ada0fb0a79dab6 100644 (file)
@@ -50,7 +50,7 @@ fn runtest(me: &str) {
         env: Some(env.as_slice()),
         .. ProcessConfig::new()
     }).unwrap();
-    let out = p.wait_with_output();
+    let out = p.wait_with_output().unwrap();
     assert!(!out.status.success());
     let s = str::from_utf8(out.error.as_slice()).unwrap();
     assert!(s.contains("stack backtrace") && s.contains("foo::h"),
@@ -62,7 +62,7 @@ fn runtest(me: &str) {
         args: ["fail".to_owned()],
         .. ProcessConfig::new()
     }).unwrap();
-    let out = p.wait_with_output();
+    let out = p.wait_with_output().unwrap();
     assert!(!out.status.success());
     let s = str::from_utf8(out.error.as_slice()).unwrap();
     assert!(!s.contains("stack backtrace") && !s.contains("foo::h"),
@@ -74,7 +74,7 @@ fn runtest(me: &str) {
         args: ["double-fail".to_owned()],
         .. ProcessConfig::new()
     }).unwrap();
-    let out = p.wait_with_output();
+    let out = p.wait_with_output().unwrap();
     assert!(!out.status.success());
     let s = str::from_utf8(out.error.as_slice()).unwrap();
     assert!(s.contains("stack backtrace") && s.contains("double::h"),
@@ -87,7 +87,7 @@ fn runtest(me: &str) {
         env: Some(env.as_slice()),
         .. ProcessConfig::new()
     }).unwrap();
-    let out = p.wait_with_output();
+    let out = p.wait_with_output().unwrap();
     assert!(!out.status.success());
     let s = str::from_utf8(out.error.as_slice()).unwrap();
     let mut i = 0;
index 83d3b51f74a4fb29ddf365128f8e8399be6f1fa0..01a71d862b4ff9396a89b363277e5947e18ad85c 100644 (file)
@@ -120,7 +120,7 @@ pub fn test_destroy_actually_kills(force: bool) {
             () = rx1.recv() => {}
         }
     });
-    match p.wait() {
+    match p.wait().unwrap() {
         ExitStatus(..) => fail!("expected a signal"),
         ExitSignal(..) => tx.send(()),
     }
index f66b943d85f64fbedc624ceb0e81923fc4e74b67..fc1825d22cd94f8b6f99e7fd82a2e19eab49c729 100644 (file)
@@ -52,7 +52,7 @@ fn parent(flavor: ~str) {
     let args = args.as_slice();
     let mut p = io::Process::new(args[0].as_slice(), ["child".to_owned(), flavor]).unwrap();
     p.stdin.get_mut_ref().write_str("test1\ntest2\ntest3").unwrap();
-    let out = p.wait_with_output();
+    let out = p.wait_with_output().unwrap();
     assert!(out.status.success());
     let s = str::from_utf8(out.output.as_slice()).unwrap();
     assert_eq!(s, "test1\n\ntest2\n\ntest3\n");
index a5e632b94a28888c7437d2dae2d2ad10885d0e7c..f87c22bdb57c3d0bf2089514cba17bce176d601e 100644 (file)
@@ -36,7 +36,7 @@ fn main() {
         env: Some(env.as_slice()),
         ..ProcessConfig::new()
     };
-    let p = Process::configure(config).unwrap().wait_with_output();
+    let p = Process::configure(config).unwrap().wait_with_output().unwrap();
     assert!(p.status.success());
     let mut lines = str::from_utf8(p.error.as_slice()).unwrap().lines();
     assert!(lines.next().unwrap().contains("foo"));
index 2a814956631d4378a2d83132e42915b8db0152e6..f41f2619032bc99b60c7d83aaae841b8e0bcafbd 100644 (file)
@@ -54,7 +54,7 @@ fn main() {
     // Wait for the child process to die (terminate it's stdin and the read
     // should fail).
     drop(p.stdin.take());
-    match p.wait() {
+    match p.wait().unwrap() {
         process::ExitStatus(..) => {}
         process::ExitSignal(..) => fail!()
     }
index f9839ed39e752c4830bafbcc2180d2ea33dac960..45af7d5de3481e6b4be3a24b880e1bf5908a4bed 100644 (file)
@@ -62,7 +62,7 @@ fn main() {
             cwd: Some(&cwd),
             env: Some(my_env.append_one(env).as_slice()),
             .. ProcessConfig::new()
-        }).unwrap().wait_with_output();
+        }).unwrap().wait_with_output().unwrap();
 
         // display the output
         assert!(io::stdout().write(p.output.as_slice()).is_ok());
index 34d1f5e66c6782a66f700aad2b67933d8a5a94e5..2b42e3ada542879fa083edf961c3e66ea55f0820 100644 (file)
@@ -31,5 +31,5 @@ fn main() {
     }
 
     let mut p = Process::new(args[0], ["test".to_owned()]).unwrap();
-    assert!(p.wait().success());
+    assert!(p.wait().unwrap().success());
 }