]> git.lizzy.rs Git - rust.git/commitdiff
Migrate uv net bindings away from ~fn()
authorAlex Crichton <alex@alexcrichton.com>
Tue, 5 Nov 2013 08:27:41 +0000 (00:27 -0800)
committerAlex Crichton <alex@alexcrichton.com>
Sun, 10 Nov 2013 09:37:11 +0000 (01:37 -0800)
src/librustuv/addrinfo.rs
src/librustuv/lib.rs
src/librustuv/net.rs
src/librustuv/pipe.rs
src/librustuv/process.rs
src/librustuv/stream.rs
src/librustuv/timer.rs
src/librustuv/uvio.rs
src/librustuv/uvll.rs

index 36c4defdee9292e658d7562871d379ab2ac19b23..88818cf2b4d22d0f10009d69edd3043b34c7fad1 100644 (file)
 use std::rt::sched::Scheduler;
 
 use net;
-use super::{Loop, UvError, NativeHandle};
-use uvll::UV_GETADDRINFO;
+use super::{Loop, UvError, NativeHandle, Request};
 use uvll;
 
-struct GetAddrInfoRequest {
-    handle: *uvll::uv_getaddrinfo_t,
-}
-
 struct Addrinfo {
     handle: *uvll::addrinfo,
 }
@@ -35,13 +30,9 @@ struct Ctx {
     addrinfo: Option<Addrinfo>,
 }
 
