]> git.lizzy.rs Git - rust.git/commitdiff
Make the uv bindings resilient to linked failure
authorAlex Crichton <alex@alexcrichton.com>
Thu, 7 Nov 2013 23:13:06 +0000 (15:13 -0800)
committerAlex Crichton <alex@alexcrichton.com>
Sun, 10 Nov 2013 09:37:11 +0000 (01:37 -0800)
In the ideal world, uv I/O could be canceled safely at any time. In reality,
however, we are unable to do this. Right now linked failure is fairly flaky as
implemented in the runtime, making it very difficult to test whether the linked
failure mechanisms inside of the uv bindings are ready for this kind of
interaction.

Right now, all constructors will execute in a task::unkillable block, and all
homing I/O operations will prevent linked failure in the duration of the homing
operation. What this means is that tasks which perform I/O are still susceptible
to linked failure, but the I/O operations themselves will never get interrupted.
Instead, the linked failure will be received at the edge of the I/O operation.

12 files changed:
src/librustuv/addrinfo.rs
src/librustuv/async.rs
src/librustuv/file.rs
src/librustuv/idle.rs
src/librustuv/lib.rs
src/librustuv/net.rs
src/librustuv/pipe.rs
src/librustuv/process.rs
src/librustuv/stream.rs
src/librustuv/timer.rs
src/librustuv/tty.rs
src/librustuv/uvio.rs

index d5bfd729eb56a57ca9e4e7750c3f9f2c09cdb147..56f6eda53575c5728400b1e4386c41895fb35793 100644 (file)
@@ -9,7 +9,6 @@
 // except according to those terms.
 
 use ai = std::rt::io::net::addrinfo;
-use std::cast;
 use std::libc::c_int;
 use std::ptr::null;
 use std::rt::BlockedTask;
@@ -17,7 +16,7 @@
 use std::rt::sched::Scheduler;
 
 use net;
-use super::{Loop, UvError, Request};
+use super::{Loop, UvError, Request, wait_until_woken_after};
 use uvll;
 
 struct Addrinfo {
@@ -76,7 +75,7 @@ pub fn run(loop_: &Loop, node: Option<&str>, service: Option<&str>,
             }
         });
         let hint_ptr = hint.as_ref().map_default(null(), |x| x as *uvll::addrinfo);
