#[cfg(windows)] pub use consts::os::extra::{PIPE_UNLIMITED_INSTANCES, ERROR_ACCESS_DENIED};
#[cfg(windows)] pub use consts::os::extra::{FILE_WRITE_ATTRIBUTES, FILE_READ_ATTRIBUTES};
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_BUSY, ERROR_IO_PENDING};
-#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED};
+#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED, WAIT_OBJECT_0};
#[cfg(windows)] pub use types::os::common::bsd44::{SOCKET};
#[cfg(windows)] pub use types::os::common::posix01::{stat, utimbuf};
#[cfg(windows)] pub use types::os::arch::extra::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES};
optname: libc::c_int,
optval: *mut libc::c_char,
optlen: *mut libc::c_int) -> libc::c_int;
+
+ pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL;
}
pub mod addrinfo;
pub mod net;
pub mod process;
+mod util;
#[cfg(unix)]
#[path = "file_unix.rs"]
fn unix_bind(&mut self, path: &CString) -> IoResult<~RtioUnixListener:Send> {
pipe::UnixListener::bind(path).map(|s| ~s as ~RtioUnixListener:Send)
}
- fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe:Send> {
- pipe::UnixStream::connect(path).map(|s| ~s as ~RtioPipe:Send)
+ fn unix_connect(&mut self, path: &CString,
+ timeout: Option<u64>) -> IoResult<~RtioPipe:Send> {
+ pipe::UnixStream::connect(path, timeout).map(|s| ~s as ~RtioPipe:Send)
}
fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
hint: Option<ai::Hint>) -> IoResult<~[ai::Info]> {
use std::io::net::ip;
use std::io;
use std::mem;
-use std::os;
-use std::ptr;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;
use super::{IoResult, retry, keep_going};
use super::c;
+use super::util;
////////////////////////////////////////////////////////////////////////////////
// sockaddr and misc bindings
}
}
-fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
- val: libc::c_int) -> IoResult<T> {
+pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
+ val: libc::c_int) -> IoResult<T> {
unsafe {
let mut slot: T = mem::init();
let mut len = mem::size_of::<T>() as libc::socklen_t;
super::last_error()
}
-fn ms_to_timeval(ms: u64) -> libc::timeval {
- libc::timeval {
- tv_sec: (ms / 1000) as libc::time_t,
- tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t,
- }
-}
-
-fn timeout(desc: &'static str) -> io::IoError {
- io::IoError {
- kind: io::TimedOut,
- desc: desc,
- detail: None,
- }
-}
-
#[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
#[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
let addrp = &addr as *_ as *libc::sockaddr;
match timeout {
Some(timeout) => {
- try!(TcpStream::connect_timeout(fd, addrp, len, timeout));
+ try!(util::connect_timeout(fd, addrp, len, timeout));
Ok(ret)
},
None => {
}
}
- // See http://developerweb.net/viewtopic.php?id=3196 for where this is
- // derived from.
- fn connect_timeout(fd: sock_t,
- addrp: *libc::sockaddr,
- len: libc::socklen_t,
- timeout_ms: u64) -> IoResult<()> {
- #[cfg(unix)] use INPROGRESS = libc::EINPROGRESS;
- #[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
- #[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK;
- #[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK;
-
- // Make sure the call to connect() doesn't block
- try!(set_nonblocking(fd, true));
-
- let ret = match unsafe { libc::connect(fd, addrp, len) } {
- // If the connection is in progress, then we need to wait for it to
- // finish (with a timeout). The current strategy for doing this is
- // to use select() with a timeout.
- -1 if os::errno() as int == INPROGRESS as int ||
- os::errno() as int == WOULDBLOCK as int => {
- let mut set: c::fd_set = unsafe { mem::init() };
- c::fd_set(&mut set, fd);
- match await(fd, &mut set, timeout_ms) {
- 0 => Err(timeout("connection timed out")),
- -1 => Err(last_error()),
- _ => {
- let err: libc::c_int = try!(
- getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR));
- if err == 0 {
- Ok(())
- } else {
- Err(io::IoError::from_errno(err as uint, true))
- }
- }
- }
- }
-
- -1 => Err(last_error()),
- _ => Ok(()),
- };
-
- // be sure to turn blocking I/O back on
- try!(set_nonblocking(fd, false));
- return ret;
-
- #[cfg(unix)]
- fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
- let set = nb as libc::c_int;
- super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
- }
- #[cfg(windows)]
- fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
- let mut set = nb as libc::c_ulong;
- if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } {
- Err(last_error())
- } else {
- Ok(())
- }
- }
-
- #[cfg(unix)]
- fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
- let start = ::io::timer::now();
- retry(|| unsafe {
- // Recalculate the timeout each iteration (it is generally
- // undefined what the value of the 'tv' is after select
- // returns EINTR).
- let tv = ms_to_timeval(timeout - (::io::timer::now() - start));
- c::select(fd + 1, ptr::null(), &*set, ptr::null(), &tv)
- })
- }
- #[cfg(windows)]
- fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
- let tv = ms_to_timeval(timeout);
- unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) }
- }
- }
-
pub fn fd(&self) -> sock_t {
// This unsafety is fine because it's just a read-only arc
unsafe { (*self.inner.get()).fd }
pub fn native_accept(&mut self) -> IoResult<TcpStream> {
if self.deadline != 0 {
- try!(self.accept_deadline());
+ try!(util::accept_deadline(self.fd(), self.deadline));
}
unsafe {
let mut storage: libc::sockaddr_storage = mem::init();
}
}
}
-
- fn accept_deadline(&mut self) -> IoResult<()> {
- let mut set: c::fd_set = unsafe { mem::init() };
- c::fd_set(&mut set, self.fd());
-
- match retry(|| {
- // If we're past the deadline, then pass a 0 timeout to select() so
- // we can poll the status of the socket.
- let now = ::io::timer::now();
- let ms = if self.deadline > now {0} else {self.deadline - now};
- let tv = ms_to_timeval(ms);
- let n = if cfg!(windows) {1} else {self.fd() as libc::c_int + 1};
- unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) }
- }) {
- -1 => Err(last_error()),
- 0 => Err(timeout("accept timed out")),
- _ => return Ok(()),
- }
- }
}
impl rtio::RtioSocket for TcpAcceptor {
fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
fn set_timeout(&mut self, timeout: Option<u64>) {
- self.deadline = match timeout {
- None => 0,
- Some(t) => ::io::timer::now() + t,
- };
+ self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
}
// option. This file may not be copied, modified, or distributed
// except according to those terms.
+use libc;
use std::c_str::CString;
use std::cast;
+use std::intrinsics;
use std::io;
-use libc;
use std::mem;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;
-use std::intrinsics;
use super::{IoResult, retry, keep_going};
+use super::util;
use super::file::fd_t;
fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
return Ok((storage, len));
}
-fn sockaddr_to_unix(storage: &libc::sockaddr_storage,
- len: uint) -> IoResult<CString> {
- match storage.ss_family as libc::c_int {
- libc::AF_UNIX => {
- assert!(len as uint <= mem::size_of::<libc::sockaddr_un>());
- let storage: &libc::sockaddr_un = unsafe {
- cast::transmute(storage)
- };
- unsafe {
- Ok(CString::new(storage.sun_path.as_ptr(), false).clone())
- }
- }
- _ => Err(io::standard_error(io::InvalidInput))
- }
-}
-
struct Inner {
fd: fd_t,
}
fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } }
}
-fn connect(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
+fn connect(addr: &CString, ty: libc::c_int,
+ timeout: Option<u64>) -> IoResult<Inner> {
let (addr, len) = try!(addr_to_sockaddr_un(addr));
let inner = Inner { fd: try!(unix_socket(ty)) };
- let addrp = &addr as *libc::sockaddr_storage;
- match retry(|| unsafe {
- libc::connect(inner.fd, addrp as *libc::sockaddr,
- len as libc::socklen_t)
- }) {
- -1 => Err(super::last_error()),
- _ => Ok(inner)
+ let addrp = &addr as *_ as *libc::sockaddr;
+ let len = len as libc::socklen_t;
+
+ match timeout {
+ None => {
+ match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) {
+ -1 => Err(super::last_error()),
+ _ => Ok(inner)
+ }
+ }
+ Some(timeout_ms) => {
+ try!(util::connect_timeout(inner.fd, addrp, len, timeout_ms));
+ Ok(inner)
+ }
}
}
}
impl UnixStream {
- pub fn connect(addr: &CString) -> IoResult<UnixStream> {
- connect(addr, libc::SOCK_STREAM).map(|inner| {
+ pub fn connect(addr: &CString,
+ timeout: Option<u64>) -> IoResult<UnixStream> {
+ connect(addr, libc::SOCK_STREAM, timeout).map(|inner| {
UnixStream { inner: UnsafeArc::new(inner) }
})
}
pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
-1 => Err(super::last_error()),
- _ => Ok(UnixAcceptor { listener: self })
+ _ => Ok(UnixAcceptor { listener: self, deadline: 0 })
}
}
}
pub struct UnixAcceptor {
listener: UnixListener,
+ deadline: u64,
}
impl UnixAcceptor {
fn fd(&self) -> fd_t { self.listener.fd() }
pub fn native_accept(&mut self) -> IoResult<UnixStream> {
+ if self.deadline != 0 {
+ try!(util::accept_deadline(self.fd(), self.deadline));
+ }
let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() };
let storagep = &mut storage as *mut libc::sockaddr_storage;
let size = mem::size_of::<libc::sockaddr_storage>();
fn accept(&mut self) -> IoResult<~rtio::RtioPipe:Send> {
self.native_accept().map(|s| ~s as ~rtio::RtioPipe:Send)
}
+ fn set_timeout(&mut self, timeout: Option<u64>) {
+ self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
+ }
}
impl Drop for UnixListener {
use std::intrinsics;
use super::IoResult;
+use super::c;
+use super::util;
struct Event(libc::HANDLE);
None
}
- pub fn connect(addr: &CString) -> IoResult<UnixStream> {
+ pub fn connect(addr: &CString, timeout: Option<u64>) -> IoResult<UnixStream> {
as_utf16_p(addr.as_str().unwrap(), |p| {
+ let start = ::io::timer::now();
loop {
match UnixStream::try_connect(p) {
Some(handle) => {
return Err(super::last_error())
}
- // An example I found on microsoft's website used 20 seconds,
- // libuv uses 30 seconds, hence we make the obvious choice of
- // waiting for 25 seconds.
- if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 {
- return Err(super::last_error())
+ match timeout {
+ Some(timeout) => {
+ let now = ::io::timer::now();
+ let timed_out = (now - start) >= timeout || unsafe {
+ let ms = (timeout - (now - start)) as libc::DWORD;
+ libc::WaitNamedPipeW(p, ms) == 0
+ };
+ if timed_out {
+ return Err(util::timeout("connect timed out"))
+ }
+ }
+
+ // An example I found on microsoft's website used 20
+ // seconds, libuv uses 30 seconds, hence we make the
+ // obvious choice of waiting for 25 seconds.
+ None => {
+ if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 {
+ return Err(super::last_error())
+ }
+ }
}
}
})
Ok(UnixAcceptor {
listener: self,
event: try!(Event::new(true, false)),
+ deadline: 0,
})
}
}
pub struct UnixAcceptor {
listener: UnixListener,
event: Event,
+ deadline: u64,
}
impl UnixAcceptor {
overlapped.hEvent = self.event.handle();
if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } {
let mut err = unsafe { libc::GetLastError() };
+
if err == libc::ERROR_IO_PENDING as libc::DWORD {
+ // If we've got a timeout, use WaitForSingleObject in tandem
+ // with CancelIo to figure out if we should indeed get the
+ // result.
+ if self.deadline != 0 {
+ let now = ::io::timer::now();
+ let timeout = self.deadline < now || unsafe {
+ let ms = (self.deadline - now) as libc::DWORD;
+ let r = libc::WaitForSingleObject(overlapped.hEvent,
+ ms);
+ r != libc::WAIT_OBJECT_0
+ };
+ if timeout {
+ unsafe { let _ = c::CancelIo(handle); }
+ return Err(util::timeout("accept timed out"))
+ }
+ }
+
+ // This will block until the overlapped I/O is completed. The
+ // timeout was previously handled, so this will either block in
+ // the normal case or succeed very quickly in the timeout case.
let ret = unsafe {
let mut transfer = 0;
libc::GetOverlappedResult(handle,
fn accept(&mut self) -> IoResult<~rtio::RtioPipe:Send> {
self.native_accept().map(|s| ~s as ~rtio::RtioPipe:Send)
}
+ fn set_timeout(&mut self, timeout: Option<u64>) {
+ self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0);
+ }
}
--- /dev/null
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use libc;
+use std::io::IoResult;
+use std::io;
+use std::mem;
+use std::ptr;
+
+use super::c;
+use super::net;
+use super::{retry, last_error};
+
+pub fn timeout(desc: &'static str) -> io::IoError {
+ io::IoError {
+ kind: io::TimedOut,
+ desc: desc,
+ detail: None,
+ }
+}
+
+pub fn ms_to_timeval(ms: u64) -> libc::timeval {
+ libc::timeval {
+ tv_sec: (ms / 1000) as libc::time_t,
+ tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t,
+ }
+}
+
+// See http://developerweb.net/viewtopic.php?id=3196 for where this is
+// derived from.
+pub fn connect_timeout(fd: net::sock_t,
+ addrp: *libc::sockaddr,
+ len: libc::socklen_t,
+ timeout_ms: u64) -> IoResult<()> {
+ use std::os;
+ #[cfg(unix)] use INPROGRESS = libc::EINPROGRESS;
+ #[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
+ #[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK;
+ #[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK;
+
+ // Make sure the call to connect() doesn't block
+ try!(set_nonblocking(fd, true));
+
+ let ret = match unsafe { libc::connect(fd, addrp, len) } {
+ // If the connection is in progress, then we need to wait for it to
+ // finish (with a timeout). The current strategy for doing this is
+ // to use select() with a timeout.
+ -1 if os::errno() as int == INPROGRESS as int ||
+ os::errno() as int == WOULDBLOCK as int => {
+ let mut set: c::fd_set = unsafe { mem::init() };
+ c::fd_set(&mut set, fd);
+ match await(fd, &mut set, timeout_ms) {
+ 0 => Err(timeout("connection timed out")),
+ -1 => Err(last_error()),
+ _ => {
+ let err: libc::c_int = try!(
+ net::getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR));
+ if err == 0 {
+ Ok(())
+ } else {
+ Err(io::IoError::from_errno(err as uint, true))
+ }
+ }
+ }
+ }
+
+ -1 => Err(last_error()),
+ _ => Ok(()),
+ };
+
+ // be sure to turn blocking I/O back on
+ try!(set_nonblocking(fd, false));
+ return ret;
+
+ #[cfg(unix)]
+ fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> {
+ let set = nb as libc::c_int;
+ super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
+ }
+
+ #[cfg(windows)]
+ fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> {
+ let mut set = nb as libc::c_ulong;
+ if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } {
+ Err(last_error())
+ } else {
+ Ok(())
+ }
+ }
+
+ #[cfg(unix)]
+ fn await(fd: net::sock_t, set: &mut c::fd_set,
+ timeout: u64) -> libc::c_int {
+ let start = ::io::timer::now();
+ retry(|| unsafe {
+ // Recalculate the timeout each iteration (it is generally
+ // undefined what the value of the 'tv' is after select
+ // returns EINTR).
+ let tv = ms_to_timeval(timeout - (::io::timer::now() - start));
+ c::select(fd + 1, ptr::null(), set as *mut _ as *_,
+ ptr::null(), &tv)
+ })
+ }
+ #[cfg(windows)]
+ fn await(_fd: net::sock_t, set: &mut c::fd_set,
+ timeout: u64) -> libc::c_int {
+ let tv = ms_to_timeval(timeout);
+ unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) }
+ }
+}
+
+pub fn accept_deadline(fd: net::sock_t, deadline: u64) -> IoResult<()> {
+ let mut set: c::fd_set = unsafe { mem::init() };
+ c::fd_set(&mut set, fd);
+
+ match retry(|| {
+ // If we're past the deadline, then pass a 0 timeout to select() so
+ // we can poll the status of the socket.
+ let now = ::io::timer::now();
+ let ms = if deadline < now {0} else {deadline - now};
+ let tv = ms_to_timeval(ms);
+ let n = if cfg!(windows) {1} else {fd as libc::c_int + 1};
+ unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) }
+ }) {
+ -1 => Err(last_error()),
+ 0 => Err(timeout("accept timed out")),
+ _ => return Ok(()),
+ }
+}
// except according to those terms.
use std::cast;
-use std::io::IoError;
+use std::io::{IoError, IoResult};
use std::io::net::ip;
use libc::{size_t, ssize_t, c_int, c_void, c_uint};
use libc;
n => Err(uv_error_to_io_error(UvError(n)))
}
}
-
////////////////////////////////////////////////////////////////////////////////
-/// TCP implementation
+// Helpers for handling timeouts, shared for pipes/tcp
////////////////////////////////////////////////////////////////////////////////
-pub struct TcpWatcher {
- handle: *uvll::uv_tcp_t,
- stream: StreamWatcher,
- home: HomeHandle,
- refcount: Refcount,
-
- // libuv can't support concurrent reads and concurrent writes of the same
- // stream object, so we use these access guards in order to arbitrate among
- // multiple concurrent reads and writes. Note that libuv *can* read and
- // write simultaneously, it just can't read and read simultaneously.
- read_access: Access,
- write_access: Access,
-}
-
-pub struct TcpListener {
- home: HomeHandle,
- handle: *uvll::uv_pipe_t,
- closing_task: Option<BlockedTask>,
- outgoing: Sender<Result<~rtio::RtioTcpStream:Send, IoError>>,
- incoming: Receiver<Result<~rtio::RtioTcpStream:Send, IoError>>,
+pub struct ConnectCtx {
+ pub status: c_int,
+ pub task: Option<BlockedTask>,
+ pub timer: Option<~TimerWatcher>,
}
-pub struct TcpAcceptor {
- listener: ~TcpListener,
+pub struct AcceptTimeout {
timer: Option<TimerWatcher>,
timeout_tx: Option<Sender<()>>,
timeout_rx: Option<Receiver<()>>,
}
-// TCP watchers (clients/streams)
-
-impl TcpWatcher {
- pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
- let handle = io.make_handle();
- TcpWatcher::new_home(&io.loop_, handle)
- }
-
- fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
- let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
- assert_eq!(unsafe {
- uvll::uv_tcp_init(loop_.handle, handle)
- }, 0);
- TcpWatcher {
- home: home,
- handle: handle,
- stream: StreamWatcher::new(handle),
- refcount: Refcount::new(),
- read_access: Access::new(),
- write_access: Access::new(),
- }
- }
-
- pub fn connect(io: &mut UvIoFactory,
- address: ip::SocketAddr,
- timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
- struct Ctx {
- status: c_int,
- task: Option<BlockedTask>,
- timer: Option<~TimerWatcher>,
- }
-
- let tcp = TcpWatcher::new(io);
- let (addr, _len) = addr_to_sockaddr(address);
+impl ConnectCtx {
+ pub fn connect<T>(
+ mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
+ f: |&Request, &T, uvll::uv_connect_cb| -> libc::c_int
+ ) -> Result<T, UvError> {
let mut req = Request::new(uvll::UV_CONNECT);
- let result = unsafe {
- let addr_p = &addr as *libc::sockaddr_storage;
- uvll::uv_tcp_connect(req.handle, tcp.handle,
- addr_p as *libc::sockaddr,
- connect_cb)
- };
- return match result {
+ let r = f(&req, &obj, connect_cb);
+ return match r {
0 => {
req.defuse(); // uv callback now owns this request
- let mut cx = Ctx { status: -1, task: None, timer: None };
match timeout {
Some(t) => {
let mut timer = TimerWatcher::new(io);
timer.start(timer_cb, t, 0);
- cx.timer = Some(timer);
+ self.timer = Some(timer);
}
None => {}
}
- wait_until_woken_after(&mut cx.task, &io.loop_, || {
- let data = &cx as *_;
- match cx.timer {
+ wait_until_woken_after(&mut self.task, &io.loop_, || {
+ let data = &self as *_;
+ match self.timer {
Some(ref mut timer) => unsafe { timer.set_data(data) },
None => {}
}
// If we failed because of a timeout, drop the TcpWatcher as
// soon as possible because it's data is now set to null and we
// want to cancel the callback ASAP.
- match cx.status {
- 0 => Ok(tcp),
- n => { drop(tcp); Err(UvError(n)) }
+ match self.status {
+ 0 => Ok(obj),
+ n => { drop(obj); Err(UvError(n)) }
}
}
n => Err(UvError(n))
extern fn timer_cb(handle: *uvll::uv_timer_t) {
// Don't close the corresponding tcp request, just wake up the task
// and let RAII take care of the pending watcher.
- let cx: &mut Ctx = unsafe {
- &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx)
+ let cx: &mut ConnectCtx = unsafe {
+ &mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx)
};
cx.status = uvll::ECANCELED;
wakeup(&mut cx.task);
let data = unsafe { uvll::get_data_for_req(req.handle) };
if data.is_null() { return }
- let cx: &mut Ctx = unsafe { &mut *(data as *mut Ctx) };
+ let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) };
cx.status = status;
match cx.timer {
Some(ref mut t) => t.stop(),
}
}
+impl AcceptTimeout {
+ pub fn new() -> AcceptTimeout {
+ AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
+ }
+
+ pub fn accept<T: Send>(&mut self, c: &Receiver<IoResult<T>>) -> IoResult<T> {
+ match self.timeout_rx {
+ None => c.recv(),
+ Some(ref rx) => {
+ use std::comm::Select;
+
+ // Poll the incoming channel first (don't rely on the order of
+ // select just yet). If someone's pending then we should return
+ // them immediately.
+ match c.try_recv() {
+ Ok(data) => return data,
+ Err(..) => {}
+ }
+
+ // Use select to figure out which channel gets ready first. We
+ // do some custom handling of select to ensure that we never
+ // actually drain the timeout channel (we'll keep seeing the
+ // timeout message in the future).
+ let s = Select::new();
+ let mut timeout = s.handle(rx);
+ let mut data = s.handle(c);
+ unsafe {
+ timeout.add();
+ data.add();
+ }
+ if s.wait() == timeout.id() {
+ Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
+ } else {
+ c.recv()
+ }
+ }
+ }
+ }
+
+ pub fn clear(&mut self) {
+ // Clear any previous timeout by dropping the timer and transmission
+ // channels
+ drop((self.timer.take(),
+ self.timeout_tx.take(),
+ self.timeout_rx.take()))
+ }
+
+ pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
+ &mut self, ms: u64, t: &mut T
+ ) {
+ // If we have a timeout, lazily initialize the timer which will be used
+ // to fire when the timeout runs out.
+ if self.timer.is_none() {
+ let _m = t.fire_homing_missile();
+ let loop_ = Loop::wrap(unsafe {
+ uvll::get_loop_for_uv_handle(t.uv_handle())
+ });
+ let mut timer = TimerWatcher::new_home(&loop_, t.home().clone());
+ unsafe {
+ timer.set_data(self as *mut _ as *AcceptTimeout);
+ }
+ self.timer = Some(timer);
+ }
+
+ // Once we've got a timer, stop any previous timeout, reset it for the
+ // current one, and install some new channels to send/receive data on
+ let timer = self.timer.get_mut_ref();
+ timer.stop();
+ timer.start(timer_cb, ms, 0);
+ let (tx, rx) = channel();
+ self.timeout_tx = Some(tx);
+ self.timeout_rx = Some(rx);
+
+ extern fn timer_cb(timer: *uvll::uv_timer_t) {
+ let acceptor: &mut AcceptTimeout = unsafe {
+ &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
+ };
+ // This send can never fail because if this timer is active then the
+ // receiving channel is guaranteed to be alive
+ acceptor.timeout_tx.get_ref().send(());
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+/// TCP implementation
+////////////////////////////////////////////////////////////////////////////////
+
+pub struct TcpWatcher {
+ handle: *uvll::uv_tcp_t,
+ stream: StreamWatcher,
+ home: HomeHandle,
+ refcount: Refcount,
+
+ // libuv can't support concurrent reads and concurrent writes of the same
+ // stream object, so we use these access guards in order to arbitrate among
+ // multiple concurrent reads and writes. Note that libuv *can* read and
+ // write simultaneously, it just can't read and read simultaneously.
+ read_access: Access,
+ write_access: Access,
+}
+
+pub struct TcpListener {
+ home: HomeHandle,
+ handle: *uvll::uv_pipe_t,
+ closing_task: Option<BlockedTask>,
+ outgoing: Sender<Result<~rtio::RtioTcpStream:Send, IoError>>,
+ incoming: Receiver<Result<~rtio::RtioTcpStream:Send, IoError>>,
+}
+
+pub struct TcpAcceptor {
+ listener: ~TcpListener,
+ timeout: AcceptTimeout,
+}
+
+// TCP watchers (clients/streams)
+
+impl TcpWatcher {
+ pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
+ let handle = io.make_handle();
+ TcpWatcher::new_home(&io.loop_, handle)
+ }
+
+ fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
+ let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
+ assert_eq!(unsafe {
+ uvll::uv_tcp_init(loop_.handle, handle)
+ }, 0);
+ TcpWatcher {
+ home: home,
+ handle: handle,
+ stream: StreamWatcher::new(handle),
+ refcount: Refcount::new(),
+ read_access: Access::new(),
+ write_access: Access::new(),
+ }
+ }
+
+ pub fn connect(io: &mut UvIoFactory,
+ address: ip::SocketAddr,
+ timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
+ let tcp = TcpWatcher::new(io);
+ let cx = ConnectCtx { status: -1, task: None, timer: None };
+ let (addr, _len) = addr_to_sockaddr(address);
+ let addr_p = &addr as *_ as *libc::sockaddr;
+ cx.connect(tcp, timeout, io, |req, tcp, cb| {
+ unsafe { uvll::uv_tcp_connect(req.handle, tcp.handle, addr_p, cb) }
+ })
+ }
+}
+
impl HomingIO for TcpWatcher {
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
}
// create the acceptor object from ourselves
let mut acceptor = ~TcpAcceptor {
listener: self,
- timer: None,
- timeout_tx: None,
- timeout_rx: None,
+ timeout: AcceptTimeout::new(),
};
let _m = acceptor.fire_homing_missile();
impl rtio::RtioTcpAcceptor for TcpAcceptor {
fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> {
- match self.timeout_rx {
- None => self.listener.incoming.recv(),
- Some(ref rx) => {
- use std::comm::Select;
-
- // Poll the incoming channel first (don't rely on the order of
- // select just yet). If someone's pending then we should return
- // them immediately.
- match self.listener.incoming.try_recv() {
- Ok(data) => return data,
- Err(..) => {}
- }
-
- // Use select to figure out which channel gets ready first. We
- // do some custom handling of select to ensure that we never
- // actually drain the timeout channel (we'll keep seeing the
- // timeout message in the future).
- let s = Select::new();
- let mut timeout = s.handle(rx);
- let mut data = s.handle(&self.listener.incoming);
- unsafe {
- timeout.add();
- data.add();
- }
- if s.wait() == timeout.id() {
- Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
- } else {
- self.listener.incoming.recv()
- }
- }
- }
+ self.timeout.accept(&self.listener.incoming)
}
fn accept_simultaneously(&mut self) -> Result<(), IoError> {
}
fn set_timeout(&mut self, ms: Option<u64>) {
- // First, if the timeout is none, clear any previous timeout by dropping
- // the timer and transmission channels
- let ms = match ms {
- None => {
- return drop((self.timer.take(),
- self.timeout_tx.take(),
- self.timeout_rx.take()))
- }
- Some(ms) => ms,
- };
-
- // If we have a timeout, lazily initialize the timer which will be used
- // to fire when the timeout runs out.
- if self.timer.is_none() {
- let _m = self.fire_homing_missile();
- let loop_ = Loop::wrap(unsafe {
- uvll::get_loop_for_uv_handle(self.listener.handle)
- });
- let mut timer = TimerWatcher::new_home(&loop_, self.home().clone());
- unsafe {
- timer.set_data(self as *mut _ as *TcpAcceptor);
- }
- self.timer = Some(timer);
- }
-
- // Once we've got a timer, stop any previous timeout, reset it for the
- // current one, and install some new channels to send/receive data on
- let timer = self.timer.get_mut_ref();
- timer.stop();
- timer.start(timer_cb, ms, 0);
- let (tx, rx) = channel();
- self.timeout_tx = Some(tx);
- self.timeout_rx = Some(rx);
-
- extern fn timer_cb(timer: *uvll::uv_timer_t) {
- let acceptor: &mut TcpAcceptor = unsafe {
- &mut *(uvll::get_data_for_uv_handle(timer) as *mut TcpAcceptor)
- };
- // This send can never fail because if this timer is active then the
- // receiving channel is guaranteed to be alive
- acceptor.timeout_tx.get_ref().send(());
+ match ms {
+ None => self.timeout.clear(),
+ Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
}
}
}
use std::io::IoError;
use libc;
use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
-use std::rt::task::BlockedTask;
use access::Access;
use homing::{HomingIO, HomeHandle};
+use net;
use rc::Refcount;
use stream::StreamWatcher;
-use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error,
- wait_until_woken_after, wakeup};
+use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
use uvio::UvIoFactory;
use uvll;
pub struct PipeAcceptor {
listener: ~PipeListener,
+ timeout: net::AcceptTimeout,
}
// PipeWatcher implementation and traits
}
}
- pub fn connect(io: &mut UvIoFactory, name: &CString)
+ pub fn connect(io: &mut UvIoFactory, name: &CString, timeout: Option<u64>)
-> Result<PipeWatcher, UvError>
{
- struct Ctx { task: Option<BlockedTask>, result: libc::c_int, }
- let mut cx = Ctx { task: None, result: 0 };
- let mut req = Request::new(uvll::UV_CONNECT);
let pipe = PipeWatcher::new(io, false);
-
- wait_until_woken_after(&mut cx.task, &io.loop_, || {
+ let cx = net::ConnectCtx { status: -1, task: None, timer: None };
+ cx.connect(pipe, timeout, io, |req, pipe, cb| {
unsafe {
- uvll::uv_pipe_connect(req.handle,
- pipe.handle(),
- name.with_ref(|p| p),
- connect_cb)
+ uvll::uv_pipe_connect(req.handle, pipe.handle(),
+ name.with_ref(|p| p), cb)
}
- req.set_data(&cx);
- req.defuse(); // uv callback now owns this request
- });
- return match cx.result {
- 0 => Ok(pipe),
- n => Err(UvError(n))
- };
-
- extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {;
- let req = Request::wrap(req);
- assert!(status != uvll::ECANCELED);
- let cx: &mut Ctx = unsafe { req.get_data() };
- cx.result = status;
- wakeup(&mut cx.task);
- }
+ 0
+ })
}
pub fn handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }
impl RtioUnixListener for PipeListener {
fn listen(~self) -> Result<~RtioUnixAcceptor:Send, IoError> {
// create the acceptor object from ourselves
- let mut acceptor = ~PipeAcceptor { listener: self };
+ let mut acceptor = ~PipeAcceptor {
+ listener: self,
+ timeout: net::AcceptTimeout::new(),
+ };
let _m = acceptor.fire_homing_missile();
// FIXME: the 128 backlog should be configurable
impl RtioUnixAcceptor for PipeAcceptor {
fn accept(&mut self) -> Result<~RtioPipe:Send, IoError> {
- self.listener.incoming.recv()
+ self.timeout.accept(&self.listener.incoming)
+ }
+
+ fn set_timeout(&mut self, timeout_ms: Option<u64>) {
+ match timeout_ms {
+ None => self.timeout.clear(),
+ Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
+ }
}
}
#[test]
fn connect_err() {
- match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str()) {
+ match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str(),
+ None) {
Ok(..) => fail!(),
Err(..) => {}
}
assert!(client.write([2]).is_ok());
});
rx.recv();
- let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str()).unwrap();
+ let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
assert!(c.write([1]).is_ok());
let mut buf = [0];
assert!(c.read(buf).unwrap() == 1);
drop(p.accept().unwrap());
});
rx.recv();
- let _c = PipeWatcher::connect(local_loop(), &path.to_c_str()).unwrap();
+ let _c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
fail!()
}
}
}
- fn unix_connect(&mut self, path: &CString) -> Result<~rtio::RtioPipe:Send, IoError> {
- match PipeWatcher::connect(self, path) {
+ fn unix_connect(&mut self, path: &CString,
+ timeout: Option<u64>) -> Result<~rtio::RtioPipe:Send, IoError> {
+ match PipeWatcher::connect(self, path, timeout) {
Ok(p) => Ok(~p as ~rtio::RtioPipe:Send),
Err(e) => Err(uv_error_to_io_error(e)),
}
/// ```
pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> {
LocalIo::maybe_raise(|io| {
- io.unix_connect(&path.to_c_str()).map(UnixStream::new)
+ io.unix_connect(&path.to_c_str(), None).map(UnixStream::new)
+ })
+ }
+
+ /// Connect to a pipe named by `path`. This will attempt to open a
+ /// connection to the underlying socket.
+ ///
+ /// The returned stream will be closed when the object falls out of scope.
+ ///
+ /// # Example
+ ///
+ /// ```rust
+ /// # #![allow(unused_must_use)]
+ /// use std::io::net::unix::UnixStream;
+ ///
+ /// let server = Path::new("path/to/my/socket");
+ /// let mut stream = UnixStream::connect(&server);
+ /// stream.write([1, 2, 3]);
+ /// ```
+ #[experimental = "the timeout argument is likely to change types"]
+ pub fn connect_timeout<P: ToCStr>(path: &P,
+ timeout_ms: u64) -> IoResult<UnixStream> {
+ LocalIo::maybe_raise(|io| {
+ let s = io.unix_connect(&path.to_c_str(), Some(timeout_ms));
+ s.map(UnixStream::new)
})
}
}
obj: ~RtioUnixAcceptor:Send,
}
+impl UnixAcceptor {
+ /// Sets a timeout for this acceptor, after which accept() will no longer
+ /// block indefinitely.
+ ///
+ /// The argument specified is the amount of time, in milliseconds, into the
+ /// future after which all invocations of accept() will not block (and any
+ /// pending invocation will return). A value of `None` will clear any
+ /// existing timeout.
+ ///
+ /// When using this method, it is likely necessary to reset the timeout as
+ /// appropriate, the timeout specified is specific to this object, not
+ /// specific to the next request.
+ #[experimental = "the name and arguments to this function are likely \
+ to change"]
+ pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
+ self.obj.set_timeout(timeout_ms)
+ }
+}
+
impl Acceptor<UnixStream> for UnixAcceptor {
fn accept(&mut self) -> IoResult<UnixStream> {
self.obj.accept().map(UnixStream::new)
}
#[cfg(test)]
+#[allow(experimental)]
mod tests {
use prelude::*;
use super::*;
drop(l.listen().unwrap());
assert!(!path.exists());
} #[cfg(not(windows))])
+
+ iotest!(fn accept_timeout() {
+ let addr = next_test_unix();
+ let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap();
+
+ a.set_timeout(Some(10));
+
+ // Make sure we time out once and future invocations also time out
+ let err = a.accept().err().unwrap();
+ assert_eq!(err.kind, TimedOut);
+ let err = a.accept().err().unwrap();
+ assert_eq!(err.kind, TimedOut);
+
+ // Also make sure that even though the timeout is expired that we will
+ // continue to receive any pending connections.
+ let l = UnixStream::connect(&addr).unwrap();
+ for i in range(0, 1001) {
+ match a.accept() {
+ Ok(..) => break,
+ Err(ref e) if e.kind == TimedOut => {}
+ Err(e) => fail!("error: {}", e),
+ }
+ if i == 1000 { fail!("should have a pending connection") }
+ }
+ drop(l);
+
+ // Unset the timeout and make sure that this always blocks.
+ a.set_timeout(None);
+ let addr2 = addr.clone();
+ spawn(proc() {
+ drop(UnixStream::connect(&addr2));
+ });
+ a.accept().unwrap();
+ })
+
+ iotest!(fn connect_timeout_error() {
+ let addr = next_test_unix();
+ assert!(UnixStream::connect_timeout(&addr, 100).is_err());
+ })
+
+ iotest!(fn connect_timeout_success() {
+ let addr = next_test_unix();
+ let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
+ assert!(UnixStream::connect_timeout(&addr, 100).is_ok());
+ })
}
fn udp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioUdpSocket:Send>;
fn unix_bind(&mut self, path: &CString)
-> IoResult<~RtioUnixListener:Send>;
- fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe:Send>;
+ fn unix_connect(&mut self, path: &CString,
+ timeout: Option<u64>) -> IoResult<~RtioPipe:Send>;
fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
hint: Option<ai::Hint>) -> IoResult<~[ai::Info]>;
pub trait RtioUnixAcceptor {
fn accept(&mut self) -> IoResult<~RtioPipe:Send>;
+ fn set_timeout(&mut self, timeout: Option<u64>);
}
pub trait RtioTTY {