-impl GetAddrInfoRequest {
-    pub fn new() -> GetAddrInfoRequest {
-        GetAddrInfoRequest {
-            handle: unsafe { uvll::malloc_req(uvll::UV_GETADDRINFO) },
-        }
-    }
+pub struct GetAddrInfoRequest;
 
+impl GetAddrInfoRequest {
     pub fn run(loop_: &Loop, node: Option<&str>, service: Option<&str>,
                hints: Option<ai::Hint>) -> Result<~[ai::Info], UvError> {
         assert!(node.is_some() || service.is_some());
@@ -85,7 +76,7 @@ pub fn run(loop_: &Loop, node: Option<&str>, service: Option<&str>,
             }
         });
         let hint_ptr = hint.as_ref().map_default(null(), |x| x as *uvll::addrinfo);
-        let req = GetAddrInfoRequest::new();
+        let req = Request::new(uvll::UV_GETADDRINFO);
 
         return match unsafe {
             uvll::uv_getaddrinfo(loop_.native_handle(), req.handle,
@@ -94,7 +85,8 @@ pub fn run(loop_: &Loop, node: Option<&str>, service: Option<&str>,
         } {
             0 => {
                 let mut cx = Ctx { slot: None, status: 0, addrinfo: None };
-                unsafe { uvll::set_data_for_req(req.handle, &cx) }
+                req.set_data(&cx);
+                req.defuse();
                 let scheduler: ~Scheduler = Local::take();
                 do scheduler.deschedule_running_task_and_then |_, task| {
                     cx.slot = Some(task);
@@ -112,9 +104,9 @@ pub fn run(loop_: &Loop, node: Option<&str>, service: Option<&str>,
         extern fn getaddrinfo_cb(req: *uvll::uv_getaddrinfo_t,
                                  status: c_int,
                                  res: *uvll::addrinfo) {
-            let cx: &mut Ctx = unsafe {
-                cast::transmute(uvll::get_data_for_req(req))
-            };
+            let req = Request::wrap(req);
+            if status == uvll::ECANCELED { return }
+            let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
             cx.status = status;
             cx.addrinfo = Some(Addrinfo { handle: res });
 
@@ -124,12 +116,6 @@ pub fn run(loop_: &Loop, node: Option<&str>, service: Option<&str>,
     }
 }
 
-impl Drop for GetAddrInfoRequest {
-    fn drop(&mut self) {
-        unsafe { uvll::free_req(self.handle) }
-    }
-}
-
 impl Drop for Addrinfo {
     fn drop(&mut self) {
         unsafe { uvll::uv_freeaddrinfo(self.handle) }
index eb2da05506d82589d65c7f72399cb95c956d5ebf..5e79f6e13451f4c7f04a2a7ef617dab1a950954b 100644 (file)
 use std::cast::transmute;
 use std::ptr::null;
 use std::unstable::finally::Finally;
-use std::rt::io::net::ip::SocketAddr;
 
 use std::rt::io::IoError;
 
 //#[cfg(test)] use unstable::run_in_bare_thread;
 
 pub use self::file::{FsRequest, FileWatcher};
-pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher};
+pub use self::net::{TcpWatcher, TcpListener, TcpAcceptor, UdpWatcher};
 pub use self::idle::IdleWatcher;
 pub use self::timer::TimerWatcher;
 pub use self::async::AsyncWatcher;
 pub use self::process::Process;
-pub use self::pipe::PipeWatcher;
+pub use self::pipe::{PipeWatcher, PipeListener, PipeAcceptor};
 pub use self::signal::SignalWatcher;
 pub use self::tty::TtyWatcher;
 
@@ -97,24 +96,6 @@ pub struct Loop {
     priv handle: *uvll::uv_loop_t
 }
 
-pub struct Handle(*uvll::uv_handle_t);
-
-impl Watcher for Handle {}
-impl NativeHandle<*uvll::uv_handle_t> for Handle {
-    fn from_native_handle(h: *uvll::uv_handle_t) -> Handle { Handle(h) }
-    fn native_handle(&self) -> *uvll::uv_handle_t { **self }
-}
-
-/// The trait implemented by uv 'watchers' (handles). Watchers are
-/// non-owning wrappers around the uv handles and are not completely
-/// safe - there may be multiple instances for a single underlying
-/// handle.  Watchers are generally created, then `start`ed, `stop`ed
-/// and `close`ed, but due to their complex life cycle may not be
-/// entirely memory safe if used in unanticipated patterns.
-pub trait Watcher { }
-
-pub trait Request { }
-
 /// A type that wraps a native handle
 pub trait NativeHandle<T> {
     fn from_native_handle(T) -> Self;
@@ -160,32 +141,47 @@ fn close_async_(&mut self) {
     }
 }
 
-pub trait UvRequest<T> {
-    fn uv_request(&self) -> *T;
+pub struct Request {
+    handle: *uvll::uv_req_t,
+}
 
-    // FIXME(#8888) dummy self
-    fn alloc(_: Option<Self>, ty: uvll::uv_req_type) -> *T {
-        unsafe {
-            let handle = uvll::malloc_req(ty);
-            assert!(!handle.is_null());
-            handle as *T
-        }
+impl Request {
+    pub fn new(ty: uvll::uv_req_type) -> Request {
+        Request::wrap(unsafe { uvll::malloc_req(ty) })
     }
 
-    unsafe fn from_uv_request<'a>(h: &'a *T) -> &'a mut Self {
-        cast::transmute(uvll::get_data_for_req(*h))
+    pub fn wrap(handle: *uvll::uv_req_t) -> Request {
+        Request { handle: handle }
     }
 
-    fn install(~self) -> ~Self {
-        unsafe {
-            let myptr = cast::transmute::<&~Self, &*u8>(&self);
-            uvll::set_data_for_req(self.uv_request(), *myptr);
-        }
-        self
+    pub fn set_data<T>(&self, t: *T) {
+        unsafe { uvll::set_data_for_req(self.handle, t) }
     }
 
-    fn delete(&mut self) {
-        unsafe { uvll::free_req(self.uv_request() as *c_void) }
+    pub fn get_data(&self) -> *c_void {
+        unsafe { uvll::get_data_for_req(self.handle) }
+    }
+
+    // This function should be used when the request handle has been given to an
+    // underlying uv function, and the uv function has succeeded. This means
+    // that uv will at some point invoke the callback, and in the meantime we
+    // can't deallocate the handle because libuv could be using it.
+    //
+    // This is still a problem in blocking situations due to linked failure. In
+    // the connection callback the handle should be re-wrapped with the `wrap`
+    // function to ensure its destruction.
+    pub fn defuse(mut self) {
+        self.handle = ptr::null();
+    }
+}
+
+impl Drop for Request {
+    fn drop(&mut self) {
+        unsafe {
+            if self.handle != ptr::null() {
+                uvll::free_req(self.handle)
+            }
+        }
     }
 }
 
@@ -214,110 +210,6 @@ fn native_handle(&self) -> *uvll::uv_loop_t {
     }
 }
 
-// XXX: The uv alloc callback also has a *uv_handle_t arg
-pub type AllocCallback = ~fn(uint) -> Buf;
-pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option<UvError>);
-pub type NullCallback = ~fn();
-pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
-pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option<UvError>);
-pub type UdpSendCallback = ~fn(UdpWatcher, Option<UvError>);
-
-
-/// Callbacks used by StreamWatchers, set as custom data on the foreign handle.
-/// XXX: Would be better not to have all watchers allocate room for all callback types.
-struct WatcherData {
-    read_cb: Option<ReadCallback>,
-    write_cb: Option<ConnectionCallback>,
-    connect_cb: Option<ConnectionCallback>,
-    close_cb: Option<NullCallback>,
-    alloc_cb: Option<AllocCallback>,
-    udp_recv_cb: Option<UdpReceiveCallback>,
-    udp_send_cb: Option<UdpSendCallback>,
-}
-
-pub trait WatcherInterop {
-    fn event_loop(&self) -> Loop;
-    fn install_watcher_data(&mut self);
-    fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData;
-    fn drop_watcher_data(&mut self);
-    fn close(self, cb: NullCallback);
-    fn close_async(self);
-}
-
-impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
-    /// Get the uv event loop from a Watcher
-    fn event_loop(&self) -> Loop {
-        unsafe {
-            let handle = self.native_handle();
-            let loop_ = uvll::get_loop_for_uv_handle(handle);
-            NativeHandle::from_native_handle(loop_)
-        }
-    }
-
-    fn install_watcher_data(&mut self) {
-        unsafe {
-            let data = ~WatcherData {
-                read_cb: None,
-                write_cb: None,
-                connect_cb: None,
-                close_cb: None,
-                alloc_cb: None,
-                udp_recv_cb: None,
-                udp_send_cb: None,
-            };
-            let data = transmute::<~WatcherData, *c_void>(data);
-            uvll::set_data_for_uv_handle(self.native_handle(), data);
-        }
-    }
-
-    fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData {
-        unsafe {
-            let data = uvll::get_data_for_uv_handle(self.native_handle());
-            let data = transmute::<&*c_void, &mut ~WatcherData>(&data);
-            return &mut **data;
-        }
-    }
-
-    fn drop_watcher_data(&mut self) {
-        unsafe {
-            let data = uvll::get_data_for_uv_handle(self.native_handle());
-            let _data = transmute::<*c_void, ~WatcherData>(data);
-            uvll::set_data_for_uv_handle(self.native_handle(), null::<()>());
-        }
-    }
-
-    fn close(mut self, cb: NullCallback) {
-        {
-            let data = self.get_watcher_data();
-            assert!(data.close_cb.is_none());
-            data.close_cb = Some(cb);
-        }
-
-        unsafe {
-            uvll::uv_close(self.native_handle() as *uvll::uv_handle_t, close_cb);
-        }
-
-        extern fn close_cb(handle: *uvll::uv_handle_t) {
-            let mut h: Handle = NativeHandle::from_native_handle(handle);
-            h.get_watcher_data().close_cb.take_unwrap()();
-            h.drop_watcher_data();
-            unsafe { uvll::free_handle(handle as *c_void) }
-        }
-    }
-
-    fn close_async(self) {
-        unsafe {
-            uvll::uv_close(self.native_handle() as *uvll::uv_handle_t, close_cb);
-        }
-
-        extern fn close_cb(handle: *uvll::uv_handle_t) {
-            let mut h: Handle = NativeHandle::from_native_handle(handle);
-            h.drop_watcher_data();
-            unsafe { uvll::free_handle(handle as *c_void) }
-        }
-    }
-}
-
 // XXX: Need to define the error constants like EOF so they can be
 // compared to the UvError type
 
index e9f3f2bba4c5e855c6dd974183cb0e8b7f25371d..ef64b1e5cc5458ae66730eeec1dc59dff84ee513 100644 (file)
@@ -8,18 +8,32 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+use std::cast;
 use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char};
-use std::vec;
+use std::ptr;
+use std::rt::BlockedTask;
+use std::rt::io::IoError;
+use std::rt::io::net::ip::{Ipv4Addr, Ipv6Addr};
+use std::rt::local::Local;
+use std::rt::io::net::ip::{SocketAddr, IpAddr};
+use std::rt::rtio;
+use std::rt::sched::{Scheduler, SchedHandle};
+use std::rt::tube::Tube;
 use std::str;
-use std::rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
+use std::vec;
 
 use uvll;
 use uvll::*;
-use super::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback,
-            UdpSendCallback, Loop, Watcher, Request, UvError, Buf, NativeHandle,
-            status_to_maybe_uv_error, empty_buf};
+use super::{
+            Loop, Request, UvError, Buf, NativeHandle,
+            status_to_io_result,
+            uv_error_to_io_error, UvHandle, slice_to_uv_buf};
+use uvio::HomingIO;
+use stream::StreamWatcher;
 
-pub struct UvAddrInfo(*uvll::addrinfo);
+////////////////////////////////////////////////////////////////////////////////
+/// Generic functions related to dealing with sockaddr things
+////////////////////////////////////////////////////////////////////////////////
 
 pub enum UvSocketAddr {
     UvIpv4SocketAddr(*sockaddr_in),
@@ -113,395 +127,585 @@ fn test_ip6_conversion() {
     assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
 }
 
-// uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
-// and uv_file_t
-pub struct StreamWatcher(*uvll::uv_stream_t);
-impl Watcher for StreamWatcher { }
+enum SocketNameKind {
+    TcpPeer,
+    Tcp,
+    Udp
+}
 
-impl StreamWatcher {
-    pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
-        unsafe {
-            match uvll::uv_read_start(self.native_handle(), alloc_cb, read_cb) {
-                0 => {
-                    let data = self.get_watcher_data();
-                    data.alloc_cb = Some(alloc);
-                    data.read_cb = Some(cb);
-                }
-                n => {
-                    cb(*self, 0, empty_buf(), Some(UvError(n)))
-                }
-            }
-        }
+fn socket_name(sk: SocketNameKind, handle: *c_void) -> Result<SocketAddr, IoError> {
+    let getsockname = match sk {
+        TcpPeer => uvll::tcp_getpeername,
+        Tcp     => uvll::tcp_getsockname,
+        Udp     => uvll::udp_getsockname,
+    };
 
-        extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
-            let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
-            let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref();
-            return (*alloc_cb)(suggested_size as uint);
-        }
+    // Allocate a sockaddr_storage
+    // since we don't know if it's ipv4 or ipv6
+    let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
 
-        extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) {
-            uvdebug!("buf addr: {}", buf.base);
-            uvdebug!("buf len: {}", buf.len);
-            let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
-            let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
-            let status = status_to_maybe_uv_error(nread as c_int);
-            (*cb)(stream_watcher, nread as int, buf, status);
-        }
-    }
+    let r = unsafe {
+        getsockname(handle, r_addr as *uvll::sockaddr_storage)
+    };
 
-    pub fn read_stop(&mut self) {
-        // It would be nice to drop the alloc and read callbacks here,
-        // but read_stop may be called from inside one of them and we
-        // would end up freeing the in-use environment
-        let handle = self.native_handle();
-        unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); }
+    if r != 0 {
+        return Err(uv_error_to_io_error(UvError(r)));
     }
 
-    pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
-        let req = WriteRequest::new();
-        return unsafe {
-            match uvll::uv_write(req.native_handle(), self.native_handle(),
-                                 [buf], write_cb) {
-                0 => {
-                    let data = self.get_watcher_data();
-                    assert!(data.write_cb.is_none());
-                    data.write_cb = Some(cb);
-                }
-                n => {
-                    req.delete();
-                    cb(*self, Some(UvError(n)))
-                }
-            }
-        };
-
-        extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
-            let write_request: WriteRequest = NativeHandle::from_native_handle(req);
-            let mut stream_watcher = write_request.stream();
-            write_request.delete();
-            let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap();
-            let status = status_to_maybe_uv_error(status);
-            cb(stream_watcher, status);
+    let addr = unsafe {
+        if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
+            uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
+        } else {
+            uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
         }
-    }
+    };
 
+    unsafe { uvll::free_sockaddr_storage(r_addr); }
 
-    pub fn listen(&mut self, cb: ConnectionCallback) -> Result<(), UvError> {
-        {
-            let data = self.get_watcher_data();
-            assert!(data.connect_cb.is_none());
-            data.connect_cb = Some(cb);
-        }
+    Ok(addr)
 
-        return unsafe {
-            static BACKLOG: c_int = 128; // XXX should be configurable
-            match uvll::uv_listen(self.native_handle(), BACKLOG, connection_cb) {
-                0 => Ok(()),
-                n => Err(UvError(n))
-            }
-        };
+}
 
-        extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
-            uvdebug!("connection_cb");
-            let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
-            let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
-            let status = status_to_maybe_uv_error(status);
-            (*cb)(stream_watcher, status);
-        }
-    }
+////////////////////////////////////////////////////////////////////////////////
+/// TCP implementation
+////////////////////////////////////////////////////////////////////////////////
 
-    pub fn accept(&mut self, stream: StreamWatcher) {
-        let self_handle = self.native_handle() as *c_void;
-        let stream_handle = stream.native_handle() as *c_void;
-        assert_eq!(0, unsafe { uvll::uv_accept(self_handle, stream_handle) } );
-    }
+pub struct TcpWatcher {
+    handle: *uvll::uv_tcp_t,
+    stream: StreamWatcher,
+    home: SchedHandle,
 }
 
-impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
-    fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
-        StreamWatcher(handle)
-    }
-    fn native_handle(&self) -> *uvll::uv_stream_t {
-        match self { &StreamWatcher(ptr) => ptr }
-    }
+pub struct TcpListener {
+    home: SchedHandle,
+    handle: *uvll::uv_pipe_t,
+    priv closing_task: Option<BlockedTask>,
+    priv outgoing: Tube<Result<~rtio::RtioTcpStream, IoError>>,
+}
+
+pub struct TcpAcceptor {
+    listener: ~TcpListener,
+    priv incoming: Tube<Result<~rtio::RtioTcpStream, IoError>>,
 }
 
