]> git.lizzy.rs Git - rust.git/commitdiff
Update all uv tests to pass again
authorAlex Crichton <alex@alexcrichton.com>
Wed, 6 Nov 2013 19:03:11 +0000 (11:03 -0800)
committerAlex Crichton <alex@alexcrichton.com>
Sun, 10 Nov 2013 09:37:11 +0000 (01:37 -0800)
src/librustuv/addrinfo.rs
src/librustuv/async.rs
src/librustuv/file.rs
src/librustuv/lib.rs
src/librustuv/net.rs
src/librustuv/timer.rs
src/librustuv/uvio.rs

index 965e97893b6404303b4fbae7dc8340afef21fafb..d5bfd729eb56a57ca9e4e7750c3f9f2c09cdb147 100644 (file)
@@ -189,28 +189,27 @@ pub fn accum_addrinfo(addr: &Addrinfo) -> ~[ai::Info] {
 
 #[cfg(test)]
 mod test {
-    use Loop;
     use std::rt::io::net::ip::{SocketAddr, Ipv4Addr};
     use super::*;
+    use super::super::run_uv_loop;
 
     #[test]
     fn getaddrinfo_test() {
-        let mut loop_ = Loop::new();
-        let mut req = GetAddrInfoRequest::new();
-        do req.getaddrinfo(&loop_, Some("localhost"), None, None) |_, addrinfo, _| {
-            let sockaddrs = accum_addrinfo(addrinfo);
-            let mut found_local = false;
-            let local_addr = &SocketAddr {
-                ip: Ipv4Addr(127, 0, 0, 1),
-                port: 0
-            };
-            for addr in sockaddrs.iter() {
-                found_local = found_local || addr.address == *local_addr;
+        do run_uv_loop |l| {
+            match GetAddrInfoRequest::run(l, Some("localhost"), None, None) {
+                Ok(infos) => {
+                    let mut found_local = false;
+                    let local_addr = &SocketAddr {
+                        ip: Ipv4Addr(127, 0, 0, 1),
+                        port: 0
+                    };
+                    for addr in infos.iter() {
+                        found_local = found_local || addr.address == *local_addr;
+                    }
+                    assert!(found_local);
+                }
+                Err(e) => fail!("{:?}", e),
             }
-            assert!(found_local);
         }
-        loop_.run();
-        loop_.close();
-        req.delete();
     }
 }
index f4c7f633ee264503c672f6928ca948e614c2fd20..334e154a397f45fe3ce463f5e16cb6ceceb91d41 100644 (file)
@@ -126,62 +126,56 @@ fn drop(&mut self) {
 #[cfg(test)]
 mod test_remote {
     use std::cell::Cell;
-    use std::rt::test::*;
+    use std::rt::rtio::Callback;
     use std::rt::thread::Thread;
     use std::rt::tube::Tube;
-    use std::rt::rtio::EventLoop;
-    use std::rt::local::Local;
-    use std::rt::sched::Scheduler;
 
+    use super::*;
+    use super::super::run_uv_loop;
+
+    // Make sure that we can fire watchers in remote threads
     #[test]
     fn test_uv_remote() {
-        do run_in_mt_newsched_task {
-            let mut tube = Tube::new();
-            let tube_clone = tube.clone();
-            let remote_cell = Cell::new_empty();
-            do Local::borrow |sched: &mut Scheduler| {
-                let tube_clone = tube_clone.clone();
-                let tube_clone_cell = Cell::new(tube_clone);
-                let remote = do sched.event_loop.remote_callback {
-                    // This could be called multiple times
-                    if !tube_clone_cell.is_empty() {
-                        tube_clone_cell.take().send(1);
-                    }
-                };
-                remote_cell.put_back(remote);
+        struct MyCallback(Option<Tube<int>>);
+        impl Callback for MyCallback {
+            fn call(&mut self) {
+                // this can get called more than once, but we only want to send
+                // once
+                if self.is_some() {
+                    self.take_unwrap().send(1);
+                }
             }
+        }
+
+        do run_uv_loop |l| {
+            let mut tube = Tube::new();
+            let cb = ~MyCallback(Some(tube.clone()));
+            let watcher = Cell::new(AsyncWatcher::new(l, cb as ~Callback));
+
             let thread = do Thread::start {
-                remote_cell.take().fire();
+                watcher.take().fire();
             };
 
-            assert!(tube.recv() == 1);
+            assert_eq!(tube.recv(), 1);
             thread.join();
         }
     }
-}
-
-#[cfg(test)]
-mod test {
-
-    use super::*;
-    use Loop;
-    use std::unstable::run_in_bare_thread;
-    use std::rt::thread::Thread;
-    use std::cell::Cell;
 
     #[test]
     fn smoke_test() {
-        do run_in_bare_thread {
-            let mut loop_ = Loop::new();
-            let watcher = AsyncWatcher::new(&mut loop_, |w, _| w.close(||()) );
-            let watcher_cell = Cell::new(watcher);
-            let thread = do Thread::start {
-                let mut watcher = watcher_cell.take();
-                watcher.send();
-            };
-            loop_.run();
-            loop_.close();
-            thread.join();
+        static mut hits: uint = 0;
+
+        struct MyCallback;
+        impl Callback for MyCallback {
+            fn call(&mut self) {
+                unsafe { hits += 1; }
+            }
+        }
+
+        do run_uv_loop |l| {
+            let mut watcher = AsyncWatcher::new(l, ~MyCallback as ~Callback);
+            watcher.fire();
         }
+        assert!(unsafe { hits > 0 });
     }
 }
index 45f4125d7920214fac85a28a476f9a66fa6b22cb..3b4760e0ff4e1f241a1a82aa1ecc0455daf20b5f 100644 (file)
@@ -455,297 +455,136 @@ fn truncate(&mut self, offset: i64) -> Result<(), IoError> {
 
 #[cfg(test)]
 mod test {
-    use super::*;
-    //use std::rt::test::*;
-    use std::libc::{STDOUT_FILENO, c_int};
-    use std::vec;
-    use std::str;
-    use std::unstable::run_in_bare_thread;
-    use super::super::{Loop, Buf, slice_to_uv_buf};
+    use std::libc::c_int;
     use std::libc::{O_CREAT, O_RDWR, O_RDONLY, S_IWUSR, S_IRUSR};
-
-    #[test]
-    fn file_test_full_simple() {
-        do run_in_bare_thread {
-            let mut loop_ = Loop::new();
-            let create_flags = O_RDWR | O_CREAT;
-            let read_flags = O_RDONLY;
-            // 0644 BZZT! WRONG! 0600! See below.
-            let mode = S_IWUSR |S_IRUSR;
-                // these aren't defined in std::libc :(
-                //map_mode(S_IRGRP) |
-                //map_mode(S_IROTH);
-            let path_str = "./tmp/file_full_simple.txt";
-            let write_val = "hello".as_bytes().to_owned();
-            let write_buf  = slice_to_uv_buf(write_val);
-            let write_buf_ptr: *Buf = &write_buf;
-            let read_buf_len = 1028;
-            let read_mem = vec::from_elem(read_buf_len, 0u8);
-            let read_buf = slice_to_uv_buf(read_mem);
-            let read_buf_ptr: *Buf = &read_buf;
-            let open_req = FsRequest::new();
-            do open_req.open(&loop_, &path_str.to_c_str(), create_flags as int,
-                             mode as int) |req, uverr| {
-                assert!(uverr.is_none());
-                let fd = req.get_result();
-                let buf = unsafe { *write_buf_ptr };
-                let write_req = FsRequest::new();
-                do write_req.write(&req.get_loop(), fd, buf, -1) |req, uverr| {
-                    let close_req = FsRequest::new();
-                    do close_req.close(&req.get_loop(), fd) |req, _| {
-                        assert!(uverr.is_none());
-                        let loop_ = req.get_loop();
-                        let open_req = FsRequest::new();
-                        do open_req.open(&loop_, &path_str.to_c_str(),
-                                         read_flags as int,0) |req, uverr| {
-                            assert!(uverr.is_none());
-                            let loop_ = req.get_loop();
-                            let fd = req.get_result();
-                            let read_buf = unsafe { *read_buf_ptr };
-                            let read_req = FsRequest::new();
-                            do read_req.read(&loop_, fd, read_buf, 0) |req, uverr| {
-                                assert!(uverr.is_none());
-                                let loop_ = req.get_loop();
-                                // we know nread >=0 because uverr is none..
-                                let nread = req.get_result() as uint;
-                                // nread == 0 would be EOF
-                                if nread > 0 {
-                                    let read_str = unsafe {
-                                        let read_buf = *read_buf_ptr;
-                                        str::from_utf8(
-                                            vec::from_buf(
-                                                read_buf.base, nread))
-                                    };
-                                    assert!(read_str == ~"hello");
-                                    let close_req = FsRequest::new();
-                                    do close_req.close(&loop_, fd) |req,uverr| {
-                                        assert!(uverr.is_none());
-                                        let loop_ = &req.get_loop();
-                                        let unlink_req = FsRequest::new();
-                                        do unlink_req.unlink(loop_,
-                                                             &path_str.to_c_str())
-                                        |_,uverr| {
-                                            assert!(uverr.is_none());
-                                        };
-                                    };
-                                };
-                            };
-                        };
-                    };
-                };
-            };
-            loop_.run();
-            loop_.close();
-        }
-    }
+    use std::rt::io;
+    use std::str;
+    use std::vec;
+    use super::*;
+    use super::super::{run_uv_loop};
 
     #[test]
     fn file_test_full_simple_sync() {
-        do run_in_bare_thread {
-            // setup
-            let mut loop_ = Loop::new();
-            let create_flags = O_RDWR |
-                O_CREAT;
+        do run_uv_loop |l| {
+            let create_flags = O_RDWR | O_CREAT;
             let read_flags = O_RDONLY;
-            // 0644
-            let mode = S_IWUSR |
-                S_IRUSR;
-                //S_IRGRP |
-                //S_IROTH;
+            let mode = S_IWUSR | S_IRUSR;
             let path_str = "./tmp/file_full_simple_sync.txt";
-            let write_val = "hello".as_bytes().to_owned();
-            let write_buf = slice_to_uv_buf(write_val);
-            // open/create
-            let open_req = FsRequest::new();
-            let result = open_req.open_sync(&loop_, &path_str.to_c_str(),
-                                            create_flags as int, mode as int);
-            assert!(result.is_ok());
-            let fd = result.unwrap();
-            // write
-            let write_req = FsRequest::new();
-            let result = write_req.write_sync(&loop_, fd, write_buf, -1);
-            assert!(result.is_ok());
-            // close
-            let close_req = FsRequest::new();
-            let result = close_req.close_sync(&loop_, fd);
-            assert!(result.is_ok());
-            // re-open
-            let open_req = FsRequest::new();
-            let result = open_req.open_sync(&loop_, &path_str.to_c_str(),
-                                                   read_flags as int,0);
-            assert!(result.is_ok());
-            let len = 1028;
-            let fd = result.unwrap();
-            // read
-            let read_mem: ~[u8] = vec::from_elem(len, 0u8);
-            let buf = slice_to_uv_buf(read_mem);
-            let read_req = FsRequest::new();
-            let result = read_req.read_sync(&loop_, fd, buf, 0);
-            assert!(result.is_ok());
-            let nread = result.unwrap();
-            // nread == 0 would be EOF.. we know it's >= zero because otherwise
-            // the above assert would fail
-            if nread > 0 {
-                let read_str = str::from_utf8(
-                    read_mem.slice(0, nread as uint));
-                assert!(read_str == ~"hello");
+
+            {
+                // open/create
+                let result = FsRequest::open(l, &path_str.to_c_str(),
+                                             create_flags as int, mode as int);
+                assert!(result.is_ok());
+                let result = result.unwrap();
+                let fd = result.fd;
+
+                // write
+                let result = FsRequest::write(l, fd, "hello".as_bytes(), -1);
+                assert!(result.is_ok());
+
                 // close
-                let close_req = FsRequest::new();
-                let result = close_req.close_sync(&loop_, fd);
+                let result = FsRequest::close(l, fd, true);
                 assert!(result.is_ok());
+            }
+
+            {
+                // re-open
+                let result = FsRequest::open(l, &path_str.to_c_str(),
+                                             read_flags as int, 0);
+                assert!(result.is_ok());
+                let result = result.unwrap();
+                let fd = result.fd;
+
+                // read
+                let mut read_mem = vec::from_elem(1000, 0u8);
+                let result = FsRequest::read(l, fd, read_mem, 0);
+                assert!(result.is_ok());
+
+                let nread = result.unwrap();
+                assert!(nread > 0);
+                let read_str = str::from_utf8(read_mem.slice(0, nread as uint));
+                assert_eq!(read_str, ~"hello");
+
+                // close
+                let result = FsRequest::close(l, fd, true);
+                assert!(result.is_ok());
+
                 // unlink
-                let unlink_req = FsRequest::new();
-                let result = unlink_req.unlink_sync(&loop_, &path_str.to_c_str());
+                let result = FsRequest::unlink(l, &path_str.to_c_str());
                 assert!(result.is_ok());
-            } else { fail!("nread was 0.. wudn't expectin' that."); }
-            loop_.close();
+            }
         }
     }
 
-    fn naive_print(loop_: &Loop, input: &str) {
-        let write_val = input.as_bytes();
-        let write_buf = slice_to_uv_buf(write_val);
-        let write_req = FsRequest::new();
-        write_req.write_sync(loop_, STDOUT_FILENO, write_buf, -1);
-    }
-
     #[test]
-    fn file_test_write_to_stdout() {
-        do run_in_bare_thread {
-            let mut loop_ = Loop::new();
-            naive_print(&loop_, "zanzibar!\n");
-            loop_.run();
-            loop_.close();
-        };
-    }
-    #[test]
-    fn file_test_stat_simple() {
-        do run_in_bare_thread {
-            let mut loop_ = Loop::new();
-            let path = "./tmp/file_test_stat_simple.txt";
-            let create_flags = O_RDWR |
-                O_CREAT;
-            let mode = S_IWUSR |
-                S_IRUSR;
-            let write_val = "hello".as_bytes().to_owned();
-            let write_buf  = slice_to_uv_buf(write_val);
-            let write_buf_ptr: *Buf = &write_buf;
-            let open_req = FsRequest::new();
-            do open_req.open(&loop_, &path.to_c_str(), create_flags as int,
-                             mode as int) |req, uverr| {
-                assert!(uverr.is_none());
-                let fd = req.get_result();
-                let buf = unsafe { *write_buf_ptr };
-                let write_req = FsRequest::new();
-                do write_req.write(&req.get_loop(), fd, buf, 0) |req, uverr| {
-                    assert!(uverr.is_none());
-                    let loop_ = req.get_loop();
-                    let stat_req = FsRequest::new();
-                    do stat_req.stat(&loop_, &path.to_c_str()) |req, uverr| {
-                        assert!(uverr.is_none());
-                        let loop_ = req.get_loop();
-                        let stat = req.get_stat();
-                        let sz: uint = stat.st_size as uint;
-                        assert!(sz > 0);
-                        let close_req = FsRequest::new();
-                        do close_req.close(&loop_, fd) |req, uverr| {
-                            assert!(uverr.is_none());
-                            let loop_ = req.get_loop();
-                            let unlink_req = FsRequest::new();
-                            do unlink_req.unlink(&loop_,
-                                                 &path.to_c_str()) |req,uverr| {
-                                assert!(uverr.is_none());
-                                let loop_ = req.get_loop();
-                                let stat_req = FsRequest::new();
-                                do stat_req.stat(&loop_,
-                                                 &path.to_c_str()) |_, uverr| {
-                                    // should cause an error because the
-                                    // file doesn't exist anymore
-                                    assert!(uverr.is_some());
-                                };
-                            };
-                        };
-                    };
-                };
-            };
-            loop_.run();
-            loop_.close();
+    fn file_test_stat() {
+        do run_uv_loop |l| {
+            let path = &"./tmp/file_test_stat_simple".to_c_str();
+            let create_flags = (O_RDWR | O_CREAT) as int;
+            let mode = (S_IWUSR | S_IRUSR) as int;
+
+            let result = FsRequest::open(l, path, create_flags, mode);
+            assert!(result.is_ok());
+            let file = result.unwrap();
+
+            let result = FsRequest::write(l, file.fd, "hello".as_bytes(), 0);
+            assert!(result.is_ok());
+
+            let result = FsRequest::stat(l, path);
+            assert!(result.is_ok());
+            assert_eq!(result.unwrap().size, 5);
+
+            fn free<T>(_: T) {}
+            free(file);
+
+            let result = FsRequest::unlink(l, path);
+            assert!(result.is_ok());
         }
     }
 
     #[test]
     fn file_test_mk_rm_dir() {
-        do run_in_bare_thread {
-            let mut loop_ = Loop::new();
-            let path = "./tmp/mk_rm_dir";
-            let mode = S_IWUSR |
-                S_IRUSR;
-            let mkdir_req = FsRequest::new();
-            do mkdir_req.mkdir(&loop_, &path.to_c_str(),
-                               mode as c_int) |req,uverr| {
-                assert!(uverr.is_none());
-                let loop_ = req.get_loop();
-                let stat_req = FsRequest::new();
-                do stat_req.stat(&loop_, &path.to_c_str()) |req, uverr| {
-                    assert!(uverr.is_none());
-                    let loop_ = req.get_loop();
-                    let stat = req.get_stat();
-                    naive_print(&loop_, format!("{:?}", stat));
-                    assert!(stat.is_dir());
-                    let rmdir_req = FsRequest::new();
-                    do rmdir_req.rmdir(&loop_, &path.to_c_str()) |req,uverr| {
-                        assert!(uverr.is_none());
-                        let loop_ = req.get_loop();
-                        let stat_req = FsRequest::new();
-                        do stat_req.stat(&loop_, &path.to_c_str()) |_req, uverr| {
-                            assert!(uverr.is_some());
-                        }
-                    }
-                }
-            }
-            loop_.run();
-            loop_.close();
+        do run_uv_loop |l| {
+            let path = &"./tmp/mk_rm_dir".to_c_str();
+            let mode = S_IWUSR | S_IRUSR;
+
+            let result = FsRequest::mkdir(l, path, mode);
+            assert!(result.is_ok());
+
+            let result = FsRequest::stat(l, path);
+            assert!(result.is_ok());
+            assert!(result.unwrap().kind == io::TypeDirectory);
+
+            let result = FsRequest::rmdir(l, path);
+            assert!(result.is_ok());
+
+            let result = FsRequest::stat(l, path);
+            assert!(result.is_err());
         }
     }
+
     #[test]
     fn file_test_mkdir_chokes_on_double_create() {
-        do run_in_bare_thread {
-            let mut loop_ = Loop::new();
-            let path = "./tmp/double_create_dir";
-            let mode = S_IWUSR |
-                S_IRUSR;
-            let mkdir_req = FsRequest::new();
-            do mkdir_req.mkdir(&loop_, &path.to_c_str(), mode as c_int) |req,uverr| {
-                assert!(uverr.is_none());
-                let loop_ = req.get_loop();
-                let mkdir_req = FsRequest::new();
-                do mkdir_req.mkdir(&loop_, &path.to_c_str(),
-                                   mode as c_int) |req,uverr| {
-                    assert!(uverr.is_some());
-                    let loop_ = req.get_loop();
-                    let _stat = req.get_stat();
-                    let rmdir_req = FsRequest::new();
-                    do rmdir_req.rmdir(&loop_, &path.to_c_str()) |req,uverr| {
-                        assert!(uverr.is_none());
-                        let _loop = req.get_loop();
-                    }
-                }
-            }
-            loop_.run();
-            loop_.close();
+        do run_uv_loop |l| {
+            let path = &"./tmp/double_create_dir".to_c_str();
+            let mode = S_IWUSR | S_IRUSR;
+
+            let result = FsRequest::mkdir(l, path, mode as c_int);
+            assert!(result.is_ok());
+            let result = FsRequest::mkdir(l, path, mode as c_int);
+            assert!(result.is_err());
+            let result = FsRequest::rmdir(l, path);
+            assert!(result.is_ok());
         }
     }
+
     #[test]
     fn file_test_rmdir_chokes_on_nonexistant_path() {
-        do run_in_bare_thread {
-            let mut loop_ = Loop::new();
-            let path = "./tmp/never_existed_dir";
-            let rmdir_req = FsRequest::new();
-            do rmdir_req.rmdir(&loop_, &path.to_c_str()) |_req, uverr| {
-                assert!(uverr.is_some());
-            }
-            loop_.run();
-            loop_.close();
+        do run_uv_loop |l| {
+            let path = &"./tmp/never_existed_dir".to_c_str();
+            let result = FsRequest::rmdir(l, path);
+            assert!(result.is_err());
         }
     }
 }
index 1afc9b1d0ea6775b9be4fe9fc10746a338919a9b..5bedba08fb0eeab494ad3f7211c45735787ba8f4 100644 (file)
 use std::vec;
 use std::ptr;
 use std::str;
-use std::libc::{c_void, c_int, size_t, malloc, free};
+use std::libc::{c_void, c_int, malloc, free};
 use std::cast::transmute;
 use std::ptr::null;
 use std::unstable::finally::Finally;
 
 use std::rt::io::IoError;
 
-//#[cfg(test)] use unstable::run_in_bare_thread;
-
 pub use self::async::AsyncWatcher;
 pub use self::file::{FsRequest, FileWatcher};
 pub use self::idle::IdleWatcher;
@@ -302,62 +300,58 @@ pub fn slice_to_uv_buf(v: &[u8]) -> Buf {
     uvll::uv_buf_t { base: data, len: v.len() as uvll::uv_buf_len_t }
 }
 
-// XXX: Do these conversions without copying
-
-/// Transmute an owned vector to a Buf
-pub fn vec_to_uv_buf(v: ~[u8]) -> Buf {
-    #[fixed_stack_segment]; #[inline(never)];
-
-    unsafe {
-        let data = malloc(v.len() as size_t) as *u8;
-        assert!(data.is_not_null());
-        do v.as_imm_buf |b, l| {
-            let data = data as *mut u8;
-            ptr::copy_memory(data, b, l)
+fn run_uv_loop(f: proc(&mut Loop)) {
+    use std::rt::local::Local;
+    use std::rt::test::run_in_uv_task;
+    use std::rt::sched::Scheduler;
+    use std::cell::Cell;
+
+    let f = Cell::new(f);
+    do run_in_uv_task {
+        let mut io = None;
+        do Local::borrow |sched: &mut Scheduler| {
+            sched.event_loop.io(|i| unsafe {
+                let (_vtable, uvio): (uint, &'static mut uvio::UvIoFactory) =
+                    cast::transmute(i);
+                io = Some(uvio);
+            });
         }
-        uvll::uv_buf_t { base: data, len: v.len() as uvll::uv_buf_len_t }
+        f.take()(io.unwrap().uv_loop());
     }
 }
 
-/// Transmute a Buf that was once a ~[u8] back to ~[u8]
-pub fn vec_from_uv_buf(buf: Buf) -> Option<~[u8]> {
-    #[fixed_stack_segment]; #[inline(never)];
+#[cfg(test)]
+mod test {
+    use std::cast::transmute;
+    use std::ptr;
+    use std::unstable::run_in_bare_thread;
 
-    if !(buf.len == 0 && buf.base.is_null()) {
-        let v = unsafe { vec::from_buf(buf.base, buf.len as uint) };
-        unsafe { free(buf.base as *c_void) };
-        return Some(v);
-    } else {
-        // No buffer
-        uvdebug!("No buffer!");
-        return None;
-    }
-}
-/*
-#[test]
-fn test_slice_to_uv_buf() {
-    let slice = [0, .. 20];
-    let buf = slice_to_uv_buf(slice);
+    use super::{slice_to_uv_buf, Loop};
 
-    assert!(buf.len == 20);
+    #[test]
+    fn test_slice_to_uv_buf() {
+        let slice = [0, .. 20];
+        let buf = slice_to_uv_buf(slice);
 
-    unsafe {
-        let base = transmute::<*u8, *mut u8>(buf.base);
-        (*base) = 1;
-        (*ptr::mut_offset(base, 1)) = 2;
-    }
+        assert_eq!(buf.len, 20);
 
-    assert!(slice[0] == 1);
-    assert!(slice[1] == 2);
-}
+        unsafe {
+            let base = transmute::<*u8, *mut u8>(buf.base);
+            (*base) = 1;
+            (*ptr::mut_offset(base, 1)) = 2;
+        }
 
+        assert!(slice[0] == 1);
+        assert!(slice[1] == 2);
+    }
 
-#[test]
-fn loop_smoke_test() {
-    do run_in_bare_thread {
-        let mut loop_ = Loop::new();
-        loop_.run();
-        loop_.close();
+
+    #[test]
+    fn loop_smoke_test() {
+        do run_in_bare_thread {
+            let mut loop_ = Loop::new();
+            loop_.run();
+            loop_.close();
+        }
     }
 }
-*/
index 28c2c4df12a0ef9260f37fea1b3fe62ff27e6649..9fd771b973950573957964af38dde1ce49294887 100644 (file)
@@ -705,350 +705,559 @@ fn drop(&mut self) {
 
 #[cfg(test)]
 mod test {
-    use super::*;
-    use std::util::ignore;
     use std::cell::Cell;
-    use std::vec;
-    use std::unstable::run_in_bare_thread;
-    use std::rt::thread::Thread;
+    use std::comm::oneshot;
     use std::rt::test::*;
-    use super::super::{Loop, AllocCallback};
-    use super::super::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
+    use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
+                        RtioUdpSocket};
+    use std::task;
+
+    use super::*;
+    use super::super::{Loop, run_uv_loop};
 
     #[test]
     fn connect_close_ip4() {
-        do run_in_bare_thread() {
-            let mut loop_ = Loop::new();
-            let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
-            // Connect to a port where nobody is listening
-            let addr = next_test_ip4();
-            do tcp_watcher.connect(addr) |stream_watcher, status| {
-                uvdebug!("tcp_watcher.connect!");
-                assert!(status.is_some());
-                assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
-                stream_watcher.close(||());
+        do run_uv_loop |l| {
+            match TcpWatcher::connect(l, next_test_ip4()) {
+                Ok(*) => fail!(),
+                Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
             }
-            loop_.run();
-            loop_.close();
         }
     }
 
     #[test]
     fn connect_close_ip6() {
-        do run_in_bare_thread() {
-            let mut loop_ = Loop::new();
-            let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
-            // Connect to a port where nobody is listening
-            let addr = next_test_ip6();
-            do tcp_watcher.connect(addr) |stream_watcher, status| {
-                uvdebug!("tcp_watcher.connect!");
-                assert!(status.is_some());
-                assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
-                stream_watcher.close(||());
+        do run_uv_loop |l| {
+            match TcpWatcher::connect(l, next_test_ip6()) {
+                Ok(*) => fail!(),
+                Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
             }
-            loop_.run();
-            loop_.close();
         }
     }
 
     #[test]
     fn udp_bind_close_ip4() {
-        do run_in_bare_thread() {
-            let mut loop_ = Loop::new();
-            let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
-            let addr = next_test_ip4();
-            udp_watcher.bind(addr);
-            udp_watcher.close(||());
-            loop_.run();
-            loop_.close();
+        do run_uv_loop |l| {
+            match UdpWatcher::bind(l, next_test_ip4()) {
+                Ok(*) => {}
+                Err(*) => fail!()
+            }
         }
     }
 
     #[test]
     fn udp_bind_close_ip6() {
-        do run_in_bare_thread() {
-            let mut loop_ = Loop::new();
-            let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
-            let addr = next_test_ip6();
-            udp_watcher.bind(addr);
-            udp_watcher.close(||());
-            loop_.run();
-            loop_.close();
+        do run_uv_loop |l| {
+            match UdpWatcher::bind(l, next_test_ip6()) {
+                Ok(*) => {}
+                Err(*) => fail!()
+            }
         }
     }
 
     #[test]
     fn listen_ip4() {
-        do run_in_bare_thread() {
-            static MAX: int = 10;
-            let mut loop_ = Loop::new();
-            let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
+        do run_uv_loop |l| {
+            let (port, chan) = oneshot();
+            let chan = Cell::new(chan);
             let addr = next_test_ip4();
-            server_tcp_watcher.bind(addr);
-            let loop_ = loop_;
-            uvdebug!("listening");
-            let mut stream = server_tcp_watcher.as_stream();
-            let res = do stream.listen |mut server_stream_watcher, status| {
-                uvdebug!("listened!");
-                assert!(status.is_none());
-                let mut loop_ = loop_;
-                let client_tcp_watcher = TcpWatcher::new(&mut loop_);
-                let mut client_tcp_watcher = client_tcp_watcher.as_stream();
-                server_stream_watcher.accept(client_tcp_watcher);
-                let count_cell = Cell::new(0);
-                let server_stream_watcher = server_stream_watcher;
-                uvdebug!("starting read");
-                let alloc: AllocCallback = |size| {
-                    vec_to_uv_buf(vec::from_elem(size, 0u8))
+
+            let handle = l.handle;
+            do spawn {
+                let w = match TcpListener::bind(&mut Loop::wrap(handle), addr) {
+                    Ok(w) => w, Err(e) => fail!("{:?}", e)
+                };
+                let mut w = match w.listen() {
+                    Ok(w) => w, Err(e) => fail!("{:?}", e),
                 };
-                do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| {
-
-                    uvdebug!("i'm reading!");
-                    let buf = vec_from_uv_buf(buf);
-                    let mut count = count_cell.take();
-                    if status.is_none() {
-                        uvdebug!("got {} bytes", nread);
-                        let buf = buf.unwrap();
-                        for byte in buf.slice(0, nread as uint).iter() {
-                            assert!(*byte == count as u8);
-                            uvdebug!("{}", *byte as uint);
-                            count += 1;
+                chan.take().send(());
+                match w.accept() {
+                    Ok(mut stream) => {
+                        let mut buf = [0u8, ..10];
+                        match stream.read(buf) {
+                            Ok(10) => {} e => fail!("{:?}", e),
                         }
-                    } else {
-                        assert_eq!(count, MAX);
-                        do stream_watcher.close {
-                            server_stream_watcher.close(||());
+                        for i in range(0, 10u8) {
+                            assert_eq!(buf[i], i + 1);
                         }
                     }
-                    count_cell.put_back(count);
+                    Err(e) => fail!("{:?}", e)
                 }
-            };
+            }
 
-            assert!(res.is_ok());
-
-            let client_thread = do Thread::start {
-                uvdebug!("starting client thread");
-                let mut loop_ = Loop::new();
-                let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
-                do tcp_watcher.connect(addr) |mut stream_watcher, status| {
-                    uvdebug!("connecting");
-                    assert!(status.is_none());
-                    let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
-                    let buf = slice_to_uv_buf(msg);
-                    let msg_cell = Cell::new(msg);
-                    do stream_watcher.write(buf) |stream_watcher, status| {
-                        uvdebug!("writing");
-                        assert!(status.is_none());
-                        let msg_cell = Cell::new(msg_cell.take());
-                        stream_watcher.close(||ignore(msg_cell.take()));
-                    }
-                }
-                loop_.run();
-                loop_.close();
+            port.recv();
+            let mut w = match TcpWatcher::connect(&mut Loop::wrap(handle), addr) {
+                Ok(w) => w, Err(e) => fail!("{:?}", e)
             };
-
-            let mut loop_ = loop_;
-            loop_.run();
-            loop_.close();
-            client_thread.join();
-        };
+            match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
+                Ok(()) => {}, Err(e) => fail!("{:?}", e)
+            }
+        }
     }
 
     #[test]
     fn listen_ip6() {
-        do run_in_bare_thread() {
-            static MAX: int = 10;
-            let mut loop_ = Loop::new();
-            let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
+        do run_uv_loop |l| {
+            let (port, chan) = oneshot();
+            let chan = Cell::new(chan);
             let addr = next_test_ip6();
-            server_tcp_watcher.bind(addr);
-            let loop_ = loop_;
-            uvdebug!("listening");
-            let mut stream = server_tcp_watcher.as_stream();
-            let res = do stream.listen |mut server_stream_watcher, status| {
-                uvdebug!("listened!");
-                assert!(status.is_none());
-                let mut loop_ = loop_;
-                let client_tcp_watcher = TcpWatcher::new(&mut loop_);
-                let mut client_tcp_watcher = client_tcp_watcher.as_stream();
-                server_stream_watcher.accept(client_tcp_watcher);
-                let count_cell = Cell::new(0);
-                let server_stream_watcher = server_stream_watcher;
-                uvdebug!("starting read");
-                let alloc: AllocCallback = |size| {
-                    vec_to_uv_buf(vec::from_elem(size, 0u8))
+
+            let handle = l.handle;
+            do spawn {
+                let w = match TcpListener::bind(&mut Loop::wrap(handle), addr) {
+                    Ok(w) => w, Err(e) => fail!("{:?}", e)
+                };
+                let mut w = match w.listen() {
+                    Ok(w) => w, Err(e) => fail!("{:?}", e),
                 };
-                do client_tcp_watcher.read_start(alloc)
-                    |stream_watcher, nread, buf, status| {
-
-                    uvdebug!("i'm reading!");
-                    let buf = vec_from_uv_buf(buf);
-                    let mut count = count_cell.take();
-                    if status.is_none() {
-                        uvdebug!("got {} bytes", nread);
-                        let buf = buf.unwrap();
-                        let r = buf.slice(0, nread as uint);
-                        for byte in r.iter() {
-                            assert!(*byte == count as u8);
-                            uvdebug!("{}", *byte as uint);
-                            count += 1;
+                chan.take().send(());
+                match w.accept() {
+                    Ok(mut stream) => {
+                        let mut buf = [0u8, ..10];
+                        match stream.read(buf) {
+                            Ok(10) => {} e => fail!("{:?}", e),
                         }
-                    } else {
-                        assert_eq!(count, MAX);
-                        do stream_watcher.close {
-                            server_stream_watcher.close(||());
+                        for i in range(0, 10u8) {
+                            assert_eq!(buf[i], i + 1);
                         }
                     }
-                    count_cell.put_back(count);
+                    Err(e) => fail!("{:?}", e)
                 }
+            }
+
+            port.recv();
+            let mut w = match TcpWatcher::connect(&mut Loop::wrap(handle), addr) {
+                Ok(w) => w, Err(e) => fail!("{:?}", e)
             };
-            assert!(res.is_ok());
-
-            let client_thread = do Thread::start {
-                uvdebug!("starting client thread");
-                let mut loop_ = Loop::new();
-                let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
-                do tcp_watcher.connect(addr) |mut stream_watcher, status| {
-                    uvdebug!("connecting");
-                    assert!(status.is_none());
-                    let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
-                    let buf = slice_to_uv_buf(msg);
-                    let msg_cell = Cell::new(msg);
-                    do stream_watcher.write(buf) |stream_watcher, status| {
-                        uvdebug!("writing");
-                        assert!(status.is_none());
-                        let msg_cell = Cell::new(msg_cell.take());
-                        stream_watcher.close(||ignore(msg_cell.take()));
+            match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
+                Ok(()) => {}, Err(e) => fail!("{:?}", e)
+            }
+        }
+    }
+
+    #[test]
+    fn udp_recv_ip4() {
+        do run_uv_loop |l| {
+            let (port, chan) = oneshot();
+            let chan = Cell::new(chan);
+            let client = next_test_ip4();
+            let server = next_test_ip4();
+
+            let handle = l.handle;
+            do spawn {
+                match UdpWatcher::bind(&mut Loop::wrap(handle), server) {
+                    Ok(mut w) => {
+                        chan.take().send(());
+                        let mut buf = [0u8, ..10];
+                        match w.recvfrom(buf) {
+                            Ok((10, addr)) => assert_eq!(addr, client),
+                            e => fail!("{:?}", e),
+                        }
+                        for i in range(0, 10u8) {
+                            assert_eq!(buf[i], i + 1);
+                        }
                     }
+                    Err(e) => fail!("{:?}", e)
                 }
-                loop_.run();
-                loop_.close();
+            }
+
+            port.recv();
+            let mut w = match UdpWatcher::bind(&mut Loop::wrap(handle), client) {
+                Ok(w) => w, Err(e) => fail!("{:?}", e)
             };
+            match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
+                Ok(()) => {}, Err(e) => fail!("{:?}", e)
+            }
+        }
+    }
+
+    #[test]
+    fn udp_recv_ip6() {
+        do run_uv_loop |l| {
+            let (port, chan) = oneshot();
+            let chan = Cell::new(chan);
+            let client = next_test_ip6();
+            let server = next_test_ip6();
+
+            let handle = l.handle;
+            do spawn {
+                match UdpWatcher::bind(&mut Loop::wrap(handle), server) {
+                    Ok(mut w) => {
+                        chan.take().send(());
+                        let mut buf = [0u8, ..10];
+                        match w.recvfrom(buf) {
+                            Ok((10, addr)) => assert_eq!(addr, client),
+                            e => fail!("{:?}", e),
+                        }
+                        for i in range(0, 10u8) {
+                            assert_eq!(buf[i], i + 1);
+                        }
+                    }
+                    Err(e) => fail!("{:?}", e)
+                }
+            }
 
-            let mut loop_ = loop_;
-            loop_.run();
-            loop_.close();
-            client_thread.join();
+            port.recv();
+            let mut w = match UdpWatcher::bind(&mut Loop::wrap(handle), client) {
+                Ok(w) => w, Err(e) => fail!("{:?}", e)
+            };
+            match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
+                Ok(()) => {}, Err(e) => fail!("{:?}", e)
+            }
         }
     }
 
     #[test]
-    fn udp_recv_ip4() {
-        do run_in_bare_thread() {
-            static MAX: int = 10;
-            let mut loop_ = Loop::new();
+    fn test_read_read_read() {
+        do run_uv_loop |l| {
+            let addr = next_test_ip4();
+            static MAX: uint = 500000;
+            let (port, chan) = oneshot();
+            let port = Cell::new(port);
+            let chan = Cell::new(chan);
+
+            let handle = l.handle;
+            do spawntask {
+                let l = &mut Loop::wrap(handle);
+                let listener = TcpListener::bind(l, addr).unwrap();
+                let mut acceptor = listener.listen().unwrap();
+                chan.take().send(());
+                let mut stream = acceptor.accept().unwrap();
+                let buf = [1, .. 2048];
+                let mut total_bytes_written = 0;
+                while total_bytes_written < MAX {
+                    stream.write(buf);
+                    total_bytes_written += buf.len();
+                }
+            }
+
+            do spawntask {
+                let l = &mut Loop::wrap(handle);
+                port.take().recv();
+                let mut stream = TcpWatcher::connect(l, addr).unwrap();
+                let mut buf = [0, .. 2048];
+                let mut total_bytes_read = 0;
+                while total_bytes_read < MAX {
+                    let nread = stream.read(buf).unwrap();
+                    uvdebug!("read {} bytes", nread);
+                    total_bytes_read += nread;
+                    for i in range(0u, nread) {
+                        assert_eq!(buf[i], 1);
+                    }
+                }
+                uvdebug!("read {} bytes total", total_bytes_read);
+            }
+        }
+    }
+
+    #[test]
+    #[ignore(cfg(windows))] // FIXME(#10102) the server never sees the second send
+    fn test_udp_twice() {
+        do run_uv_loop |l| {
             let server_addr = next_test_ip4();
             let client_addr = next_test_ip4();
+            let (port, chan) = oneshot();
+            let port = Cell::new(port);
+            let chan = Cell::new(chan);
+
+            let handle = l.handle;
+            do spawntask {
+                let l = &mut Loop::wrap(handle);
+                let mut client = UdpWatcher::bind(l, client_addr).unwrap();
+                port.take().recv();
+                assert!(client.sendto([1], server_addr).is_ok());
+                assert!(client.sendto([2], server_addr).is_ok());
+            }
 
-            let mut server = UdpWatcher::new(&loop_);
-            assert!(server.bind(server_addr).is_ok());
+            do spawntask {
+                let l = &mut Loop::wrap(handle);
+                let mut server = UdpWatcher::bind(l, server_addr).unwrap();
+                chan.take().send(());
+                let mut buf1 = [0];
+                let mut buf2 = [0];
+                let (nread1, src1) = server.recvfrom(buf1).unwrap();
+                let (nread2, src2) = server.recvfrom(buf2).unwrap();
+                assert_eq!(nread1, 1);
+                assert_eq!(nread2, 1);
+                assert_eq!(src1, client_addr);
+                assert_eq!(src2, client_addr);
+                assert_eq!(buf1[0], 1);
+                assert_eq!(buf2[0], 2);
+            }
+        }
+    }
 
-            uvdebug!("starting read");
-            let alloc: AllocCallback = |size| {
-                vec_to_uv_buf(vec::from_elem(size, 0u8))
-            };
+    #[test]
+    fn test_udp_many_read() {
+        do run_uv_loop |l| {
+            let server_out_addr = next_test_ip4();
+            let server_in_addr = next_test_ip4();
+            let client_out_addr = next_test_ip4();
+            let client_in_addr = next_test_ip4();
+            static MAX: uint = 500_000;
+
+            let (p1, c1) = oneshot();
+            let (p2, c2) = oneshot();
+
+            let first = Cell::new((p1, c2));
+            let second = Cell::new((p2, c1));
+
+            let handle = l.handle;
+            do spawntask {
+                let l = &mut Loop::wrap(handle);
+                let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
+                let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
+                let (port, chan) = first.take();
+                chan.send(());
+                port.recv();
+                let msg = [1, .. 2048];
+                let mut total_bytes_sent = 0;
+                let mut buf = [1];
+                while buf[0] == 1 {
+                    // send more data
+                    assert!(server_out.sendto(msg, client_in_addr).is_ok());
+                    total_bytes_sent += msg.len();
+                    // check if the client has received enough
+                    let res = server_in.recvfrom(buf);
+                    assert!(res.is_ok());
+                    let (nread, src) = res.unwrap();
+                    assert_eq!(nread, 1);
+                    assert_eq!(src, client_out_addr);
+                }
+                assert!(total_bytes_sent >= MAX);
+            }
 
-            do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
-                server.recv_stop();
-                uvdebug!("i'm reading!");
-                assert!(status.is_none());
-                assert_eq!(flags, 0);
-                assert_eq!(src, client_addr);
-
-                let buf = vec_from_uv_buf(buf);
-                let mut count = 0;
-                uvdebug!("got {} bytes", nread);
-
-                let buf = buf.unwrap();
-                for &byte in buf.slice(0, nread as uint).iter() {
-                    assert!(byte == count as u8);
-                    uvdebug!("{}", byte as uint);
-                    count += 1;
+            do spawntask {
+                let l = &mut Loop::wrap(handle);
+                let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
+                let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
+                let (port, chan) = second.take();
+                port.recv();
+                chan.send(());
+                let mut total_bytes_recv = 0;
+                let mut buf = [0, .. 2048];
+                while total_bytes_recv < MAX {
+                    // ask for more
+                    assert!(client_out.sendto([1], server_in_addr).is_ok());
+                    // wait for data
+                    let res = client_in.recvfrom(buf);
+                    assert!(res.is_ok());
+                    let (nread, src) = res.unwrap();
+                    assert_eq!(src, server_out_addr);
+                    total_bytes_recv += nread;
+                    for i in range(0u, nread) {
+                        assert_eq!(buf[i], 1);
+                    }
                 }
-                assert_eq!(count, MAX);
+                // tell the server we're done
+                assert!(client_out.sendto([0], server_in_addr).is_ok());
+            }
+        }
+    }
+
+    #[test]
+    fn test_read_and_block() {
+        do run_uv_loop |l| {
+            let addr = next_test_ip4();
+            let (port, chan) = oneshot();
+            let port = Cell::new(port);
+            let chan = Cell::new(chan);
+
+            let handle = l.handle;
+            do spawntask {
+                let l = &mut Loop::wrap(handle);
+                let listener = TcpListener::bind(l, addr).unwrap();
+                let mut acceptor = listener.listen().unwrap();
+                let (port2, chan2) = stream();
+                chan.take().send(port2);
+                let mut stream = acceptor.accept().unwrap();
+                let mut buf = [0, .. 2048];
+
+                let expected = 32;
+                let mut current = 0;
+                let mut reads = 0;
+
+                while current < expected {
+                    let nread = stream.read(buf).unwrap();
+                    for i in range(0u, nread) {
+                        let val = buf[i] as uint;
+                        assert_eq!(val, current % 8);
+                        current += 1;
+                    }
+                    reads += 1;
+
+                    chan2.send(());
+                }
+
+                // Make sure we had multiple reads
+                assert!(reads > 1);
+            }
+
+            do spawntask {
+                let l = &mut Loop::wrap(handle);
+                let port2 = port.take().recv();
+                let mut stream = TcpWatcher::connect(l, addr).unwrap();
+                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+                port2.recv();
+                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+                port2.recv();
+            }
+        }
+    }
 
-                server.close(||{});
+    #[test]
+    fn test_simple_tcp_server_and_client_on_diff_threads() {
+        let addr = next_test_ip4();
+
+        do task::spawn_sched(task::SingleThreaded) {
+            do run_uv_loop |l| {
+                let listener = TcpListener::bind(l, addr).unwrap();
+                let mut acceptor = listener.listen().unwrap();
+                let mut stream = acceptor.accept().unwrap();
+                let mut buf = [0, .. 2048];
+                let nread = stream.read(buf).unwrap();
+                assert_eq!(nread, 8);
+                for i in range(0u, nread) {
+                    assert_eq!(buf[i], i as u8);
+                }
             }
+        }
 
-            let thread = do Thread::start {
-                let mut loop_ = Loop::new();
-                let mut client = UdpWatcher::new(&loop_);
-                assert!(client.bind(client_addr).is_ok());
-                let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
-                let buf = slice_to_uv_buf(msg);
-                do client.send(buf, server_addr) |client, status| {
-                    uvdebug!("writing");
-                    assert!(status.is_none());
-                    client.close(||{});
+        do task::spawn_sched(task::SingleThreaded) {
+            do run_uv_loop |l| {
+                let mut stream = TcpWatcher::connect(l, addr);
+                while stream.is_err() {
+                    stream = TcpWatcher::connect(l, addr);
                 }
+                stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
+            }
+        }
+    }
 
-                loop_.run();
-                loop_.close();
-            };
+    // On one thread, create a udp socket. Then send that socket to another
+    // thread and destroy the socket on the remote thread. This should make sure
+    // that homing kicks in for the socket to go back home to the original
+    // thread, close itself, and then come back to the last thread.
+    #[test]
+    fn test_homing_closes_correctly() {
+        let (port, chan) = oneshot();
+        let port = Cell::new(port);
+        let chan = Cell::new(chan);
+
+        do task::spawn_sched(task::SingleThreaded) {
+            let chan = Cell::new(chan.take());
+            do run_uv_loop |l| {
+                let listener = UdpWatcher::bind(l, next_test_ip4()).unwrap();
+                chan.take().send(listener);
+            }
+        }
 
-            loop_.run();
-            loop_.close();
-            thread.join();
+        do task::spawn_sched(task::SingleThreaded) {
+            let port = Cell::new(port.take());
+            do run_uv_loop |_l| {
+                port.take().recv();
+            }
         }
     }
 
+    // This is a bit of a crufty old test, but it has its uses.
     #[test]
-    fn udp_recv_ip6() {
-        do run_in_bare_thread() {
-            static MAX: int = 10;
-            let mut loop_ = Loop::new();
-            let server_addr = next_test_ip6();
-            let client_addr = next_test_ip6();
-
-            let mut server = UdpWatcher::new(&loop_);
-            assert!(server.bind(server_addr).is_ok());
-
-            uvdebug!("starting read");
-            let alloc: AllocCallback = |size| {
-                vec_to_uv_buf(vec::from_elem(size, 0u8))
+    fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
+        use std::cast;
+        use std::rt::local::Local;
+        use std::rt::rtio::{EventLoop, IoFactory};
+        use std::rt::sched::Scheduler;
+        use std::rt::sched::{Shutdown, TaskFromFriend};
+        use std::rt::sleeper_list::SleeperList;
+        use std::rt::task::Task;
+        use std::rt::task::UnwindResult;
+        use std::rt::thread::Thread;
+        use std::rt::work_queue::WorkQueue;
+        use std::unstable::run_in_bare_thread;
+        use uvio::UvEventLoop;
+
+        do run_in_bare_thread {
+            let sleepers = SleeperList::new();
+            let work_queue1 = WorkQueue::new();
+            let work_queue2 = WorkQueue::new();
+            let queues = ~[work_queue1.clone(), work_queue2.clone()];
+
+            let loop1 = ~UvEventLoop::new() as ~EventLoop;
+            let mut sched1 = ~Scheduler::new(loop1, work_queue1, queues.clone(),
+                                             sleepers.clone());
+            let loop2 = ~UvEventLoop::new() as ~EventLoop;
+            let mut sched2 = ~Scheduler::new(loop2, work_queue2, queues.clone(),
+                                             sleepers.clone());
+
+            let handle1 = Cell::new(sched1.make_handle());
+            let handle2 = Cell::new(sched2.make_handle());
+            let tasksFriendHandle = Cell::new(sched2.make_handle());
+
+            let on_exit: ~fn(UnwindResult) = |exit_status| {
+                handle1.take().send(Shutdown);
+                handle2.take().send(Shutdown);
+                assert!(exit_status.is_success());
             };
 
-            do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
-                server.recv_stop();
-                uvdebug!("i'm reading!");
-                assert!(status.is_none());
-                assert_eq!(flags, 0);
-                assert_eq!(src, client_addr);
-
-                let buf = vec_from_uv_buf(buf);
-                let mut count = 0;
-                uvdebug!("got {} bytes", nread);
-
-                let buf = buf.unwrap();
-                for &byte in buf.slice(0, nread as uint).iter() {
-                    assert!(byte == count as u8);
-                    uvdebug!("{}", byte as uint);
-                    count += 1;
+            unsafe fn local_io() -> &'static mut IoFactory {
+                do Local::borrow |sched: &mut Scheduler| {
+                    let mut io = None;
+                    sched.event_loop.io(|i| io = Some(i));
+                    cast::transmute(io.unwrap())
                 }
-                assert_eq!(count, MAX);
-
-                server.close(||{});
             }
 
-            let thread = do Thread::start {
-                let mut loop_ = Loop::new();
-                let mut client = UdpWatcher::new(&loop_);
-                assert!(client.bind(client_addr).is_ok());
-                let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
-                let buf = slice_to_uv_buf(msg);
-                do client.send(buf, server_addr) |client, status| {
-                    uvdebug!("writing");
-                    assert!(status.is_none());
-                    client.close(||{});
+            let test_function: ~fn() = || {
+                let io = unsafe { local_io() };
+                let addr = next_test_ip4();
+                let maybe_socket = io.udp_bind(addr);
+                // this socket is bound to this event loop
+                assert!(maybe_socket.is_ok());
+
+                // block self on sched1
+                do task::unkillable { // FIXME(#8674)
+                    let scheduler: ~Scheduler = Local::take();
+                    do scheduler.deschedule_running_task_and_then |_, task| {
+                        // unblock task
+                        do task.wake().map |task| {
+                            // send self to sched2
+                            tasksFriendHandle.take().send(TaskFromFriend(task));
+                        };
+                        // sched1 should now sleep since it has nothing else to do
+                    }
                 }
+                // sched2 will wake up and get the task as we do nothing else,
+                // the function ends and the socket goes out of scope sched2
+                // will start to run the destructor the destructor will first
+                // block the task, set it's home as sched1, then enqueue it
+                // sched2 will dequeue the task, see that it has a home, and
+                // send it to sched1 sched1 will wake up, exec the close
+                // function on the correct loop, and then we're done
+            };
+
+            let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None,
+                                                test_function);
+            main_task.death.on_exit = Some(on_exit);
+            let main_task = Cell::new(main_task);
+
+            let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool,
+                                                         None) || {});
 
-                loop_.run();
-                loop_.close();
+            let sched1 = Cell::new(sched1);
+            let sched2 = Cell::new(sched2);
+
+            let thread1 = do Thread::start {
+                sched1.take().bootstrap(main_task.take());
+            };
+            let thread2 = do Thread::start {
+                sched2.take().bootstrap(null_task.take());
             };
 
-            loop_.run();
-            loop_.close();
-            thread.join();
+            thread1.join();
+            thread2.join();
         }
     }
+
 }
index 18b05073e8306e48c023a8769315eb8b763745fb..bf24ec405c2f907e79ab3f7b280ed1440e38ddaa 100644 (file)
@@ -123,92 +123,52 @@ fn drop(&mut self) {
 #[cfg(test)]
 mod test {
     use super::*;
-    use Loop;
-    use std::unstable::run_in_bare_thread;
+    use std::rt::rtio::RtioTimer;
+    use super::super::run_uv_loop;
 
     #[test]
-    fn smoke_test() {
-        do run_in_bare_thread {
-            let mut count = 0;
-            let count_ptr: *mut int = &mut count;
-            let mut loop_ = Loop::new();
-            let mut timer = TimerWatcher::new(&mut loop_);
-            do timer.start(10, 0) |timer, status| {
-                assert!(status.is_none());
-                unsafe { *count_ptr += 1 };
-                timer.close(||());
-            }
-            loop_.run();
-            loop_.close();
-            assert!(count == 1);
+    fn oneshot() {
+        do run_uv_loop |l| {
+            let mut timer = TimerWatcher::new(l);
+            let port = timer.oneshot(1);
+            port.recv();
+            let port = timer.oneshot(1);
+            port.recv();
         }
     }
 
     #[test]
-    fn start_twice() {
-        do run_in_bare_thread {
-            let mut count = 0;
-            let count_ptr: *mut int = &mut count;
-            let mut loop_ = Loop::new();
-            let mut timer = TimerWatcher::new(&mut loop_);
-            do timer.start(10, 0) |timer, status| {
-                let mut timer = timer;
-                assert!(status.is_none());
-                unsafe { *count_ptr += 1 };
-                do timer.start(10, 0) |timer, status| {
-                    assert!(status.is_none());
-                    unsafe { *count_ptr += 1 };
-                    timer.close(||());
-                }
-            }
-            loop_.run();
-            loop_.close();
-            assert!(count == 2);
+    fn override() {
+        do run_uv_loop |l| {
+            let mut timer = TimerWatcher::new(l);
+            let oport = timer.oneshot(1);
+            let pport = timer.period(1);
+            timer.sleep(1);
+            assert_eq!(oport.try_recv(), None);
+            assert_eq!(pport.try_recv(), None);
+            timer.oneshot(1).recv();
         }
     }
 
     #[test]
-    fn repeat_stop() {
-        do run_in_bare_thread {
-            let mut count = 0;
-            let count_ptr: *mut int = &mut count;
-            let mut loop_ = Loop::new();
-            let mut timer = TimerWatcher::new(&mut loop_);
-            do timer.start(1, 2) |timer, status| {
-                assert!(status.is_none());
-                unsafe {
-                    *count_ptr += 1;
-
-                    if *count_ptr == 10 {
-
-                        // Stop the timer and do something else
-                        let mut timer = timer;
-                        timer.stop();
-                        // Freeze timer so it can be captured
-                        let timer = timer;
-
-                        let mut loop_ = timer.event_loop();
-                        let mut timer2 = TimerWatcher::new(&mut loop_);
-                        do timer2.start(10, 0) |timer2, _| {
-
-                            *count_ptr += 1;
-
-                            timer2.close(||());
-
-                            // Restart the original timer
-                            let mut timer = timer;
-                            do timer.start(1, 0) |timer, _| {
-                                *count_ptr += 1;
-                                timer.close(||());
-                            }
-                        }
-                    }
-                };
-            }
-            loop_.run();
-            loop_.close();
-            assert!(count == 12);
+    fn period() {
+        do run_uv_loop |l| {
+            let mut timer = TimerWatcher::new(l);
+            let port = timer.period(1);
+            port.recv();
+            port.recv();
+            let port = timer.period(1);
+            port.recv();
+            port.recv();
         }
     }
 
+    #[test]
+    fn sleep() {
+        do run_uv_loop |l| {
+            let mut timer = TimerWatcher::new(l);
+            timer.sleep(1);
+            timer.sleep(1);
+        }
+    }
 }
index 2aac43072dd01548f2e4d102c9357f3a53f1dd03..e9d8aab2e8b661eabae40eaf0802b3100eecaf5d 100644 (file)
@@ -9,8 +9,6 @@
 // except according to those terms.
 
 use std::c_str::CString;
-use std::cast::transmute;
-use std::cast;
 use std::comm::{SharedChan, GenericChan};
 use std::libc::c_int;
 use std::libc;
@@ -23,7 +21,6 @@
 use std::rt::rtio::*;
 use std::rt::sched::{Scheduler, SchedHandle};
 use std::rt::task::Task;
-use std::str;
 use std::libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY,
                 S_IRUSR, S_IWUSR};
 use std::rt::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write,
 use ai = std::rt::io::net::addrinfo;
 
 #[cfg(test)] use std::unstable::run_in_bare_thread;
-#[cfg(test)] use std::rt::test::{spawntask,
-                                 next_test_ip4,
-                                 run_in_mt_newsched_task};
-#[cfg(test)] use std::rt::comm::oneshot;
 
 use super::*;
 use addrinfo::GetAddrInfoRequest;
@@ -370,626 +363,3 @@ fn signal(&mut self, signum: Signum, channel: SharedChan<Signum>)
         }
     }
 }
-
-// this function is full of lies
-unsafe fn local_io() -> &'static mut IoFactory {
-    do Local::borrow |sched: &mut Scheduler| {
-        let mut io = None;
-        sched.event_loop.io(|i| io = Some(i));
-        cast::transmute(io.unwrap())
-    }
-}
-
-#[test]
-fn test_simple_io_no_connect() {
-    do run_in_mt_newsched_task {
-        unsafe {
-            let io = local_io();
-            let addr = next_test_ip4();
-            let maybe_chan = io.tcp_connect(addr);
-            assert!(maybe_chan.is_err());
-        }
-    }
-}
-
-#[test]
-fn test_simple_udp_io_bind_only() {
-    do run_in_mt_newsched_task {
-        unsafe {
-            let io = local_io();
-            let addr = next_test_ip4();
-            let maybe_socket = io.udp_bind(addr);
-            assert!(maybe_socket.is_ok());
-        }
-    }
-}
-
-#[test]
-fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
-    use std::rt::sleeper_list::SleeperList;
-    use std::rt::work_queue::WorkQueue;
-    use std::rt::thread::Thread;
-    use std::rt::task::Task;
-    use std::rt::sched::{Shutdown, TaskFromFriend};
-    use std::rt::task::UnwindResult;
-    do run_in_bare_thread {
-        let sleepers = SleeperList::new();
-        let work_queue1 = WorkQueue::new();
-        let work_queue2 = WorkQueue::new();
-        let queues = ~[work_queue1.clone(), work_queue2.clone()];
-
-        let loop1 = ~UvEventLoop::new() as ~EventLoop;
-        let mut sched1 = ~Scheduler::new(loop1, work_queue1, queues.clone(),
-                                         sleepers.clone());
-        let loop2 = ~UvEventLoop::new() as ~EventLoop;
-        let mut sched2 = ~Scheduler::new(loop2, work_queue2, queues.clone(),
-                                         sleepers.clone());
-
-        let handle1 = Cell::new(sched1.make_handle());
-        let handle2 = Cell::new(sched2.make_handle());
-        let tasksFriendHandle = Cell::new(sched2.make_handle());
-
-        let on_exit: ~fn(UnwindResult) = |exit_status| {
-            handle1.take().send(Shutdown);
-            handle2.take().send(Shutdown);
-            assert!(exit_status.is_success());
-        };
-
-        let test_function: ~fn() = || {
-            let io = unsafe { local_io() };
-            let addr = next_test_ip4();
-            let maybe_socket = io.udp_bind(addr);
-            // this socket is bound to this event loop
-            assert!(maybe_socket.is_ok());
-
-            // block self on sched1
-            do task::unkillable { // FIXME(#8674)
-                let scheduler: ~Scheduler = Local::take();
-                do scheduler.deschedule_running_task_and_then |_, task| {
-                    // unblock task
-                    do task.wake().map |task| {
-                      // send self to sched2
-                      tasksFriendHandle.take().send(TaskFromFriend(task));
-                    };
-                    // sched1 should now sleep since it has nothing else to do
-                }
-            }
-            // sched2 will wake up and get the task
-            // as we do nothing else, the function ends and the socket goes out of scope
-            // sched2 will start to run the destructor
-            // the destructor will first block the task, set it's home as sched1, then enqueue it
-            // sched2 will dequeue the task, see that it has a home, and send it to sched1
-            // sched1 will wake up, exec the close function on the correct loop, and then we're done
-        };
-
-        let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function);
-        main_task.death.on_exit = Some(on_exit);
-        let main_task = Cell::new(main_task);
-
-        let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {});
-
-        let sched1 = Cell::new(sched1);
-        let sched2 = Cell::new(sched2);
-
-        let thread1 = do Thread::start {
-            sched1.take().bootstrap(main_task.take());
-        };
-        let thread2 = do Thread::start {
-            sched2.take().bootstrap(null_task.take());
-        };
-
-        thread1.join();
-        thread2.join();
-    }
-}
-
-#[test]
-fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() {
-    use std::rt::sleeper_list::SleeperList;
-    use std::rt::work_queue::WorkQueue;
-    use std::rt::thread::Thread;
-    use std::rt::task::Task;
-    use std::rt::comm::oneshot;
-    use std::rt::sched::Shutdown;
-    use std::rt::task::UnwindResult;
-    do run_in_bare_thread {
-        let sleepers = SleeperList::new();
-        let work_queue1 = WorkQueue::new();
-        let work_queue2 = WorkQueue::new();
-        let queues = ~[work_queue1.clone(), work_queue2.clone()];
-
-        let loop1 = ~UvEventLoop::new() as ~EventLoop;
-        let mut sched1 = ~Scheduler::new(loop1, work_queue1, queues.clone(),
-                                         sleepers.clone());
-        let loop2 = ~UvEventLoop::new() as ~EventLoop;
-        let mut sched2 = ~Scheduler::new(loop2, work_queue2, queues.clone(),
-                                         sleepers.clone());
-
-        let handle1 = Cell::new(sched1.make_handle());
-        let handle2 = Cell::new(sched2.make_handle());
-
-        let (port, chan) = oneshot();
-        let port = Cell::new(port);
-        let chan = Cell::new(chan);
-
-        let body1: ~fn() = || {
-            let io = unsafe { local_io() };
-            let addr = next_test_ip4();
-            let socket = io.udp_bind(addr);
-            assert!(socket.is_ok());
-            chan.take().send(socket);
-        };
-
-        let body2: ~fn() = || {
-            let socket = port.take().recv();
-            assert!(socket.is_ok());
-            /* The socket goes out of scope and the destructor is called.
-             * The destructor:
-             *  - sends itself back to sched1
-             *  - frees the socket
-             *  - resets the home of the task to whatever it was previously
-             */
-        };
-
-        let on_exit: ~fn(UnwindResult) = |exit| {
-            handle1.take().send(Shutdown);
-            handle2.take().send(Shutdown);
-            assert!(exit.is_success());
-        };
-
-        let task1 = Cell::new(~Task::new_root(&mut sched1.stack_pool, None, body1));
-
-        let mut task2 = ~Task::new_root(&mut sched2.stack_pool, None, body2);
-        task2.death.on_exit = Some(on_exit);
-        let task2 = Cell::new(task2);
-
-        let sched1 = Cell::new(sched1);
-        let sched2 = Cell::new(sched2);
-
-        let thread1 = do Thread::start {
-            sched1.take().bootstrap(task1.take());
-        };
-        let thread2 = do Thread::start {
-            sched2.take().bootstrap(task2.take());
-        };
-
-        thread1.join();
-        thread2.join();
-    }
-}
-
-#[test]
-fn test_simple_tcp_server_and_client() {
-    do run_in_mt_newsched_task {
-        let addr = next_test_ip4();
-        let (port, chan) = oneshot();
-        let port = Cell::new(port);
-        let chan = Cell::new(chan);
-
-        // Start the server first so it's listening when we connect
-        do spawntask {
-            unsafe {
-                let io = local_io();
-                let listener = io.tcp_bind(addr).unwrap();
-                let mut acceptor = listener.listen().unwrap();
-                chan.take().send(());
-                let mut stream = acceptor.accept().unwrap();
-                let mut buf = [0, .. 2048];
-                let nread = stream.read(buf).unwrap();
-                assert_eq!(nread, 8);
-                for i in range(0u, nread) {
-                    uvdebug!("{}", buf[i]);
-                    assert_eq!(buf[i], i as u8);
-                }
-            }
-        }
-
-        do spawntask {
-            unsafe {
-                port.take().recv();
-                let io = local_io();
-                let mut stream = io.tcp_connect(addr).unwrap();
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-            }
-        }
-    }
-}
-
-#[test]
-fn test_simple_tcp_server_and_client_on_diff_threads() {
-    use std::rt::sleeper_list::SleeperList;
-    use std::rt::work_queue::WorkQueue;
-    use std::rt::thread::Thread;
-    use std::rt::task::Task;
-    use std::rt::sched::{Shutdown};
-    use std::rt::task::UnwindResult;
-    do run_in_bare_thread {
-        let sleepers = SleeperList::new();
-
-        let server_addr = next_test_ip4();
-        let client_addr = server_addr.clone();
-
-        let server_work_queue = WorkQueue::new();
-        let client_work_queue = WorkQueue::new();
-        let queues = ~[server_work_queue.clone(), client_work_queue.clone()];
-
-        let sloop = ~UvEventLoop::new() as ~EventLoop;
-        let mut server_sched = ~Scheduler::new(sloop, server_work_queue,
-                                               queues.clone(), sleepers.clone());
-        let cloop = ~UvEventLoop::new() as ~EventLoop;
-        let mut client_sched = ~Scheduler::new(cloop, client_work_queue,
-                                               queues.clone(), sleepers.clone());
-
-        let server_handle = Cell::new(server_sched.make_handle());
-        let client_handle = Cell::new(client_sched.make_handle());
-
-        let server_on_exit: ~fn(UnwindResult) = |exit_status| {
-            server_handle.take().send(Shutdown);
-            assert!(exit_status.is_success());
-        };
-
-        let client_on_exit: ~fn(UnwindResult) = |exit_status| {
-            client_handle.take().send(Shutdown);
-            assert!(exit_status.is_success());
-        };
-
-        let server_fn: ~fn() = || {
-            let io = unsafe { local_io() };
-            let listener = io.tcp_bind(server_addr).unwrap();
-            let mut acceptor = listener.listen().unwrap();
-            let mut stream = acceptor.accept().unwrap();
-            let mut buf = [0, .. 2048];
-            let nread = stream.read(buf).unwrap();
-            assert_eq!(nread, 8);
-            for i in range(0u, nread) {
-                assert_eq!(buf[i], i as u8);
-            }
-        };
-
-        let client_fn: ~fn() = || {
-            let io = unsafe { local_io() };
-            let mut stream = io.tcp_connect(client_addr);
-            while stream.is_err() {
-                stream = io.tcp_connect(client_addr);
-            }
-            stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
-        };
-
-        let mut server_task = ~Task::new_root(&mut server_sched.stack_pool, None, server_fn);
-        server_task.death.on_exit = Some(server_on_exit);
-        let server_task = Cell::new(server_task);
-
-        let mut client_task = ~Task::new_root(&mut client_sched.stack_pool, None, client_fn);
-        client_task.death.on_exit = Some(client_on_exit);
-        let client_task = Cell::new(client_task);
-
-        let server_sched = Cell::new(server_sched);
-        let client_sched = Cell::new(client_sched);
-
-        let server_thread = do Thread::start {
-            server_sched.take().bootstrap(server_task.take());
-        };
-        let client_thread = do Thread::start {
-            client_sched.take().bootstrap(client_task.take());
-        };
-
-        server_thread.join();
-        client_thread.join();
-    }
-}
-
-#[test]
-fn test_simple_udp_server_and_client() {
-    do run_in_mt_newsched_task {
-        let server_addr = next_test_ip4();
-        let client_addr = next_test_ip4();
-        let (port, chan) = oneshot();
-        let port = Cell::new(port);
-        let chan = Cell::new(chan);
-
-        do spawntask {
-            unsafe {
-                let io = local_io();
-                let mut server_socket = io.udp_bind(server_addr).unwrap();
-                chan.take().send(());
-                let mut buf = [0, .. 2048];
-                let (nread,src) = server_socket.recvfrom(buf).unwrap();
-                assert_eq!(nread, 8);
-                for i in range(0u, nread) {
-                    uvdebug!("{}", buf[i]);
-                    assert_eq!(buf[i], i as u8);
-                }
-                assert_eq!(src, client_addr);
-            }
-        }
-
-        do spawntask {
-            unsafe {
-                let io = local_io();
-                let mut client_socket = io.udp_bind(client_addr).unwrap();
-                port.take().recv();
-                client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr);
-            }
-        }
-    }
-}
-
-#[test] #[ignore(reason = "busted")]
-fn test_read_and_block() {
-    do run_in_mt_newsched_task {
-        let addr = next_test_ip4();
-        let (port, chan) = oneshot();
-        let port = Cell::new(port);
-        let chan = Cell::new(chan);
-
-        do spawntask {
-            let io = unsafe { local_io() };
-            let listener = io.tcp_bind(addr).unwrap();
-            let mut acceptor = listener.listen().unwrap();
-            chan.take().send(());
-            let mut stream = acceptor.accept().unwrap();
-            let mut buf = [0, .. 2048];
-
-            let expected = 32;
-            let mut current = 0;
-            let mut reads = 0;
-
-            while current < expected {
-                let nread = stream.read(buf).unwrap();
-                for i in range(0u, nread) {
-                    let val = buf[i] as uint;
-                    assert_eq!(val, current % 8);
-                    current += 1;
-                }
-                reads += 1;
-
-                do task::unkillable { // FIXME(#8674)
-                    let scheduler: ~Scheduler = Local::take();
-                    // Yield to the other task in hopes that it
-                    // will trigger a read callback while we are
-                    // not ready for it
-                    do scheduler.deschedule_running_task_and_then |sched, task| {
-                        let task = Cell::new(task);
-                        sched.enqueue_blocked_task(task.take());
-                    }
-                }
-            }
-
-            // Make sure we had multiple reads
-            assert!(reads > 1);
-        }
-
-        do spawntask {
-            unsafe {
-                port.take().recv();
-                let io = local_io();
-                let mut stream = io.tcp_connect(addr).unwrap();
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-            }
-        }
-
-    }
-}
-
-#[test]
-fn test_read_read_read() {
-    do run_in_mt_newsched_task {
-        let addr = next_test_ip4();
-        static MAX: uint = 500000;
-        let (port, chan) = oneshot();
-        let port = Cell::new(port);
-        let chan = Cell::new(chan);
-
-        do spawntask {
-            unsafe {
-                let io = local_io();
-                let listener = io.tcp_bind(addr).unwrap();
-                let mut acceptor = listener.listen().unwrap();
-                chan.take().send(());
-                let mut stream = acceptor.accept().unwrap();
-                let buf = [1, .. 2048];
-                let mut total_bytes_written = 0;
-                while total_bytes_written < MAX {
-                    stream.write(buf);
-                    total_bytes_written += buf.len();
-                }
-            }
-        }
-
-        do spawntask {
-            unsafe {
-                port.take().recv();
-                let io = local_io();
-                let mut stream = io.tcp_connect(addr).unwrap();
-                let mut buf = [0, .. 2048];
-                let mut total_bytes_read = 0;
-                while total_bytes_read < MAX {
-                    let nread = stream.read(buf).unwrap();
-                    uvdebug!("read {} bytes", nread);
-                    total_bytes_read += nread;
-                    for i in range(0u, nread) {
-                        assert_eq!(buf[i], 1);
-                    }
-                }
-                uvdebug!("read {} bytes total", total_bytes_read);
-            }
-        }
-    }
-}
-
-#[test]
-#[ignore(cfg(windows))] // FIXME(#10102) the server never sees the second send
-fn test_udp_twice() {
-    do run_in_mt_newsched_task {
-        let server_addr = next_test_ip4();
-        let client_addr = next_test_ip4();
-        let (port, chan) = oneshot();
-        let port = Cell::new(port);
-        let chan = Cell::new(chan);
-
-        do spawntask {
-            unsafe {
-                let io = local_io();
-                let mut client = io.udp_bind(client_addr).unwrap();
-                port.take().recv();
-                assert!(client.sendto([1], server_addr).is_ok());
-                assert!(client.sendto([2], server_addr).is_ok());
-            }
-        }
-
-        do spawntask {
-            unsafe {
-                let io = local_io();
-                let mut server = io.udp_bind(server_addr).unwrap();
-                chan.take().send(());
-                let mut buf1 = [0];
-                let mut buf2 = [0];
-                let (nread1, src1) = server.recvfrom(buf1).unwrap();
-                let (nread2, src2) = server.recvfrom(buf2).unwrap();
-                assert_eq!(nread1, 1);
-                assert_eq!(nread2, 1);
-                assert_eq!(src1, client_addr);
-                assert_eq!(src2, client_addr);
-                assert_eq!(buf1[0], 1);
-                assert_eq!(buf2[0], 2);
-            }
-        }
-    }
-}
-
-#[test]
-fn test_udp_many_read() {
-    do run_in_mt_newsched_task {
-        let server_out_addr = next_test_ip4();
-        let server_in_addr = next_test_ip4();
-        let client_out_addr = next_test_ip4();
-        let client_in_addr = next_test_ip4();
-        static MAX: uint = 500_000;
-
-        let (p1, c1) = oneshot();
-        let (p2, c2) = oneshot();
-
-        let first = Cell::new((p1, c2));
-        let second = Cell::new((p2, c1));
-
-        do spawntask {
-            unsafe {
-                let io = local_io();
-                let mut server_out = io.udp_bind(server_out_addr).unwrap();
-                let mut server_in = io.udp_bind(server_in_addr).unwrap();
-                let (port, chan) = first.take();
-                chan.send(());
-                port.recv();
-                let msg = [1, .. 2048];
-                let mut total_bytes_sent = 0;
-                let mut buf = [1];
-                while buf[0] == 1 {
-                    // send more data
-                    assert!(server_out.sendto(msg, client_in_addr).is_ok());
-                    total_bytes_sent += msg.len();
-                    // check if the client has received enough
-                    let res = server_in.recvfrom(buf);
-                    assert!(res.is_ok());
-                    let (nread, src) = res.unwrap();
-                    assert_eq!(nread, 1);
-                    assert_eq!(src, client_out_addr);
-                }
-                assert!(total_bytes_sent >= MAX);
-            }
-        }
-
-        do spawntask {
-            unsafe {
-                let io = local_io();
-                let mut client_out = io.udp_bind(client_out_addr).unwrap();
-                let mut client_in = io.udp_bind(client_in_addr).unwrap();
-                let (port, chan) = second.take();
-                port.recv();
-                chan.send(());
-                let mut total_bytes_recv = 0;
-                let mut buf = [0, .. 2048];
-                while total_bytes_recv < MAX {
-                    // ask for more
-                    assert!(client_out.sendto([1], server_in_addr).is_ok());
-                    // wait for data
-                    let res = client_in.recvfrom(buf);
-                    assert!(res.is_ok());
-                    let (nread, src) = res.unwrap();
-                    assert_eq!(src, server_out_addr);
-                    total_bytes_recv += nread;
-                    for i in range(0u, nread) {
-                        assert_eq!(buf[i], 1);
-                    }
-                }
-                // tell the server we're done
-                assert!(client_out.sendto([0], server_in_addr).is_ok());
-            }
-        }
-    }
-}
-
-#[test]
-fn test_timer_sleep_simple() {
-    do run_in_mt_newsched_task {
-        unsafe {
-            let io = local_io();
-            let timer = io.timer_init();
-            do timer.map |mut t| { t.sleep(1) };
-        }
-    }
-}
-
-fn file_test_uvio_full_simple_impl() {
-    use std::rt::io::{Open, ReadWrite, Read};
-    unsafe {
-        let io = local_io();
-        let write_val = "hello uvio!";
-        let path = "./tmp/file_test_uvio_full.txt";
-        {
-            let create_fm = Open;
-            let create_fa = ReadWrite;
-            let mut fd = io.fs_open(&path.to_c_str(), create_fm, create_fa).unwrap();
-            let write_buf = write_val.as_bytes();
-            fd.write(write_buf);
-        }
-        {
-            let ro_fm = Open;
-            let ro_fa = Read;
-            let mut fd = io.fs_open(&path.to_c_str(), ro_fm, ro_fa).unwrap();
-            let mut read_vec = [0, .. 1028];
-            let nread = fd.read(read_vec).unwrap();
-            let read_val = str::from_utf8(read_vec.slice(0, nread as uint));
-            assert!(read_val == write_val.to_owned());
-        }
-        io.fs_unlink(&path.to_c_str());
-    }
-}
-
-#[test]
-fn file_test_uvio_full_simple() {
-    do run_in_mt_newsched_task {
-        file_test_uvio_full_simple_impl();
-    }
-}
-
-fn uvio_naive_print(input: &str) {
-    unsafe {
-        use std::libc::{STDOUT_FILENO};
-        let io = local_io();
-        {
-            let mut fd = io.fs_from_raw_fd(STDOUT_FILENO, DontClose);
-            let write_buf = input.as_bytes();
-            fd.write(write_buf);
-        }
-    }
-}
-
-#[test]
-fn file_test_uvio_write_to_stdout() {
-    do run_in_mt_newsched_task {
-        uvio_naive_print("jubilation\n");
-    }
-}