]> git.lizzy.rs Git - rust.git/blobdiff - src/librustuv/net.rs
Fix usage of libuv for windows
[rust.git] / src / librustuv / net.rs
index 9fd771b973950573957964af38dde1ce49294887..32c9b6c3d172999f6efffb262b81ca3624840784 100644 (file)
@@ -9,6 +9,7 @@
 // except according to those terms.
 
 use std::cast;
+use std::libc;
 use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char};
 use std::ptr;
 use std::rt::BlockedTask;
 use std::rt::sched::{Scheduler, SchedHandle};
 use std::rt::tube::Tube;
 use std::str;
+use std::task;
 use std::vec;
 
 use stream::StreamWatcher;
 use super::{Loop, Request, UvError, Buf, status_to_io_result,
-            uv_error_to_io_error, UvHandle, slice_to_uv_buf};
+            uv_error_to_io_error, UvHandle, slice_to_uv_buf,
+            wait_until_woken_after};
 use uvio::HomingIO;
 use uvll;
+use uvll::sockaddr;
 
 ////////////////////////////////////////////////////////////////////////////////
 /// Generic functions related to dealing with sockaddr things
 ////////////////////////////////////////////////////////////////////////////////
 
-pub enum UvSocketAddr {
-    UvIpv4SocketAddr(*uvll::sockaddr_in),
-    UvIpv6SocketAddr(*uvll::sockaddr_in6),
-}
-
-pub fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
-    unsafe {
-        assert!((uvll::is_ip4_addr(addr) || uvll::is_ip6_addr(addr)));
-        assert!(!(uvll::is_ip4_addr(addr) && uvll::is_ip6_addr(addr)));
-        match addr {
-            _ if uvll::is_ip4_addr(addr) =>
-                UvIpv4SocketAddr(addr as *uvll::sockaddr_in),
-            _ if uvll::is_ip6_addr(addr) =>
-                UvIpv6SocketAddr(addr as *uvll::sockaddr_in6),
-            _ => fail!(),
-        }
-    }
-}
-
-fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
+#[fixed_stack_segment]
+fn socket_addr_as_sockaddr<T>(addr: SocketAddr, f: &fn(*sockaddr) -> T) -> T {
     let malloc = match addr.ip {
-        Ipv4Addr(*) => uvll::malloc_ip4_addr,
-        Ipv6Addr(*) => uvll::malloc_ip6_addr,
-    };
-    let wrap = match addr.ip {
-        Ipv4Addr(*) => UvIpv4SocketAddr,
-        Ipv6Addr(*) => UvIpv6SocketAddr,
-    };
-    let free = match addr.ip {
-        Ipv4Addr(*) => uvll::free_ip4_addr,
-        Ipv6Addr(*) => uvll::free_ip6_addr,
+        Ipv4Addr(*) => uvll::rust_malloc_ip4_addr,
+        Ipv6Addr(*) => uvll::rust_malloc_ip6_addr,
     };
 
-    let addr = unsafe { malloc(addr.ip.to_str(), addr.port as int) };
+    let ip = addr.ip.to_str();
+    let addr = ip.with_c_str(|p| unsafe { malloc(p, addr.port as c_int) });
     do (|| {
-        f(wrap(addr))
+        f(addr)
     }).finally {
-        unsafe { free(addr) };
+        unsafe { libc::free(addr) };
     }
 }
 
-fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
-    let ip_size = match addr {
-        UvIpv4SocketAddr(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/,
-        UvIpv6SocketAddr(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/,
-    };
-    let ip_name = {
-        let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
-        unsafe {
+#[fixed_stack_segment]
+pub fn sockaddr_to_socket_addr(addr: *sockaddr) -> SocketAddr {
+    unsafe {
+        let ip_size = if uvll::rust_is_ipv4_sockaddr(addr) == 1 {
+            4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/
+        } else if uvll::rust_is_ipv6_sockaddr(addr) == 1 {
+            8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/
+        } else {
+            fail!("unknown address?");
+        };
+        let ip_name = {
+            let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
             let buf_ptr = vec::raw::to_ptr(buf);
-            match addr {
-                UvIpv4SocketAddr(addr) =>
-                    uvll::uv_ip4_name(addr, buf_ptr as *c_char, ip_size as size_t),
-                UvIpv6SocketAddr(addr) =>
-                    uvll::uv_ip6_name(addr, buf_ptr as *c_char, ip_size as size_t),
+            if uvll::rust_is_ipv4_sockaddr(addr) == 1 {
+                uvll::uv_ip4_name(addr, buf_ptr as *c_char, ip_size as size_t);
+            } else {
+                uvll::uv_ip6_name(addr, buf_ptr as *c_char, ip_size as size_t);
             }
+            buf
         };
-        buf
-    };
-    let ip_port = unsafe {
-        let port = match addr {
-            UvIpv4SocketAddr(addr) => uvll::ip4_port(addr),
-            UvIpv6SocketAddr(addr) => uvll::ip6_port(addr),
+        let ip_port = {
+            let port = if uvll::rust_is_ipv4_sockaddr(addr) == 1 {
+                uvll::rust_ip4_port(addr)
+            } else {
+                uvll::rust_ip6_port(addr)
+            };
+            port as u16
         };
-        port as u16
-    };
-    let ip_str = str::from_utf8_slice(ip_name).trim_right_chars(&'\x00');
-    let ip_addr = FromStr::from_str(ip_str).unwrap();
+        let ip_str = str::from_utf8_slice(ip_name).trim_right_chars(&'\x00');
+        let ip_addr = FromStr::from_str(ip_str).unwrap();
 
-    // finally run the closure
-    f(SocketAddr { ip: ip_addr, port: ip_port })
-}
-
-pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
-    use std::util;
-    uv_socket_addr_as_socket_addr(addr, util::id)
+        SocketAddr { ip: ip_addr, port: ip_port }
+    }
 }
 
 #[cfg(test)]
@@ -114,7 +91,9 @@ pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
 fn test_ip4_conversion() {
     use std::rt;
     let ip4 = rt::test::next_test_ip4();
-    assert_eq!(ip4, socket_addr_as_uv_socket_addr(ip4, uv_socket_addr_to_socket_addr));
+    do socket_addr_as_sockaddr(ip4) |addr| {
+        assert_eq!(ip4, sockaddr_to_socket_addr(addr));
+    }
 }
 
 #[cfg(test)]
@@ -122,7 +101,9 @@ fn test_ip4_conversion() {
 fn test_ip6_conversion() {
     use std::rt;
     let ip6 = rt::test::next_test_ip6();
-    assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
+    do socket_addr_as_sockaddr(ip6) |addr| {
+        assert_eq!(ip6, sockaddr_to_socket_addr(addr));
+    }
 }
 
 enum SocketNameKind {
@@ -131,37 +112,29 @@ enum SocketNameKind {
     Udp
 }
 
+#[fixed_stack_segment]
 fn socket_name(sk: SocketNameKind, handle: *c_void) -> Result<SocketAddr, IoError> {
-    let getsockname = match sk {
-        TcpPeer => uvll::tcp_getpeername,
-        Tcp     => uvll::tcp_getsockname,
-        Udp     => uvll::udp_getsockname,
-    };
-
-    // Allocate a sockaddr_storage
-    // since we don't know if it's ipv4 or ipv6
-    let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
+    unsafe {
+        let getsockname = match sk {
+            TcpPeer => uvll::uv_tcp_getpeername,
+            Tcp     => uvll::uv_tcp_getsockname,
+            Udp     => uvll::uv_udp_getsockname,
+        };
 
-    let r = unsafe {
-        getsockname(handle, r_addr as *uvll::sockaddr_storage)
-    };
+        // Allocate a sockaddr_storage
+        // since we don't know if it's ipv4 or ipv6
+        let size = uvll::rust_sockaddr_size();
+        let name = libc::malloc(size as size_t);
+        assert!(!name.is_null());
+        let mut namelen = size;
 
-    if r != 0 {
-        return Err(uv_error_to_io_error(UvError(r)));
+        let ret = match getsockname(handle, name, &mut namelen) {
+            0 => Ok(sockaddr_to_socket_addr(name)),
+            n => Err(uv_error_to_io_error(UvError(n)))
+        };
+        libc::free(name);
+        ret
     }
-
-    let addr = unsafe {
-        if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
-            uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
-        } else {
-            uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
-        }
-    };
-
-    unsafe { uvll::free_sockaddr_storage(r_addr); }
-
-    Ok(addr)
-
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -206,46 +179,40 @@ pub fn connect(loop_: &mut Loop, address: SocketAddr)
     {
         struct Ctx { status: c_int, task: Option<BlockedTask> }
 
-        let tcp = TcpWatcher::new(loop_);
-        let ret = do socket_addr_as_uv_socket_addr(address) |addr| {
-            let req = Request::new(uvll::UV_CONNECT);
-            let result = match addr {
-                UvIpv4SocketAddr(addr) => unsafe {
-                    uvll::tcp_connect(req.handle, tcp.handle, addr,
-                                      connect_cb)
-                },
-                UvIpv6SocketAddr(addr) => unsafe {
-                    uvll::tcp_connect6(req.handle, tcp.handle, addr,
-                                       connect_cb)
-                },
-            };
-            match result {
-                0 => {
-                    let mut cx = Ctx { status: 0, task: None };
-                    req.set_data(&cx);
-                    req.defuse();
-                    let scheduler: ~Scheduler = Local::take();
-                    do scheduler.deschedule_running_task_and_then |_, task| {
-                        cx.task = Some(task);
-                    }
-                    match cx.status {
-                        0 => Ok(()),
-                        n => Err(UvError(n)),
+        return do task::unkillable {
+            let tcp = TcpWatcher::new(loop_);
+            let ret = do socket_addr_as_sockaddr(address) |addr| {
+                let mut req = Request::new(uvll::UV_CONNECT);
+                let result = unsafe {
+                    uvll::uv_tcp_connect(req.handle, tcp.handle, addr,
+                                         connect_cb)
+                };
+                match result {
+                    0 => {
+                        req.defuse(); // uv callback now owns this request
+                        let mut cx = Ctx { status: 0, task: None };
+                        do wait_until_woken_after(&mut cx.task) {
+                            req.set_data(&cx);
+                        }
+                        match cx.status {
+                            0 => Ok(()),
+                            n => Err(UvError(n)),
+                        }
                     }
+                    n => Err(UvError(n))
                 }
-                n => Err(UvError(n))
-            }
-        };
+            };
 
-        return match ret {
-            Ok(()) => Ok(tcp),
-            Err(e) => Err(e),
+            match ret {
+                Ok(()) => Ok(tcp),
+                Err(e) => Err(e),
+            }
         };
 
         extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
             let req = Request::wrap(req);
-            if status == uvll::ECANCELED { return }
-            let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
+            assert!(status != uvll::ECANCELED);
+            let cx: &mut Ctx = unsafe { req.get_data() };
             cx.status = status;
             let scheduler: ~Scheduler = Local::take();
             scheduler.resume_blocked_task_immediately(cx.task.take_unwrap());
@@ -259,43 +226,43 @@ fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
 
 impl rtio::RtioSocket for TcpWatcher {
     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         socket_name(Tcp, self.handle)
     }
 }
 
 impl rtio::RtioTcpStream for TcpWatcher {
     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         self.stream.read(buf).map_err(uv_error_to_io_error)
     }
 
     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         self.stream.write(buf).map_err(uv_error_to_io_error)
     }
 
     fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         socket_name(TcpPeer, self.handle)
     }
 
     fn control_congestion(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         status_to_io_result(unsafe {
             uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
         })
     }
 
     fn nodelay(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         status_to_io_result(unsafe {
             uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
         })
     }
 
     fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         status_to_io_result(unsafe {
             uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
                                    delay_in_seconds as c_uint)
@@ -303,17 +270,21 @@ fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
     }
 
     fn letdie(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         status_to_io_result(unsafe {
             uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
         })
     }
 }
 
+impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
+    fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
+}
+
 impl Drop for TcpWatcher {
     fn drop(&mut self) {
-        let _m = self.fire_missiles();
-        self.stream.close();
+        let _m = self.fire_homing_missile();
+        self.close();
     }
 }
 
@@ -323,25 +294,24 @@ impl TcpListener {
     pub fn bind(loop_: &mut Loop, address: SocketAddr)
         -> Result<~TcpListener, UvError>
     {
-        let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
-        assert_eq!(unsafe {
-            uvll::uv_tcp_init(loop_.handle, handle)
-        }, 0);
-        let l = ~TcpListener {
-            home: get_handle_to_current_scheduler!(),
-            handle: handle,
-            closing_task: None,
-            outgoing: Tube::new(),
-        };
-        let res = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
-            match addr {
-                UvIpv4SocketAddr(addr) => uvll::tcp_bind(l.handle, addr),
-                UvIpv6SocketAddr(addr) => uvll::tcp_bind6(l.handle, addr),
+        do task::unkillable {
+            let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
+            assert_eq!(unsafe {
+                uvll::uv_tcp_init(loop_.handle, handle)
+            }, 0);
+            let l = ~TcpListener {
+                home: get_handle_to_current_scheduler!(),
+                handle: handle,
+                closing_task: None,
+                outgoing: Tube::new(),
+            };
+            let res = socket_addr_as_sockaddr(address, |addr| unsafe {
+                uvll::uv_tcp_bind(l.handle, addr)
+            });
+            match res {
+                0 => Ok(l.install()),
+                n => Err(UvError(n))
             }
-        });
-        match res {
-            0 => Ok(l.install()),
-            n => Err(UvError(n))
         }
     }
 }
@@ -356,7 +326,7 @@ fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
 
 impl rtio::RtioSocket for TcpListener {
     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         socket_name(Tcp, self.handle)
     }
 }
@@ -370,7 +340,7 @@ fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
             incoming: incoming,
         };
 
-        let _m = acceptor.fire_missiles();
+        let _m = acceptor.fire_homing_missile();
         // XXX: the 128 backlog should be configurable
         match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
             0 => Ok(acceptor as ~rtio::RtioTcpAcceptor),
@@ -380,6 +350,7 @@ fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
 }
 
 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
+    assert!(status != uvll::ECANCELED);
     let msg = match status {
         0 => {
             let loop_ = Loop::wrap(unsafe {
@@ -389,7 +360,6 @@ fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
             assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
             Ok(~client as ~rtio::RtioTcpStream)
         }
-        uvll::ECANCELED => return,
         n => Err(uv_error_to_io_error(UvError(n)))
     };
 
@@ -399,12 +369,8 @@ fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
 
 impl Drop for TcpListener {
     fn drop(&mut self) {
-        let (_m, sched) = self.fire_missiles_sched();
-
-        do sched.deschedule_running_task_and_then |_, task| {
-            self.closing_task = Some(task);
-            unsafe { uvll::uv_close(self.handle, listener_close_cb) }
-        }
+        let _m = self.fire_homing_missile();
+        self.close();
     }
 }
 
@@ -424,26 +390,26 @@ fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
 
 impl rtio::RtioSocket for TcpAcceptor {
     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         socket_name(Tcp, self.listener.handle)
     }
 }
 
 impl rtio::RtioTcpAcceptor for TcpAcceptor {
     fn accept(&mut self) -> Result<~rtio::RtioTcpStream, IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         self.incoming.recv()
     }
 
     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         status_to_io_result(unsafe {
             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
         })
     }
 
     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         status_to_io_result(unsafe {
             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
         })
@@ -463,33 +429,36 @@ impl UdpWatcher {
     pub fn bind(loop_: &Loop, address: SocketAddr)
         -> Result<UdpWatcher, UvError>
     {
-        let udp = UdpWatcher {
-            handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
-            home: get_handle_to_current_scheduler!(),
-        };
-        assert_eq!(unsafe {
-            uvll::uv_udp_init(loop_.handle, udp.handle)
-        }, 0);
-        let result = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
-            match addr {
-                UvIpv4SocketAddr(addr) => uvll::udp_bind(udp.handle, addr, 0u32),
-                UvIpv6SocketAddr(addr) => uvll::udp_bind6(udp.handle, addr, 0u32),
+        do task::unkillable {
+            let udp = UdpWatcher {
+                handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
+                home: get_handle_to_current_scheduler!(),
+            };
+            assert_eq!(unsafe {
+                uvll::uv_udp_init(loop_.handle, udp.handle)
+            }, 0);
+            let result = socket_addr_as_sockaddr(address, |addr| unsafe {
+                uvll::uv_udp_bind(udp.handle, addr, 0u32)
+            });
+            match result {
+                0 => Ok(udp),
+                n => Err(UvError(n)),
             }
-        });
-        match result {
-            0 => Ok(udp),
-            n => Err(UvError(n)),
         }
     }
 }
 
+impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
+    fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
+}
+
 impl HomingIO for UdpWatcher {
     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
 }
 
 impl rtio::RtioSocket for UdpWatcher {
     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         socket_name(Udp, self.handle)
     }
 }
@@ -501,11 +470,11 @@ fn recvfrom(&mut self, buf: &mut [u8])
         struct Ctx {
             task: Option<BlockedTask>,
             buf: Option<Buf>,
-            result: Option<(ssize_t, SocketAddr)>,
+            result: Option<(ssize_t, Option<SocketAddr>)>,
         }
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
 
-        return match unsafe {
+        let a = match unsafe {
             uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
         } {
             0 => {
@@ -514,36 +483,43 @@ struct Ctx {
                     buf: Some(slice_to_uv_buf(buf)),
                     result: None,
                 };
-                unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) }
-                let scheduler: ~Scheduler = Local::take();
-                do scheduler.deschedule_running_task_and_then |_, task| {
-                    cx.task = Some(task);
+                do wait_until_woken_after(&mut cx.task) {
+                    unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) }
                 }
                 match cx.result.take_unwrap() {
                     (n, _) if n < 0 =>
                         Err(uv_error_to_io_error(UvError(n as c_int))),
-                    (n, addr) => Ok((n as uint, addr))
+                    (n, addr) => Ok((n as uint, addr.unwrap()))
                 }
             }
             n => Err(uv_error_to_io_error(UvError(n)))
         };