-pub struct TcpWatcher(*uvll::uv_tcp_t);
-impl Watcher for TcpWatcher { }
+// TCP watchers (clients/streams)
 
 impl TcpWatcher {
     pub fn new(loop_: &Loop) -> TcpWatcher {
-        unsafe {
-            let handle = malloc_handle(UV_TCP);
-            assert!(handle.is_not_null());
-            assert_eq!(0, uvll::uv_tcp_init(loop_.native_handle(), handle));
-            let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
-            watcher.install_watcher_data();
-            return watcher;
+        let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
+        assert_eq!(unsafe {
+            uvll::uv_tcp_init(loop_.native_handle(), handle)
+        }, 0);
+        TcpWatcher {
+            home: get_handle_to_current_scheduler!(),
+            handle: handle,
+            stream: StreamWatcher::new(handle),
         }
     }
 
-    pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
-        do socket_addr_as_uv_socket_addr(address) |addr| {
-            let result = unsafe {
-                match addr {
-                    UvIpv4SocketAddr(addr) => uvll::tcp_bind(self.native_handle(), addr),
-                    UvIpv6SocketAddr(addr) => uvll::tcp_bind6(self.native_handle(), addr),
-                }
+    pub fn connect(loop_: &mut Loop, address: SocketAddr)
+        -> Result<TcpWatcher, UvError>
+    {
+        struct Ctx { status: c_int, task: Option<BlockedTask> }
+
+        let tcp = TcpWatcher::new(loop_);
+        let ret = do socket_addr_as_uv_socket_addr(address) |addr| {
+            let req = Request::new(uvll::UV_CONNECT);
+            let result = match addr {
+                UvIpv4SocketAddr(addr) => unsafe {
+                    uvll::tcp_connect(req.handle, tcp.handle, addr,
+                                      connect_cb)
+                },
+                UvIpv6SocketAddr(addr) => unsafe {
+                    uvll::tcp_connect6(req.handle, tcp.handle, addr,
+                                       connect_cb)
+                },
             };
             match result {
-                0 => Ok(()),
-                _ => Err(UvError(result)),
+                0 => {
+                    req.defuse();
+                    let mut cx = Ctx { status: 0, task: None };
+                    let scheduler: ~Scheduler = Local::take();
+                    do scheduler.deschedule_running_task_and_then |_, task| {
+                        cx.task = Some(task);
+                    }
+                    match cx.status {
+                        0 => Ok(()),
+                        n => Err(UvError(n)),
+                    }
+                }
+                n => Err(UvError(n))
             }
-        }
-    }
+        };
 
-    pub fn connect(&mut self, address: SocketAddr, cb: ConnectionCallback) {
-        unsafe {
-            assert!(self.get_watcher_data().connect_cb.is_none());
-            self.get_watcher_data().connect_cb = Some(cb);
-
-            let connect_handle = ConnectRequest::new().native_handle();
-            uvdebug!("connect_t: {}", connect_handle);
-            do socket_addr_as_uv_socket_addr(address) |addr| {
-                let result = match addr {
-                    UvIpv4SocketAddr(addr) => uvll::tcp_connect(connect_handle,
-                                                      self.native_handle(), addr, connect_cb),
-                    UvIpv6SocketAddr(addr) => uvll::tcp_connect6(connect_handle,
-                                                       self.native_handle(), addr, connect_cb),
-                };
-                assert_eq!(0, result);
-            }
+        return match ret {
+            Ok(()) => Ok(tcp),
+            Err(e) => Err(e),
+        };
 
-            extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
-                uvdebug!("connect_t: {}", req);
-                let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
-                let mut stream_watcher = connect_request.stream();
-                connect_request.delete();
-                let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
-                let status = status_to_maybe_uv_error(status);
-                cb(stream_watcher, status);
-            }
+        extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
+            let _req = Request::wrap(req);
+            if status == uvll::ECANCELED { return }
+            let cx: &mut Ctx = unsafe {
+                cast::transmute(uvll::get_data_for_req(req))
+            };
+            cx.status = status;
+            let scheduler: ~Scheduler = Local::take();
+            scheduler.resume_blocked_task_immediately(cx.task.take_unwrap());
         }
     }
+}
 
-    pub fn as_stream(&self) -> StreamWatcher {
-        NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t)
+impl HomingIO for TcpWatcher {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+}
+
+impl rtio::RtioSocket for TcpWatcher {
+    fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
+        let _m = self.fire_missiles();
+        socket_name(Tcp, self.handle)
     }
 }
 
-impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
-    fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
-        TcpWatcher(handle)
+impl rtio::RtioTcpStream for TcpWatcher {
+    fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
+        let _m = self.fire_missiles();
+        self.stream.read(buf).map_err(uv_error_to_io_error)
     }
-    fn native_handle(&self) -> *uvll::uv_tcp_t {
-        match self { &TcpWatcher(ptr) => ptr }
+
+    fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
+        let _m = self.fire_missiles();
+        self.stream.write(buf).map_err(uv_error_to_io_error)
     }
-}
 
-pub struct UdpWatcher(*uvll::uv_udp_t);
-impl Watcher for UdpWatcher { }
+    fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
+        let _m = self.fire_missiles();
+        socket_name(TcpPeer, self.handle)
+    }
 
-impl UdpWatcher {
-    pub fn new(loop_: &Loop) -> UdpWatcher {
-        unsafe {
-            let handle = malloc_handle(UV_UDP);
-            assert!(handle.is_not_null());
-            assert_eq!(0, uvll::uv_udp_init(loop_.native_handle(), handle));
-            let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
-            watcher.install_watcher_data();
-            return watcher;
-        }
+    fn control_congestion(&mut self) -> Result<(), IoError> {
+        let _m = self.fire_missiles();
+        status_to_io_result(unsafe {
+            uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
+        })
     }
 
-    pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
-        do socket_addr_as_uv_socket_addr(address) |addr| {
-            let result = unsafe {
-                match addr {
-                    UvIpv4SocketAddr(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32),
-                    UvIpv6SocketAddr(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32),
-                }
-            };
-            match result {
-                0 => Ok(()),
-                _ => Err(UvError(result)),
-            }
-        }
+    fn nodelay(&mut self) -> Result<(), IoError> {
+        let _m = self.fire_missiles();
+        status_to_io_result(unsafe {
+            uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
+        })
     }
 
-    pub fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) {
-        {
-            let data = self.get_watcher_data();
-            data.alloc_cb = Some(alloc);
-            data.udp_recv_cb = Some(cb);
-        }
+    fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
+        let _m = self.fire_missiles();
+        status_to_io_result(unsafe {
+            uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
+                                   delay_in_seconds as c_uint)
+        })
+    }
 
-        unsafe { uvll::uv_udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
+    fn letdie(&mut self) -> Result<(), IoError> {
+        let _m = self.fire_missiles();
+        status_to_io_result(unsafe {
+            uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
+        })
+    }
+}
 
-        extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
-            let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
-            let alloc_cb = udp_watcher.get_watcher_data().alloc_cb.get_ref();
-            return (*alloc_cb)(suggested_size as uint);
-        }
+impl Drop for TcpWatcher {
+    fn drop(&mut self) {
+        let _m = self.fire_missiles();
+        self.stream.close(true);
+    }
+}
 
