]> git.lizzy.rs Git - rust.git/commitdiff
std: Add timeouts to unix connect/accept
authorAlex Crichton <alex@alexcrichton.com>
Wed, 23 Apr 2014 01:38:59 +0000 (18:38 -0700)
committerAlex Crichton <alex@alexcrichton.com>
Thu, 24 Apr 2014 23:24:09 +0000 (16:24 -0700)
This adds support for connecting to a unix socket with a timeout (a named pipe
on windows), and accepting a connection with a timeout. The goal is to bring
unix pipes/named sockets back in line with TCP support for timeouts.

Similarly to the TCP sockets, all methods are marked #[experimental] due to
uncertainty about the type of the timeout argument.

This internally involved a good bit of refactoring to share as much code as
possible between TCP servers and pipe servers, but the core implementation did
not change drastically as part of this commit.

cc #13523

12 files changed:
src/liblibc/lib.rs
src/libnative/io/c_win32.rs
src/libnative/io/mod.rs
src/libnative/io/net.rs
src/libnative/io/pipe_unix.rs
src/libnative/io/pipe_win32.rs
src/libnative/io/util.rs [new file with mode: 0644]
src/librustuv/net.rs
src/librustuv/pipe.rs
src/librustuv/uvio.rs
src/libstd/io/net/unix.rs
src/libstd/rt/rtio.rs

index 98613f885cd4527e20be6b43e98fa1aa0b059fda..bebf95a4a3ba6870ef516438dbde82c5a91c3366 100644 (file)
 #[cfg(windows)] pub use consts::os::extra::{PIPE_UNLIMITED_INSTANCES, ERROR_ACCESS_DENIED};
 #[cfg(windows)] pub use consts::os::extra::{FILE_WRITE_ATTRIBUTES, FILE_READ_ATTRIBUTES};
 #[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_BUSY, ERROR_IO_PENDING};
-#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED};
+#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED, WAIT_OBJECT_0};
 #[cfg(windows)] pub use types::os::common::bsd44::{SOCKET};
 #[cfg(windows)] pub use types::os::common::posix01::{stat, utimbuf};
 #[cfg(windows)] pub use types::os::arch::extra::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES};
index dbbb39b3b7b52fd5363ff30a3430857ba46e349b..6c84424e97a0df4fa0c25e74fc97eaa379fc25dd 100644 (file)
@@ -59,4 +59,6 @@ pub fn getsockopt(sockfd: libc::SOCKET,
                       optname: libc::c_int,
                       optval: *mut libc::c_char,
                       optlen: *mut libc::c_int) -> libc::c_int;
+
+    pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL;
 }
index 19cb5c5f1d4f0ef57d59be45915304b2fc2428b0..944766e8fd070ca72e161c4ccd0f0ab959ca2240 100644 (file)
@@ -44,6 +44,7 @@
 pub mod addrinfo;
 pub mod net;
 pub mod process;
+mod util;
 
 #[cfg(unix)]
 #[path = "file_unix.rs"]
@@ -177,8 +178,9 @@ fn udp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioUdpSocket:Send> {
     fn unix_bind(&mut self, path: &CString) -> IoResult<~RtioUnixListener:Send> {
         pipe::UnixListener::bind(path).map(|s| ~s as ~RtioUnixListener:Send)
     }
-    fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe:Send> {
-        pipe::UnixStream::connect(path).map(|s| ~s as ~RtioPipe:Send)
+    fn unix_connect(&mut self, path: &CString,
+                    timeout: Option<u64>) -> IoResult<~RtioPipe:Send> {
+        pipe::UnixStream::connect(path, timeout).map(|s| ~s as ~RtioPipe:Send)
     }
     fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
                           hint: Option<ai::Hint>) -> IoResult<~[ai::Info]> {
index 93ec23e32ad427da60e894b85eb94b366ae05a6e..cc41da846b2b2f127293eaacb0c20a48cd7d386f 100644 (file)
 use std::io::net::ip;
 use std::io;
 use std::mem;
-use std::os;
-use std::ptr;
 use std::rt::rtio;
 use std::sync::arc::UnsafeArc;
 
 use super::{IoResult, retry, keep_going};
 use super::c;
+use super::util;
 
 ////////////////////////////////////////////////////////////////////////////////
 // sockaddr and misc bindings
@@ -118,8 +117,8 @@ fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
     }
 }
 
-fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
-                       val: libc::c_int) -> IoResult<T> {
+pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
+                           val: libc::c_int) -> IoResult<T> {
     unsafe {
         let mut slot: T = mem::init();
         let mut len = mem::size_of::<T>() as libc::socklen_t;
@@ -145,21 +144,6 @@ fn last_error() -> io::IoError {
     super::last_error()
 }
 
-fn ms_to_timeval(ms: u64) -> libc::timeval {
-    libc::timeval {
-        tv_sec: (ms / 1000) as libc::time_t,
-        tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t,
-    }
-}
-
-fn timeout(desc: &'static str) -> io::IoError {
-    io::IoError {
-        kind: io::TimedOut,
-        desc: desc,
-        detail: None,
-    }
-}
-
 #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
 #[cfg(unix)]    unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
 
@@ -270,7 +254,7 @@ pub fn connect(addr: ip::SocketAddr,
         let addrp = &addr as *_ as *libc::sockaddr;
         match timeout {
             Some(timeout) => {
-                try!(TcpStream::connect_timeout(fd, addrp, len, timeout));
+                try!(util::connect_timeout(fd, addrp, len, timeout));
                 Ok(ret)
             },
             None => {
@@ -282,84 +266,6 @@ pub fn connect(addr: ip::SocketAddr,
         }
     }
 
-    // See http://developerweb.net/viewtopic.php?id=3196 for where this is
-    // derived from.
-    fn connect_timeout(fd: sock_t,
-                       addrp: *libc::sockaddr,
-                       len: libc::socklen_t,
-                       timeout_ms: u64) -> IoResult<()> {
-        #[cfg(unix)]    use INPROGRESS = libc::EINPROGRESS;
-        #[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
-        #[cfg(unix)]    use WOULDBLOCK = libc::EWOULDBLOCK;
-        #[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK;
-
-        // Make sure the call to connect() doesn't block
-        try!(set_nonblocking(fd, true));
-
-        let ret = match unsafe { libc::connect(fd, addrp, len) } {
-            // If the connection is in progress, then we need to wait for it to
-            // finish (with a timeout). The current strategy for doing this is
-            // to use select() with a timeout.
-            -1 if os::errno() as int == INPROGRESS as int ||
-                  os::errno() as int == WOULDBLOCK as int => {
-                let mut set: c::fd_set = unsafe { mem::init() };
-                c::fd_set(&mut set, fd);
-                match await(fd, &mut set, timeout_ms) {
-                    0 => Err(timeout("connection timed out")),
-                    -1 => Err(last_error()),
-                    _ => {
-                        let err: libc::c_int = try!(
-                            getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR));
-                        if err == 0 {
-                            Ok(())
-                        } else {
-                            Err(io::IoError::from_errno(err as uint, true))
-                        }
-                    }
-                }
-            }
-
-            -1 => Err(last_error()),
-            _ => Ok(()),
-        };
-
-        // be sure to turn blocking I/O back on
-        try!(set_nonblocking(fd, false));
-        return ret;
-
-        #[cfg(unix)]
-        fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
-            let set = nb as libc::c_int;
-            super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
-        }
-        #[cfg(windows)]
-        fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
-            let mut set = nb as libc::c_ulong;
-            if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } {
-                Err(last_error())
-            } else {
-                Ok(())
-            }
-        }
-
-        #[cfg(unix)]
-        fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
-            let start = ::io::timer::now();
-            retry(|| unsafe {
-                // Recalculate the timeout each iteration (it is generally
-                // undefined what the value of the 'tv' is after select
-                // returns EINTR).
-                let tv = ms_to_timeval(timeout - (::io::timer::now() - start));
-                c::select(fd + 1, ptr::null(), &*set, ptr::null(), &tv)
-            })
-        }
-        #[cfg(windows)]
-        fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
-            let tv = ms_to_timeval(timeout);
-            unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) }
-        }
-    }
-
     pub fn fd(&self) -> sock_t {
         // This unsafety is fine because it's just a read-only arc
         unsafe { (*self.inner.get()).fd }
@@ -533,7 +439,7 @@ pub fn fd(&self) -> sock_t { self.listener.fd() }
 
     pub fn native_accept(&mut self) -> IoResult<TcpStream> {
         if self.deadline != 0 {
-            try!(self.accept_deadline());
+            try!(util::accept_deadline(self.fd(), self.deadline));
         }
         unsafe {
             let mut storage: libc::sockaddr_storage = mem::init();
@@ -550,25 +456,6 @@ pub fn native_accept(&mut self) -> IoResult<TcpStream> {
             }
         }
     }
-
-    fn accept_deadline(&mut self) -> IoResult<()> {
-        let mut set: c::fd_set = unsafe { mem::init() };
-        c::fd_set(&mut set, self.fd());
-
-        match retry(|| {
-            // If we're past the deadline, then pass a 0 timeout to select() so
-            // we can poll the status of the socket.
-            let now = ::io::timer::now();
-            let ms = if self.deadline > now {0} else {self.deadline - now};
-            let tv = ms_to_timeval(ms);
-            let n = if cfg!(windows) {1} else {self.fd() as libc::c_int + 1};
-            unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) }
-        }) {
-            -1 => Err(last_error()),
-            0 => Err(timeout("accept timed out")),
-            _ => return Ok(()),
-        }
-    }
 }
 
 impl rtio::RtioSocket for TcpAcceptor {
@@ -585,10 +472,7 @@ fn accept(&mut self) -> IoResult<~rtio::RtioTcpStream:Send> {
     fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
     fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
     fn set_timeout(&mut self, timeout: Option<u64>) {
-        self.deadline = match timeout {
-            None => 0,
-            Some(t) => ::io::timer::now() + t,
-        };
+        self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
     }
 }
 
index 5d13a6b5fc5cdd8a36bfe6f099dcbeb2bf9611d4..190cae05d4343c05d6c5981c22d5ebe63bc33019 100644 (file)
@@ -8,16 +8,17 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+use libc;
 use std::c_str::CString;
 use std::cast;
+use std::intrinsics;
 use std::io;
-use libc;
 use std::mem;
 use std::rt::rtio;
 use std::sync::arc::UnsafeArc;
-use std::intrinsics;
 
 use super::{IoResult, retry, keep_going};
+use super::util;
 use super::file::fd_t;
 
 fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
@@ -52,22 +53,6 @@ fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint
     return Ok((storage, len));
 }
 
-fn sockaddr_to_unix(storage: &libc::sockaddr_storage,
-                    len: uint) -> IoResult<CString> {
-    match storage.ss_family as libc::c_int {
-        libc::AF_UNIX => {
-            assert!(len as uint <= mem::size_of::<libc::sockaddr_un>());
-            let storage: &libc::sockaddr_un = unsafe {
-                cast::transmute(storage)
-            };
-            unsafe {
-                Ok(CString::new(storage.sun_path.as_ptr(), false).clone())
-            }
-        }
-        _ => Err(io::standard_error(io::InvalidInput))
-    }
-}
-
 struct Inner {
     fd: fd_t,
 }
@@ -76,16 +61,24 @@ impl Drop for Inner {
     fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } }
 }
 