-        let req = Request::new(uvll::UV_GETADDRINFO);
+        let mut req = Request::new(uvll::UV_GETADDRINFO);
 
         return match unsafe {
             uvll::uv_getaddrinfo(loop_.handle, req.handle,
@@ -84,12 +83,11 @@ pub fn run(loop_: &Loop, node: Option<&str>, service: Option<&str>,
                                  hint_ptr)
         } {
             0 => {
+                req.defuse(); // uv callback now owns this request
                 let mut cx = Ctx { slot: None, status: 0, addrinfo: None };
-                req.set_data(&cx);
-                req.defuse();
-                let scheduler: ~Scheduler = Local::take();
-                do scheduler.deschedule_running_task_and_then |_, task| {
-                    cx.slot = Some(task);
+
+                do wait_until_woken_after(&mut cx.slot) {
+                    req.set_data(&cx);
                 }
 
                 match cx.status {
@@ -105,8 +103,8 @@ pub fn run(loop_: &Loop, node: Option<&str>, service: Option<&str>,
                                  status: c_int,
                                  res: *uvll::addrinfo) {
             let req = Request::wrap(req);
-            if status == uvll::ECANCELED { return }
-            let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
+            assert!(status != uvll::ECANCELED);
+            let cx: &mut Ctx = unsafe { req.get_data() };
             cx.status = status;
             cx.addrinfo = Some(Addrinfo { handle: res });
 
@@ -191,25 +189,23 @@ pub fn accum_addrinfo(addr: &Addrinfo) -> ~[ai::Info] {
 mod test {
     use std::rt::io::net::ip::{SocketAddr, Ipv4Addr};
     use super::*;
-    use super::super::run_uv_loop;
+    use super::super::local_loop;
 
     #[test]
     fn getaddrinfo_test() {
-        do run_uv_loop |l| {
-            match GetAddrInfoRequest::run(l, Some("localhost"), None, None) {
-                Ok(infos) => {
-                    let mut found_local = false;
-                    let local_addr = &SocketAddr {
-                        ip: Ipv4Addr(127, 0, 0, 1),
-                        port: 0
-                    };
-                    for addr in infos.iter() {
-                        found_local = found_local || addr.address == *local_addr;
-                    }
-                    assert!(found_local);
+        match GetAddrInfoRequest::run(local_loop(), Some("localhost"), None, None) {
+            Ok(infos) => {
+                let mut found_local = false;
+                let local_addr = &SocketAddr {
+                    ip: Ipv4Addr(127, 0, 0, 1),
+                    port: 0
+                };
+                for addr in infos.iter() {
+                    found_local = found_local || addr.address == *local_addr;
                 }
-                Err(e) => fail!("{:?}", e),
+                assert!(found_local);
             }
+            Err(e) => fail!("{:?}", e),
         }
     }
 }
index 334e154a397f45fe3ce463f5e16cb6ceceb91d41..04e7bce5bd18190fb7fcbe799ab332f39c8d39ab 100644 (file)
@@ -131,11 +131,12 @@ mod test_remote {
     use std::rt::tube::Tube;
 
     use super::*;
-    use super::super::run_uv_loop;
+    use super::super::local_loop;
 
-    // Make sure that we can fire watchers in remote threads
+    // Make sure that we can fire watchers in remote threads and that they
+    // actually trigger what they say they will.
     #[test]
-    fn test_uv_remote() {
+    fn smoke_test() {
         struct MyCallback(Option<Tube<int>>);
         impl Callback for MyCallback {
             fn call(&mut self) {
@@ -147,35 +148,15 @@ fn call(&mut self) {
             }
         }
 
-        do run_uv_loop |l| {
-            let mut tube = Tube::new();
-            let cb = ~MyCallback(Some(tube.clone()));
-            let watcher = Cell::new(AsyncWatcher::new(l, cb as ~Callback));
-
-            let thread = do Thread::start {
-                watcher.take().fire();
-            };
+        let mut tube = Tube::new();
+        let cb = ~MyCallback(Some(tube.clone()));
+        let watcher = Cell::new(AsyncWatcher::new(local_loop(), cb as ~Callback));
 
-            assert_eq!(tube.recv(), 1);
-            thread.join();
-        }
-    }
-
-    #[test]
-    fn smoke_test() {
-        static mut hits: uint = 0;
+        let thread = do Thread::start {
+            watcher.take().fire();
+        };
 
-        struct MyCallback;
-        impl Callback for MyCallback {
-            fn call(&mut self) {
-                unsafe { hits += 1; }
-            }
-        }
-
-        do run_uv_loop |l| {
-            let mut watcher = AsyncWatcher::new(l, ~MyCallback as ~Callback);
-            watcher.fire();
-        }
-        assert!(unsafe { hits > 0 });
+        assert_eq!(tube.recv(), 1);
+        thread.join();
     }
 }
index ac89ef38e8ecb3455bd8574b8a98155400f6c998..bdb1429f5b625b1ebd2b8d0ba07633d0bb176a39 100644 (file)
 use std::libc::{c_int, c_char, c_void, c_uint};
 use std::libc;
 use std::rt::BlockedTask;
-use std::rt::io;
 use std::rt::io::{FileStat, IoError};
-use std::rt::rtio;
+use std::rt::io;
 use std::rt::local::Local;
+use std::rt::rtio;
 use std::rt::sched::{Scheduler, SchedHandle};
 use std::vec;
 
-use super::{Loop, UvError, uv_error_to_io_error};
+use super::{Loop, UvError, uv_error_to_io_error, wait_until_woken_after};
 use uvio::HomingIO;
 use uvll;
 
@@ -305,10 +305,8 @@ fn execute(f: &fn(*uvll::uv_fs_t, uvll::uv_fs_cb) -> c_int)
         0 => {
             req.fired = true;
             let mut slot = None;
-            unsafe { uvll::set_data_for_req(req.req, &slot) }
-            let sched: ~Scheduler = Local::take();
-            do sched.deschedule_running_task_and_then |_, task| {
-                slot = Some(task);
+            do wait_until_woken_after(&mut slot) {
+                unsafe { uvll::set_data_for_req(req.req, &slot) }
             }
             match req.get_result() {
                 n if n < 0 => Err(UvError(n)),
@@ -454,123 +452,113 @@ mod test {
     use std::str;
     use std::vec;
     use super::*;
-    use super::super::{run_uv_loop};
+    use l = super::super::local_loop;
 
     #[test]
     fn file_test_full_simple_sync() {
-        do run_uv_loop |l| {
-            let create_flags = O_RDWR | O_CREAT;
-            let read_flags = O_RDONLY;
-            let mode = S_IWUSR | S_IRUSR;
-            let path_str = "./tmp/file_full_simple_sync.txt";
-
-            {
-                // open/create
-                let result = FsRequest::open(l, &path_str.to_c_str(),
-                                             create_flags as int, mode as int);
-                assert!(result.is_ok());
-                let result = result.unwrap();
-                let fd = result.fd;
-
-                // write
-                let result = FsRequest::write(l, fd, "hello".as_bytes(), -1);
-                assert!(result.is_ok());
-            }
+        let create_flags = O_RDWR | O_CREAT;
+        let read_flags = O_RDONLY;
+        let mode = S_IWUSR | S_IRUSR;
+        let path_str = "./tmp/file_full_simple_sync.txt";
+
+        {
+            // open/create
+            let result = FsRequest::open(l(), &path_str.to_c_str(),
+                                         create_flags as int, mode as int);
+            assert!(result.is_ok());
+            let result = result.unwrap();
+            let fd = result.fd;
 
-            {
-                // re-open
-                let result = FsRequest::open(l, &path_str.to_c_str(),
-                                             read_flags as int, 0);
-                assert!(result.is_ok());
-                let result = result.unwrap();
-                let fd = result.fd;
-
-                // read
-                let mut read_mem = vec::from_elem(1000, 0u8);
-                let result = FsRequest::read(l, fd, read_mem, 0);
-                assert!(result.is_ok());
-
-                let nread = result.unwrap();
-                assert!(nread > 0);
-                let read_str = str::from_utf8(read_mem.slice(0, nread as uint));
-                assert_eq!(read_str, ~"hello");
-            }
-            // unlink
-            let result = FsRequest::unlink(l, &path_str.to_c_str());
+            // write
+            let result = FsRequest::write(l(), fd, "hello".as_bytes(), -1);
             assert!(result.is_ok());
         }
+
+        {
+            // re-open
+            let result = FsRequest::open(l(), &path_str.to_c_str(),
+                                         read_flags as int, 0);
+            assert!(result.is_ok());
+            let result = result.unwrap();
+            let fd = result.fd;
+
+            // read
+            let mut read_mem = vec::from_elem(1000, 0u8);
+            let result = FsRequest::read(l(), fd, read_mem, 0);
+            assert!(result.is_ok());
+
+            let nread = result.unwrap();
+            assert!(nread > 0);
+            let read_str = str::from_utf8(read_mem.slice(0, nread as uint));
+            assert_eq!(read_str, ~"hello");
+        }
+        // unlink
+        let result = FsRequest::unlink(l(), &path_str.to_c_str());
+        assert!(result.is_ok());
     }
 
     #[test]
     fn file_test_stat() {
-        do run_uv_loop |l| {
-            let path = &"./tmp/file_test_stat_simple".to_c_str();
-            let create_flags = (O_RDWR | O_CREAT) as int;
-            let mode = (S_IWUSR | S_IRUSR) as int;
+        let path = &"./tmp/file_test_stat_simple".to_c_str();
+        let create_flags = (O_RDWR | O_CREAT) as int;
+        let mode = (S_IWUSR | S_IRUSR) as int;
 
-            let result = FsRequest::open(l, path, create_flags, mode);
-            assert!(result.is_ok());
-            let file = result.unwrap();
+        let result = FsRequest::open(l(), path, create_flags, mode);
+        assert!(result.is_ok());
+        let file = result.unwrap();
 
-            let result = FsRequest::write(l, file.fd, "hello".as_bytes(), 0);
-            assert!(result.is_ok());
+        let result = FsRequest::write(l(), file.fd, "hello".as_bytes(), 0);
+        assert!(result.is_ok());
 
-            let result = FsRequest::stat(l, path);
-            assert!(result.is_ok());
-            assert_eq!(result.unwrap().size, 5);
+        let result = FsRequest::stat(l(), path);
+        assert!(result.is_ok());
+        assert_eq!(result.unwrap().size, 5);
 
-            fn free<T>(_: T) {}
-            free(file);
+        fn free<T>(_: T) {}
+        free(file);
 
-            let result = FsRequest::unlink(l, path);
-            assert!(result.is_ok());
-        }
+        let result = FsRequest::unlink(l(), path);
+        assert!(result.is_ok());
     }
 
     #[test]
     fn file_test_mk_rm_dir() {
-        do run_uv_loop |l| {
-            let path = &"./tmp/mk_rm_dir".to_c_str();
-            let mode = S_IWUSR | S_IRUSR;
+        let path = &"./tmp/mk_rm_dir".to_c_str();
+        let mode = S_IWUSR | S_IRUSR;
 
-            let result = FsRequest::mkdir(l, path, mode);
-            assert!(result.is_ok());
+        let result = FsRequest::mkdir(l(), path, mode);
+        assert!(result.is_ok());
 
-            let result = FsRequest::stat(l, path);
-            assert!(result.is_ok());
-            assert!(result.unwrap().kind == io::TypeDirectory);
+        let result = FsRequest::stat(l(), path);
+        assert!(result.is_ok());
+        assert!(result.unwrap().kind == io::TypeDirectory);
 
-            let result = FsRequest::rmdir(l, path);
-            assert!(result.is_ok());
+        let result = FsRequest::rmdir(l(), path);
+        assert!(result.is_ok());
 
-            let result = FsRequest::stat(l, path);
-            assert!(result.is_err());
-        }
+        let result = FsRequest::stat(l(), path);
+        assert!(result.is_err());
     }
 
     #[test]
     fn file_test_mkdir_chokes_on_double_create() {
-        do run_uv_loop |l| {
-            let path = &"./tmp/double_create_dir".to_c_str();
-            let mode = S_IWUSR | S_IRUSR;
-
-            let result = FsRequest::stat(l, path);
-            assert!(result.is_err(), "{:?}", result);
-            let result = FsRequest::mkdir(l, path, mode as c_int);
-            assert!(result.is_ok(), "{:?}", result);
-            let result = FsRequest::mkdir(l, path, mode as c_int);
-            assert!(result.is_err(), "{:?}", result);
-            let result = FsRequest::rmdir(l, path);
-            assert!(result.is_ok(), "{:?}", result);
-        }
+        let path = &"./tmp/double_create_dir".to_c_str();
+        let mode = S_IWUSR | S_IRUSR;
+
+        let result = FsRequest::stat(l(), path);
+        assert!(result.is_err(), "{:?}", result);
+        let result = FsRequest::mkdir(l(), path, mode as c_int);
+        assert!(result.is_ok(), "{:?}", result);
+        let result = FsRequest::mkdir(l(), path, mode as c_int);
+        assert!(result.is_err(), "{:?}", result);
+        let result = FsRequest::rmdir(l(), path);
+        assert!(result.is_ok(), "{:?}", result);
     }
 
     #[test]
     fn file_test_rmdir_chokes_on_nonexistant_path() {
-        do run_uv_loop |l| {
-            let path = &"./tmp/never_existed_dir".to_c_str();
-            let result = FsRequest::rmdir(l, path);
-            assert!(result.is_err());
-        }
+        let path = &"./tmp/never_existed_dir".to_c_str();
+        let result = FsRequest::rmdir(l(), path);
+        assert!(result.is_err());
     }
 }
index b3527ce9fb4215eac74f88afbffe80ac5af23e9d..83fc53dce1cd7f7f399e94100096e3cd211a97e2 100644 (file)
@@ -83,7 +83,6 @@ fn uv_handle(&self) -> *uvll::uv_idle_t { self.handle }
 }
 
 extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) {
-    if status == uvll::ECANCELED { return }
     assert_eq!(status, 0);
     let idle: &mut IdleWatcher = unsafe { UvHandle::from_uv_handle(&handle) };
     idle.callback.call();
@@ -101,7 +100,7 @@ mod test {
     use super::*;
     use std::rt::tube::Tube;
     use std::rt::rtio::{Callback, PausibleIdleCallback};
-    use super::super::run_uv_loop;
+    use super::super::local_loop;
 
     struct MyCallback(Tube<int>, int);
     impl Callback for MyCallback {
@@ -114,55 +113,47 @@ fn call(&mut self) {
 
     #[test]
     fn not_used() {
-        do run_uv_loop |l| {
-            let cb = ~MyCallback(Tube::new(), 1);
-            let _idle = IdleWatcher::new(l, cb as ~Callback);
-        }
+        let cb = ~MyCallback(Tube::new(), 1);
+        let _idle = IdleWatcher::new(local_loop(), cb as ~Callback);
     }
 
     #[test]
     fn smoke_test() {
-        do run_uv_loop |l| {
-            let mut tube = Tube::new();
-            let cb = ~MyCallback(tube.clone(), 1);
-            let mut idle = IdleWatcher::new(l, cb as ~Callback);
-            idle.resume();
-            tube.recv();
-        }
+        let mut tube = Tube::new();
+        let cb = ~MyCallback(tube.clone(), 1);
+        let mut idle = IdleWatcher::new(local_loop(), cb as ~Callback);
+        idle.resume();
+        tube.recv();
     }
 
     #[test]
     fn fun_combinations_of_methods() {
-        do run_uv_loop |l| {
-            let mut tube = Tube::new();
-            let cb = ~MyCallback(tube.clone(), 1);
-            let mut idle = IdleWatcher::new(l, cb as ~Callback);
-            idle.resume();
-            tube.recv();
-            idle.pause();
-            idle.resume();
-            idle.resume();
-            tube.recv();
-            idle.pause();
-            idle.pause();
-            idle.resume();
-            tube.recv();
-        }
+        let mut tube = Tube::new();
+        let cb = ~MyCallback(tube.clone(), 1);
+        let mut idle = IdleWatcher::new(local_loop(), cb as ~Callback);
+        idle.resume();
+        tube.recv();
+        idle.pause();
+        idle.resume();
+        idle.resume();
+        tube.recv();
+        idle.pause();
+        idle.pause();
+        idle.resume();
+        tube.recv();
     }
 
     #[test]
     fn pause_pauses() {
-        do run_uv_loop |l| {
-            let mut tube = Tube::new();
-            let cb = ~MyCallback(tube.clone(), 1);
-            let mut idle1 = IdleWatcher::new(l, cb as ~Callback);
-            let cb = ~MyCallback(tube.clone(), 2);
-            let mut idle2 = IdleWatcher::new(l, cb as ~Callback);
-            idle2.resume();
-            assert_eq!(tube.recv(), 2);
-            idle2.pause();
-            idle1.resume();
-            assert_eq!(tube.recv(), 1);
-        }
+        let mut tube = Tube::new();
+        let cb = ~MyCallback(tube.clone(), 1);
+        let mut idle1 = IdleWatcher::new(local_loop(), cb as ~Callback);
+        let cb = ~MyCallback(tube.clone(), 2);
+        let mut idle2 = IdleWatcher::new(local_loop(), cb as ~Callback);
+        idle2.resume();
+        assert_eq!(tube.recv(), 2);
+        idle2.pause();
+        idle1.resume();
+        assert_eq!(tube.recv(), 1);
     }
 }
index 5bedba08fb0eeab494ad3f7211c45735787ba8f4..4da5ad4275f792e0f32297c08e06a204053cfa64 100644 (file)
 
 #[feature(macro_rules, globs)];
 
+use std::cast::transmute;
 use std::cast;
-use std::str::raw::from_c_str;
-use std::vec;
+use std::libc::{c_int, malloc, free};
+use std::ptr::null;
 use std::ptr;
+use std::rt::BlockedTask;
+use std::rt::local::Local;
+use std::rt::sched::Scheduler;
+use std::str::raw::from_c_str;
 use std::str;
-use std::libc::{c_void, c_int, malloc, free};
-use std::cast::transmute;
-use std::ptr::null;
+use std::task;
 use std::unstable::finally::Finally;
+use std::vec;
 
 use std::rt::io::IoError;
 
@@ -124,27 +128,90 @@ fn close_async_(&mut self) {
             uvll::uv_close(self.uv_handle() as *uvll::uv_handle_t, close_cb)
         }
     }
+
+    fn close(&mut self) {
+        let mut slot = None;
+
+        unsafe {
+            uvll::uv_close(self.uv_handle() as *uvll::uv_handle_t, close_cb);
+            uvll::set_data_for_uv_handle(self.uv_handle(), ptr::null::<()>());
+
+            do wait_until_woken_after(&mut slot) {
+                uvll::set_data_for_uv_handle(self.uv_handle(), &slot);
+            }
+        }
+
+        extern fn close_cb(handle: *uvll::uv_handle_t) {
+            unsafe {
+                let data = uvll::get_data_for_uv_handle(handle);
+                uvll::free_handle(handle);
+                if data == ptr::null() { return }
+                let slot: &mut Option<BlockedTask> = cast::transmute(data);
+                let sched: ~Scheduler = Local::take();
+                sched.resume_blocked_task_immediately(slot.take_unwrap());
+            }
+        }
+    }
+}
+
+pub struct ForbidUnwind {
+    msg: &'static str,
+    failing_before: bool,
+}
+
+impl ForbidUnwind {
+    fn new(s: &'static str) -> ForbidUnwind {
+        ForbidUnwind {
+            msg: s, failing_before: task::failing(),
+        }
+    }
+}
+
+impl Drop for ForbidUnwind {
+    fn drop(&mut self) {
+        assert!(self.failing_before == task::failing(),
+                "failing sadface {}", self.msg);
+    }
+}
+
+fn wait_until_woken_after(slot: *mut Option<BlockedTask>, f: &fn()) {
+    let _f = ForbidUnwind::new("wait_until_woken_after");
+    unsafe {
+        assert!((*slot).is_none());
+        let sched: ~Scheduler = Local::take();
+        do sched.deschedule_running_task_and_then |_, task| {
+            f();
+            *slot = Some(task);
+        }
+    }
 }
 
 pub struct Request {
     handle: *uvll::uv_req_t,
+    priv defused: bool,
 }
 
 impl Request {
     pub fn new(ty: uvll::uv_req_type) -> Request {
-        Request::wrap(unsafe { uvll::malloc_req(ty) })
+        unsafe {
+            let handle = uvll::malloc_req(ty);
+            uvll::set_data_for_req(handle, null::<()>());
+            Request::wrap(handle)
+        }
     }
 
     pub fn wrap(handle: *uvll::uv_req_t) -> Request {
-        Request { handle: handle }
+        Request { handle: handle, defused: false }
     }
 
     pub fn set_data<T>(&self, t: *T) {
         unsafe { uvll::set_data_for_req(self.handle, t) }
     }
 
-    pub fn get_data(&self) -> *c_void {
-        unsafe { uvll::get_data_for_req(self.handle) }
+    pub unsafe fn get_data<T>(&self) -> &'static mut T {
+        let data = uvll::get_data_for_req(self.handle);
+        assert!(data != null());
+        cast::transmute(data)
     }
 
     // This function should be used when the request handle has been given to an
@@ -155,17 +222,15 @@ pub fn get_data(&self) -> *c_void {
     // This is still a problem in blocking situations due to linked failure. In
     // the connection callback the handle should be re-wrapped with the `wrap`
     // function to ensure its destruction.
-    pub fn defuse(mut self) {
-        self.handle = ptr::null();
+    pub fn defuse(&mut self) {
+        self.defused = true;
     }
 }
 
 impl Drop for Request {
     fn drop(&mut self) {
-        unsafe {
-            if self.handle != ptr::null() {
-                uvll::free_req(self.handle)
-            }
+        if !self.defused {
+            unsafe { uvll::free_req(self.handle) }
         }
     }
 }
@@ -300,23 +365,18 @@ pub fn slice_to_uv_buf(v: &[u8]) -> Buf {
     uvll::uv_buf_t { base: data, len: v.len() as uvll::uv_buf_len_t }
 }
 
-fn run_uv_loop(f: proc(&mut Loop)) {
-    use std::rt::local::Local;
-    use std::rt::test::run_in_uv_task;
-    use std::rt::sched::Scheduler;
-    use std::cell::Cell;
-
-    let f = Cell::new(f);
-    do run_in_uv_task {
-        let mut io = None;
-        do Local::borrow |sched: &mut Scheduler| {
-            sched.event_loop.io(|i| unsafe {
+#[cfg(test)]
+fn local_loop() -> &'static mut Loop {
+    unsafe {
+        cast::transmute(do Local::borrow |sched: &mut Scheduler| {
+            let mut io = None;
+            do sched.event_loop.io |i| {
                 let (_vtable, uvio): (uint, &'static mut uvio::UvIoFactory) =
                     cast::transmute(i);
                 io = Some(uvio);
-            });
-        }
-        f.take()(io.unwrap().uv_loop());
+            }
+            io.unwrap()
+        }.uv_loop())
     }
 }
 
index 5d228cd78486b23f75d715d16287dcf0ba99499d..bf5f6c88527e220cafd454d6c4b7e8c95d454611 100644 (file)
 use std::rt::sched::{Scheduler, SchedHandle};
 use std::rt::tube::Tube;
 use std::str;
+use std::task;
 use std::vec;
 
 use stream::StreamWatcher;
 use super::{Loop, Request, UvError, Buf, status_to_io_result,
-            uv_error_to_io_error, UvHandle, slice_to_uv_buf};
+            uv_error_to_io_error, UvHandle, slice_to_uv_buf,
+            wait_until_woken_after};
 use uvio::HomingIO;
 use uvll;
 
@@ -206,46 +208,46 @@ pub fn connect(loop_: &mut Loop, address: SocketAddr)
     {
         struct Ctx { status: c_int, task: Option<BlockedTask> }
 
-        let tcp = TcpWatcher::new(loop_);
-        let ret = do socket_addr_as_uv_socket_addr(address) |addr| {
-            let req = Request::new(uvll::UV_CONNECT);
-            let result = match addr {
-                UvIpv4SocketAddr(addr) => unsafe {
-                    uvll::tcp_connect(req.handle, tcp.handle, addr,
-                                      connect_cb)
-                },
-                UvIpv6SocketAddr(addr) => unsafe {
-                    uvll::tcp_connect6(req.handle, tcp.handle, addr,
-                                       connect_cb)
-                },
-            };
-            match result {
-                0 => {
-                    let mut cx = Ctx { status: 0, task: None };
-                    req.set_data(&cx);
-                    req.defuse();
-                    let scheduler: ~Scheduler = Local::take();
-                    do scheduler.deschedule_running_task_and_then |_, task| {
-                        cx.task = Some(task);
-                    }
-                    match cx.status {
-                        0 => Ok(()),
-                        n => Err(UvError(n)),
+        return do task::unkillable {
+            let tcp = TcpWatcher::new(loop_);
+            let ret = do socket_addr_as_uv_socket_addr(address) |addr| {
+                let mut req = Request::new(uvll::UV_CONNECT);
+                let result = match addr {
+                    UvIpv4SocketAddr(addr) => unsafe {
+                        uvll::tcp_connect(req.handle, tcp.handle, addr,
+                                          connect_cb)
+                    },
+                    UvIpv6SocketAddr(addr) => unsafe {
+                        uvll::tcp_connect6(req.handle, tcp.handle, addr,
+                                           connect_cb)
+                    },
+                };
+                match result {
+                    0 => {
+                        req.defuse(); // uv callback now owns this request
+                        let mut cx = Ctx { status: 0, task: None };
+                        do wait_until_woken_after(&mut cx.task) {
+                            req.set_data(&cx);
+                        }
+                        match cx.status {
+                            0 => Ok(()),
+                            n => Err(UvError(n)),
+                        }
                     }
+                    n => Err(UvError(n))
                 }
-                n => Err(UvError(n))
-            }
-        };
+            };
 
-        return match ret {
-            Ok(()) => Ok(tcp),
-            Err(e) => Err(e),
+            match ret {
+                Ok(()) => Ok(tcp),
+                Err(e) => Err(e),
+            }
         };
 
         extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
             let req = Request::wrap(req);
-            if status == uvll::ECANCELED { return }
-            let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
+            assert!(status != uvll::ECANCELED);
+            let cx: &mut Ctx = unsafe { req.get_data() };
             cx.status = status;
             let scheduler: ~Scheduler = Local::take();
             scheduler.resume_blocked_task_immediately(cx.task.take_unwrap());
@@ -310,10 +312,14 @@ fn letdie(&mut self) -> Result<(), IoError> {
     }
 }
 
+impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
+    fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
+}
+
 impl Drop for TcpWatcher {
     fn drop(&mut self) {
         let _m = self.fire_homing_missile();
-        self.stream.close();
+        self.close();
     }
 }
 
@@ -323,25 +329,27 @@ impl TcpListener {
     pub fn bind(loop_: &mut Loop, address: SocketAddr)
         -> Result<~TcpListener, UvError>
     {
-        let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
-        assert_eq!(unsafe {
-            uvll::uv_tcp_init(loop_.handle, handle)
-        }, 0);
-        let l = ~TcpListener {
-            home: get_handle_to_current_scheduler!(),
-            handle: handle,
-            closing_task: None,
-            outgoing: Tube::new(),
-        };
-        let res = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
-            match addr {
-                UvIpv4SocketAddr(addr) => uvll::tcp_bind(l.handle, addr),
-                UvIpv6SocketAddr(addr) => uvll::tcp_bind6(l.handle, addr),
+        do task::unkillable {
+            let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
+            assert_eq!(unsafe {
+                uvll::uv_tcp_init(loop_.handle, handle)
+            }, 0);
+            let l = ~TcpListener {
+                home: get_handle_to_current_scheduler!(),
+                handle: handle,
+                closing_task: None,
+                outgoing: Tube::new(),
+            };
+            let res = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
+                match addr {
+                    UvIpv4SocketAddr(addr) => uvll::tcp_bind(l.handle, addr),
+                    UvIpv6SocketAddr(addr) => uvll::tcp_bind6(l.handle, addr),
+                }
+            });
+            match res {
+                0 => Ok(l.install()),
+                n => Err(UvError(n))
             }
-        });
-        match res {
-            0 => Ok(l.install()),
-            n => Err(UvError(n))
         }
     }
 }
@@ -380,6 +388,7 @@ fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
 }
 
 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
+    assert!(status != uvll::ECANCELED);
     let msg = match status {
         0 => {
             let loop_ = Loop::wrap(unsafe {
@@ -389,7 +398,6 @@ fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
             assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
             Ok(~client as ~rtio::RtioTcpStream)
         }
-        uvll::ECANCELED => return,
         n => Err(uv_error_to_io_error(UvError(n)))
     };
 
@@ -399,12 +407,8 @@ fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
 
 impl Drop for TcpListener {
     fn drop(&mut self) {
-        let (_m, sched) = self.fire_homing_missile_sched();
-
-        do sched.deschedule_running_task_and_then |_, task| {
-            self.closing_task = Some(task);
-            unsafe { uvll::uv_close(self.handle, listener_close_cb) }
-        }
+        let _m = self.fire_homing_missile();
+        self.close();
     }
 }
 
@@ -463,26 +467,34 @@ impl UdpWatcher {
     pub fn bind(loop_: &Loop, address: SocketAddr)
         -> Result<UdpWatcher, UvError>
     {
-        let udp = UdpWatcher {
-            handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
-            home: get_handle_to_current_scheduler!(),
-        };
-        assert_eq!(unsafe {
-            uvll::uv_udp_init(loop_.handle, udp.handle)
-        }, 0);
-        let result = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
-            match addr {
-                UvIpv4SocketAddr(addr) => uvll::udp_bind(udp.handle, addr, 0u32),
-                UvIpv6SocketAddr(addr) => uvll::udp_bind6(udp.handle, addr, 0u32),
+        do task::unkillable {
+            let udp = UdpWatcher {
+                handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
+                home: get_handle_to_current_scheduler!(),
+            };
+            assert_eq!(unsafe {
+                uvll::uv_udp_init(loop_.handle, udp.handle)
+            }, 0);
+            let result = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
+                match addr {
+                    UvIpv4SocketAddr(addr) =>
+                        uvll::udp_bind(udp.handle, addr, 0u32),
+                    UvIpv6SocketAddr(addr) =>
+                        uvll::udp_bind6(udp.handle, addr, 0u32),
+                }
+            });
+            match result {
+                0 => Ok(udp),
+                n => Err(UvError(n)),
             }
-        });
-        match result {
-            0 => Ok(udp),
-            n => Err(UvError(n)),
         }
     }
 }
 
+impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
+    fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
+}
+
 impl HomingIO for UdpWatcher {
     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
 }
@@ -505,7 +517,7 @@ struct Ctx {
         }
         let _m = self.fire_homing_missile();
 
-        return match unsafe {
+        let a = match unsafe {
             uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
         } {
             0 => {
@@ -514,10 +526,8 @@ struct Ctx {
                     buf: Some(slice_to_uv_buf(buf)),
                     result: None,
                 };
-                unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) }
-                let scheduler: ~Scheduler = Local::take();
-                do scheduler.deschedule_running_task_and_then |_, task| {
-                    cx.task = Some(task);
+                do wait_until_woken_after(&mut cx.task) {
+                    unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) }
                 }
                 match cx.result.take_unwrap() {
                     (n, _) if n < 0 =>
@@ -527,23 +537,30 @@ struct Ctx {
             }
             n => Err(uv_error_to_io_error(UvError(n)))
         };
+        return a;
 
         extern fn alloc_cb(handle: *uvll::uv_udp_t,
                            _suggested_size: size_t) -> Buf {
             let cx: &mut Ctx = unsafe {
                 cast::transmute(uvll::get_data_for_uv_handle(handle))
             };
-            cx.buf.take().expect("alloc_cb called more than once")
+            cx.buf.take().expect("recv alloc_cb called more than once")
         }
 
-        extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, _buf: Buf,
+        extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
                           addr: *uvll::sockaddr, _flags: c_uint) {
+            assert!(nread != uvll::ECANCELED as ssize_t);
+            let cx: &mut Ctx = unsafe {
+                cast::transmute(uvll::get_data_for_uv_handle(handle))
+            };
 
             // When there's no data to read the recv callback can be a no-op.
             // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
             // this we just drop back to kqueue and wait for the next callback.
-            if nread == 0 { return }
-            if nread == uvll::ECANCELED as ssize_t { return }
+            if nread == 0 {
+                cx.buf = Some(buf);
+                return
+            }
 
             unsafe {
                 assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
@@ -566,7 +583,7 @@ struct Ctx { task: Option<BlockedTask>, result: c_int }
 
         let _m = self.fire_homing_missile();
 
-        let req = Request::new(uvll::UV_UDP_SEND);
+        let mut req = Request::new(uvll::UV_UDP_SEND);
         let buf = slice_to_uv_buf(buf);
         let result = socket_addr_as_uv_socket_addr(dst, |dst| unsafe {
             match dst {
@@ -579,15 +596,11 @@ struct Ctx { task: Option<BlockedTask>, result: c_int }
 
         return match result {
             0 => {
+                req.defuse(); // uv callback now owns this request
                 let mut cx = Ctx { task: None, result: 0 };
-                req.set_data(&cx);
-                req.defuse();
-
-                let sched: ~Scheduler = Local::take();
-                do sched.deschedule_running_task_and_then |_, task| {
-                    cx.task = Some(task);
+                do wait_until_woken_after(&mut cx.task) {
+                    req.set_data(&cx);
                 }
-
                 match cx.result {
                     0 => Ok(()),
                     n => Err(uv_error_to_io_error(UvError(n)))
@@ -598,7 +611,8 @@ struct Ctx { task: Option<BlockedTask>, result: c_int }
 
         extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
             let req = Request::wrap(req);
-            let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
+            assert!(status != uvll::ECANCELED);
+            let cx: &mut Ctx = unsafe { req.get_data() };
             cx.result = status;
 
             let sched: ~Scheduler = Local::take();
@@ -679,24 +693,8 @@ fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
 impl Drop for UdpWatcher {
     fn drop(&mut self) {
         // Send ourselves home to close this handle (blocking while doing so).
-        let (_m, sched) = self.fire_homing_missile_sched();
-        let mut slot = None;
-        unsafe {
-            uvll::set_data_for_uv_handle(self.handle, &slot);
-            uvll::uv_close(self.handle, close_cb);
-        }
-        do sched.deschedule_running_task_and_then |_, task| {
-            slot = Some(task);
-        }
-
-        extern fn close_cb(handle: *uvll::uv_handle_t) {
-            let slot: &mut Option<BlockedTask> = unsafe {
-                cast::transmute(uvll::get_data_for_uv_handle(handle))
-            };
-            unsafe { uvll::free_handle(handle) }
-            let sched: ~Scheduler = Local::take();
-            sched.resume_blocked_task_immediately(slot.take_unwrap());
-        }
+        let _m = self.fire_homing_missile();
+        self.close();
     }
 }
 
@@ -714,397 +712,357 @@ mod test {
     use std::task;
 
     use super::*;
-    use super::super::{Loop, run_uv_loop};
+    use super::super::local_loop;
 
     #[test]
     fn connect_close_ip4() {
-        do run_uv_loop |l| {
-            match TcpWatcher::connect(l, next_test_ip4()) {
-                Ok(*) => fail!(),
-                Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
-            }
+        match TcpWatcher::connect(local_loop(), next_test_ip4()) {
+            Ok(*) => fail!(),
+            Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
         }
     }
 
     #[test]
     fn connect_close_ip6() {
-        do run_uv_loop |l| {
-            match TcpWatcher::connect(l, next_test_ip6()) {
-                Ok(*) => fail!(),
-                Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
-            }
+        match TcpWatcher::connect(local_loop(), next_test_ip6()) {
+            Ok(*) => fail!(),
+            Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
         }
     }
 
     #[test]
     fn udp_bind_close_ip4() {
-        do run_uv_loop |l| {
-            match UdpWatcher::bind(l, next_test_ip4()) {
-                Ok(*) => {}
-                Err(*) => fail!()
-            }
+        match UdpWatcher::bind(local_loop(), next_test_ip4()) {
+            Ok(*) => {}
+            Err(*) => fail!()
         }
     }
 
     #[test]
     fn udp_bind_close_ip6() {
-        do run_uv_loop |l| {
-            match UdpWatcher::bind(l, next_test_ip6()) {
-                Ok(*) => {}
-                Err(*) => fail!()
-            }
+        match UdpWatcher::bind(local_loop(), next_test_ip6()) {
+            Ok(*) => {}
+            Err(*) => fail!()
         }
     }
 
     #[test]
     fn listen_ip4() {
-        do run_uv_loop |l| {
-            let (port, chan) = oneshot();
-            let chan = Cell::new(chan);
-            let addr = next_test_ip4();
-
-            let handle = l.handle;
-            do spawn {
-                let w = match TcpListener::bind(&mut Loop::wrap(handle), addr) {
-                    Ok(w) => w, Err(e) => fail!("{:?}", e)
-                };
-                let mut w = match w.listen() {
-                    Ok(w) => w, Err(e) => fail!("{:?}", e),
-                };
-                chan.take().send(());
-                match w.accept() {
-                    Ok(mut stream) => {
-                        let mut buf = [0u8, ..10];
-                        match stream.read(buf) {
-                            Ok(10) => {} e => fail!("{:?}", e),
-                        }
-                        for i in range(0, 10u8) {
-                            assert_eq!(buf[i], i + 1);
-                        }
-                    }
-                    Err(e) => fail!("{:?}", e)
-                }
-            }
+        let (port, chan) = oneshot();
+        let chan = Cell::new(chan);
+        let addr = next_test_ip4();
 
-            port.recv();
-            let mut w = match TcpWatcher::connect(&mut Loop::wrap(handle), addr) {
+        do spawn {
+            let w = match TcpListener::bind(local_loop(), addr) {
                 Ok(w) => w, Err(e) => fail!("{:?}", e)
             };
-            match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
-                Ok(()) => {}, Err(e) => fail!("{:?}", e)
+            let mut w = match w.listen() {
+                Ok(w) => w, Err(e) => fail!("{:?}", e),
+            };
+            chan.take().send(());
+            match w.accept() {
+                Ok(mut stream) => {
+                    let mut buf = [0u8, ..10];
+                    match stream.read(buf) {
+                        Ok(10) => {} e => fail!("{:?}", e),
+                    }
+                    for i in range(0, 10u8) {
+                        assert_eq!(buf[i], i + 1);
+                    }
+                }
+                Err(e) => fail!("{:?}", e)
             }
         }
+
+        port.recv();
+        let mut w = match TcpWatcher::connect(local_loop(), addr) {
+            Ok(w) => w, Err(e) => fail!("{:?}", e)
+        };
+        match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
+            Ok(()) => {}, Err(e) => fail!("{:?}", e)
+        }
     }
 
     #[test]
     fn listen_ip6() {
-        do run_uv_loop |l| {
-            let (port, chan) = oneshot();
-            let chan = Cell::new(chan);
-            let addr = next_test_ip6();
-
-            let handle = l.handle;
-            do spawn {
-                let w = match TcpListener::bind(&mut Loop::wrap(handle), addr) {
-                    Ok(w) => w, Err(e) => fail!("{:?}", e)
-                };
-                let mut w = match w.listen() {
-                    Ok(w) => w, Err(e) => fail!("{:?}", e),
-                };
-                chan.take().send(());
-                match w.accept() {
-                    Ok(mut stream) => {
-                        let mut buf = [0u8, ..10];
-                        match stream.read(buf) {
-                            Ok(10) => {} e => fail!("{:?}", e),
-                        }
-                        for i in range(0, 10u8) {
-                            assert_eq!(buf[i], i + 1);
-                        }
-                    }
-                    Err(e) => fail!("{:?}", e)
-                }
-            }
+        let (port, chan) = oneshot();
+        let chan = Cell::new(chan);
+        let addr = next_test_ip6();
 
-            port.recv();
-            let mut w = match TcpWatcher::connect(&mut Loop::wrap(handle), addr) {
+        do spawn {
+            let w = match TcpListener::bind(local_loop(), addr) {
                 Ok(w) => w, Err(e) => fail!("{:?}", e)
             };
-            match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
-                Ok(()) => {}, Err(e) => fail!("{:?}", e)
+            let mut w = match w.listen() {
+                Ok(w) => w, Err(e) => fail!("{:?}", e),
+            };
+            chan.take().send(());
+            match w.accept() {
+                Ok(mut stream) => {
+                    let mut buf = [0u8, ..10];
+                    match stream.read(buf) {
+                        Ok(10) => {} e => fail!("{:?}", e),
+                    }
+                    for i in range(0, 10u8) {
+                        assert_eq!(buf[i], i + 1);
+                    }
+                }
+                Err(e) => fail!("{:?}", e)
             }
         }
+
+        port.recv();
+        let mut w = match TcpWatcher::connect(local_loop(), addr) {
+            Ok(w) => w, Err(e) => fail!("{:?}", e)
+        };
+        match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
+            Ok(()) => {}, Err(e) => fail!("{:?}", e)
+        }
     }
 
     #[test]
     fn udp_recv_ip4() {
-        do run_uv_loop |l| {
-            let (port, chan) = oneshot();
-            let chan = Cell::new(chan);
-            let client = next_test_ip4();
-            let server = next_test_ip4();
-
-            let handle = l.handle;
-            do spawn {
-                match UdpWatcher::bind(&mut Loop::wrap(handle), server) {
-                    Ok(mut w) => {
-                        chan.take().send(());
-                        let mut buf = [0u8, ..10];
-                        match w.recvfrom(buf) {
-                            Ok((10, addr)) => assert_eq!(addr, client),
-                            e => fail!("{:?}", e),
-                        }
-                        for i in range(0, 10u8) {
-                            assert_eq!(buf[i], i + 1);
-                        }
+        let (port, chan) = oneshot();
+        let chan = Cell::new(chan);
+        let client = next_test_ip4();
+        let server = next_test_ip4();
+
+        do spawn {
+            match UdpWatcher::bind(local_loop(), server) {
+                Ok(mut w) => {
+                    chan.take().send(());
+                    let mut buf = [0u8, ..10];
+                    match w.recvfrom(buf) {
+                        Ok((10, addr)) => assert_eq!(addr, client),
+                        e => fail!("{:?}", e),
+                    }
+                    for i in range(0, 10u8) {
+                        assert_eq!(buf[i], i + 1);
                     }
-                    Err(e) => fail!("{:?}", e)
                 }
+                Err(e) => fail!("{:?}", e)
             }
+        }
 
-            port.recv();
-            let mut w = match UdpWatcher::bind(&mut Loop::wrap(handle), client) {
-                Ok(w) => w, Err(e) => fail!("{:?}", e)
-            };
-            match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
-                Ok(()) => {}, Err(e) => fail!("{:?}", e)
-            }
+        port.recv();
+        let mut w = match UdpWatcher::bind(local_loop(), client) {
+            Ok(w) => w, Err(e) => fail!("{:?}", e)
+        };
+        match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
+            Ok(()) => {}, Err(e) => fail!("{:?}", e)
         }
     }
 
     #[test]
     fn udp_recv_ip6() {
-        do run_uv_loop |l| {
-            let (port, chan) = oneshot();
-            let chan = Cell::new(chan);
-            let client = next_test_ip6();
-            let server = next_test_ip6();
-
-            let handle = l.handle;
-            do spawn {
-                match UdpWatcher::bind(&mut Loop::wrap(handle), server) {
-                    Ok(mut w) => {
-                        chan.take().send(());
-                        let mut buf = [0u8, ..10];
-                        match w.recvfrom(buf) {
-                            Ok((10, addr)) => assert_eq!(addr, client),
-                            e => fail!("{:?}", e),
-                        }
-                        for i in range(0, 10u8) {
-                            assert_eq!(buf[i], i + 1);
-                        }
+        let (port, chan) = oneshot();
+        let chan = Cell::new(chan);
+        let client = next_test_ip6();
+        let server = next_test_ip6();
+
+        do spawn {
+            match UdpWatcher::bind(local_loop(), server) {
+                Ok(mut w) => {
+                    chan.take().send(());
+                    let mut buf = [0u8, ..10];
+                    match w.recvfrom(buf) {
+                        Ok((10, addr)) => assert_eq!(addr, client),
+                        e => fail!("{:?}", e),
+                    }
+                    for i in range(0, 10u8) {
+                        assert_eq!(buf[i], i + 1);
                     }
-                    Err(e) => fail!("{:?}", e)
                 }
+                Err(e) => fail!("{:?}", e)
             }
+        }
 
-            port.recv();
-            let mut w = match UdpWatcher::bind(&mut Loop::wrap(handle), client) {
-                Ok(w) => w, Err(e) => fail!("{:?}", e)
-            };
-            match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
-                Ok(()) => {}, Err(e) => fail!("{:?}", e)
-            }
+        port.recv();
+        let mut w = match UdpWatcher::bind(local_loop(), client) {
+            Ok(w) => w, Err(e) => fail!("{:?}", e)
+        };
+        match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
+            Ok(()) => {}, Err(e) => fail!("{:?}", e)
         }
     }
 
     #[test]
     fn test_read_read_read() {
-        do run_uv_loop |l| {
-            let addr = next_test_ip4();
-            static MAX: uint = 500000;
-            let (port, chan) = oneshot();
-            let port = Cell::new(port);
-            let chan = Cell::new(chan);
-
-            let handle = l.handle;
-            do spawntask {
-                let l = &mut Loop::wrap(handle);
-                let listener = TcpListener::bind(l, addr).unwrap();
-                let mut acceptor = listener.listen().unwrap();
-                chan.take().send(());
-                let mut stream = acceptor.accept().unwrap();
-                let buf = [1, .. 2048];
-                let mut total_bytes_written = 0;
-                while total_bytes_written < MAX {
-                    stream.write(buf);
-                    total_bytes_written += buf.len();
-                }
+        use std::rt::rtio::*;
+        let addr = next_test_ip4();
+        static MAX: uint = 5000;
+        let (port, chan) = oneshot();
+        let port = Cell::new(port);
+        let chan = Cell::new(chan);
+
+        do spawn {
+            let listener = TcpListener::bind(local_loop(), addr).unwrap();
+            let mut acceptor = listener.listen().unwrap();
+            chan.take().send(());
+            let mut stream = acceptor.accept().unwrap();
+            let buf = [1, .. 2048];
+            let mut total_bytes_written = 0;
+            while total_bytes_written < MAX {
+                assert!(stream.write(buf).is_ok());
+                uvdebug!("wrote bytes");
+                total_bytes_written += buf.len();
             }
+        }
 
-            do spawntask {
-                let l = &mut Loop::wrap(handle);
-                port.take().recv();
-                let mut stream = TcpWatcher::connect(l, addr).unwrap();
-                let mut buf = [0, .. 2048];
-                let mut total_bytes_read = 0;
-                while total_bytes_read < MAX {
-                    let nread = stream.read(buf).unwrap();
-                    uvdebug!("read {} bytes", nread);
-                    total_bytes_read += nread;
-                    for i in range(0u, nread) {
-                        assert_eq!(buf[i], 1);
-                    }
+        do spawn {
+            port.take().recv();
+            let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
+            let mut buf = [0, .. 2048];
+            let mut total_bytes_read = 0;
+            while total_bytes_read < MAX {
+                let nread = stream.read(buf).unwrap();
+                total_bytes_read += nread;
+                for i in range(0u, nread) {
+                    assert_eq!(buf[i], 1);
                 }
-                uvdebug!("read {} bytes total", total_bytes_read);
             }
+            uvdebug!("read {} bytes total", total_bytes_read);
         }
     }
 
     #[test]
-    #[ignore(cfg(windows))] // FIXME(#10102) the server never sees the second send
     fn test_udp_twice() {
-        do run_uv_loop |l| {
-            let server_addr = next_test_ip4();
-            let client_addr = next_test_ip4();
-            let (port, chan) = oneshot();
-            let port = Cell::new(port);
-            let chan = Cell::new(chan);
-
-            let handle = l.handle;
-            do spawntask {
-                let l = &mut Loop::wrap(handle);
-                let mut client = UdpWatcher::bind(l, client_addr).unwrap();
-                port.take().recv();
-                assert!(client.sendto([1], server_addr).is_ok());
-                assert!(client.sendto([2], server_addr).is_ok());
-            }
+        let server_addr = next_test_ip4();
+        let client_addr = next_test_ip4();
+        let (port, chan) = oneshot();
+        let port = Cell::new(port);
+        let chan = Cell::new(chan);
 
-            do spawntask {
-                let l = &mut Loop::wrap(handle);
-                let mut server = UdpWatcher::bind(l, server_addr).unwrap();
-                chan.take().send(());
-                let mut buf1 = [0];
-                let mut buf2 = [0];
-                let (nread1, src1) = server.recvfrom(buf1).unwrap();
-                let (nread2, src2) = server.recvfrom(buf2).unwrap();
-                assert_eq!(nread1, 1);
-                assert_eq!(nread2, 1);
-                assert_eq!(src1, client_addr);
-                assert_eq!(src2, client_addr);
-                assert_eq!(buf1[0], 1);
-                assert_eq!(buf2[0], 2);
-            }
+        do spawn {
+            let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
+            port.take().recv();
+            assert!(client.sendto([1], server_addr).is_ok());
+            assert!(client.sendto([2], server_addr).is_ok());
         }
+
+        let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
+        chan.take().send(());
+        let mut buf1 = [0];
+        let mut buf2 = [0];
+        let (nread1, src1) = server.recvfrom(buf1).unwrap();
+        let (nread2, src2) = server.recvfrom(buf2).unwrap();
+        assert_eq!(nread1, 1);
+        assert_eq!(nread2, 1);
+        assert_eq!(src1, client_addr);
+        assert_eq!(src2, client_addr);
+        assert_eq!(buf1[0], 1);
+        assert_eq!(buf2[0], 2);
     }
 
     #[test]
     fn test_udp_many_read() {
-        do run_uv_loop |l| {
-            let server_out_addr = next_test_ip4();
-            let server_in_addr = next_test_ip4();
-            let client_out_addr = next_test_ip4();
-            let client_in_addr = next_test_ip4();
-            static MAX: uint = 500_000;
-
-            let (p1, c1) = oneshot();
-            let (p2, c2) = oneshot();
-
-            let first = Cell::new((p1, c2));
-            let second = Cell::new((p2, c1));
-
-            let handle = l.handle;
-            do spawntask {
-                let l = &mut Loop::wrap(handle);
-                let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
-                let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
-                let (port, chan) = first.take();
-                chan.send(());
-                port.recv();
-                let msg = [1, .. 2048];
-                let mut total_bytes_sent = 0;
-                let mut buf = [1];
-                while buf[0] == 1 {
-                    // send more data
-                    assert!(server_out.sendto(msg, client_in_addr).is_ok());
-                    total_bytes_sent += msg.len();
-                    // check if the client has received enough
-                    let res = server_in.recvfrom(buf);
-                    assert!(res.is_ok());
-                    let (nread, src) = res.unwrap();
-                    assert_eq!(nread, 1);
-                    assert_eq!(src, client_out_addr);
-                }
-                assert!(total_bytes_sent >= MAX);
+        let server_out_addr = next_test_ip4();
+        let server_in_addr = next_test_ip4();
+        let client_out_addr = next_test_ip4();
+        let client_in_addr = next_test_ip4();
+        static MAX: uint = 500_000;
+
+        let (p1, c1) = oneshot();
+        let (p2, c2) = oneshot();
+
+        let first = Cell::new((p1, c2));
+        let second = Cell::new((p2, c1));
+
+        do spawn {
+            let l = local_loop();
+            let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
+            let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
+            let (port, chan) = first.take();
+            chan.send(());
+            port.recv();
+            let msg = [1, .. 2048];
+            let mut total_bytes_sent = 0;
+            let mut buf = [1];
+            while buf[0] == 1 {
+                // send more data
+                assert!(server_out.sendto(msg, client_in_addr).is_ok());
+                total_bytes_sent += msg.len();
+                // check if the client has received enough
+                let res = server_in.recvfrom(buf);
+                assert!(res.is_ok());
+                let (nread, src) = res.unwrap();
+                assert_eq!(nread, 1);
+                assert_eq!(src, client_out_addr);
             }
+            assert!(total_bytes_sent >= MAX);
+        }
 
-            do spawntask {
-                let l = &mut Loop::wrap(handle);
-                let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
-                let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
-                let (port, chan) = second.take();
-                port.recv();
-                chan.send(());
-                let mut total_bytes_recv = 0;
-                let mut buf = [0, .. 2048];
-                while total_bytes_recv < MAX {
-                    // ask for more
-                    assert!(client_out.sendto([1], server_in_addr).is_ok());
-                    // wait for data
-                    let res = client_in.recvfrom(buf);
-                    assert!(res.is_ok());
-                    let (nread, src) = res.unwrap();
-                    assert_eq!(src, server_out_addr);
-                    total_bytes_recv += nread;
-                    for i in range(0u, nread) {
-                        assert_eq!(buf[i], 1);
-                    }
+        do spawn {
+            let l = local_loop();
+            let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
+            let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
+            let (port, chan) = second.take();
+            port.recv();
+            chan.send(());
+            let mut total_bytes_recv = 0;
+            let mut buf = [0, .. 2048];
+            while total_bytes_recv < MAX {
+                // ask for more
+                assert!(client_out.sendto([1], server_in_addr).is_ok());
+                // wait for data
+                let res = client_in.recvfrom(buf);
+                assert!(res.is_ok());
+                let (nread, src) = res.unwrap();
+                assert_eq!(src, server_out_addr);
+                total_bytes_recv += nread;
+                for i in range(0u, nread) {
+                    assert_eq!(buf[i], 1);
                 }
-                // tell the server we're done
-                assert!(client_out.sendto([0], server_in_addr).is_ok());
             }
+            // tell the server we're done
+            assert!(client_out.sendto([0], server_in_addr).is_ok());
         }
     }
 
     #[test]
     fn test_read_and_block() {
-        do run_uv_loop |l| {
-            let addr = next_test_ip4();
-            let (port, chan) = oneshot();
-            let port = Cell::new(port);
-            let chan = Cell::new(chan);
-
-            let handle = l.handle;
-            do spawntask {
-                let l = &mut Loop::wrap(handle);
-                let listener = TcpListener::bind(l, addr).unwrap();
-                let mut acceptor = listener.listen().unwrap();
-                let (port2, chan2) = stream();
-                chan.take().send(port2);
-                let mut stream = acceptor.accept().unwrap();
-                let mut buf = [0, .. 2048];
-
-                let expected = 32;
-                let mut current = 0;
-                let mut reads = 0;
-
-                while current < expected {
-                    let nread = stream.read(buf).unwrap();
-                    for i in range(0u, nread) {
-                        let val = buf[i] as uint;
-                        assert_eq!(val, current % 8);
-                        current += 1;
-                    }
-                    reads += 1;
+        let addr = next_test_ip4();
+        let (port, chan) = oneshot();
+        let port = Cell::new(port);
+        let chan = Cell::new(chan);
+
+        do spawn {
+            let listener = TcpListener::bind(local_loop(), addr).unwrap();
+            let mut acceptor = listener.listen().unwrap();
+            let (port2, chan2) = stream();
+            chan.take().send(port2);
+            let mut stream = acceptor.accept().unwrap();
+            let mut buf = [0, .. 2048];
+
+            let expected = 32;
+            let mut current = 0;
+            let mut reads = 0;
 
-                    chan2.send(());
+            while current < expected {
+                let nread = stream.read(buf).unwrap();
+                for i in range(0u, nread) {
+                    let val = buf[i] as uint;
+                    assert_eq!(val, current % 8);
+                    current += 1;
                 }
+                reads += 1;
 
-                // Make sure we had multiple reads
-                assert!(reads > 1);
+                chan2.send(());
             }
 
-            do spawntask {
-                let l = &mut Loop::wrap(handle);
-                let port2 = port.take().recv();
-                let mut stream = TcpWatcher::connect(l, addr).unwrap();
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                port2.recv();
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                port2.recv();
-            }
+            // Make sure we had multiple reads
+            assert!(reads > 1);
+        }
+
+        do spawn {
+            let port2 = port.take().recv();
+            let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
+            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            port2.recv();
+            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            port2.recv();
         }
     }
 
@@ -1113,27 +1071,23 @@ fn test_simple_tcp_server_and_client_on_diff_threads() {
         let addr = next_test_ip4();
 
         do task::spawn_sched(task::SingleThreaded) {
-            do run_uv_loop |l| {
-                let listener = TcpListener::bind(l, addr).unwrap();
-                let mut acceptor = listener.listen().unwrap();
-                let mut stream = acceptor.accept().unwrap();
-                let mut buf = [0, .. 2048];
-                let nread = stream.read(buf).unwrap();
-                assert_eq!(nread, 8);
-                for i in range(0u, nread) {
-                    assert_eq!(buf[i], i as u8);
-                }
+            let listener = TcpListener::bind(local_loop(), addr).unwrap();
+            let mut acceptor = listener.listen().unwrap();
+            let mut stream = acceptor.accept().unwrap();
+            let mut buf = [0, .. 2048];
+            let nread = stream.read(buf).unwrap();
+            assert_eq!(nread, 8);
+            for i in range(0u, nread) {
+                assert_eq!(buf[i], i as u8);
             }
         }
 
         do task::spawn_sched(task::SingleThreaded) {
-            do run_uv_loop |l| {
-                let mut stream = TcpWatcher::connect(l, addr);
-                while stream.is_err() {
-                    stream = TcpWatcher::connect(l, addr);
-                }
-                stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
+            let mut stream = TcpWatcher::connect(local_loop(), addr);
+            while stream.is_err() {
+                stream = TcpWatcher::connect(local_loop(), addr);
             }
+            stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
         }
     }
 
@@ -1149,17 +1103,13 @@ fn test_homing_closes_correctly() {
 
         do task::spawn_sched(task::SingleThreaded) {
             let chan = Cell::new(chan.take());
-            do run_uv_loop |l| {
-                let listener = UdpWatcher::bind(l, next_test_ip4()).unwrap();
-                chan.take().send(listener);
-            }
+            let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap();
+            chan.take().send(listener);
         }
 
         do task::spawn_sched(task::SingleThreaded) {
             let port = Cell::new(port.take());
-            do run_uv_loop |_l| {
-                port.take().recv();
-            }
+            port.take().recv();
         }
     }
 
@@ -1261,4 +1211,69 @@ unsafe fn local_io() -> &'static mut IoFactory {
         }
     }
 
+    #[should_fail]
+    #[test]
+    #[ignore(reason = "linked failure")]
+    fn linked_failure1() {
+        let (port, chan) = oneshot();
+        let chan = Cell::new(chan);
+        let addr = next_test_ip4();
+
+        do spawn {
+            let w = TcpListener::bind(local_loop(), addr).unwrap();
+            let mut w = w.listen().unwrap();
+            chan.take().send(());
+            w.accept();
+        }
+
+        port.recv();
+        fail!();
+    }
+
+    #[should_fail]
+    #[test]
+    #[ignore(reason = "linked failure")]
+    fn linked_failure2() {
+        let (port, chan) = oneshot();
+        let chan = Cell::new(chan);
+        let addr = next_test_ip4();
+
+        do spawn {
+            let w = TcpListener::bind(local_loop(), addr).unwrap();
+            let mut w = w.listen().unwrap();
+            chan.take().send(());
+            let mut buf = [0];
+            w.accept().unwrap().read(buf);
+        }
+
+        port.recv();
+        let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
+
+        fail!();
+    }
+
+    #[should_fail]
+    #[test]
+    #[ignore(reason = "linked failure")]
+    fn linked_failure3() {
+        let (port, chan) = stream();
+        let chan = Cell::new(chan);
+        let addr = next_test_ip4();
+
+        do spawn {
+            let chan = chan.take();
+            let w = TcpListener::bind(local_loop(), addr).unwrap();
+            let mut w = w.listen().unwrap();
+            chan.send(());
+            let mut conn = w.accept().unwrap();
+            chan.send(());
+            let buf = [0, ..65536];
+            conn.write(buf);
+        }
+
+        port.recv();
+        let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
+        port.recv();
+        fail!();
+    }
 }
index 1f28e043dfb4988881cfa37a24df6802a9b8b704..89a86a2ff7dce106a0a4f6486cb1cd046c0b40ce 100644 (file)
@@ -9,7 +9,6 @@
 // except according to those terms.
 
 use std::c_str::CString;
-use std::cast;
 use std::libc;
 use std::rt::BlockedTask;
 use std::rt::io::IoError;
 use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
 use std::rt::sched::{Scheduler, SchedHandle};
 use std::rt::tube::Tube;
+use std::task;
 
 use stream::StreamWatcher;
-use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error};
+use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error,
+            wait_until_woken_after};
 use uvio::HomingIO;
 use uvll;
 
@@ -32,7 +33,6 @@ pub struct PipeWatcher {
 pub struct PipeListener {
     home: SchedHandle,
     pipe: *uvll::uv_pipe_t,
-    priv closing_task: Option<BlockedTask>,
     priv outgoing: Tube<Result<~RtioPipe, IoError>>,
 }
 
@@ -74,36 +74,35 @@ pub fn open(loop_: &Loop, file: libc::c_int) -> Result<PipeWatcher, UvError>
     pub fn connect(loop_: &Loop, name: &CString) -> Result<PipeWatcher, UvError>
     {
         struct Ctx { task: Option<BlockedTask>, result: libc::c_int, }
-        let mut cx = Ctx { task: None, result: 0 };
-        let req = Request::new(uvll::UV_CONNECT);
-        let pipe = PipeWatcher::new(loop_, false);
-        unsafe {
-            uvll::set_data_for_req(req.handle, &cx as *Ctx);
-            uvll::uv_pipe_connect(req.handle,
-                                  pipe.handle(),
-                                  name.with_ref(|p| p),
-                                  connect_cb)
-        }
-        req.defuse();
+        return do task::unkillable {
+            let mut cx = Ctx { task: None, result: 0 };
+            let mut req = Request::new(uvll::UV_CONNECT);
+            let pipe = PipeWatcher::new(loop_, false);
+
+            do wait_until_woken_after(&mut cx.task) {
+                unsafe {
+                    uvll::uv_pipe_connect(req.handle,
+                                          pipe.handle(),
+                                          name.with_ref(|p| p),
+                                          connect_cb)
+                }
+                req.set_data(&cx);
+                req.defuse(); // uv callback now owns this request
+            }
+            match cx.result {
+                0 => Ok(pipe),
+                n => Err(UvError(n))
+            }
 
-        let sched: ~Scheduler = Local::take();
-        do sched.deschedule_running_task_and_then |_, task| {
-            cx.task = Some(task);
-        }
-        return match cx.result {
-            0 => Ok(pipe),
-            n => Err(UvError(n))
         };
 
-        extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {
-            let _req = Request::wrap(req);
-            if status == uvll::ECANCELED { return }
-            unsafe {
-                let cx: &mut Ctx = cast::transmute(uvll::get_data_for_req(req));
-                cx.result = status;
-                let sched: ~Scheduler = Local::take();
-                sched.resume_blocked_task_immediately(cx.task.take_unwrap());
-            }
+        extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {;
+            let req = Request::wrap(req);
+            assert!(status != uvll::ECANCELED);
+            let cx: &mut Ctx = unsafe { req.get_data() };
+            cx.result = status;
+            let sched: ~Scheduler = Local::take();
+            sched.resume_blocked_task_immediately(cx.task.take_unwrap());
         }
     }
 
@@ -133,11 +132,15 @@ impl HomingIO for PipeWatcher {
     fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home }
 }
 
+impl UvHandle<uvll::uv_pipe_t> for PipeWatcher {
+    fn uv_handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }
+}
+
 impl Drop for PipeWatcher {
     fn drop(&mut self) {
         if !self.defused {
             let _m = self.fire_homing_missile();
-            self.stream.close();
+            self.close();
         }
     }
 }
@@ -150,21 +153,24 @@ fn drop(&mut self) {
 
 impl PipeListener {
     pub fn bind(loop_: &Loop, name: &CString) -> Result<~PipeListener, UvError> {
-        let pipe = PipeWatcher::new(loop_, false);
-        match unsafe { uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p)) } {
-            0 => {
-                // If successful, unwrap the PipeWatcher because we control how
-                // we close the pipe differently. We can't rely on
-                // StreamWatcher's default close method.
-                let p = ~PipeListener {
-                    home: get_handle_to_current_scheduler!(),
-                    pipe: pipe.unwrap(),
-                    closing_task: None,
-                    outgoing: Tube::new(),
-                };
-                Ok(p.install())
+        do task::unkillable {
+            let pipe = PipeWatcher::new(loop_, false);
+            match unsafe {
+                uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p))
+            } {
+                0 => {
+                    // If successful, unwrap the PipeWatcher because we control how
+                    // we close the pipe differently. We can't rely on
+                    // StreamWatcher's default close method.
+                    let p = ~PipeListener {
+                        home: get_handle_to_current_scheduler!(),
+                        pipe: pipe.unwrap(),
+                        outgoing: Tube::new(),
+                    };
+                    Ok(p.install())
+                }
+                n => Err(UvError(n))
             }
-            n => Err(UvError(n))
         }
     }
 }
@@ -196,6 +202,7 @@ fn uv_handle(&self) -> *uvll::uv_pipe_t { self.pipe }
 }
 
 extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) {
+    assert!(status != uvll::ECANCELED);
     let msg = match status {
         0 => {
             let loop_ = Loop::wrap(unsafe {
@@ -205,7 +212,6 @@ fn uv_handle(&self) -> *uvll::uv_pipe_t { self.pipe }
             assert_eq!(unsafe { uvll::uv_accept(server, client.handle()) }, 0);
             Ok(~client as ~RtioPipe)
         }
-        uvll::ECANCELED => return,
         n => Err(uv_error_to_io_error(UvError(n)))
     };
 
@@ -215,23 +221,11 @@ fn uv_handle(&self) -> *uvll::uv_pipe_t { self.pipe }
 
 impl Drop for PipeListener {
     fn drop(&mut self) {
-        let (_m, sched) = self.fire_homing_missile_sched();
-
-        do sched.deschedule_running_task_and_then |_, task| {
-            self.closing_task = Some(task);
-            unsafe { uvll::uv_close(self.pipe, listener_close_cb) }
-        }
+        let _m = self.fire_homing_missile();
+        self.close();
     }
 }
 
-extern fn listener_close_cb(handle: *uvll::uv_handle_t) {
-    let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&handle) };
-    unsafe { uvll::free_handle(handle) }
-
-    let sched: ~Scheduler = Local::take();
-    sched.resume_blocked_task_immediately(pipe.closing_task.take_unwrap());
-}
-
 // PipeAcceptor implementation and traits
 
 impl RtioUnixAcceptor for PipeAcceptor {
index 15d5ae1c33ca5514918a4c83c62d6729bf7886e7..17a7510aa19b71e32befb5000b9abc6976691b6c 100644 (file)
@@ -19,7 +19,8 @@
 use std::rt::sched::{Scheduler, SchedHandle};
 use std::vec;
 
-use super::{Loop, UvHandle, UvError, uv_error_to_io_error};
+use super::{Loop, UvHandle, UvError, uv_error_to_io_error,
+            wait_until_woken_after};
 use uvio::HomingIO;
 use uvll;
 use pipe::PipeWatcher;
@@ -222,11 +223,7 @@ fn wait(&mut self) -> int {
                 // If there's no exit code previously listed, then the
                 // process's exit callback has yet to be invoked. We just
                 // need to deschedule ourselves and wait to be reawoken.
-                let scheduler: ~Scheduler = Local::take();
-                do scheduler.deschedule_running_task_and_then |_, task| {
-                    assert!(self.to_wake.is_none());
-                    self.to_wake = Some(task);
-                }
+                wait_until_woken_after(&mut self.to_wake, || {});
                 assert!(self.exit_status.is_some());
             }
         }
index 745cb5a6fa090aa7231b1515737573a6fbda7e77..b9ccacf4df707ff51178358000a50f04a7828757 100644 (file)
@@ -9,12 +9,14 @@
 // except according to those terms.
 
 use std::cast;
-use std::libc::{c_int, size_t, ssize_t, c_void};
+use std::libc::{c_int, size_t, ssize_t};
+use std::ptr;
 use std::rt::BlockedTask;
 use std::rt::local::Local;
 use std::rt::sched::Scheduler;
 
-use super::{UvError, Buf, slice_to_uv_buf, Request};
+use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after,
+            ForbidUnwind};
 use uvll;
 
 // This is a helper structure which is intended to get embedded into other
@@ -63,6 +65,10 @@ pub fn new(stream: *uvll::uv_stream_t) -> StreamWatcher {
     }
 
     pub fn read(&mut self, buf: &mut [u8]) -> Result<uint, UvError> {
+        // This read operation needs to get canceled on an unwind via libuv's
+        // uv_read_stop function
+        let _f = ForbidUnwind::new("stream read");
+
         // Send off the read request, but don't block until we're sure that the
         // read request is queued.
         match unsafe {
@@ -74,12 +80,10 @@ pub fn read(&mut self, buf: &mut [u8]) -> Result<uint, UvError> {
                     result: 0,
                     task: None,
                 };
-                unsafe {
-                    uvll::set_data_for_uv_handle(self.handle, &rcx)
-                }
-                let scheduler: ~Scheduler = Local::take();
-                do scheduler.deschedule_running_task_and_then |_sched, task| {
-                    rcx.task = Some(task);
+                do wait_until_woken_after(&mut rcx.task) {
+                    unsafe {
+                        uvll::set_data_for_uv_handle(self.handle, &rcx)
+                    }
                 }
                 match rcx.result {
                     n if n < 0 => Err(UvError(n as c_int)),
@@ -91,12 +95,17 @@ pub fn read(&mut self, buf: &mut [u8]) -> Result<uint, UvError> {
     }
 
     pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> {
+        // The ownership of the write request is dubious if this function
+        // unwinds. I believe that if the write_cb fails to re-schedule the task
+        // then the write request will be leaked.
+        let _f = ForbidUnwind::new("stream write");
+
         // Prepare the write request, either using a cached one or allocating a
         // new one
-        if self.last_write_req.is_none() {
-            self.last_write_req = Some(Request::new(uvll::UV_WRITE));
-        }
-        let req = self.last_write_req.get_ref();
+        let mut req = match self.last_write_req.take() {
+            Some(req) => req, None => Request::new(uvll::UV_WRITE),
+        };
+        req.set_data(ptr::null::<()>());
 
         // Send off the request, but be careful to not block until we're sure
         // that the write reqeust is queued. If the reqeust couldn't be queued,
@@ -107,11 +116,12 @@ pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> {
         } {
             0 => {
                 let mut wcx = WriteContext { result: 0, task: None, };
-                req.set_data(&wcx);
-                let scheduler: ~Scheduler = Local::take();
-                do scheduler.deschedule_running_task_and_then |_sched, task| {
-                    wcx.task = Some(task);
+                req.defuse(); // uv callback now owns this request
+
+                do wait_until_woken_after(&mut wcx.task) {
+                    req.set_data(&wcx);
                 }
+                self.last_write_req = Some(Request::wrap(req.handle));
                 match wcx.result {
                     0 => Ok(()),
                     n => Err(UvError(n)),
@@ -120,50 +130,24 @@ pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> {
             n => Err(UvError(n)),
         }
     }
-
-    // This will deallocate an internally used memory, along with closing the
-    // handle (and freeing it).
-    pub fn close(&mut self) {
-        let mut closing_task = None;
-        unsafe {
-            uvll::set_data_for_uv_handle(self.handle, &closing_task);
-        }
-
-        // Wait for this stream to close because it possibly represents a remote
-        // connection which may have consequences if we close asynchronously.
-        let sched: ~Scheduler = Local::take();
-        do sched.deschedule_running_task_and_then |_, task| {
-            closing_task = Some(task);
-            unsafe { uvll::uv_close(self.handle, close_cb) }
-        }
-
-        extern fn close_cb(handle: *uvll::uv_handle_t) {
-            let data: *c_void = unsafe { uvll::get_data_for_uv_handle(handle) };
-            unsafe { uvll::free_handle(handle) }
-
-            let closing_task: &mut Option<BlockedTask> = unsafe {
-                cast::transmute(data)
-            };
-            let task = closing_task.take_unwrap();
-            let scheduler: ~Scheduler = Local::take();
-            scheduler.resume_blocked_task_immediately(task);
-        }
-    }
 }
 
 // This allocation callback expects to be invoked once and only once. It will
 // unwrap the buffer in the ReadContext stored in the stream and return it. This
 // will fail if it is called more than once.
 extern fn alloc_cb(stream: *uvll::uv_stream_t, _hint: size_t) -> Buf {
+    uvdebug!("alloc_cb");
     let rcx: &mut ReadContext = unsafe {
         cast::transmute(uvll::get_data_for_uv_handle(stream))
     };
-    rcx.buf.take().expect("alloc_cb called more than once")
+    rcx.buf.take().expect("stream alloc_cb called more than once")
 }
 
 // When a stream has read some data, we will always forcibly stop reading and
 // return all the data read (even if it didn't fill the whole buffer).
 extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: Buf) {
+    uvdebug!("read_cb {}", nread);
+    assert!(nread != uvll::ECANCELED as ssize_t);
     let rcx: &mut ReadContext = unsafe {
         cast::transmute(uvll::get_data_for_uv_handle(handle))
     };
@@ -182,11 +166,11 @@ pub fn close(&mut self) {
 // reading, however, all this does is wake up the blocked task after squirreling
 // away the error code as a result.
 extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
-    if status == uvll::ECANCELED { return }
+    let mut req = Request::wrap(req);
+    assert!(status != uvll::ECANCELED);
     // Remember to not free the request because it is re-used between writes on
     // the same stream.
-    let req = Request::wrap(req);
-    let wcx: &mut WriteContext = unsafe { cast::transmute(req.get_data()) };
+    let wcx: &mut WriteContext = unsafe { req.get_data() };
     wcx.result = status;
     req.defuse();
 
index df35a4892e97876cff1a3d0999bb32e311b0f1b4..96cf024639f81b87f36f2bfdefab126219d0830e 100644 (file)
@@ -16,7 +16,7 @@
 use std::rt::sched::{Scheduler, SchedHandle};
 
 use uvll;
-use super::{Loop, UvHandle};
+use super::{Loop, UvHandle, ForbidUnwind};
 use uvio::HomingIO;
 
 pub struct TimerWatcher {
@@ -67,6 +67,11 @@ fn uv_handle(&self) -> *uvll::uv_timer_t { self.handle }
 impl RtioTimer for TimerWatcher {
     fn sleep(&mut self, msecs: u64) {
         let (_m, sched) = self.fire_homing_missile_sched();
+
+        // If the descheduling operation unwinds after the timer has been
+        // started, then we need to call stop on the timer.
+        let _f = ForbidUnwind::new("timer");
+
         do sched.deschedule_running_task_and_then |_sched, task| {
             self.action = Some(WakeTask(task));
             self.start(msecs, 0);
@@ -124,51 +129,43 @@ fn drop(&mut self) {
 mod test {
     use super::*;
     use std::rt::rtio::RtioTimer;
-    use super::super::run_uv_loop;
+    use super::super::local_loop;
 
     #[test]
     fn oneshot() {
-        do run_uv_loop |l| {
-            let mut timer = TimerWatcher::new(l);
-            let port = timer.oneshot(1);
-            port.recv();
-            let port = timer.oneshot(1);
-            port.recv();
-        }
+        let mut timer = TimerWatcher::new(local_loop());
+        let port = timer.oneshot(1);
+        port.recv();
+        let port = timer.oneshot(1);
+        port.recv();
     }
 
     #[test]
     fn override() {
-        do run_uv_loop |l| {
-            let mut timer = TimerWatcher::new(l);
-            let oport = timer.oneshot(1);
-            let pport = timer.period(1);
-            timer.sleep(1);
-            assert_eq!(oport.try_recv(), None);
-            assert_eq!(pport.try_recv(), None);
-            timer.oneshot(1).recv();
-        }
+        let mut timer = TimerWatcher::new(local_loop());
+        let oport = timer.oneshot(1);
+        let pport = timer.period(1);
+        timer.sleep(1);
+        assert_eq!(oport.try_recv(), None);
+        assert_eq!(pport.try_recv(), None);
+        timer.oneshot(1).recv();
     }
 
     #[test]
     fn period() {
-        do run_uv_loop |l| {
-            let mut timer = TimerWatcher::new(l);
-            let port = timer.period(1);
-            port.recv();
-            port.recv();
-            let port = timer.period(1);
-            port.recv();
-            port.recv();
-        }
+        let mut timer = TimerWatcher::new(local_loop());
+        let port = timer.period(1);
+        port.recv();
+        port.recv();
+        let port = timer.period(1);
+        port.recv();
+        port.recv();
     }
 
     #[test]
     fn sleep() {
-        do run_uv_loop |l| {
-            let mut timer = TimerWatcher::new(l);
-            timer.sleep(1);
-            timer.sleep(1);
-        }
+        let mut timer = TimerWatcher::new(local_loop());
+        timer.sleep(1);
+        timer.sleep(1);
     }
 }
index c072ab5156121cc7cd9ff716641ec12deb7a02e1..04e406ce987e9f86d4b4a6dab04489bb7374c9b6 100644 (file)
@@ -103,6 +103,6 @@ fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home }
 impl Drop for TtyWatcher {
     fn drop(&mut self) {
         let _m = self.fire_homing_missile();
-        self.stream.close();
+        self.close();
     }
 }
index 6ae2c174e18b45abadf7fd044ce40ec4d4876021..75ec5f26b336ce8ffc972d12260bfae950c932cd 100644 (file)
@@ -9,7 +9,7 @@
 // except according to those terms.
 
 use std::c_str::CString;
-use std::comm::{SharedChan, GenericChan};
+use std::comm::SharedChan;
 use std::libc::c_int;
 use std::libc;
 use std::path::Path;
@@ -26,7 +26,7 @@
 use std::rt::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write,
                   ReadWrite, FileStat};
 use std::rt::io::signal::Signum;
-use std::task;
+use std::util;
 use ai = std::rt::io::net::addrinfo;
 
 #[cfg(test)] use std::unstable::run_in_bare_thread;
@@ -44,6 +44,13 @@ pub trait HomingIO {
     fn go_to_IO_home(&mut self) -> uint {
         use std::rt::sched::RunOnce;
 
+        unsafe {
+            let task: *mut Task = Local::unsafe_borrow();
+            (*task).death.inhibit_kill((*task).unwinder.unwinding);
+        }
+
+        let _f = ForbidUnwind::new("going home");
+
         let current_sched_id = do Local::borrow |sched: &mut Scheduler| {
             sched.sched_id()
         };
@@ -51,22 +58,17 @@ fn go_to_IO_home(&mut self) -> uint {
         // Only need to invoke a context switch if we're not on the right
         // scheduler.
         if current_sched_id != self.home().sched_id {
-            do task::unkillable { // FIXME(#8674)
-                let scheduler: ~Scheduler = Local::take();
-                do scheduler.deschedule_running_task_and_then |_, task| {
-                    /* FIXME(#8674) if the task was already killed then wake
-                     * will return None. In that case, the home pointer will
-                     * never be set.
-                     *
-                     * RESOLUTION IDEA: Since the task is dead, we should
-                     * just abort the IO action.
-                     */
-                    do task.wake().map |task| {
-                        self.home().send(RunOnce(task));
-                    };
-                }
+            let scheduler: ~Scheduler = Local::take();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                do task.wake().map |task| {
+                    self.home().send(RunOnce(task));
+                };
             }
         }
+        let current_sched_id = do Local::borrow |sched: &mut Scheduler| {
+            sched.sched_id()
+        };
+        assert!(current_sched_id == self.home().sched_id);
 
         self.home().sched_id
     }
@@ -98,25 +100,38 @@ struct HomingMissile {
     priv io_home: uint,
 }
 
+impl HomingMissile {
+    pub fn check(&self, msg: &'static str) {
+        let local_id = Local::borrow(|sched: &mut Scheduler| sched.sched_id());
+        assert!(local_id == self.io_home, "{}", msg);
+    }
+}
+
 impl Drop for HomingMissile {
     fn drop(&mut self) {
+        let f = ForbidUnwind::new("leaving home");
+
         // It would truly be a sad day if we had moved off the home I/O
         // scheduler while we were doing I/O.
-        assert_eq!(Local::borrow(|sched: &mut Scheduler| sched.sched_id()),
-                   self.io_home);
+        self.check("task moved away from the home scheduler");
 
         // If we were a homed task, then we must send ourselves back to the
         // original scheduler. Otherwise, we can just return and keep running
         if !Task::on_appropriate_sched() {
-            do task::unkillable { // FIXME(#8674)
-                let scheduler: ~Scheduler = Local::take();
-                do scheduler.deschedule_running_task_and_then |_, task| {
-                    do task.wake().map |task| {
-                        Scheduler::run_task(task);
-                    };
-                }
+            let scheduler: ~Scheduler = Local::take();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                do task.wake().map |task| {
+                    Scheduler::run_task(task);
+                };
             }
         }
+
+        util::ignore(f);
+
+        unsafe {
+            let task: *mut Task = Local::unsafe_borrow();
+            (*task).death.allow_kill((*task).unwinder.unwinding);
+        }
     }
 }