-        extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
-                          addr: *uvll::sockaddr, flags: c_uint) {
-            // When there's no data to read the recv callback can be a no-op.
-            // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
-            // this we just drop back to kqueue and wait for the next callback.
-            if nread == 0 {
-                return;
+// TCP listeners (unbound servers)
+
+impl TcpListener {
+    pub fn bind(loop_: &mut Loop, address: SocketAddr)
+        -> Result<~TcpListener, UvError>
+    {
+        let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
+        assert_eq!(unsafe {
+            uvll::uv_tcp_init(loop_.native_handle(), handle)
+        }, 0);
+        let l = ~TcpListener {
+            home: get_handle_to_current_scheduler!(),
+            handle: handle,
+            closing_task: None,
+            outgoing: Tube::new(),
+        };
+        let res = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
+            match addr {
+                UvIpv4SocketAddr(addr) => uvll::tcp_bind(l.handle, addr),
+                UvIpv6SocketAddr(addr) => uvll::tcp_bind6(l.handle, addr),
             }
-
-            uvdebug!("buf addr: {}", buf.base);
-            uvdebug!("buf len: {}", buf.len);
-            let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
-            let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
-            let status = status_to_maybe_uv_error(nread as c_int);
-            let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
-            (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
+        });
+        match res {
+            0 => Ok(l.install()),
+            n => Err(UvError(n))
         }
     }
+}
+
+impl HomingIO for TcpListener {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+}
+
+impl UvHandle<uvll::uv_tcp_t> for TcpListener {
+    fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
+}
 
-    pub fn recv_stop(&mut self) {
-        unsafe { uvll::uv_udp_recv_stop(self.native_handle()); }
+impl rtio::RtioSocket for TcpListener {
+    fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
+        let _m = self.fire_missiles();
+        socket_name(Tcp, self.handle)
     }
+}
+
+impl rtio::RtioTcpListener for TcpListener {
+    fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
+        // create the acceptor object from ourselves
+        let incoming = self.outgoing.clone();
+        let mut acceptor = ~TcpAcceptor {
+            listener: self,
+            incoming: incoming,
+        };
 
-    pub fn send(&mut self, buf: Buf, address: SocketAddr, cb: UdpSendCallback) {
-        {
-            let data = self.get_watcher_data();
-            assert!(data.udp_send_cb.is_none());
-            data.udp_send_cb = Some(cb);
+        let _m = acceptor.fire_missiles();
+        // XXX: the 128 backlog should be configurable
+        match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
+            0 => Ok(acceptor as ~rtio::RtioTcpAcceptor),
+            n => Err(uv_error_to_io_error(UvError(n))),
         }
+    }
+}
 
-        let req = UdpSendRequest::new();
-        do socket_addr_as_uv_socket_addr(address) |addr| {
-            let result = unsafe {
-                match addr {
-                    UvIpv4SocketAddr(addr) => uvll::udp_send(req.native_handle(),
-                                                   self.native_handle(), [buf], addr, send_cb),
-                    UvIpv6SocketAddr(addr) => uvll::udp_send6(req.native_handle(),
-                                                    self.native_handle(), [buf], addr, send_cb),
-                }
-            };
-            assert_eq!(0, result);
+extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
+    let msg = match status {
+        0 => {
+            let loop_ = NativeHandle::from_native_handle(unsafe {
+                uvll::get_loop_for_uv_handle(server)
+            });
+            let client = TcpWatcher::new(&loop_);
+            assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
+            Ok(~client as ~rtio::RtioTcpStream)
         }
+        uvll::ECANCELED => return,
+        n => Err(uv_error_to_io_error(UvError(n)))
+    };
 
-        extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
-            let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
-            let mut udp_watcher = send_request.handle();
-            send_request.delete();
-            let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap();
-            let status = status_to_maybe_uv_error(status);
-            cb(udp_watcher, status);
+    let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
+    tcp.outgoing.send(msg);
+}
+
+impl Drop for TcpListener {
+    fn drop(&mut self) {
+        let (_m, sched) = self.fire_missiles_sched();
+
+        do sched.deschedule_running_task_and_then |_, task| {
+            self.closing_task = Some(task);
+            unsafe { uvll::uv_close(self.handle, listener_close_cb) }
         }
     }
 }
 
-impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
-    fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
-        UdpWatcher(handle)
-    }
-    fn native_handle(&self) -> *uvll::uv_udp_t {
-        match self { &UdpWatcher(ptr) => ptr }
-    }
+extern fn listener_close_cb(handle: *uvll::uv_handle_t) {
+    let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&handle) };
+    unsafe { uvll::free_handle(handle) }
+
+    let sched: ~Scheduler = Local::take();
+    sched.resume_blocked_task_immediately(tcp.closing_task.take_unwrap());
 }
 
-// uv_connect_t is a subclass of uv_req_t
-pub struct ConnectRequest(*uvll::uv_connect_t);
-impl Request for ConnectRequest { }
+// TCP acceptors (bound servers)
+
+impl HomingIO for TcpAcceptor {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
+}
 
-impl ConnectRequest {
+impl rtio::RtioSocket for TcpAcceptor {
+    fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
+        let _m = self.fire_missiles();
+        socket_name(Tcp, self.listener.handle)
+    }
+}
 
-    pub fn new() -> ConnectRequest {
-        let connect_handle = unsafe { malloc_req(UV_CONNECT) };
-        assert!(connect_handle.is_not_null());
-        ConnectRequest(connect_handle as *uvll::uv_connect_t)
+impl rtio::RtioTcpAcceptor for TcpAcceptor {
+    fn accept(&mut self) -> Result<~rtio::RtioTcpStream, IoError> {
+        let _m = self.fire_missiles();
+        self.incoming.recv()
     }
 
-    fn stream(&self) -> StreamWatcher {
-        unsafe {
-            let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle());
-            NativeHandle::from_native_handle(stream_handle)
-        }
+    fn accept_simultaneously(&mut self) -> Result<(), IoError> {
+        let _m = self.fire_missiles();
+        status_to_io_result(unsafe {
+            uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
+        })
     }
 
-    fn delete(self) {
-        unsafe { free_req(self.native_handle() as *c_void) }
+    fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
+        let _m = self.fire_missiles();
+        status_to_io_result(unsafe {
+            uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
+        })
     }
 }
 
-impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
-    fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
-        ConnectRequest(handle)
+////////////////////////////////////////////////////////////////////////////////
+/// UDP implementation
+////////////////////////////////////////////////////////////////////////////////
+
+pub struct UdpWatcher {
+    handle: *uvll::uv_udp_t,
+    home: SchedHandle,
+}
+
+impl UdpWatcher {
+    pub fn bind(loop_: &Loop, address: SocketAddr)
+        -> Result<UdpWatcher, UvError>
+    {
+        let udp = UdpWatcher {
+            handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
+            home: get_handle_to_current_scheduler!(),
+        };
+        assert_eq!(unsafe {
+            uvll::uv_udp_init(loop_.native_handle(), udp.handle)
+        }, 0);
+        let result = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
+            match addr {
+                UvIpv4SocketAddr(addr) => uvll::udp_bind(udp.handle, addr, 0u32),
+                UvIpv6SocketAddr(addr) => uvll::udp_bind6(udp.handle, addr, 0u32),
+            }
+        });
+        match result {
+            0 => Ok(udp),
+            n => Err(UvError(n)),
+        }
     }
-    fn native_handle(&self) -> *uvll::uv_connect_t {
-        match self { &ConnectRequest(ptr) => ptr }
+}
+
+impl HomingIO for UdpWatcher {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+}
+
+impl rtio::RtioSocket for UdpWatcher {
+    fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
+        let _m = self.fire_missiles();
+        socket_name(Udp, self.handle)
     }
 }
 
-pub struct WriteRequest(*uvll::uv_write_t);
+impl rtio::RtioUdpSocket for UdpWatcher {
+    fn recvfrom(&mut self, buf: &mut [u8])
+        -> Result<(uint, SocketAddr), IoError>
+    {
+        struct Ctx {
+            task: Option<BlockedTask>,
+            buf: Option<Buf>,
+            result: Option<(ssize_t, SocketAddr)>,
+        }
+        let _m = self.fire_missiles();
+
+        return match unsafe {
+            uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
+        } {
+            0 => {
+                let mut cx = Ctx {
+                    task: None,
+                    buf: Some(slice_to_uv_buf(buf)),
+                    result: None,
+                };
+                unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) }
+                let scheduler: ~Scheduler = Local::take();
+                do scheduler.deschedule_running_task_and_then |_, task| {
+                    cx.task = Some(task);
+                }
+                match cx.result.take_unwrap() {
+                    (n, _) if n < 0 =>
+                        Err(uv_error_to_io_error(UvError(n as c_int))),
+                    (n, addr) => Ok((n as uint, addr))
+                }
+            }
+            n => Err(uv_error_to_io_error(UvError(n)))
+        };
+
+        extern fn alloc_cb(handle: *uvll::uv_udp_t,
+                           _suggested_size: size_t) -> Buf {
+            let cx: &mut Ctx = unsafe {
+                cast::transmute(uvll::get_data_for_uv_handle(handle))
+            };
+            cx.buf.take().expect("alloc_cb called more than once")
+        }
+
+        extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, _buf: Buf,
+                          addr: *uvll::sockaddr, _flags: c_uint) {
 
-impl Request for WriteRequest { }
+            // When there's no data to read the recv callback can be a no-op.
+            // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
+            // this we just drop back to kqueue and wait for the next callback.
+            if nread == 0 { return }
+            if nread == uvll::ECANCELED as ssize_t { return }
 
-impl WriteRequest {
-    pub fn new() -> WriteRequest {
-        let write_handle = unsafe { malloc_req(UV_WRITE) };
-        assert!(write_handle.is_not_null());
-        WriteRequest(write_handle as *uvll::uv_write_t)
+            unsafe {
+                assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
+            }
+
+            let cx: &mut Ctx = unsafe {
+                cast::transmute(uvll::get_data_for_uv_handle(handle))
+            };
+            let addr = sockaddr_to_UvSocketAddr(addr);
+            let addr = uv_socket_addr_to_socket_addr(addr);
+            cx.result = Some((nread, addr));
+
+            let sched: ~Scheduler = Local::take();
+            sched.resume_blocked_task_immediately(cx.task.take_unwrap());
+        }
     }
 
-    pub fn stream(&self) -> StreamWatcher {
-        unsafe {
-            let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle());
-            NativeHandle::from_native_handle(stream_handle)
+    fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
+        struct Ctx { task: Option<BlockedTask>, result: c_int }
+
+        let _m = self.fire_missiles();
+
+        let req = Request::new(uvll::UV_UDP_SEND);
+        let buf = slice_to_uv_buf(buf);
+        let result = socket_addr_as_uv_socket_addr(dst, |dst| unsafe {
+            match dst {
+                UvIpv4SocketAddr(dst) =>
+                    uvll::udp_send(req.handle, self.handle, [buf], dst, send_cb),
+                UvIpv6SocketAddr(dst) =>
+                    uvll::udp_send6(req.handle, self.handle, [buf], dst, send_cb),
+            }
+        });
+
+        return match result {
+            0 => {
+                let mut cx = Ctx { task: None, result: 0 };
+                req.set_data(&cx);
+                req.defuse();
+
+                let sched: ~Scheduler = Local::take();
+                do sched.deschedule_running_task_and_then |_, task| {
+                    cx.task = Some(task);
+                }
+
+                match cx.result {
+                    0 => Ok(()),
+                    n => Err(uv_error_to_io_error(UvError(n)))
+                }
+            }
+            n => Err(uv_error_to_io_error(UvError(n)))
+        };
+
+        extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
+            let req = Request::wrap(req);
+            let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
+            cx.result = status;
+
+            let sched: ~Scheduler = Local::take();
+            sched.resume_blocked_task_immediately(cx.task.take_unwrap());
         }
     }
 