-fn connect(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
+fn connect(addr: &CString, ty: libc::c_int,
+           timeout: Option<u64>) -> IoResult<Inner> {
     let (addr, len) = try!(addr_to_sockaddr_un(addr));
     let inner = Inner { fd: try!(unix_socket(ty)) };
-    let addrp = &addr as *libc::sockaddr_storage;
-    match retry(|| unsafe {
-        libc::connect(inner.fd, addrp as *libc::sockaddr,
-                      len as libc::socklen_t)
-    }) {
-        -1 => Err(super::last_error()),
-        _  => Ok(inner)
+    let addrp = &addr as *_ as *libc::sockaddr;
+    let len = len as libc::socklen_t;
+
+    match timeout {
+        None => {
+            match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) {
+                -1 => Err(super::last_error()),
+                _  => Ok(inner)
+            }
+        }
+        Some(timeout_ms) => {
+            try!(util::connect_timeout(inner.fd, addrp, len, timeout_ms));
+            Ok(inner)
+        }
     }
 }
 
@@ -110,8 +103,9 @@ pub struct UnixStream {
 }
 
 impl UnixStream {
-    pub fn connect(addr: &CString) -> IoResult<UnixStream> {
-        connect(addr, libc::SOCK_STREAM).map(|inner| {
+    pub fn connect(addr: &CString,
+                   timeout: Option<u64>) -> IoResult<UnixStream> {
+        connect(addr, libc::SOCK_STREAM, timeout).map(|inner| {
             UnixStream { inner: UnsafeArc::new(inner) }
         })
     }
@@ -176,7 +170,7 @@ fn fd(&self) -> fd_t { self.inner.fd }
     pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
         match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
             -1 => Err(super::last_error()),
-            _ => Ok(UnixAcceptor { listener: self })
+            _ => Ok(UnixAcceptor { listener: self, deadline: 0 })
         }
     }
 }
@@ -189,12 +183,16 @@ fn listen(~self) -> IoResult<~rtio::RtioUnixAcceptor:Send> {
 
 pub struct UnixAcceptor {
     listener: UnixListener,
+    deadline: u64,
 }
 
 impl UnixAcceptor {
     fn fd(&self) -> fd_t { self.listener.fd() }
 
     pub fn native_accept(&mut self) -> IoResult<UnixStream> {
+        if self.deadline != 0 {
+            try!(util::accept_deadline(self.fd(), self.deadline));
+        }
         let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() };
         let storagep = &mut storage as *mut libc::sockaddr_storage;
         let size = mem::size_of::<libc::sockaddr_storage>();
@@ -214,6 +212,9 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
     fn accept(&mut self) -> IoResult<~rtio::RtioPipe:Send> {
         self.native_accept().map(|s| ~s as ~rtio::RtioPipe:Send)
     }
+    fn set_timeout(&mut self, timeout: Option<u64>) {
+        self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
+    }
 }
 
 impl Drop for UnixListener {
index 84b3d887c04989ebfa618688d035ff82aa97d832..a4f09ded0ac110095bef3ef0fadb1bfa6b9138b7 100644 (file)
@@ -93,6 +93,8 @@
 use std::intrinsics;
 
 use super::IoResult;
+use super::c;
+use super::util;
 
 struct Event(libc::HANDLE);
 
@@ -210,8 +212,9 @@ fn try_connect(p: *u16) -> Option<libc::HANDLE> {
         None
     }
 
-    pub fn connect(addr: &CString) -> IoResult<UnixStream> {
+    pub fn connect(addr: &CString, timeout: Option<u64>) -> IoResult<UnixStream> {
         as_utf16_p(addr.as_str().unwrap(), |p| {
+            let start = ::io::timer::now();
             loop {
                 match UnixStream::try_connect(p) {
                     Some(handle) => {
@@ -246,11 +249,26 @@ pub fn connect(addr: &CString) -> IoResult<UnixStream> {
                     return Err(super::last_error())
                 }
 
-                // An example I found on microsoft's website used 20 seconds,
-                // libuv uses 30 seconds, hence we make the obvious choice of
-                // waiting for 25 seconds.
-                if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 {
-                    return Err(super::last_error())
+                match timeout {
+                    Some(timeout) => {
+                        let now = ::io::timer::now();
+                        let timed_out = (now - start) >= timeout || unsafe {
+                            let ms = (timeout - (now - start)) as libc::DWORD;
+                            libc::WaitNamedPipeW(p, ms) == 0
+                        };
+                        if timed_out {
+                            return Err(util::timeout("connect timed out"))
+                        }
+                    }
+
+                    // An example I found on microsoft's website used 20
+                    // seconds, libuv uses 30 seconds, hence we make the
+                    // obvious choice of waiting for 25 seconds.
+                    None => {
+                        if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 {
+                            return Err(super::last_error())
+                        }
+                    }
                 }
             }
         })
@@ -372,6 +390,7 @@ pub fn native_listen(self) -> IoResult<UnixAcceptor> {
         Ok(UnixAcceptor {
             listener: self,
             event: try!(Event::new(true, false)),
+            deadline: 0,
         })
     }
 }
@@ -391,6 +410,7 @@ fn listen(~self) -> IoResult<~rtio::RtioUnixAcceptor:Send> {
 pub struct UnixAcceptor {
     listener: UnixListener,
     event: Event,
+    deadline: u64,
 }
 
 impl UnixAcceptor {
@@ -438,7 +458,28 @@ pub fn native_accept(&mut self) -> IoResult<UnixStream> {
         overlapped.hEvent = self.event.handle();
         if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } {
             let mut err = unsafe { libc::GetLastError() };
+
             if err == libc::ERROR_IO_PENDING as libc::DWORD {
+                // If we've got a timeout, use WaitForSingleObject in tandem
+                // with CancelIo to figure out if we should indeed get the
+                // result.
+                if self.deadline != 0 {
+                    let now = ::io::timer::now();
+                    let timeout = self.deadline < now || unsafe {
+                        let ms = (self.deadline - now) as libc::DWORD;
+                        let r = libc::WaitForSingleObject(overlapped.hEvent,
+                                                          ms);
+                        r != libc::WAIT_OBJECT_0
+                    };
+                    if timeout {
+                        unsafe { let _ = c::CancelIo(handle); }
+                        return Err(util::timeout("accept timed out"))
+                    }
+                }
+
+                // This will block until the overlapped I/O is completed. The
+                // timeout was previously handled, so this will either block in
+                // the normal case or succeed very quickly in the timeout case.
                 let ret = unsafe {
                     let mut transfer = 0;
                     libc::GetOverlappedResult(handle,
@@ -488,5 +529,8 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
     fn accept(&mut self) -> IoResult<~rtio::RtioPipe:Send> {
         self.native_accept().map(|s| ~s as ~rtio::RtioPipe:Send)
     }
+    fn set_timeout(&mut self, timeout: Option<u64>) {
+        self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0);
+    }
 }
 
diff --git a/src/libnative/io/util.rs b/src/libnative/io/util.rs
new file mode 100644 (file)
index 0000000..0aaac8f
--- /dev/null
@@ -0,0 +1,136 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use libc;
+use std::io::IoResult;
+use std::io;
+use std::mem;
+use std::ptr;
+
+use super::c;
+use super::net;
+use super::{retry, last_error};
+
+pub fn timeout(desc: &'static str) -> io::IoError {
+    io::IoError {
+        kind: io::TimedOut,
+        desc: desc,
+        detail: None,
+    }
+}
+
+pub fn ms_to_timeval(ms: u64) -> libc::timeval {
+    libc::timeval {
+        tv_sec: (ms / 1000) as libc::time_t,
+        tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t,
+    }
+}
+
+// See http://developerweb.net/viewtopic.php?id=3196 for where this is
+// derived from.
+pub fn connect_timeout(fd: net::sock_t,
+                       addrp: *libc::sockaddr,
+                       len: libc::socklen_t,
+                       timeout_ms: u64) -> IoResult<()> {
+    use std::os;
+    #[cfg(unix)]    use INPROGRESS = libc::EINPROGRESS;
+    #[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
+    #[cfg(unix)]    use WOULDBLOCK = libc::EWOULDBLOCK;
+    #[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK;
+
+    // Make sure the call to connect() doesn't block
+    try!(set_nonblocking(fd, true));
+
+    let ret = match unsafe { libc::connect(fd, addrp, len) } {
+        // If the connection is in progress, then we need to wait for it to
+        // finish (with a timeout). The current strategy for doing this is
+        // to use select() with a timeout.
+        -1 if os::errno() as int == INPROGRESS as int ||
+              os::errno() as int == WOULDBLOCK as int => {
+            let mut set: c::fd_set = unsafe { mem::init() };
+            c::fd_set(&mut set, fd);
+            match await(fd, &mut set, timeout_ms) {
+                0 => Err(timeout("connection timed out")),
+                -1 => Err(last_error()),
+                _ => {
+                    let err: libc::c_int = try!(
+                        net::getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR));
+                    if err == 0 {
+                        Ok(())
+                    } else {
+                        Err(io::IoError::from_errno(err as uint, true))
+                    }
+                }
+            }
+        }
+
+        -1 => Err(last_error()),
+        _ => Ok(()),
+    };
+
+    // be sure to turn blocking I/O back on
+    try!(set_nonblocking(fd, false));
+    return ret;
+
+    #[cfg(unix)]
+    fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> {
+        let set = nb as libc::c_int;
+        super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
+    }
+
+    #[cfg(windows)]
+    fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> {
+        let mut set = nb as libc::c_ulong;
+        if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } {
+            Err(last_error())
+        } else {
+            Ok(())
+        }
+    }
+
+    #[cfg(unix)]
+    fn await(fd: net::sock_t, set: &mut c::fd_set,
+             timeout: u64) -> libc::c_int {
+        let start = ::io::timer::now();
+        retry(|| unsafe {
+            // Recalculate the timeout each iteration (it is generally
+            // undefined what the value of the 'tv' is after select
+            // returns EINTR).
+            let tv = ms_to_timeval(timeout - (::io::timer::now() - start));
+            c::select(fd + 1, ptr::null(), set as *mut _ as *_,
+                      ptr::null(), &tv)
+        })
+    }
+    #[cfg(windows)]
+    fn await(_fd: net::sock_t, set: &mut c::fd_set,
+             timeout: u64) -> libc::c_int {
+        let tv = ms_to_timeval(timeout);
+        unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) }
+    }
+}
+
+pub fn accept_deadline(fd: net::sock_t, deadline: u64) -> IoResult<()> {
+    let mut set: c::fd_set = unsafe { mem::init() };
+    c::fd_set(&mut set, fd);
+
+    match retry(|| {
+        // If we're past the deadline, then pass a 0 timeout to select() so
+        // we can poll the status of the socket.
+        let now = ::io::timer::now();
+        let ms = if deadline < now {0} else {deadline - now};
+        let tv = ms_to_timeval(ms);
+        let n = if cfg!(windows) {1} else {fd as libc::c_int + 1};
+        unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) }
+    }) {
+        -1 => Err(last_error()),
+        0 => Err(timeout("accept timed out")),
+        _ => return Ok(()),
+    }
+}
index 27a0691193980b308d23ac75ec31b125318770cc..470a343b84ed662678ef5e5f7bc079f6ea19a0ee 100644 (file)
@@ -9,7 +9,7 @@
 // except according to those terms.
 
 use std::cast;
-use std::io::IoError;
+use std::io::{IoError, IoResult};
 use std::io::net::ip;
 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
 use libc;
@@ -145,96 +145,43 @@ fn socket_name(sk: SocketNameKind,
         n => Err(uv_error_to_io_error(UvError(n)))
     }
 }
-
 ////////////////////////////////////////////////////////////////////////////////
-/// TCP implementation
+// Helpers for handling timeouts, shared for pipes/tcp
 ////////////////////////////////////////////////////////////////////////////////
 
-pub struct TcpWatcher {
-    handle: *uvll::uv_tcp_t,
-    stream: StreamWatcher,
-    home: HomeHandle,
-    refcount: Refcount,
-
-    // libuv can't support concurrent reads and concurrent writes of the same
-    // stream object, so we use these access guards in order to arbitrate among
-    // multiple concurrent reads and writes. Note that libuv *can* read and
-    // write simultaneously, it just can't read and read simultaneously.
-    read_access: Access,
-    write_access: Access,
-}
-
-pub struct TcpListener {
-    home: HomeHandle,
-    handle: *uvll::uv_pipe_t,
-    closing_task: Option<BlockedTask>,
-    outgoing: Sender<Result<~rtio::RtioTcpStream:Send, IoError>>,
-    incoming: Receiver<Result<~rtio::RtioTcpStream:Send, IoError>>,
+pub struct ConnectCtx {
+    pub status: c_int,
+    pub task: Option<BlockedTask>,
+    pub timer: Option<~TimerWatcher>,
 }
 
-pub struct TcpAcceptor {
-    listener: ~TcpListener,
+pub struct AcceptTimeout {
     timer: Option<TimerWatcher>,
     timeout_tx: Option<Sender<()>>,
     timeout_rx: Option<Receiver<()>>,
 }
 
-// TCP watchers (clients/streams)
-
-impl TcpWatcher {
-    pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
-        let handle = io.make_handle();
-        TcpWatcher::new_home(&io.loop_, handle)
-    }
-
-    fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
-        let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
-        assert_eq!(unsafe {
-            uvll::uv_tcp_init(loop_.handle, handle)
-        }, 0);
-        TcpWatcher {
-            home: home,
-            handle: handle,
-            stream: StreamWatcher::new(handle),
-            refcount: Refcount::new(),
-            read_access: Access::new(),
-            write_access: Access::new(),
-        }
-    }
-
-    pub fn connect(io: &mut UvIoFactory,
-                   address: ip::SocketAddr,
-                   timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
-        struct Ctx {
-            status: c_int,
-            task: Option<BlockedTask>,
-            timer: Option<~TimerWatcher>,
-        }
-
-        let tcp = TcpWatcher::new(io);
-        let (addr, _len) = addr_to_sockaddr(address);
+impl ConnectCtx {
+    pub fn connect<T>(
+        mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
+        f: |&Request, &T, uvll::uv_connect_cb| -> libc::c_int
+    ) -> Result<T, UvError> {
         let mut req = Request::new(uvll::UV_CONNECT);
-        let result = unsafe {
-            let addr_p = &addr as *libc::sockaddr_storage;
-            uvll::uv_tcp_connect(req.handle, tcp.handle,
-                                 addr_p as *libc::sockaddr,
-                                 connect_cb)
-        };
-        return match result {
+        let r = f(&req, &obj, connect_cb);
+        return match r {
             0 => {
                 req.defuse(); // uv callback now owns this request
-                let mut cx = Ctx { status: -1, task: None, timer: None };
                 match timeout {
                     Some(t) => {
                         let mut timer = TimerWatcher::new(io);
                         timer.start(timer_cb, t, 0);
-                        cx.timer = Some(timer);
+                        self.timer = Some(timer);
                     }
                     None => {}
                 }
-                wait_until_woken_after(&mut cx.task, &io.loop_, || {
-                    let data = &cx as *_;
-                    match cx.timer {
+                wait_until_woken_after(&mut self.task, &io.loop_, || {
+                    let data = &self as *_;
+                    match self.timer {
                         Some(ref mut timer) => unsafe { timer.set_data(data) },
                         None => {}
                     }
@@ -247,9 +194,9 @@ struct Ctx {
                 // If we failed because of a timeout, drop the TcpWatcher as
                 // soon as possible because it's data is now set to null and we
                 // want to cancel the callback ASAP.
-                match cx.status {
-                    0 => Ok(tcp),
-                    n => { drop(tcp); Err(UvError(n)) }
+                match self.status {
+                    0 => Ok(obj),
+                    n => { drop(obj); Err(UvError(n)) }
                 }
             }
             n => Err(UvError(n))
@@ -258,8 +205,8 @@ struct Ctx {
         extern fn timer_cb(handle: *uvll::uv_timer_t) {
             // Don't close the corresponding tcp request, just wake up the task
             // and let RAII take care of the pending watcher.
-            let cx: &mut Ctx = unsafe {
-                &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx)
+            let cx: &mut ConnectCtx = unsafe {
+                &mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx)
             };
             cx.status = uvll::ECANCELED;
             wakeup(&mut cx.task);
@@ -279,7 +226,7 @@ struct Ctx {
             let data = unsafe { uvll::get_data_for_req(req.handle) };
             if data.is_null() { return }
 
-            let cx: &mut Ctx = unsafe { &mut *(data as *mut Ctx) };
+            let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) };
             cx.status = status;
             match cx.timer {
                 Some(ref mut t) => t.stop(),
@@ -299,6 +246,157 @@ struct Ctx {
     }
 }
 
+impl AcceptTimeout {
+    pub fn new() -> AcceptTimeout {
+        AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
+    }
+
+    pub fn accept<T: Send>(&mut self, c: &Receiver<IoResult<T>>) -> IoResult<T> {
+        match self.timeout_rx {
+            None => c.recv(),
+            Some(ref rx) => {
+                use std::comm::Select;
+
+                // Poll the incoming channel first (don't rely on the order of
+                // select just yet). If someone's pending then we should return
+                // them immediately.
+                match c.try_recv() {
+                    Ok(data) => return data,
+                    Err(..) => {}
+                }
+
+                // Use select to figure out which channel gets ready first. We
+                // do some custom handling of select to ensure that we never
+                // actually drain the timeout channel (we'll keep seeing the
+                // timeout message in the future).
+                let s = Select::new();
+                let mut timeout = s.handle(rx);
+                let mut data = s.handle(c);
+                unsafe {
+                    timeout.add();
+                    data.add();
+                }
+                if s.wait() == timeout.id() {
+                    Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
+                } else {
+                    c.recv()
+                }
+            }
+        }
+    }
+
+    pub fn clear(&mut self) {
+        // Clear any previous timeout by dropping the timer and transmission
+        // channels
+        drop((self.timer.take(),
+              self.timeout_tx.take(),
+              self.timeout_rx.take()))
+    }
+
+    pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
+        &mut self, ms: u64, t: &mut T
+    ) {
+        // If we have a timeout, lazily initialize the timer which will be used
+        // to fire when the timeout runs out.
+        if self.timer.is_none() {
+            let _m = t.fire_homing_missile();
+            let loop_ = Loop::wrap(unsafe {
+                uvll::get_loop_for_uv_handle(t.uv_handle())
+            });
+            let mut timer = TimerWatcher::new_home(&loop_, t.home().clone());
+            unsafe {
+                timer.set_data(self as *mut _ as *AcceptTimeout);
+            }
+            self.timer = Some(timer);
+        }
+
+        // Once we've got a timer, stop any previous timeout, reset it for the
+        // current one, and install some new channels to send/receive data on
+        let timer = self.timer.get_mut_ref();
+        timer.stop();
+        timer.start(timer_cb, ms, 0);
+        let (tx, rx) = channel();
+        self.timeout_tx = Some(tx);
+        self.timeout_rx = Some(rx);
+
+        extern fn timer_cb(timer: *uvll::uv_timer_t) {
+            let acceptor: &mut AcceptTimeout = unsafe {
+                &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
+            };
+            // This send can never fail because if this timer is active then the
+            // receiving channel is guaranteed to be alive
+            acceptor.timeout_tx.get_ref().send(());
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+/// TCP implementation
+////////////////////////////////////////////////////////////////////////////////
+
+pub struct TcpWatcher {
+    handle: *uvll::uv_tcp_t,
+    stream: StreamWatcher,
+    home: HomeHandle,
+    refcount: Refcount,
+
+    // libuv can't support concurrent reads and concurrent writes of the same
+    // stream object, so we use these access guards in order to arbitrate among
+    // multiple concurrent reads and writes. Note that libuv *can* read and
+    // write simultaneously, it just can't read and read simultaneously.
+    read_access: Access,
+    write_access: Access,
+}
+
+pub struct TcpListener {
+    home: HomeHandle,
+    handle: *uvll::uv_pipe_t,
+    closing_task: Option<BlockedTask>,
+    outgoing: Sender<Result<~rtio::RtioTcpStream:Send, IoError>>,
+    incoming: Receiver<Result<~rtio::RtioTcpStream:Send, IoError>>,
+}
+
+pub struct TcpAcceptor {
+    listener: ~TcpListener,
+    timeout: AcceptTimeout,
+}
+
+// TCP watchers (clients/streams)
+
+impl TcpWatcher {
+    pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
+        let handle = io.make_handle();
+        TcpWatcher::new_home(&io.loop_, handle)
+    }
+
+    fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
+        let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
+        assert_eq!(unsafe {
+            uvll::uv_tcp_init(loop_.handle, handle)
+        }, 0);
+        TcpWatcher {
+            home: home,
+            handle: handle,
+            stream: StreamWatcher::new(handle),
+            refcount: Refcount::new(),
+            read_access: Access::new(),
+            write_access: Access::new(),
+        }
+    }
+
+    pub fn connect(io: &mut UvIoFactory,
+                   address: ip::SocketAddr,
+                   timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
+        let tcp = TcpWatcher::new(io);
+        let cx = ConnectCtx { status: -1, task: None, timer: None };
+        let (addr, _len) = addr_to_sockaddr(address);
+        let addr_p = &addr as *_ as *libc::sockaddr;
+        cx.connect(tcp, timeout, io, |req, tcp, cb| {
+            unsafe { uvll::uv_tcp_connect(req.handle, tcp.handle, addr_p, cb) }
+        })
+    }
+}
+
 impl HomingIO for TcpWatcher {
     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
 }
@@ -463,9 +561,7 @@ fn listen(~self) -> Result<~rtio::RtioTcpAcceptor:Send, IoError> {
         // create the acceptor object from ourselves
         let mut acceptor = ~TcpAcceptor {
             listener: self,
-            timer: None,
-            timeout_tx: None,
-            timeout_rx: None,
+            timeout: AcceptTimeout::new(),
         };
 
         let _m = acceptor.fire_homing_missile();
@@ -516,37 +612,7 @@ fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
 
 impl rtio::RtioTcpAcceptor for TcpAcceptor {
     fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> {
-        match self.timeout_rx {
-            None => self.listener.incoming.recv(),
-            Some(ref rx) => {
-                use std::comm::Select;
-
-                // Poll the incoming channel first (don't rely on the order of
-                // select just yet). If someone's pending then we should return
-                // them immediately.
-                match self.listener.incoming.try_recv() {
-                    Ok(data) => return data,
-                    Err(..) => {}
-                }
-
-                // Use select to figure out which channel gets ready first. We
-                // do some custom handling of select to ensure that we never
-                // actually drain the timeout channel (we'll keep seeing the
-                // timeout message in the future).
-                let s = Select::new();
-                let mut timeout = s.handle(rx);
-                let mut data = s.handle(&self.listener.incoming);
-                unsafe {
-                    timeout.add();
-                    data.add();
-                }
-                if s.wait() == timeout.id() {
-                    Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
-                } else {
-                    self.listener.incoming.recv()
-                }
-            }
-        }
+        self.timeout.accept(&self.listener.incoming)
     }
 
     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
@@ -564,47 +630,9 @@ fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
     }
 
     fn set_timeout(&mut self, ms: Option<u64>) {
-        // First, if the timeout is none, clear any previous timeout by dropping
-        // the timer and transmission channels
-        let ms = match ms {
-            None => {
-                return drop((self.timer.take(),
-                             self.timeout_tx.take(),
-                             self.timeout_rx.take()))
-            }
-            Some(ms) => ms,
-        };
-
-        // If we have a timeout, lazily initialize the timer which will be used
-        // to fire when the timeout runs out.
-        if self.timer.is_none() {
-            let _m = self.fire_homing_missile();
-            let loop_ = Loop::wrap(unsafe {
-                uvll::get_loop_for_uv_handle(self.listener.handle)
-            });
-            let mut timer = TimerWatcher::new_home(&loop_, self.home().clone());
-            unsafe {
-                timer.set_data(self as *mut _ as *TcpAcceptor);
-            }
-            self.timer = Some(timer);
-        }
-
-        // Once we've got a timer, stop any previous timeout, reset it for the
-        // current one, and install some new channels to send/receive data on
-        let timer = self.timer.get_mut_ref();
-        timer.stop();
-        timer.start(timer_cb, ms, 0);
-        let (tx, rx) = channel();
-        self.timeout_tx = Some(tx);
-        self.timeout_rx = Some(rx);
-
-        extern fn timer_cb(timer: *uvll::uv_timer_t) {
-            let acceptor: &mut TcpAcceptor = unsafe {
-                &mut *(uvll::get_data_for_uv_handle(timer) as *mut TcpAcceptor)
-            };
-            // This send can never fail because if this timer is active then the
-            // receiving channel is guaranteed to be alive
-            acceptor.timeout_tx.get_ref().send(());
+        match ms {
+            None => self.timeout.clear(),
+            Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
         }
     }
 }
index 6ee684ff9bdc082bf142d167b2534d761bfa72a0..7277be1616b7182653891d5e3bdbcae80beb71af 100644 (file)
 use std::io::IoError;
 use libc;
 use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
-use std::rt::task::BlockedTask;
 
 use access::Access;
 use homing::{HomingIO, HomeHandle};
+use net;
 use rc::Refcount;
 use stream::StreamWatcher;
-use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error,
-            wait_until_woken_after, wakeup};
+use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
 use uvio::UvIoFactory;
 use uvll;
 
@@ -43,6 +42,7 @@ pub struct PipeListener {
 
 pub struct PipeAcceptor {
     listener: ~PipeListener,
+    timeout: net::AcceptTimeout,
 }
 
 // PipeWatcher implementation and traits
@@ -84,36 +84,18 @@ pub fn open(io: &mut UvIoFactory, file: libc::c_int)
         }
     }
 
-    pub fn connect(io: &mut UvIoFactory, name: &CString)
+    pub fn connect(io: &mut UvIoFactory, name: &CString, timeout: Option<u64>)
         -> Result<PipeWatcher, UvError>
     {
-        struct Ctx { task: Option<BlockedTask>, result: libc::c_int, }
-        let mut cx = Ctx { task: None, result: 0 };
-        let mut req = Request::new(uvll::UV_CONNECT);
         let pipe = PipeWatcher::new(io, false);
-
-        wait_until_woken_after(&mut cx.task, &io.loop_, || {
+        let cx = net::ConnectCtx { status: -1, task: None, timer: None };
+        cx.connect(pipe, timeout, io, |req, pipe, cb| {
             unsafe {
-                uvll::uv_pipe_connect(req.handle,
-                                      pipe.handle(),
-                                      name.with_ref(|p| p),
-                                      connect_cb)
+                uvll::uv_pipe_connect(req.handle, pipe.handle(),
+                                      name.with_ref(|p| p), cb)
             }
-            req.set_data(&cx);
-            req.defuse(); // uv callback now owns this request
-        });
-        return match cx.result {
-            0 => Ok(pipe),
-            n => Err(UvError(n))
-        };
-
-        extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {;
-            let req = Request::wrap(req);
-            assert!(status != uvll::ECANCELED);
-            let cx: &mut Ctx = unsafe { req.get_data() };
-            cx.result = status;
-            wakeup(&mut cx.task);
-        }
+            0
+        })
     }
 
     pub fn handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }
@@ -199,7 +181,10 @@ pub fn bind(io: &mut UvIoFactory, name: &CString)
 impl RtioUnixListener for PipeListener {
     fn listen(~self) -> Result<~RtioUnixAcceptor:Send, IoError> {
         // create the acceptor object from ourselves
-        let mut acceptor = ~PipeAcceptor { listener: self };
+        let mut acceptor = ~PipeAcceptor {
+            listener: self,
+            timeout: net::AcceptTimeout::new(),
+        };
 
         let _m = acceptor.fire_homing_missile();
         // FIXME: the 128 backlog should be configurable
@@ -247,7 +232,14 @@ fn drop(&mut self) {
 
 impl RtioUnixAcceptor for PipeAcceptor {
     fn accept(&mut self) -> Result<~RtioPipe:Send, IoError> {
-        self.listener.incoming.recv()
+        self.timeout.accept(&self.listener.incoming)
+    }
+
+    fn set_timeout(&mut self, timeout_ms: Option<u64>) {
+        match timeout_ms {
+            None => self.timeout.clear(),
+            Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
+        }
     }
 }
 
@@ -265,7 +257,8 @@ mod tests {
 
     #[test]
     fn connect_err() {
-        match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str()) {
+        match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str(),
+                                   None) {
             Ok(..) => fail!(),
             Err(..) => {}
         }
@@ -312,7 +305,7 @@ fn connect() {
             assert!(client.write([2]).is_ok());
         });
         rx.recv();
-        let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str()).unwrap();
+        let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
         assert!(c.write([1]).is_ok());
         let mut buf = [0];
         assert!(c.read(buf).unwrap() == 1);
@@ -332,7 +325,7 @@ fn connect_fail() {
             drop(p.accept().unwrap());
         });
         rx.recv();
-        let _c = PipeWatcher::connect(local_loop(), &path.to_c_str()).unwrap();
+        let _c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
         fail!()
 
     }
index 3127a01d70e4698644fb7f9cc6e19623048f61a6..81d7ac6601e2337322e24ed617249f38b0479505 100644 (file)
@@ -291,8 +291,9 @@ fn unix_bind(&mut self, path: &CString) -> Result<~rtio::RtioUnixListener:Send,
         }
     }
 
-    fn unix_connect(&mut self, path: &CString) -> Result<~rtio::RtioPipe:Send, IoError> {
-        match PipeWatcher::connect(self, path) {
+    fn unix_connect(&mut self, path: &CString,
+                    timeout: Option<u64>) -> Result<~rtio::RtioPipe:Send, IoError> {
+        match PipeWatcher::connect(self, path, timeout) {
             Ok(p) => Ok(~p as ~rtio::RtioPipe:Send),
             Err(e) => Err(uv_error_to_io_error(e)),
         }
index bf56817702021e8f95c4bcd589a2b580c329de59..b75b797e9744f28a38d682ee5f9e4c69ee150993 100644 (file)
@@ -61,7 +61,31 @@ fn new(obj: ~RtioPipe:Send) -> UnixStream {
     /// ```
     pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> {
         LocalIo::maybe_raise(|io| {
-            io.unix_connect(&path.to_c_str()).map(UnixStream::new)
+            io.unix_connect(&path.to_c_str(), None).map(UnixStream::new)
+        })
+    }
+
+    /// Connect to a pipe named by `path`. This will attempt to open a
+    /// connection to the underlying socket.
+    ///
+    /// The returned stream will be closed when the object falls out of scope.
+    ///
+    /// # Example
+    ///
+    /// ```rust
+    /// # #![allow(unused_must_use)]
+    /// use std::io::net::unix::UnixStream;
+    ///
+    /// let server = Path::new("path/to/my/socket");
+    /// let mut stream = UnixStream::connect(&server);
+    /// stream.write([1, 2, 3]);
+    /// ```
+    #[experimental = "the timeout argument is likely to change types"]
+    pub fn connect_timeout<P: ToCStr>(path: &P,
+                                      timeout_ms: u64) -> IoResult<UnixStream> {
+        LocalIo::maybe_raise(|io| {
+            let s = io.unix_connect(&path.to_c_str(), Some(timeout_ms));
+            s.map(UnixStream::new)
         })
     }
 }
@@ -128,6 +152,25 @@ pub struct UnixAcceptor {
     obj: ~RtioUnixAcceptor:Send,
 }
 
+impl UnixAcceptor {
+    /// Sets a timeout for this acceptor, after which accept() will no longer
+    /// block indefinitely.
+    ///
+    /// The argument specified is the amount of time, in milliseconds, into the
+    /// future after which all invocations of accept() will not block (and any
+    /// pending invocation will return). A value of `None` will clear any
+    /// existing timeout.
+    ///
+    /// When using this method, it is likely necessary to reset the timeout as
+    /// appropriate, the timeout specified is specific to this object, not
+    /// specific to the next request.
+    #[experimental = "the name and arguments to this function are likely \
+                      to change"]
+    pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
+        self.obj.set_timeout(timeout_ms)
+    }
+}
+
 impl Acceptor<UnixStream> for UnixAcceptor {
     fn accept(&mut self) -> IoResult<UnixStream> {
         self.obj.accept().map(UnixStream::new)
@@ -135,6 +178,7 @@ fn accept(&mut self) -> IoResult<UnixStream> {
 }
 
 #[cfg(test)]
+#[allow(experimental)]
 mod tests {
     use prelude::*;
     use super::*;
@@ -371,4 +415,49 @@ pub fn smalltest(server: proc(UnixStream):Send, client: proc(UnixStream):Send) {
         drop(l.listen().unwrap());
         assert!(!path.exists());
     } #[cfg(not(windows))])
+
+    iotest!(fn accept_timeout() {
+        let addr = next_test_unix();
+        let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap();
+
+        a.set_timeout(Some(10));
+
+        // Make sure we time out once and future invocations also time out
+        let err = a.accept().err().unwrap();
+        assert_eq!(err.kind, TimedOut);
+        let err = a.accept().err().unwrap();
+        assert_eq!(err.kind, TimedOut);
+
+        // Also make sure that even though the timeout is expired that we will
+        // continue to receive any pending connections.
+        let l = UnixStream::connect(&addr).unwrap();
+        for i in range(0, 1001) {
+            match a.accept() {
+                Ok(..) => break,
+                Err(ref e) if e.kind == TimedOut => {}
+                Err(e) => fail!("error: {}", e),
+            }
+            if i == 1000 { fail!("should have a pending connection") }
+        }
+        drop(l);
+
+        // Unset the timeout and make sure that this always blocks.
+        a.set_timeout(None);
+        let addr2 = addr.clone();
+        spawn(proc() {
+            drop(UnixStream::connect(&addr2));
+        });
+        a.accept().unwrap();
+    })
+
+    iotest!(fn connect_timeout_error() {
+        let addr = next_test_unix();
+        assert!(UnixStream::connect_timeout(&addr, 100).is_err());
+    })
+
+    iotest!(fn connect_timeout_success() {
+        let addr = next_test_unix();
+        let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
+        assert!(UnixStream::connect_timeout(&addr, 100).is_ok());
+    })
 }
index 5dd148346695d8606770fb6dd762c6443f570284..f3c7fdaf7105b2115cc8a1f236363c9eff68d915 100644 (file)
@@ -152,7 +152,8 @@ fn tcp_connect(&mut self, addr: SocketAddr,
     fn udp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioUdpSocket:Send>;
     fn unix_bind(&mut self, path: &CString)
         -> IoResult<~RtioUnixListener:Send>;
-    fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe:Send>;
+    fn unix_connect(&mut self, path: &CString,
+                    timeout: Option<u64>) -> IoResult<~RtioPipe:Send>;
     fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
                           hint: Option<ai::Hint>) -> IoResult<~[ai::Info]>;
 
@@ -274,6 +275,7 @@ pub trait RtioUnixListener {
 
 pub trait RtioUnixAcceptor {
     fn accept(&mut self) -> IoResult<~RtioPipe:Send>;
+    fn set_timeout(&mut self, timeout: Option<u64>);
 }
 
 pub trait RtioTTY {