+        return a;
 
         extern fn alloc_cb(handle: *uvll::uv_udp_t,
-                           _suggested_size: size_t) -> Buf {
-            let cx: &mut Ctx = unsafe {
-                cast::transmute(uvll::get_data_for_uv_handle(handle))
-            };
-            cx.buf.take().expect("alloc_cb called more than once")
+                           _suggested_size: size_t,
+                           buf: *mut Buf) {
+            unsafe {
+                let cx: &mut Ctx =
+                    cast::transmute(uvll::get_data_for_uv_handle(handle));
+                *buf = cx.buf.take().expect("recv alloc_cb called more than once")
+            }
         }
 
-        extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, _buf: Buf,
+        extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
                           addr: *uvll::sockaddr, _flags: c_uint) {
+            assert!(nread != uvll::ECANCELED as ssize_t);
+            let cx: &mut Ctx = unsafe {
+                cast::transmute(uvll::get_data_for_uv_handle(handle))
+            };
 
             // When there's no data to read the recv callback can be a no-op.
             // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
             // this we just drop back to kqueue and wait for the next callback.
-            if nread == 0 { return }
-            if nread == uvll::ECANCELED as ssize_t { return }
+            if nread == 0 {
+                cx.buf = Some(unsafe { *buf });
+                return
+            }
 
             unsafe {
                 assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
@@ -552,8 +528,11 @@ struct Ctx {
             let cx: &mut Ctx = unsafe {
                 cast::transmute(uvll::get_data_for_uv_handle(handle))
             };
-            let addr = sockaddr_to_UvSocketAddr(addr);
-            let addr = uv_socket_addr_to_socket_addr(addr);
+            let addr = if addr == ptr::null() {
+                None
+            } else {
+                Some(sockaddr_to_socket_addr(addr))
+            };
             cx.result = Some((nread, addr));
 
             let sched: ~Scheduler = Local::take();
@@ -564,30 +543,21 @@ struct Ctx {
     fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
         struct Ctx { task: Option<BlockedTask>, result: c_int }
 
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
 
-        let req = Request::new(uvll::UV_UDP_SEND);
+        let mut req = Request::new(uvll::UV_UDP_SEND);
         let buf = slice_to_uv_buf(buf);
-        let result = socket_addr_as_uv_socket_addr(dst, |dst| unsafe {
-            match dst {
-                UvIpv4SocketAddr(dst) =>
-                    uvll::udp_send(req.handle, self.handle, [buf], dst, send_cb),
-                UvIpv6SocketAddr(dst) =>
-                    uvll::udp_send6(req.handle, self.handle, [buf], dst, send_cb),
-            }
+        let result = socket_addr_as_sockaddr(dst, |dst| unsafe {
+            uvll::uv_udp_send(req.handle, self.handle, [buf], dst, send_cb)
         });
 
         return match result {
             0 => {
+                req.defuse(); // uv callback now owns this request
                 let mut cx = Ctx { task: None, result: 0 };
-                req.set_data(&cx);
-                req.defuse();
-
-                let sched: ~Scheduler = Local::take();
-                do sched.deschedule_running_task_and_then |_, task| {
-                    cx.task = Some(task);
+                do wait_until_woken_after(&mut cx.task) {
+                    req.set_data(&cx);
                 }
-
                 match cx.result {
                     0 => Ok(()),
                     n => Err(uv_error_to_io_error(UvError(n)))
@@ -598,7 +568,8 @@ struct Ctx { task: Option<BlockedTask>, result: c_int }
 
         extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
             let req = Request::wrap(req);
-            let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
+            assert!(status != uvll::ECANCELED);
+            let cx: &mut Ctx = unsafe { req.get_data() };
             cx.result = status;
 
             let sched: ~Scheduler = Local::take();
@@ -607,7 +578,7 @@ struct Ctx { task: Option<BlockedTask>, result: c_int }
     }
 
     fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         status_to_io_result(unsafe {
             do multi.to_str().with_c_str |m_addr| {
                 uvll::uv_udp_set_membership(self.handle,
@@ -618,7 +589,7 @@ fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
     }
 
     fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         status_to_io_result(unsafe {
             do multi.to_str().with_c_str |m_addr| {
                 uvll::uv_udp_set_membership(self.handle,
@@ -629,7 +600,7 @@ fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
     }
 
     fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         status_to_io_result(unsafe {
             uvll::uv_udp_set_multicast_loop(self.handle,
                                             1 as c_int)
@@ -637,7 +608,7 @@ fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
     }
 
     fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         status_to_io_result(unsafe {
             uvll::uv_udp_set_multicast_loop(self.handle,
                                             0 as c_int)
@@ -645,7 +616,7 @@ fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
     }
 
     fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         status_to_io_result(unsafe {
             uvll::uv_udp_set_multicast_ttl(self.handle,
                                            ttl as c_int)
@@ -653,14 +624,14 @@ fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
     }
 
     fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         status_to_io_result(unsafe {
             uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
         })
     }
 
     fn hear_broadcasts(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         status_to_io_result(unsafe {
             uvll::uv_udp_set_broadcast(self.handle,
                                        1 as c_int)
@@ -668,7 +639,7 @@ fn hear_broadcasts(&mut self) -> Result<(), IoError> {
     }
 
     fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
-        let _m = self.fire_missiles();
+        let _m = self.fire_homing_missile();
         status_to_io_result(unsafe {
             uvll::uv_udp_set_broadcast(self.handle,
                                        0 as c_int)
@@ -679,23 +650,8 @@ fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
 impl Drop for UdpWatcher {
     fn drop(&mut self) {
         // Send ourselves home to close this handle (blocking while doing so).
-        let (_m, sched) = self.fire_missiles_sched();
-        let mut slot = None;
-        unsafe {
-            uvll::set_data_for_uv_handle(self.handle, &slot);
-            uvll::uv_close(self.handle, close_cb);
-        }
-        do sched.deschedule_running_task_and_then |_, task| {
-            slot = Some(task);
-        }
-
-        extern fn close_cb(handle: *uvll::uv_handle_t) {
-            let slot: &mut Option<BlockedTask> = unsafe {
-                cast::transmute(uvll::get_data_for_uv_handle(handle))
-            };
-            let sched: ~Scheduler = Local::take();
-            sched.resume_blocked_task_immediately(slot.take_unwrap());
-        }
+        let _m = self.fire_homing_missile();
+        self.close();
     }
 }
 
@@ -713,397 +669,358 @@ mod test {
     use std::task;
 
     use super::*;
-    use super::super::{Loop, run_uv_loop};
+    use super::super::local_loop;
 
     #[test]
     fn connect_close_ip4() {
-        do run_uv_loop |l| {
-            match TcpWatcher::connect(l, next_test_ip4()) {
-                Ok(*) => fail!(),
-                Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
-            }
+        match TcpWatcher::connect(local_loop(), next_test_ip4()) {
+            Ok(*) => fail!(),
+            Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
         }
     }
 
     #[test]
     fn connect_close_ip6() {
-        do run_uv_loop |l| {
-            match TcpWatcher::connect(l, next_test_ip6()) {
-                Ok(*) => fail!(),
-                Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
-            }
+        match TcpWatcher::connect(local_loop(), next_test_ip6()) {
+            Ok(*) => fail!(),
+            Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
         }
     }
 
     #[test]
     fn udp_bind_close_ip4() {
-        do run_uv_loop |l| {
-            match UdpWatcher::bind(l, next_test_ip4()) {
-                Ok(*) => {}
-                Err(*) => fail!()
-            }
+        match UdpWatcher::bind(local_loop(), next_test_ip4()) {
+            Ok(*) => {}
+            Err(*) => fail!()
         }
     }
 
     #[test]
     fn udp_bind_close_ip6() {
-        do run_uv_loop |l| {
-            match UdpWatcher::bind(l, next_test_ip6()) {
-                Ok(*) => {}
-                Err(*) => fail!()
-            }
+        match UdpWatcher::bind(local_loop(), next_test_ip6()) {
+            Ok(*) => {}
+            Err(*) => fail!()
         }
     }
 
     #[test]
     fn listen_ip4() {
-        do run_uv_loop |l| {
-            let (port, chan) = oneshot();
-            let chan = Cell::new(chan);
-            let addr = next_test_ip4();
-
-            let handle = l.handle;
-            do spawn {
-                let w = match TcpListener::bind(&mut Loop::wrap(handle), addr) {
-                    Ok(w) => w, Err(e) => fail!("{:?}", e)
-                };
-                let mut w = match w.listen() {
-                    Ok(w) => w, Err(e) => fail!("{:?}", e),
-                };
-                chan.take().send(());
-                match w.accept() {
-                    Ok(mut stream) => {
-                        let mut buf = [0u8, ..10];
-                        match stream.read(buf) {
-                            Ok(10) => {} e => fail!("{:?}", e),
-                        }
-                        for i in range(0, 10u8) {
-                            assert_eq!(buf[i], i + 1);
-                        }
-                    }
-                    Err(e) => fail!("{:?}", e)
-                }
-            }
+        let (port, chan) = oneshot();
+        let chan = Cell::new(chan);
+        let addr = next_test_ip4();
 
-            port.recv();
-            let mut w = match TcpWatcher::connect(&mut Loop::wrap(handle), addr) {
+        do spawn {
+            let w = match TcpListener::bind(local_loop(), addr) {
                 Ok(w) => w, Err(e) => fail!("{:?}", e)
             };
-            match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
-                Ok(()) => {}, Err(e) => fail!("{:?}", e)
+            let mut w = match w.listen() {
+                Ok(w) => w, Err(e) => fail!("{:?}", e),
+            };
+            chan.take().send(());
+            match w.accept() {
+                Ok(mut stream) => {
+                    let mut buf = [0u8, ..10];
+                    match stream.read(buf) {
+                        Ok(10) => {} e => fail!("{:?}", e),
+                    }
+                    for i in range(0, 10u8) {
+                        assert_eq!(buf[i], i + 1);
+                    }
+                }
+                Err(e) => fail!("{:?}", e)
             }
         }
+
+        port.recv();
+        let mut w = match TcpWatcher::connect(local_loop(), addr) {
+            Ok(w) => w, Err(e) => fail!("{:?}", e)
+        };
+        match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
+            Ok(()) => {}, Err(e) => fail!("{:?}", e)
+        }
     }
 
     #[test]
     fn listen_ip6() {
-        do run_uv_loop |l| {
-            let (port, chan) = oneshot();
-            let chan = Cell::new(chan);
-            let addr = next_test_ip6();
-
-            let handle = l.handle;
-            do spawn {
-                let w = match TcpListener::bind(&mut Loop::wrap(handle), addr) {
-                    Ok(w) => w, Err(e) => fail!("{:?}", e)
-                };
-                let mut w = match w.listen() {
-                    Ok(w) => w, Err(e) => fail!("{:?}", e),
-                };
-                chan.take().send(());
-                match w.accept() {
-                    Ok(mut stream) => {
-                        let mut buf = [0u8, ..10];
-                        match stream.read(buf) {
-                            Ok(10) => {} e => fail!("{:?}", e),
-                        }
-                        for i in range(0, 10u8) {
-                            assert_eq!(buf[i], i + 1);
-                        }
-                    }
-                    Err(e) => fail!("{:?}", e)
-                }
-            }
+        let (port, chan) = oneshot();
+        let chan = Cell::new(chan);
+        let addr = next_test_ip6();
 
-            port.recv();
-            let mut w = match TcpWatcher::connect(&mut Loop::wrap(handle), addr) {
+        do spawn {
+            let w = match TcpListener::bind(local_loop(), addr) {
                 Ok(w) => w, Err(e) => fail!("{:?}", e)
             };
-            match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
-                Ok(()) => {}, Err(e) => fail!("{:?}", e)
+            let mut w = match w.listen() {
+                Ok(w) => w, Err(e) => fail!("{:?}", e),
+            };
+            chan.take().send(());
+            match w.accept() {
+                Ok(mut stream) => {
+                    let mut buf = [0u8, ..10];
+                    match stream.read(buf) {
+                        Ok(10) => {} e => fail!("{:?}", e),
+                    }
+                    for i in range(0, 10u8) {
+                        assert_eq!(buf[i], i + 1);
+                    }
+                }
+                Err(e) => fail!("{:?}", e)
             }
         }
+
+        port.recv();
+        let mut w = match TcpWatcher::connect(local_loop(), addr) {
+            Ok(w) => w, Err(e) => fail!("{:?}", e)
+        };
+        match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
+            Ok(()) => {}, Err(e) => fail!("{:?}", e)
+        }
     }
 
     #[test]
     fn udp_recv_ip4() {
-        do run_uv_loop |l| {
-            let (port, chan) = oneshot();
-            let chan = Cell::new(chan);
-            let client = next_test_ip4();
-            let server = next_test_ip4();
-
-            let handle = l.handle;
-            do spawn {
-                match UdpWatcher::bind(&mut Loop::wrap(handle), server) {
-                    Ok(mut w) => {
-                        chan.take().send(());
-                        let mut buf = [0u8, ..10];
-                        match w.recvfrom(buf) {
-                            Ok((10, addr)) => assert_eq!(addr, client),
-                            e => fail!("{:?}", e),
-                        }
-                        for i in range(0, 10u8) {
-                            assert_eq!(buf[i], i + 1);
-                        }
+        let (port, chan) = oneshot();
+        let chan = Cell::new(chan);
+        let client = next_test_ip4();
+        let server = next_test_ip4();
+
+        do spawn {
+            match UdpWatcher::bind(local_loop(), server) {
+                Ok(mut w) => {
+                    chan.take().send(());
+                    let mut buf = [0u8, ..10];
+                    match w.recvfrom(buf) {
+                        Ok((10, addr)) => assert_eq!(addr, client),
+                        e => fail!("{:?}", e),
+                    }
+                    for i in range(0, 10u8) {
+                        assert_eq!(buf[i], i + 1);
                     }
-                    Err(e) => fail!("{:?}", e)
                 }
+                Err(e) => fail!("{:?}", e)
             }
+        }
 
-            port.recv();
-            let mut w = match UdpWatcher::bind(&mut Loop::wrap(handle), client) {
-                Ok(w) => w, Err(e) => fail!("{:?}", e)
-            };
-            match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
-                Ok(()) => {}, Err(e) => fail!("{:?}", e)
-            }
+        port.recv();
+        let mut w = match UdpWatcher::bind(local_loop(), client) {
+            Ok(w) => w, Err(e) => fail!("{:?}", e)
+        };
+        match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
+            Ok(()) => {}, Err(e) => fail!("{:?}", e)
         }
     }
 
     #[test]
     fn udp_recv_ip6() {
-        do run_uv_loop |l| {
-            let (port, chan) = oneshot();
-            let chan = Cell::new(chan);
-            let client = next_test_ip6();
-            let server = next_test_ip6();
-
-            let handle = l.handle;
-            do spawn {
-                match UdpWatcher::bind(&mut Loop::wrap(handle), server) {
-                    Ok(mut w) => {
-                        chan.take().send(());
-                        let mut buf = [0u8, ..10];
-                        match w.recvfrom(buf) {
-                            Ok((10, addr)) => assert_eq!(addr, client),
-                            e => fail!("{:?}", e),
-                        }
-                        for i in range(0, 10u8) {
-                            assert_eq!(buf[i], i + 1);
-                        }
+        let (port, chan) = oneshot();
+        let chan = Cell::new(chan);
+        let client = next_test_ip6();
+        let server = next_test_ip6();
+
+        do spawn {
+            match UdpWatcher::bind(local_loop(), server) {
+                Ok(mut w) => {
+                    chan.take().send(());
+                    let mut buf = [0u8, ..10];
+                    match w.recvfrom(buf) {
+                        Ok((10, addr)) => assert_eq!(addr, client),
+                        e => fail!("{:?}", e),
+                    }
+                    for i in range(0, 10u8) {
+                        assert_eq!(buf[i], i + 1);
                     }
-                    Err(e) => fail!("{:?}", e)
                 }
+                Err(e) => fail!("{:?}", e)
             }
+        }
 
-            port.recv();
-            let mut w = match UdpWatcher::bind(&mut Loop::wrap(handle), client) {
-                Ok(w) => w, Err(e) => fail!("{:?}", e)
-            };
-            match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
-                Ok(()) => {}, Err(e) => fail!("{:?}", e)
-            }
+        port.recv();
+        let mut w = match UdpWatcher::bind(local_loop(), client) {
+            Ok(w) => w, Err(e) => fail!("{:?}", e)
+        };
+        match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
+            Ok(()) => {}, Err(e) => fail!("{:?}", e)
         }
     }
 
     #[test]
     fn test_read_read_read() {
-        do run_uv_loop |l| {
-            let addr = next_test_ip4();
-            static MAX: uint = 500000;
-            let (port, chan) = oneshot();
-            let port = Cell::new(port);
-            let chan = Cell::new(chan);
-
-            let handle = l.handle;
-            do spawntask {
-                let l = &mut Loop::wrap(handle);
-                let listener = TcpListener::bind(l, addr).unwrap();
-                let mut acceptor = listener.listen().unwrap();
-                chan.take().send(());
-                let mut stream = acceptor.accept().unwrap();
-                let buf = [1, .. 2048];
-                let mut total_bytes_written = 0;
-                while total_bytes_written < MAX {
-                    stream.write(buf);
-                    total_bytes_written += buf.len();
-                }
+        use std::rt::rtio::*;
+        let addr = next_test_ip4();
+        static MAX: uint = 5000;
+        let (port, chan) = oneshot();
+        let port = Cell::new(port);
+        let chan = Cell::new(chan);
+
+        do spawn {
+            let listener = TcpListener::bind(local_loop(), addr).unwrap();
+            let mut acceptor = listener.listen().unwrap();
+            chan.take().send(());
+            let mut stream = acceptor.accept().unwrap();
+            let buf = [1, .. 2048];
+            let mut total_bytes_written = 0;
+            while total_bytes_written < MAX {
+                assert!(stream.write(buf).is_ok());
+                uvdebug!("wrote bytes");
+                total_bytes_written += buf.len();
             }
+        }
 
-            do spawntask {
-                let l = &mut Loop::wrap(handle);
-                port.take().recv();
-                let mut stream = TcpWatcher::connect(l, addr).unwrap();
-                let mut buf = [0, .. 2048];
-                let mut total_bytes_read = 0;
-                while total_bytes_read < MAX {
-                    let nread = stream.read(buf).unwrap();
-                    uvdebug!("read {} bytes", nread);
-                    total_bytes_read += nread;
-                    for i in range(0u, nread) {
-                        assert_eq!(buf[i], 1);
-                    }
+        do spawn {
+            port.take().recv();
+            let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
+            let mut buf = [0, .. 2048];
+            let mut total_bytes_read = 0;
+            while total_bytes_read < MAX {
+                let nread = stream.read(buf).unwrap();
+                total_bytes_read += nread;
+                for i in range(0u, nread) {
+                    assert_eq!(buf[i], 1);
                 }
-                uvdebug!("read {} bytes total", total_bytes_read);
             }
+            uvdebug!("read {} bytes total", total_bytes_read);
         }
     }
 
     #[test]
-    #[ignore(cfg(windows))] // FIXME(#10102) the server never sees the second send
+    #[ignore(cfg(windows))] // FIXME(#10102) server never sees second packet
     fn test_udp_twice() {
-        do run_uv_loop |l| {
-            let server_addr = next_test_ip4();
-            let client_addr = next_test_ip4();
-            let (port, chan) = oneshot();
-            let port = Cell::new(port);
-            let chan = Cell::new(chan);
-
-            let handle = l.handle;
-            do spawntask {
-                let l = &mut Loop::wrap(handle);
-                let mut client = UdpWatcher::bind(l, client_addr).unwrap();
-                port.take().recv();
-                assert!(client.sendto([1], server_addr).is_ok());
-                assert!(client.sendto([2], server_addr).is_ok());
-            }
+        let server_addr = next_test_ip4();
+        let client_addr = next_test_ip4();
+        let (port, chan) = oneshot();
+        let port = Cell::new(port);
+        let chan = Cell::new(chan);
 
-            do spawntask {
-                let l = &mut Loop::wrap(handle);
-                let mut server = UdpWatcher::bind(l, server_addr).unwrap();
-                chan.take().send(());
-                let mut buf1 = [0];
-                let mut buf2 = [0];
-                let (nread1, src1) = server.recvfrom(buf1).unwrap();
-                let (nread2, src2) = server.recvfrom(buf2).unwrap();
-                assert_eq!(nread1, 1);
-                assert_eq!(nread2, 1);
-                assert_eq!(src1, client_addr);
-                assert_eq!(src2, client_addr);
-                assert_eq!(buf1[0], 1);
-                assert_eq!(buf2[0], 2);
-            }
+        do spawn {
+            let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
+            port.take().recv();
+            assert!(client.sendto([1], server_addr).is_ok());
+            assert!(client.sendto([2], server_addr).is_ok());
         }
+
+        let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
+        chan.take().send(());
+        let mut buf1 = [0];
+        let mut buf2 = [0];
+        let (nread1, src1) = server.recvfrom(buf1).unwrap();
+        let (nread2, src2) = server.recvfrom(buf2).unwrap();
+        assert_eq!(nread1, 1);
+        assert_eq!(nread2, 1);
+        assert_eq!(src1, client_addr);
+        assert_eq!(src2, client_addr);
+        assert_eq!(buf1[0], 1);
+        assert_eq!(buf2[0], 2);
     }
 
     #[test]
     fn test_udp_many_read() {
-        do run_uv_loop |l| {
-            let server_out_addr = next_test_ip4();
-            let server_in_addr = next_test_ip4();
-            let client_out_addr = next_test_ip4();
-            let client_in_addr = next_test_ip4();
-            static MAX: uint = 500_000;
-
-            let (p1, c1) = oneshot();
-            let (p2, c2) = oneshot();
-
-            let first = Cell::new((p1, c2));
-            let second = Cell::new((p2, c1));
-
-            let handle = l.handle;
-            do spawntask {
-                let l = &mut Loop::wrap(handle);
-                let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
-                let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
-                let (port, chan) = first.take();
-                chan.send(());
-                port.recv();
-                let msg = [1, .. 2048];
-                let mut total_bytes_sent = 0;
-                let mut buf = [1];
-                while buf[0] == 1 {
-                    // send more data
-                    assert!(server_out.sendto(msg, client_in_addr).is_ok());
-                    total_bytes_sent += msg.len();
-                    // check if the client has received enough
-                    let res = server_in.recvfrom(buf);
-                    assert!(res.is_ok());
-                    let (nread, src) = res.unwrap();
-                    assert_eq!(nread, 1);
-                    assert_eq!(src, client_out_addr);
-                }
-                assert!(total_bytes_sent >= MAX);
+        let server_out_addr = next_test_ip4();
+        let server_in_addr = next_test_ip4();
+        let client_out_addr = next_test_ip4();
+        let client_in_addr = next_test_ip4();
+        static MAX: uint = 500_000;
+
+        let (p1, c1) = oneshot();
+        let (p2, c2) = oneshot();
+
+        let first = Cell::new((p1, c2));
+        let second = Cell::new((p2, c1));
+
+        do spawn {
+            let l = local_loop();
+            let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
+            let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
+            let (port, chan) = first.take();
+            chan.send(());
+            port.recv();
+            let msg = [1, .. 2048];
+            let mut total_bytes_sent = 0;
+            let mut buf = [1];
+            while buf[0] == 1 {
+                // send more data
+                assert!(server_out.sendto(msg, client_in_addr).is_ok());
+                total_bytes_sent += msg.len();
+                // check if the client has received enough
+                let res = server_in.recvfrom(buf);
+                assert!(res.is_ok());
+                let (nread, src) = res.unwrap();
+                assert_eq!(nread, 1);
+                assert_eq!(src, client_out_addr);
             }
+            assert!(total_bytes_sent >= MAX);
+        }
 
-            do spawntask {
-                let l = &mut Loop::wrap(handle);
-                let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
-                let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
-                let (port, chan) = second.take();
-                port.recv();
-                chan.send(());
-                let mut total_bytes_recv = 0;
-                let mut buf = [0, .. 2048];
-                while total_bytes_recv < MAX {
-                    // ask for more
-                    assert!(client_out.sendto([1], server_in_addr).is_ok());
-                    // wait for data
-                    let res = client_in.recvfrom(buf);
-                    assert!(res.is_ok());
-                    let (nread, src) = res.unwrap();
-                    assert_eq!(src, server_out_addr);
-                    total_bytes_recv += nread;
-                    for i in range(0u, nread) {
-                        assert_eq!(buf[i], 1);
-                    }
+        do spawn {
+            let l = local_loop();
+            let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
+            let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
+            let (port, chan) = second.take();
+            port.recv();
+            chan.send(());
+            let mut total_bytes_recv = 0;
+            let mut buf = [0, .. 2048];
+            while total_bytes_recv < MAX {
+                // ask for more
+                assert!(client_out.sendto([1], server_in_addr).is_ok());
+                // wait for data
+                let res = client_in.recvfrom(buf);
+                assert!(res.is_ok());
+                let (nread, src) = res.unwrap();
+                assert_eq!(src, server_out_addr);
+                total_bytes_recv += nread;
+                for i in range(0u, nread) {
+                    assert_eq!(buf[i], 1);
                 }
-                // tell the server we're done
-                assert!(client_out.sendto([0], server_in_addr).is_ok());
             }
+            // tell the server we're done
+            assert!(client_out.sendto([0], server_in_addr).is_ok());
         }
     }
 
     #[test]
     fn test_read_and_block() {
-        do run_uv_loop |l| {
-            let addr = next_test_ip4();
-            let (port, chan) = oneshot();
-            let port = Cell::new(port);
-            let chan = Cell::new(chan);
-
-            let handle = l.handle;
-            do spawntask {
-                let l = &mut Loop::wrap(handle);
-                let listener = TcpListener::bind(l, addr).unwrap();
-                let mut acceptor = listener.listen().unwrap();
-                let (port2, chan2) = stream();
-                chan.take().send(port2);
-                let mut stream = acceptor.accept().unwrap();
-                let mut buf = [0, .. 2048];
-
-                let expected = 32;
-                let mut current = 0;
-                let mut reads = 0;
-
-                while current < expected {
-                    let nread = stream.read(buf).unwrap();
-                    for i in range(0u, nread) {
-                        let val = buf[i] as uint;
-                        assert_eq!(val, current % 8);
-                        current += 1;
-                    }
-                    reads += 1;
+        let addr = next_test_ip4();
+        let (port, chan) = oneshot();
+        let port = Cell::new(port);
+        let chan = Cell::new(chan);
 
-                    chan2.send(());
+        do spawn {
+            let listener = TcpListener::bind(local_loop(), addr).unwrap();
+            let mut acceptor = listener.listen().unwrap();
+            let (port2, chan2) = stream();
+            chan.take().send(port2);
+            let mut stream = acceptor.accept().unwrap();
+            let mut buf = [0, .. 2048];
+
+            let expected = 32;
+            let mut current = 0;
+            let mut reads = 0;
+
+            while current < expected {
+                let nread = stream.read(buf).unwrap();
+                for i in range(0u, nread) {
+                    let val = buf[i] as uint;
+                    assert_eq!(val, current % 8);
+                    current += 1;
                 }
+                reads += 1;
 
-                // Make sure we had multiple reads
-                assert!(reads > 1);
+                chan2.send(());
             }
 
-            do spawntask {
-                let l = &mut Loop::wrap(handle);
-                let port2 = port.take().recv();
-                let mut stream = TcpWatcher::connect(l, addr).unwrap();
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                port2.recv();
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                port2.recv();
-            }
+            // Make sure we had multiple reads
+            assert!(reads > 1);
+        }
+
+        do spawn {
+            let port2 = port.take().recv();
+            let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
+            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            port2.recv();
+            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            port2.recv();
         }
     }
 
@@ -1112,27 +1029,23 @@ fn test_simple_tcp_server_and_client_on_diff_threads() {
         let addr = next_test_ip4();
 
         do task::spawn_sched(task::SingleThreaded) {
-            do run_uv_loop |l| {
-                let listener = TcpListener::bind(l, addr).unwrap();
-                let mut acceptor = listener.listen().unwrap();
-                let mut stream = acceptor.accept().unwrap();
-                let mut buf = [0, .. 2048];
-                let nread = stream.read(buf).unwrap();
-                assert_eq!(nread, 8);
-                for i in range(0u, nread) {
-                    assert_eq!(buf[i], i as u8);
-                }
+            let listener = TcpListener::bind(local_loop(), addr).unwrap();
+            let mut acceptor = listener.listen().unwrap();
+            let mut stream = acceptor.accept().unwrap();
+            let mut buf = [0, .. 2048];
+            let nread = stream.read(buf).unwrap();
+            assert_eq!(nread, 8);
+            for i in range(0u, nread) {
+                assert_eq!(buf[i], i as u8);
             }
         }
 
         do task::spawn_sched(task::SingleThreaded) {
-            do run_uv_loop |l| {
-                let mut stream = TcpWatcher::connect(l, addr);
-                while stream.is_err() {
-                    stream = TcpWatcher::connect(l, addr);
-                }
-                stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
+            let mut stream = TcpWatcher::connect(local_loop(), addr);
+            while stream.is_err() {
+                stream = TcpWatcher::connect(local_loop(), addr);
             }
+            stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
         }
     }
 
@@ -1148,17 +1061,13 @@ fn test_homing_closes_correctly() {
 
         do task::spawn_sched(task::SingleThreaded) {
             let chan = Cell::new(chan.take());
-            do run_uv_loop |l| {
-                let listener = UdpWatcher::bind(l, next_test_ip4()).unwrap();
-                chan.take().send(listener);
-            }
+            let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap();
+            chan.take().send(listener);
         }
 
         do task::spawn_sched(task::SingleThreaded) {
             let port = Cell::new(port.take());
-            do run_uv_loop |_l| {
-                port.take().recv();
-            }
+            port.take().recv();
         }
     }
 
@@ -1260,4 +1169,119 @@ unsafe fn local_io() -> &'static mut IoFactory {
         }
     }
 
+    #[should_fail] #[test]
+    fn tcp_listener_fail_cleanup() {
+        let addr = next_test_ip4();
+        let w = TcpListener::bind(local_loop(), addr).unwrap();
+        let _w = w.listen().unwrap();
+        fail!();
+    }
+
+    #[should_fail] #[test]
+    fn tcp_stream_fail_cleanup() {
+        let (port, chan) = oneshot();
+        let chan = Cell::new(chan);
+        let addr = next_test_ip4();
+
+        do task::spawn_unlinked { // please no linked failure
+            let w = TcpListener::bind(local_loop(), addr).unwrap();
+            let mut w = w.listen().unwrap();
+            chan.take().send(());
+            w.accept();
+        }
+        port.recv();
+        let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
+        fail!();
+    }
+
+    #[should_fail] #[test]
+    fn udp_listener_fail_cleanup() {
+        let addr = next_test_ip4();
+        let _w = UdpWatcher::bind(local_loop(), addr).unwrap();
+        fail!();
+    }
+
+    #[should_fail] #[test]
+    fn udp_fail_other_task() {
+        let addr = next_test_ip4();
+        let (port, chan) = oneshot();
+        let chan = Cell::new(chan);
+
+        // force the handle to be created on a different scheduler, failure in
+        // the original task will force a homing operation back to this
+        // scheduler.
+        do task::spawn_sched(task::SingleThreaded) {
+            let w = UdpWatcher::bind(local_loop(), addr).unwrap();
+            chan.take().send(w);
+        }
+
+        let _w = port.recv();
+        fail!();
+    }
+
+    #[should_fail]
+    #[test]
+    #[ignore(reason = "linked failure")]
+    fn linked_failure1() {
+        let (port, chan) = oneshot();
+        let chan = Cell::new(chan);
+        let addr = next_test_ip4();
+
+        do spawn {
+            let w = TcpListener::bind(local_loop(), addr).unwrap();
+            let mut w = w.listen().unwrap();
+            chan.take().send(());
+            w.accept();
+        }
+
+        port.recv();
+        fail!();
+    }
+
+    #[should_fail]
+    #[test]
+    #[ignore(reason = "linked failure")]
+    fn linked_failure2() {
+        let (port, chan) = oneshot();
+        let chan = Cell::new(chan);
+        let addr = next_test_ip4();
+
+        do spawn {
+            let w = TcpListener::bind(local_loop(), addr).unwrap();
+            let mut w = w.listen().unwrap();
+            chan.take().send(());
+            let mut buf = [0];
+            w.accept().unwrap().read(buf);
+        }
+
+        port.recv();
+        let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
+
+        fail!();
+    }
+
+    #[should_fail]
+    #[test]
+    #[ignore(reason = "linked failure")]
+    fn linked_failure3() {
+        let (port, chan) = stream();
+        let chan = Cell::new(chan);
+        let addr = next_test_ip4();
+
+        do spawn {
+            let chan = chan.take();
+            let w = TcpListener::bind(local_loop(), addr).unwrap();
+            let mut w = w.listen().unwrap();
+            chan.send(());
+            let mut conn = w.accept().unwrap();
+            chan.send(());
+            let buf = [0, ..65536];
+            conn.write(buf);
+        }
+
+        port.recv();
+        let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
+        port.recv();
+        fail!();
+    }
 }