-    pub fn delete(self) {
-        unsafe { free_req(self.native_handle() as *c_void) }
+    fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
+        let _m = self.fire_missiles();
+        status_to_io_result(unsafe {
+            do multi.to_str().with_c_str |m_addr| {
+                uvll::uv_udp_set_membership(self.handle,
+                                            m_addr, ptr::null(),
+                                            uvll::UV_JOIN_GROUP)
+            }
+        })
     }
-}
 
-impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
-    fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
-        WriteRequest(handle)
+    fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
+        let _m = self.fire_missiles();
+        status_to_io_result(unsafe {
+            do multi.to_str().with_c_str |m_addr| {
+                uvll::uv_udp_set_membership(self.handle,
+                                            m_addr, ptr::null(),
+                                            uvll::UV_LEAVE_GROUP)
+            }
+        })
     }
-    fn native_handle(&self) -> *uvll::uv_write_t {
-        match self { &WriteRequest(ptr) => ptr }
+
+    fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
+        let _m = self.fire_missiles();
+        status_to_io_result(unsafe {
+            uvll::uv_udp_set_multicast_loop(self.handle,
+                                            1 as c_int)
+        })
     }
-}
 
-pub struct UdpSendRequest(*uvll::uv_udp_send_t);
-impl Request for UdpSendRequest { }
+    fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
+        let _m = self.fire_missiles();
+        status_to_io_result(unsafe {
+            uvll::uv_udp_set_multicast_loop(self.handle,
+                                            0 as c_int)
+        })
+    }
 
-impl UdpSendRequest {
-    pub fn new() -> UdpSendRequest {
-        let send_handle = unsafe { malloc_req(UV_UDP_SEND) };
-        assert!(send_handle.is_not_null());
-        UdpSendRequest(send_handle as *uvll::uv_udp_send_t)
+    fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
+        let _m = self.fire_missiles();
+        status_to_io_result(unsafe {
+            uvll::uv_udp_set_multicast_ttl(self.handle,
+                                           ttl as c_int)
+        })
     }
 
-    pub fn handle(&self) -> UdpWatcher {
-        let send_request_handle = unsafe {
-            uvll::get_udp_handle_from_send_req(self.native_handle())
-        };
-        NativeHandle::from_native_handle(send_request_handle)
+    fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
+        let _m = self.fire_missiles();
+        status_to_io_result(unsafe {
+            uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
+        })
     }
 
-    pub fn delete(self) {
-        unsafe { free_req(self.native_handle() as *c_void) }
+    fn hear_broadcasts(&mut self) -> Result<(), IoError> {
+        let _m = self.fire_missiles();
+        status_to_io_result(unsafe {
+            uvll::uv_udp_set_broadcast(self.handle,
+                                       1 as c_int)
+        })
     }
-}
 
-impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
-    fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
-        UdpSendRequest(handle)
+    fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
+        let _m = self.fire_missiles();
+        status_to_io_result(unsafe {
+            uvll::uv_udp_set_broadcast(self.handle,
+                                       0 as c_int)
+        })
     }
-    fn native_handle(&self) -> *uvll::uv_udp_send_t {
-        match self { &UdpSendRequest(ptr) => ptr }
+}
+
+impl Drop for UdpWatcher {
+    fn drop(&mut self) {
+        // Send ourselves home to close this handle (blocking while doing so).
+        let (_m, sched) = self.fire_missiles_sched();
+        let mut slot = None;
+        unsafe {
+            uvll::set_data_for_uv_handle(self.handle, &slot);
+            uvll::uv_close(self.handle, close_cb);
+        }
+        do sched.deschedule_running_task_and_then |_, task| {
+            slot = Some(task);
+        }
+
+        extern fn close_cb(handle: *uvll::uv_handle_t) {
+            let slot: &mut Option<BlockedTask> = unsafe {
+                cast::transmute(uvll::get_data_for_uv_handle(handle))
+            };
+            let sched: ~Scheduler = Local::take();
+            sched.resume_blocked_task_immediately(slot.take_unwrap());
+        }
     }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+/// UV request support
+////////////////////////////////////////////////////////////////////////////////
+
 #[cfg(test)]
 mod test {
     use super::*;
index a857308a81b35a18baf89163c4456d25b65af974..2a41dd9efe19bccf099ad5f6216c18292225a79d 100644 (file)
@@ -19,7 +19,7 @@
 use std::rt::tube::Tube;
 
 use stream::StreamWatcher;
-use super::{Loop, UvError, NativeHandle, uv_error_to_io_error, UvHandle};
+use super::{Loop, UvError, NativeHandle, uv_error_to_io_error, UvHandle, Request};
 use uvio::HomingIO;
 use uvll;
 
@@ -79,23 +79,26 @@ struct Ctx {
             result: Option<Result<PipeWatcher, UvError>>,
         }
         let mut cx = Ctx { task: None, result: None };
-        let req = unsafe { uvll::malloc_req(uvll::UV_CONNECT) };
-        unsafe { uvll::set_data_for_req(req, &cx as *Ctx) }
+        let req = Request::new(uvll::UV_CONNECT);
+        unsafe {
+            uvll::set_data_for_req(req.handle, &cx as *Ctx);
+            uvll::uv_pipe_connect(req.handle,
+                                  PipeWatcher::alloc(loop_, false),
+                                  name.with_ref(|p| p),
+                                  connect_cb)
+        }
+        req.defuse();
 
         let sched: ~Scheduler = Local::take();
         do sched.deschedule_running_task_and_then |_, task| {
             cx.task = Some(task);
-            unsafe {
-                uvll::uv_pipe_connect(req,
-                                      PipeWatcher::alloc(loop_, false),
-                                      name.with_ref(|p| p),
-                                      connect_cb)
-            }
         }
         assert!(cx.task.is_none());
         return cx.result.take().expect("pipe connect needs a result");
 
         extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {
+            let _req = Request::wrap(req);
+            if status == uvll::ECANCELED { return }
             unsafe {
                 let cx: &mut Ctx = cast::transmute(uvll::get_data_for_req(req));
                 let stream = uvll::get_stream_handle_from_connect_req(req);
@@ -106,7 +109,6 @@ struct Ctx {
                         Err(UvError(n))
                     }
                 });
-                uvll::free_req(req);
 
                 let sched: ~Scheduler = Local::take();
                 sched.resume_blocked_task_immediately(cx.task.take_unwrap());
@@ -201,6 +203,7 @@ fn uv_handle(&self) -> *uvll::uv_pipe_t { self.pipe }
             assert_eq!(unsafe { uvll::uv_accept(server, client) }, 0);
             Ok(~PipeWatcher::new(client) as ~RtioPipe)
         }
+        uvll::ECANCELED => return,
         n => Err(uv_error_to_io_error(UvError(n)))
     };
 
index 50964d7a84c63aff6546cacb948b492ae990d40f..7b44c350f13de7996055ad0e5d198d34a03ae1c2 100644 (file)
@@ -8,7 +8,6 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use std::cell::Cell;
 use std::libc::c_int;
 use std::libc;
 use std::ptr;
@@ -58,8 +57,7 @@ pub fn spawn(loop_: &Loop, config: ProcessConfig)
             }
         }
 
-        let ret_io = Cell::new(ret_io);
-        do with_argv(config.program, config.args) |argv| {
+        let ret = do with_argv(config.program, config.args) |argv| {
             do with_env(config.env) |envp| {
                 let options = uvll::uv_process_options_t {
                     exit_cb: on_exit,
@@ -89,7 +87,7 @@ pub fn spawn(loop_: &Loop, config: ProcessConfig)
                             exit_status: None,
                             term_signal: None,
                         };
-                        Ok((process.install(), ret_io.take()))
+                        Ok(process.install())
                     }
                     err => {
                         unsafe { uvll::free_handle(handle) }
@@ -97,6 +95,11 @@ pub fn spawn(loop_: &Loop, config: ProcessConfig)
                     }
                 }
             }
+        };
+
+        match ret {
+            Ok(p) => Ok((p, ret_io)),
+            Err(e) => Err(e),
         }
     }
 }
index ad0deebd45711abbc052cb00262ef35e4289a047..01bc02a50be900e269779258394f4a4b2da45cc4 100644 (file)
@@ -15,7 +15,7 @@
 use std::rt::local::Local;
 use std::rt::sched::Scheduler;
 
-use super::{UvError, Buf, slice_to_uv_buf};
+use super::{UvError, Buf, slice_to_uv_buf, Request};
 use uvll;
 
 // This is a helper structure which is intended to get embedded into other
@@ -29,17 +29,17 @@ pub struct StreamWatcher {
     // every call to uv_write(). Ideally this would be a stack-allocated
     // structure, but currently we don't have mappings for all the structures
     // defined in libuv, so we're foced to malloc this.
-    priv last_write_req: Option<*uvll::uv_write_t>,
+    priv last_write_req: Option<Request>,
 }
 
 struct ReadContext {
     buf: Option<Buf>,
-    result: Option<Result<uint, UvError>>,
+    result: ssize_t,
     task: Option<BlockedTask>,
 }
 
 struct WriteContext {
-    result: Option<Result<(), UvError>>,
+    result: c_int,
     task: Option<BlockedTask>,
 }
 
@@ -72,7 +72,7 @@ pub fn read(&mut self, buf: &mut [u8]) -> Result<uint, UvError> {
             0 => {
                 let mut rcx = ReadContext {
                     buf: Some(slice_to_uv_buf(buf)),
-                    result: None,
+                    result: 0,
                     task: None,
                 };
                 unsafe {
@@ -82,7 +82,10 @@ pub fn read(&mut self, buf: &mut [u8]) -> Result<uint, UvError> {
                 do scheduler.deschedule_running_task_and_then |_sched, task| {
                     rcx.task = Some(task);
                 }
-                rcx.result.take().expect("no result in read stream?")
+                match rcx.result {
+                    n if n < 0 => Err(UvError(n as c_int)),
+                    n => Ok(n as uint),
+                }
             }
             n => Err(UvError(n))
         }
@@ -91,27 +94,29 @@ pub fn read(&mut self, buf: &mut [u8]) -> Result<uint, UvError> {
     pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> {
         // Prepare the write request, either using a cached one or allocating a
         // new one
-        let req = match self.last_write_req {
-            Some(req) => req,
-            None => unsafe { uvll::malloc_req(uvll::UV_WRITE) },
-        };
-        self.last_write_req = Some(req);
-        let mut wcx = WriteContext { result: None, task: None, };
-        unsafe { uvll::set_data_for_req(req, &wcx as *WriteContext) }
+        if self.last_write_req.is_none() {
+            self.last_write_req = Some(Request::new(uvll::UV_WRITE));
+        }
+        let req = self.last_write_req.get_ref();
 
         // Send off the request, but be careful to not block until we're sure
         // that the write reqeust is queued. If the reqeust couldn't be queued,
         // then we should return immediately with an error.
         match unsafe {
-            uvll::uv_write(req, self.handle, [slice_to_uv_buf(buf)], write_cb)
+            uvll::uv_write(req.handle, self.handle, [slice_to_uv_buf(buf)],
+                           write_cb)
         } {
             0 => {
+                let mut wcx = WriteContext { result: 0, task: None, };
+                req.set_data(&wcx);
                 let scheduler: ~Scheduler = Local::take();
                 do scheduler.deschedule_running_task_and_then |_sched, task| {
                     wcx.task = Some(task);
                 }
-                assert!(wcx.task.is_none());
-                wcx.result.take().expect("no result in write stream?")
+                match wcx.result {
+                    0 => Ok(()),
+                    n => Err(UvError(n)),
+                }
             }
             n => Err(UvError(n)),
         }
@@ -124,12 +129,6 @@ pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> {
     // synchronously (the task is blocked) or asynchronously (the task is not
     // block, but the handle is still deallocated).
     pub fn close(&mut self, synchronous: bool) {
-        // clean up the cached write request if we have one
-        match self.last_write_req {
-            Some(req) => unsafe { uvll::free_req(req) },
-            None => {}
-        }
-
         if synchronous {
             let mut closing_task = None;
             unsafe {
@@ -186,31 +185,24 @@ pub fn close(&mut self, synchronous: bool) {
     // XXX: Is there a performance impact to calling
     // stop here?
     unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); }
+    rcx.result = nread;
 
-    assert!(rcx.result.is_none());
-    rcx.result = Some(match nread {
-        n if n < 0 => Err(UvError(n as c_int)),
-        n => Ok(n as uint),
-    });
-
-    let task = rcx.task.take().expect("read_cb needs a task");
     let scheduler: ~Scheduler = Local::take();
-    scheduler.resume_blocked_task_immediately(task);
+    scheduler.resume_blocked_task_immediately(rcx.task.take_unwrap());
 }
 
 // Unlike reading, the WriteContext is stored in the uv_write_t request. Like
 // reading, however, all this does is wake up the blocked task after squirreling
 // away the error code as a result.
 extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
+    if status == uvll::ECANCELED { return }
     // Remember to not free the request because it is re-used between writes on
     // the same stream.
-    unsafe {
-        let wcx: &mut WriteContext = cast::transmute(uvll::get_data_for_req(req));
-        wcx.result = Some(match status {
-            0 => Ok(()),
-            n => Err(UvError(n)),
-        });
-        let sched: ~Scheduler = Local::take();
-        sched.resume_blocked_task_immediately(wcx.task.take_unwrap());
-    }
+    let req = Request::wrap(req);
+    let wcx: &mut WriteContext = unsafe { cast::transmute(req.get_data()) };
+    wcx.result = status;
+
+    let sched: ~Scheduler = Local::take();
+    sched.resume_blocked_task_immediately(wcx.task.take_unwrap());
+    req.defuse();
 }
index 1732e84be4e7079123d2771047a02de585ee887f..46731993bc7b6c69397f85037d33ef7cc0ae6d59 100644 (file)
@@ -8,7 +8,6 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use std::cell::Cell;
 use std::comm::{oneshot, stream, PortOne, ChanOne, SendDeferred};
 use std::libc::c_int;
 use std::rt::BlockedTask;
@@ -77,10 +76,9 @@ fn sleep(&mut self, msecs: u64) {
 
     fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
         let (port, chan) = oneshot();
-        let chan = Cell::new(chan);
 
         let _m = self.fire_missiles();
-        self.action = Some(SendOnce(chan.take()));
+        self.action = Some(SendOnce(chan));
         self.start(msecs, 0);
 
         return port;
@@ -88,10 +86,9 @@ fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
 
     fn period(&mut self, msecs: u64) -> Port<()> {
         let (port, chan) = stream();
-        let chan = Cell::new(chan);
 
         let _m = self.fire_missiles();
-        self.action = Some(SendMany(chan.take()));
+        self.action = Some(SendMany(chan));
         self.start(msecs, msecs);
 
         return port;
index 1c6e59d9f2eacbda35b60571c482145dd67c1c5f..d0a160ba8ce9e94fe29adbc7720c7cdda94619c7 100644 (file)
 use std::c_str::CString;
 use std::cast::transmute;
 use std::cast;
-use std::cell::Cell;
-use std::clone::Clone;
 use std::comm::{SharedChan, GenericChan};
 use std::libc;
-use std::libc::{c_int, c_uint, c_void};
-use std::ptr;
+use std::libc::c_int;
 use std::str;
 use std::rt::io;
 use std::rt::io::IoError;
-use std::rt::io::net::ip::{SocketAddr, IpAddr};
-use std::rt::io::{standard_error, OtherIoError};
+use std::rt::io::net::ip::SocketAddr;
 use std::rt::io::process::ProcessConfig;
 use std::rt::local::Local;
 use std::rt::rtio::*;
 use std::rt::sched::{Scheduler, SchedHandle};
-use std::rt::tube::Tube;
 use std::rt::task::Task;
 use std::path::Path;
 use std::libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY,
@@ -45,9 +40,7 @@
 
 use super::*;
 use idle::IdleWatcher;
-use net::{UvIpv4SocketAddr, UvIpv6SocketAddr};
 use addrinfo::GetAddrInfoRequest;
-use pipe::PipeListener;
 
 // XXX we should not be calling uvll functions in here.
 
@@ -137,47 +130,6 @@ fn drop(&mut self) {
     }
 }
 
-enum SocketNameKind {
-    TcpPeer,
-    Tcp,
-    Udp
-}
-
-fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
-                                                 handle: U) -> Result<SocketAddr, IoError> {
-    let getsockname = match sk {
-        TcpPeer => uvll::tcp_getpeername,
-        Tcp     => uvll::tcp_getsockname,
-        Udp     => uvll::udp_getsockname,
-    };
-
-    // Allocate a sockaddr_storage
-    // since we don't know if it's ipv4 or ipv6
-    let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
-
-    let r = unsafe {
-        getsockname(handle.native_handle() as *c_void, r_addr as *uvll::sockaddr_storage)
-    };
-
-    if r != 0 {
-        let status = status_to_maybe_uv_error(r);
-        return Err(uv_error_to_io_error(status.unwrap()));
-    }
-
-    let addr = unsafe {
-        if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
-            net::uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
-        } else {
-            net::uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
-        }
-    };
-
-    unsafe { uvll::free_sockaddr_storage(r_addr); }
-
-    Ok(addr)
-
-}
-
 // Obviously an Event Loop is always home.
 pub struct UvEventLoop {
     priv uvio: UvIoFactory
@@ -251,97 +203,26 @@ impl IoFactory for UvIoFactory {
     // Connect to an address and return a new stream
     // NB: This blocks the task waiting on the connection.
     // It would probably be better to return a future
-    fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStream, IoError> {
-        // Create a cell in the task to hold the result. We will fill
-        // the cell before resuming the task.
-        let result_cell = Cell::new_empty();
-        let result_cell_ptr: *Cell<Result<~RtioTcpStream, IoError>> = &result_cell;
-
-        // Block this task and take ownership, switch to scheduler context
-        do task::unkillable { // FIXME(#8674)
-            let scheduler: ~Scheduler = Local::take();
-            do scheduler.deschedule_running_task_and_then |_, task| {
-
-                let mut tcp = TcpWatcher::new(self.uv_loop());
-                let task_cell = Cell::new(task);
-
-                // Wait for a connection
-                do tcp.connect(addr) |stream, status| {
-                    match status {
-                        None => {
-                            let tcp = NativeHandle::from_native_handle(stream.native_handle());
-                            let home = get_handle_to_current_scheduler!();
-                            let res = Ok(~UvTcpStream { watcher: tcp, home: home }
-                                                as ~RtioTcpStream);
-
-                            // Store the stream in the task's stack
-                            unsafe { (*result_cell_ptr).put_back(res); }
-
-                            // Context switch
-                            let scheduler: ~Scheduler = Local::take();
-                            scheduler.resume_blocked_task_immediately(task_cell.take());
-                        }
-                        Some(_) => {
-                            let task_cell = Cell::new(task_cell.take());
-                            do stream.close {
-                                let res = Err(uv_error_to_io_error(status.unwrap()));
-                                unsafe { (*result_cell_ptr).put_back(res); }
-                                let scheduler: ~Scheduler = Local::take();
-                                scheduler.resume_blocked_task_immediately(task_cell.take());
-                            }
-                        }
-                    }
-                }
-            }
+    fn tcp_connect(&mut self, addr: SocketAddr)
+        -> Result<~RtioTcpStream, IoError>
+    {
+        match TcpWatcher::connect(self.uv_loop(), addr) {
+            Ok(t) => Ok(~t as ~RtioTcpStream),
+            Err(e) => Err(uv_error_to_io_error(e)),
         }
-
-        assert!(!result_cell.is_empty());
-        return result_cell.take();
     }
 
     fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListener, IoError> {
-        let mut watcher = TcpWatcher::new(self.uv_loop());
-        match watcher.bind(addr) {
-            Ok(_) => {
-                let home = get_handle_to_current_scheduler!();
-                Ok(~UvTcpListener::new(watcher, home) as ~RtioTcpListener)
-            }
-            Err(uverr) => {
-                do task::unkillable { // FIXME(#8674)
-                    let scheduler: ~Scheduler = Local::take();
-                    do scheduler.deschedule_running_task_and_then |_, task| {
-                        let task_cell = Cell::new(task);
-                        do watcher.as_stream().close {
-                            let scheduler: ~Scheduler = Local::take();
-                            scheduler.resume_blocked_task_immediately(task_cell.take());
-                        }
-                    }
-                    Err(uv_error_to_io_error(uverr))
-                }
-            }
+        match TcpListener::bind(self.uv_loop(), addr) {
+            Ok(t) => Ok(t as ~RtioTcpListener),
+            Err(e) => Err(uv_error_to_io_error(e)),
         }
     }
 
     fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocket, IoError> {
-        let mut watcher = UdpWatcher::new(self.uv_loop());
-        match watcher.bind(addr) {
-            Ok(_) => {
-                let home = get_handle_to_current_scheduler!();
-                Ok(~UvUdpSocket { watcher: watcher, home: home } as ~RtioUdpSocket)
-            }
-            Err(uverr) => {
-                do task::unkillable { // FIXME(#8674)
-                    let scheduler: ~Scheduler = Local::take();
-                    do scheduler.deschedule_running_task_and_then |_, task| {
-                        let task_cell = Cell::new(task);
-                        do watcher.close {
-                            let scheduler: ~Scheduler = Local::take();
-                            scheduler.resume_blocked_task_immediately(task_cell.take());
-                        }
-                    }
-                    Err(uv_error_to_io_error(uverr))
-                }
-            }
+        match UdpWatcher::bind(self.uv_loop(), addr) {
+            Ok(u) => Ok(~u as ~RtioUdpSocket),
+            Err(e) => Err(uv_error_to_io_error(e)),
         }
     }
 
@@ -487,416 +368,6 @@ fn signal(&mut self, signum: Signum, channel: SharedChan<Signum>)
     }
 }
 
-pub struct UvTcpListener {
-    priv watcher : TcpWatcher,
-    priv home: SchedHandle,
-}
-
-impl HomingIO for UvTcpListener {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
-}
-
-impl UvTcpListener {
-    fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
-        UvTcpListener { watcher: watcher, home: home }
-    }
-}
-
-impl Drop for UvTcpListener {
-    fn drop(&mut self) {
-        let (_m, sched) = self.fire_homing_missile_sched();
-        do sched.deschedule_running_task_and_then |_, task| {
-            let task = Cell::new(task);
-            do self.watcher.as_stream().close {
-                let scheduler: ~Scheduler = Local::take();
-                scheduler.resume_blocked_task_immediately(task.take());
-            }
-        }
-    }
-}
-
-impl RtioSocket for UvTcpListener {
-    fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
-        let _m = self.fire_homing_missile();
-        socket_name(Tcp, self.watcher)
-    }
-}
-
-impl RtioTcpListener for UvTcpListener {
-    fn listen(mut ~self) -> Result<~RtioTcpAcceptor, IoError> {
-        let _m = self.fire_homing_missile();
-        let acceptor = ~UvTcpAcceptor::new(*self);
-        let incoming = Cell::new(acceptor.incoming.clone());
-        let mut stream = acceptor.listener.watcher.as_stream();
-        let res = do stream.listen |mut server, status| {
-            do incoming.with_mut_ref |incoming| {
-                let inc = match status {
-                    Some(_) => Err(standard_error(OtherIoError)),
-                    None => {
-                        let inc = TcpWatcher::new(&server.event_loop());
-                        // first accept call in the callback guarenteed to succeed
-                        server.accept(inc.as_stream());
-                        let home = get_handle_to_current_scheduler!();
-                        Ok(~UvTcpStream { watcher: inc, home: home }
-                                as ~RtioTcpStream)
-                    }
-                };
-                incoming.send(inc);
-            }
-        };
-        match res {
-            Ok(()) => Ok(acceptor as ~RtioTcpAcceptor),
-            Err(e) => Err(uv_error_to_io_error(e)),
-        }
-    }
-}
-
-pub struct UvTcpAcceptor {
-    priv listener: UvTcpListener,
-    priv incoming: Tube<Result<~RtioTcpStream, IoError>>,
-}
-
-impl HomingIO for UvTcpAcceptor {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
-}
-
-impl UvTcpAcceptor {
-    fn new(listener: UvTcpListener) -> UvTcpAcceptor {
-        UvTcpAcceptor { listener: listener, incoming: Tube::new() }
-    }
-}
-
-impl RtioSocket for UvTcpAcceptor {
-    fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
-        let _m = self.fire_homing_missile();
-        socket_name(Tcp, self.listener.watcher)
-    }
-}
-
-fn accept_simultaneously(stream: StreamWatcher, a: int) -> Result<(), IoError> {
-    let r = unsafe {
-        uvll::uv_tcp_simultaneous_accepts(stream.native_handle(), a as c_int)
-    };
-    status_to_io_result(r)
-}
-
-impl RtioTcpAcceptor for UvTcpAcceptor {
-    fn accept(&mut self) -> Result<~RtioTcpStream, IoError> {
-        let _m = self.fire_homing_missile();
-        self.incoming.recv()
-    }
-
-    fn accept_simultaneously(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_homing_missile();
-        accept_simultaneously(self.listener.watcher.as_stream(), 1)
-    }
-
-    fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_homing_missile();
-        accept_simultaneously(self.listener.watcher.as_stream(), 0)
-    }
-}
-
-fn read_stream(mut watcher: StreamWatcher,
-               scheduler: ~Scheduler,
-               buf: &mut [u8]) -> Result<uint, IoError> {
-    let result_cell = Cell::new_empty();
-    let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
-
-    let uv_buf = slice_to_uv_buf(buf);
-    do scheduler.deschedule_running_task_and_then |_sched, task| {
-        let task_cell = Cell::new(task);
-        // XXX: We shouldn't reallocate these callbacks every
-        // call to read
-        let alloc: AllocCallback = |_| uv_buf;
-        do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
-
-            // Stop reading so that no read callbacks are
-            // triggered before the user calls `read` again.
-            // XXX: Is there a performance impact to calling
-            // stop here?
-            watcher.read_stop();
-
-            let result = if status.is_none() {
-                assert!(nread >= 0);
-                Ok(nread as uint)
-            } else {
-                Err(uv_error_to_io_error(status.unwrap()))
-            };
-
-            unsafe { (*result_cell_ptr).put_back(result); }
-
-            let scheduler: ~Scheduler = Local::take();
-            scheduler.resume_blocked_task_immediately(task_cell.take());
-        }
-    }
-
-    assert!(!result_cell.is_empty());
-    result_cell.take()
-}
-
-fn write_stream(mut watcher: StreamWatcher,
-                scheduler: ~Scheduler,
-                buf: &[u8]) -> Result<(), IoError> {
-    let result_cell = Cell::new_empty();
-    let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
-    let buf_ptr: *&[u8] = &buf;
-    do scheduler.deschedule_running_task_and_then |_, task| {
-        let task_cell = Cell::new(task);
-        let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
-        do watcher.write(buf) |_watcher, status| {
-            let result = if status.is_none() {
-                Ok(())
-            } else {
-                Err(uv_error_to_io_error(status.unwrap()))
-            };
-
-            unsafe { (*result_cell_ptr).put_back(result); }
-
-            let scheduler: ~Scheduler = Local::take();
-            scheduler.resume_blocked_task_immediately(task_cell.take());
-        }
-    }
-
-    assert!(!result_cell.is_empty());
-    result_cell.take()
-}
-
-pub struct UvTcpStream {
-    priv watcher: TcpWatcher,
-    priv home: SchedHandle,
-}
-
-impl HomingIO for UvTcpStream {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
-}
-
-impl Drop for UvTcpStream {
-    fn drop(&mut self) {
-        let (_m, sched) = self.fire_homing_missile_sched();
-        do sched.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            do self.watcher.as_stream().close {
-                let scheduler: ~Scheduler = Local::take();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
-            }
-        }
-    }
-}
-
-impl RtioSocket for UvTcpStream {
-    fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
-        let _m = self.fire_homing_missile();
-        socket_name(Tcp, self.watcher)
-    }
-}
-
-impl RtioTcpStream for UvTcpStream {
-    fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
-        let (_m, scheduler) = self.fire_homing_missile_sched();
-        read_stream(self.watcher.as_stream(), scheduler, buf)
-    }
-
-    fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
-        let (_m, scheduler) = self.fire_homing_missile_sched();
-        write_stream(self.watcher.as_stream(), scheduler, buf)
-    }
-
-    fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
-        let _m = self.fire_homing_missile();
-        socket_name(TcpPeer, self.watcher)
-    }
-
-    fn control_congestion(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_homing_missile();
-        status_to_io_result(unsafe {
-            uvll::uv_tcp_nodelay(self.watcher.native_handle(), 0 as c_int)
-        })
-    }
-
-    fn nodelay(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_homing_missile();
-        status_to_io_result(unsafe {
-            uvll::uv_tcp_nodelay(self.watcher.native_handle(), 1 as c_int)
-        })
-    }
-
-    fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
-        let _m = self.fire_homing_missile();
-        status_to_io_result(unsafe {
-            uvll::uv_tcp_keepalive(self.watcher.native_handle(), 1 as c_int,
-                                   delay_in_seconds as c_uint)
-        })
-    }
-
-    fn letdie(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_homing_missile();
-        status_to_io_result(unsafe {
-            uvll::uv_tcp_keepalive(self.watcher.native_handle(),
-                                   0 as c_int, 0 as c_uint)
-        })
-    }
-}
-
-pub struct UvUdpSocket {
-    priv watcher: UdpWatcher,
-    priv home: SchedHandle,
-}
-
-impl HomingIO for UvUdpSocket {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
-}
-
-impl Drop for UvUdpSocket {
-    fn drop(&mut self) {
-        let (_m, scheduler) = self.fire_homing_missile_sched();
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            do self.watcher.close {
-                let scheduler: ~Scheduler = Local::take();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
-            }
-        }
-    }
-}
-
-impl RtioSocket for UvUdpSocket {
-    fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
-        let _m = self.fire_homing_missile();
-        socket_name(Udp, self.watcher)
-    }
-}
-
-impl RtioUdpSocket for UvUdpSocket {
-    fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
-        let (_m, scheduler) = self.fire_homing_missile_sched();
-        let result_cell = Cell::new_empty();
-        let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
-
-        let buf_ptr: *&mut [u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
-            do self.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
-                let _ = flags; // /XXX add handling for partials?
-
-                watcher.recv_stop();
-
-                let result = match status {
-                    None => {
-                        assert!(nread >= 0);
-                        Ok((nread as uint, addr))
-                    }
-                    Some(err) => Err(uv_error_to_io_error(err)),
-                };
-
-                unsafe { (*result_cell_ptr).put_back(result); }
-
-                let scheduler: ~Scheduler = Local::take();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
-            }
-        }
-
-        assert!(!result_cell.is_empty());
-        result_cell.take()
-    }
-
-    fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
-        let (_m, scheduler) = self.fire_homing_missile_sched();
-        let result_cell = Cell::new_empty();
-        let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
-        let buf_ptr: *&[u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
-            do self.watcher.send(buf, dst) |_watcher, status| {
-
-                let result = match status {
-                    None => Ok(()),
-                    Some(err) => Err(uv_error_to_io_error(err)),
-                };
-
-                unsafe { (*result_cell_ptr).put_back(result); }
-
-                let scheduler: ~Scheduler = Local::take();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
-            }
-        }
-
-        assert!(!result_cell.is_empty());
-        result_cell.take()
-    }
-
-    fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
-        let _m = self.fire_homing_missile();
-        status_to_io_result(unsafe {
-            do multi.to_str().with_c_str |m_addr| {
-                uvll::uv_udp_set_membership(self.watcher.native_handle(),
-                                            m_addr, ptr::null(),
-                                            uvll::UV_JOIN_GROUP)
-            }
-        })
-    }
-
-    fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
-        let _m = self.fire_homing_missile();
-        status_to_io_result(unsafe {
-            do multi.to_str().with_c_str |m_addr| {
-                uvll::uv_udp_set_membership(self.watcher.native_handle(),
-                                            m_addr, ptr::null(),
-                                            uvll::UV_LEAVE_GROUP)
-            }
-        })
-    }
-
-    fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_homing_missile();
-        status_to_io_result(unsafe {
-            uvll::uv_udp_set_multicast_loop(self.watcher.native_handle(),
-                                            1 as c_int)
-        })
-    }
-
-    fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_homing_missile();
-        status_to_io_result(unsafe {
-            uvll::uv_udp_set_multicast_loop(self.watcher.native_handle(),
-                                            0 as c_int)
-        })
-    }
-
-    fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
-        let _m = self.fire_homing_missile();
-        status_to_io_result(unsafe {
-            uvll::uv_udp_set_multicast_ttl(self.watcher.native_handle(),
-                                           ttl as c_int)
-        })
-    }
-
-    fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
-        let _m = self.fire_homing_missile();
-        status_to_io_result(unsafe {
-            uvll::uv_udp_set_ttl(self.watcher.native_handle(), ttl as c_int)
-        })
-    }
-
-    fn hear_broadcasts(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_homing_missile();
-        status_to_io_result(unsafe {
-            uvll::uv_udp_set_broadcast(self.watcher.native_handle(),
-                                       1 as c_int)
-        })
-    }
-
-    fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_homing_missile();
-        status_to_io_result(unsafe {
-            uvll::uv_udp_set_broadcast(self.watcher.native_handle(),
-                                       0 as c_int)
-        })
-    }
-}
-
 // this function is full of lies
 unsafe fn local_io() -> &'static mut IoFactory {
     do Local::borrow |sched: &mut Scheduler| {
index a32f03732d664ed91285e12ca072c4e52c5ecec3..42e0f58d87d41bdfc3ab1ba4ea67cf660d30ca97 100644 (file)
@@ -53,6 +53,7 @@ pub mod errors {
     pub static ENOTCONN: c_int = -4054;
     pub static EPIPE: c_int = -4048;
     pub static ECONNABORTED: c_int = -4080;
+    pub static ECANCELED: c_int = -4082;
 }
 #[cfg(not(windows))]
 pub mod errors {
@@ -65,6 +66,7 @@ pub mod errors {
     pub static ENOTCONN: c_int = -libc::ENOTCONN;
     pub static EPIPE: c_int = -libc::EPIPE;
     pub static ECONNABORTED: c_int = -libc::ECONNABORTED;
+    pub static ECANCELED : c_int = -libc::ECANCELED;
 }
 
 pub static PROCESS_SETUID: c_int = 1 << 0;
@@ -127,6 +129,7 @@ pub struct uv_stdio_container_t {
 }
 
 pub type uv_handle_t = c_void;
+pub type uv_req_t = c_void;
 pub type uv_loop_t = c_void;
 pub type uv_idle_t = c_void;
 pub type uv_tcp_t = c_void;