use std::libc::{c_int, c_char, c_void, c_uint};
use std::libc;
use std::rt::BlockedTask;
-use std::rt::io;
use std::rt::io::{FileStat, IoError};
-use std::rt::rtio;
+use std::rt::io;
use std::rt::local::Local;
+use std::rt::rtio;
use std::rt::sched::{Scheduler, SchedHandle};
use std::vec;
-use super::{Loop, UvError, uv_error_to_io_error};
+use super::{Loop, UvError, uv_error_to_io_error, wait_until_woken_after};
use uvio::HomingIO;
use uvll;
0 => {
req.fired = true;
let mut slot = None;
- unsafe { uvll::set_data_for_req(req.req, &slot) }
- let sched: ~Scheduler = Local::take();
- do sched.deschedule_running_task_and_then |_, task| {
- slot = Some(task);
+ do wait_until_woken_after(&mut slot) {
+ unsafe { uvll::set_data_for_req(req.req, &slot) }
}
match req.get_result() {
n if n < 0 => Err(UvError(n)),
use std::str;
use std::vec;
use super::*;
- use super::super::{run_uv_loop};
+ use l = super::super::local_loop;
#[test]
fn file_test_full_simple_sync() {
- do run_uv_loop |l| {
- let create_flags = O_RDWR | O_CREAT;
- let read_flags = O_RDONLY;
- let mode = S_IWUSR | S_IRUSR;
- let path_str = "./tmp/file_full_simple_sync.txt";
-
- {
- // open/create
- let result = FsRequest::open(l, &path_str.to_c_str(),
- create_flags as int, mode as int);
- assert!(result.is_ok());
- let result = result.unwrap();
- let fd = result.fd;
-
- // write
- let result = FsRequest::write(l, fd, "hello".as_bytes(), -1);
- assert!(result.is_ok());
- }
+ let create_flags = O_RDWR | O_CREAT;
+ let read_flags = O_RDONLY;
+ let mode = S_IWUSR | S_IRUSR;
+ let path_str = "./tmp/file_full_simple_sync.txt";
+
+ {
+ // open/create
+ let result = FsRequest::open(l(), &path_str.to_c_str(),
+ create_flags as int, mode as int);
+ assert!(result.is_ok());
+ let result = result.unwrap();
+ let fd = result.fd;
- {
- // re-open
- let result = FsRequest::open(l, &path_str.to_c_str(),
- read_flags as int, 0);
- assert!(result.is_ok());
- let result = result.unwrap();
- let fd = result.fd;
-
- // read
- let mut read_mem = vec::from_elem(1000, 0u8);
- let result = FsRequest::read(l, fd, read_mem, 0);
- assert!(result.is_ok());
-
- let nread = result.unwrap();
- assert!(nread > 0);
- let read_str = str::from_utf8(read_mem.slice(0, nread as uint));
- assert_eq!(read_str, ~"hello");
- }
- // unlink
- let result = FsRequest::unlink(l, &path_str.to_c_str());
+ // write
+ let result = FsRequest::write(l(), fd, "hello".as_bytes(), -1);
assert!(result.is_ok());
}
+
+ {
+ // re-open
+ let result = FsRequest::open(l(), &path_str.to_c_str(),
+ read_flags as int, 0);
+ assert!(result.is_ok());
+ let result = result.unwrap();
+ let fd = result.fd;
+
+ // read
+ let mut read_mem = vec::from_elem(1000, 0u8);
+ let result = FsRequest::read(l(), fd, read_mem, 0);
+ assert!(result.is_ok());
+
+ let nread = result.unwrap();
+ assert!(nread > 0);
+ let read_str = str::from_utf8(read_mem.slice(0, nread as uint));
+ assert_eq!(read_str, ~"hello");
+ }
+ // unlink
+ let result = FsRequest::unlink(l(), &path_str.to_c_str());
+ assert!(result.is_ok());
}
#[test]
fn file_test_stat() {
- do run_uv_loop |l| {
- let path = &"./tmp/file_test_stat_simple".to_c_str();
- let create_flags = (O_RDWR | O_CREAT) as int;
- let mode = (S_IWUSR | S_IRUSR) as int;
+ let path = &"./tmp/file_test_stat_simple".to_c_str();
+ let create_flags = (O_RDWR | O_CREAT) as int;
+ let mode = (S_IWUSR | S_IRUSR) as int;
- let result = FsRequest::open(l, path, create_flags, mode);
- assert!(result.is_ok());
- let file = result.unwrap();
+ let result = FsRequest::open(l(), path, create_flags, mode);
+ assert!(result.is_ok());
+ let file = result.unwrap();
- let result = FsRequest::write(l, file.fd, "hello".as_bytes(), 0);
- assert!(result.is_ok());
+ let result = FsRequest::write(l(), file.fd, "hello".as_bytes(), 0);
+ assert!(result.is_ok());
- let result = FsRequest::stat(l, path);
- assert!(result.is_ok());
- assert_eq!(result.unwrap().size, 5);
+ let result = FsRequest::stat(l(), path);
+ assert!(result.is_ok());
+ assert_eq!(result.unwrap().size, 5);
- fn free<T>(_: T) {}
- free(file);
+ fn free<T>(_: T) {}
+ free(file);
- let result = FsRequest::unlink(l, path);
- assert!(result.is_ok());
- }
+ let result = FsRequest::unlink(l(), path);
+ assert!(result.is_ok());
}
#[test]
fn file_test_mk_rm_dir() {
- do run_uv_loop |l| {
- let path = &"./tmp/mk_rm_dir".to_c_str();
- let mode = S_IWUSR | S_IRUSR;
+ let path = &"./tmp/mk_rm_dir".to_c_str();
+ let mode = S_IWUSR | S_IRUSR;
- let result = FsRequest::mkdir(l, path, mode);
- assert!(result.is_ok());
+ let result = FsRequest::mkdir(l(), path, mode);
+ assert!(result.is_ok());
- let result = FsRequest::stat(l, path);
- assert!(result.is_ok());
- assert!(result.unwrap().kind == io::TypeDirectory);
+ let result = FsRequest::stat(l(), path);
+ assert!(result.is_ok());
+ assert!(result.unwrap().kind == io::TypeDirectory);
- let result = FsRequest::rmdir(l, path);
- assert!(result.is_ok());
+ let result = FsRequest::rmdir(l(), path);
+ assert!(result.is_ok());
- let result = FsRequest::stat(l, path);
- assert!(result.is_err());
- }
+ let result = FsRequest::stat(l(), path);
+ assert!(result.is_err());
}
#[test]
fn file_test_mkdir_chokes_on_double_create() {
- do run_uv_loop |l| {
- let path = &"./tmp/double_create_dir".to_c_str();
- let mode = S_IWUSR | S_IRUSR;
-
- let result = FsRequest::stat(l, path);
- assert!(result.is_err(), "{:?}", result);
- let result = FsRequest::mkdir(l, path, mode as c_int);
- assert!(result.is_ok(), "{:?}", result);
- let result = FsRequest::mkdir(l, path, mode as c_int);
- assert!(result.is_err(), "{:?}", result);
- let result = FsRequest::rmdir(l, path);
- assert!(result.is_ok(), "{:?}", result);
- }
+ let path = &"./tmp/double_create_dir".to_c_str();
+ let mode = S_IWUSR | S_IRUSR;
+
+ let result = FsRequest::stat(l(), path);
+ assert!(result.is_err(), "{:?}", result);
+ let result = FsRequest::mkdir(l(), path, mode as c_int);
+ assert!(result.is_ok(), "{:?}", result);
+ let result = FsRequest::mkdir(l(), path, mode as c_int);
+ assert!(result.is_err(), "{:?}", result);
+ let result = FsRequest::rmdir(l(), path);
+ assert!(result.is_ok(), "{:?}", result);
}
#[test]
fn file_test_rmdir_chokes_on_nonexistant_path() {
- do run_uv_loop |l| {
- let path = &"./tmp/never_existed_dir".to_c_str();
- let result = FsRequest::rmdir(l, path);
- assert!(result.is_err());
- }
+ let path = &"./tmp/never_existed_dir".to_c_str();
+ let result = FsRequest::rmdir(l(), path);
+ assert!(result.is_err());
}
}
use std::rt::sched::{Scheduler, SchedHandle};
use std::rt::tube::Tube;
use std::str;
+use std::task;
use std::vec;
use stream::StreamWatcher;
use super::{Loop, Request, UvError, Buf, status_to_io_result,
- uv_error_to_io_error, UvHandle, slice_to_uv_buf};
+ uv_error_to_io_error, UvHandle, slice_to_uv_buf,
+ wait_until_woken_after};
use uvio::HomingIO;
use uvll;
{
struct Ctx { status: c_int, task: Option<BlockedTask> }
- let tcp = TcpWatcher::new(loop_);
- let ret = do socket_addr_as_uv_socket_addr(address) |addr| {
- let req = Request::new(uvll::UV_CONNECT);
- let result = match addr {
- UvIpv4SocketAddr(addr) => unsafe {
- uvll::tcp_connect(req.handle, tcp.handle, addr,
- connect_cb)
- },
- UvIpv6SocketAddr(addr) => unsafe {
- uvll::tcp_connect6(req.handle, tcp.handle, addr,
- connect_cb)
- },
- };
- match result {
- 0 => {
- let mut cx = Ctx { status: 0, task: None };
- req.set_data(&cx);
- req.defuse();
- let scheduler: ~Scheduler = Local::take();
- do scheduler.deschedule_running_task_and_then |_, task| {
- cx.task = Some(task);
- }
- match cx.status {
- 0 => Ok(()),
- n => Err(UvError(n)),
+ return do task::unkillable {
+ let tcp = TcpWatcher::new(loop_);
+ let ret = do socket_addr_as_uv_socket_addr(address) |addr| {
+ let mut req = Request::new(uvll::UV_CONNECT);
+ let result = match addr {
+ UvIpv4SocketAddr(addr) => unsafe {
+ uvll::tcp_connect(req.handle, tcp.handle, addr,
+ connect_cb)
+ },
+ UvIpv6SocketAddr(addr) => unsafe {
+ uvll::tcp_connect6(req.handle, tcp.handle, addr,
+ connect_cb)
+ },
+ };
+ match result {
+ 0 => {
+ req.defuse(); // uv callback now owns this request
+ let mut cx = Ctx { status: 0, task: None };
+ do wait_until_woken_after(&mut cx.task) {
+ req.set_data(&cx);
+ }
+ match cx.status {
+ 0 => Ok(()),
+ n => Err(UvError(n)),
+ }
}
+ n => Err(UvError(n))
}
- n => Err(UvError(n))
- }
- };
+ };
- return match ret {
- Ok(()) => Ok(tcp),
- Err(e) => Err(e),
+ match ret {
+ Ok(()) => Ok(tcp),
+ Err(e) => Err(e),
+ }
};
extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
let req = Request::wrap(req);
- if status == uvll::ECANCELED { return }
- let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
+ assert!(status != uvll::ECANCELED);
+ let cx: &mut Ctx = unsafe { req.get_data() };
cx.status = status;
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(cx.task.take_unwrap());
}
}
+impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
+ fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
+}
+
impl Drop for TcpWatcher {
fn drop(&mut self) {
let _m = self.fire_homing_missile();
- self.stream.close();
+ self.close();
}
}
pub fn bind(loop_: &mut Loop, address: SocketAddr)
-> Result<~TcpListener, UvError>
{
- let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
- assert_eq!(unsafe {
- uvll::uv_tcp_init(loop_.handle, handle)
- }, 0);
- let l = ~TcpListener {
- home: get_handle_to_current_scheduler!(),
- handle: handle,
- closing_task: None,
- outgoing: Tube::new(),
- };
- let res = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
- match addr {
- UvIpv4SocketAddr(addr) => uvll::tcp_bind(l.handle, addr),
- UvIpv6SocketAddr(addr) => uvll::tcp_bind6(l.handle, addr),
+ do task::unkillable {
+ let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
+ assert_eq!(unsafe {
+ uvll::uv_tcp_init(loop_.handle, handle)
+ }, 0);
+ let l = ~TcpListener {
+ home: get_handle_to_current_scheduler!(),
+ handle: handle,
+ closing_task: None,
+ outgoing: Tube::new(),
+ };
+ let res = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
+ match addr {
+ UvIpv4SocketAddr(addr) => uvll::tcp_bind(l.handle, addr),
+ UvIpv6SocketAddr(addr) => uvll::tcp_bind6(l.handle, addr),
+ }
+ });
+ match res {
+ 0 => Ok(l.install()),
+ n => Err(UvError(n))
}
- });
- match res {
- 0 => Ok(l.install()),
- n => Err(UvError(n))
}
}
}
}
extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
+ assert!(status != uvll::ECANCELED);
let msg = match status {
0 => {
let loop_ = Loop::wrap(unsafe {
assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
Ok(~client as ~rtio::RtioTcpStream)
}
- uvll::ECANCELED => return,
n => Err(uv_error_to_io_error(UvError(n)))
};
impl Drop for TcpListener {
fn drop(&mut self) {
- let (_m, sched) = self.fire_homing_missile_sched();
-
- do sched.deschedule_running_task_and_then |_, task| {
- self.closing_task = Some(task);
- unsafe { uvll::uv_close(self.handle, listener_close_cb) }
- }
+ let _m = self.fire_homing_missile();
+ self.close();
}
}
pub fn bind(loop_: &Loop, address: SocketAddr)
-> Result<UdpWatcher, UvError>
{
- let udp = UdpWatcher {
- handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
- home: get_handle_to_current_scheduler!(),
- };
- assert_eq!(unsafe {
- uvll::uv_udp_init(loop_.handle, udp.handle)
- }, 0);
- let result = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
- match addr {
- UvIpv4SocketAddr(addr) => uvll::udp_bind(udp.handle, addr, 0u32),
- UvIpv6SocketAddr(addr) => uvll::udp_bind6(udp.handle, addr, 0u32),
+ do task::unkillable {
+ let udp = UdpWatcher {
+ handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
+ home: get_handle_to_current_scheduler!(),
+ };
+ assert_eq!(unsafe {
+ uvll::uv_udp_init(loop_.handle, udp.handle)
+ }, 0);
+ let result = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
+ match addr {
+ UvIpv4SocketAddr(addr) =>
+ uvll::udp_bind(udp.handle, addr, 0u32),
+ UvIpv6SocketAddr(addr) =>
+ uvll::udp_bind6(udp.handle, addr, 0u32),
+ }
+ });
+ match result {
+ 0 => Ok(udp),
+ n => Err(UvError(n)),
}
- });
- match result {
- 0 => Ok(udp),
- n => Err(UvError(n)),
}
}
}
+impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
+ fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
+}
+
impl HomingIO for UdpWatcher {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
}
}
let _m = self.fire_homing_missile();
- return match unsafe {
+ let a = match unsafe {
uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
} {
0 => {
buf: Some(slice_to_uv_buf(buf)),
result: None,
};
- unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) }
- let scheduler: ~Scheduler = Local::take();
- do scheduler.deschedule_running_task_and_then |_, task| {
- cx.task = Some(task);
+ do wait_until_woken_after(&mut cx.task) {
+ unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) }
}
match cx.result.take_unwrap() {
(n, _) if n < 0 =>
}
n => Err(uv_error_to_io_error(UvError(n)))
};
+ return a;
extern fn alloc_cb(handle: *uvll::uv_udp_t,
_suggested_size: size_t) -> Buf {
let cx: &mut Ctx = unsafe {
cast::transmute(uvll::get_data_for_uv_handle(handle))
};
- cx.buf.take().expect("alloc_cb called more than once")
+ cx.buf.take().expect("recv alloc_cb called more than once")
}
- extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, _buf: Buf,
+ extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
addr: *uvll::sockaddr, _flags: c_uint) {
+ assert!(nread != uvll::ECANCELED as ssize_t);
+ let cx: &mut Ctx = unsafe {
+ cast::transmute(uvll::get_data_for_uv_handle(handle))
+ };
// When there's no data to read the recv callback can be a no-op.
// This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
// this we just drop back to kqueue and wait for the next callback.
- if nread == 0 { return }
- if nread == uvll::ECANCELED as ssize_t { return }
+ if nread == 0 {
+ cx.buf = Some(buf);
+ return
+ }
unsafe {
assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
let _m = self.fire_homing_missile();
- let req = Request::new(uvll::UV_UDP_SEND);
+ let mut req = Request::new(uvll::UV_UDP_SEND);
let buf = slice_to_uv_buf(buf);
let result = socket_addr_as_uv_socket_addr(dst, |dst| unsafe {
match dst {
return match result {
0 => {
+ req.defuse(); // uv callback now owns this request
let mut cx = Ctx { task: None, result: 0 };
- req.set_data(&cx);
- req.defuse();
-
- let sched: ~Scheduler = Local::take();
- do sched.deschedule_running_task_and_then |_, task| {
- cx.task = Some(task);
+ do wait_until_woken_after(&mut cx.task) {
+ req.set_data(&cx);
}
-
match cx.result {
0 => Ok(()),
n => Err(uv_error_to_io_error(UvError(n)))
extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
let req = Request::wrap(req);
- let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
+ assert!(status != uvll::ECANCELED);
+ let cx: &mut Ctx = unsafe { req.get_data() };
cx.result = status;
let sched: ~Scheduler = Local::take();
impl Drop for UdpWatcher {
fn drop(&mut self) {
// Send ourselves home to close this handle (blocking while doing so).
- let (_m, sched) = self.fire_homing_missile_sched();
- let mut slot = None;
- unsafe {
- uvll::set_data_for_uv_handle(self.handle, &slot);
- uvll::uv_close(self.handle, close_cb);
- }
- do sched.deschedule_running_task_and_then |_, task| {
- slot = Some(task);
- }
-
- extern fn close_cb(handle: *uvll::uv_handle_t) {
- let slot: &mut Option<BlockedTask> = unsafe {
- cast::transmute(uvll::get_data_for_uv_handle(handle))
- };
- unsafe { uvll::free_handle(handle) }
- let sched: ~Scheduler = Local::take();
- sched.resume_blocked_task_immediately(slot.take_unwrap());
- }
+ let _m = self.fire_homing_missile();
+ self.close();
}
}
use std::task;
use super::*;
- use super::super::{Loop, run_uv_loop};
+ use super::super::local_loop;
#[test]
fn connect_close_ip4() {
- do run_uv_loop |l| {
- match TcpWatcher::connect(l, next_test_ip4()) {
- Ok(*) => fail!(),
- Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
- }
+ match TcpWatcher::connect(local_loop(), next_test_ip4()) {
+ Ok(*) => fail!(),
+ Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
}
}
#[test]
fn connect_close_ip6() {
- do run_uv_loop |l| {
- match TcpWatcher::connect(l, next_test_ip6()) {
- Ok(*) => fail!(),
- Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
- }
+ match TcpWatcher::connect(local_loop(), next_test_ip6()) {
+ Ok(*) => fail!(),
+ Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
}
}
#[test]
fn udp_bind_close_ip4() {
- do run_uv_loop |l| {
- match UdpWatcher::bind(l, next_test_ip4()) {
- Ok(*) => {}
- Err(*) => fail!()
- }
+ match UdpWatcher::bind(local_loop(), next_test_ip4()) {
+ Ok(*) => {}
+ Err(*) => fail!()
}
}
#[test]
fn udp_bind_close_ip6() {
- do run_uv_loop |l| {
- match UdpWatcher::bind(l, next_test_ip6()) {
- Ok(*) => {}
- Err(*) => fail!()
- }
+ match UdpWatcher::bind(local_loop(), next_test_ip6()) {
+ Ok(*) => {}
+ Err(*) => fail!()
}
}
#[test]
fn listen_ip4() {
- do run_uv_loop |l| {
- let (port, chan) = oneshot();
- let chan = Cell::new(chan);
- let addr = next_test_ip4();
-
- let handle = l.handle;
- do spawn {
- let w = match TcpListener::bind(&mut Loop::wrap(handle), addr) {
- Ok(w) => w, Err(e) => fail!("{:?}", e)
- };
- let mut w = match w.listen() {
- Ok(w) => w, Err(e) => fail!("{:?}", e),
- };
- chan.take().send(());
- match w.accept() {
- Ok(mut stream) => {
- let mut buf = [0u8, ..10];
- match stream.read(buf) {
- Ok(10) => {} e => fail!("{:?}", e),
- }
- for i in range(0, 10u8) {
- assert_eq!(buf[i], i + 1);
- }
- }
- Err(e) => fail!("{:?}", e)
- }
- }
+ let (port, chan) = oneshot();
+ let chan = Cell::new(chan);
+ let addr = next_test_ip4();
- port.recv();
- let mut w = match TcpWatcher::connect(&mut Loop::wrap(handle), addr) {
+ do spawn {
+ let w = match TcpListener::bind(local_loop(), addr) {
Ok(w) => w, Err(e) => fail!("{:?}", e)
};
- match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
- Ok(()) => {}, Err(e) => fail!("{:?}", e)
+ let mut w = match w.listen() {
+ Ok(w) => w, Err(e) => fail!("{:?}", e),
+ };
+ chan.take().send(());
+ match w.accept() {
+ Ok(mut stream) => {
+ let mut buf = [0u8, ..10];
+ match stream.read(buf) {
+ Ok(10) => {} e => fail!("{:?}", e),
+ }
+ for i in range(0, 10u8) {
+ assert_eq!(buf[i], i + 1);
+ }
+ }
+ Err(e) => fail!("{:?}", e)
}
}
+
+ port.recv();
+ let mut w = match TcpWatcher::connect(local_loop(), addr) {
+ Ok(w) => w, Err(e) => fail!("{:?}", e)
+ };
+ match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
+ Ok(()) => {}, Err(e) => fail!("{:?}", e)
+ }
}
#[test]
fn listen_ip6() {
- do run_uv_loop |l| {
- let (port, chan) = oneshot();
- let chan = Cell::new(chan);
- let addr = next_test_ip6();
-
- let handle = l.handle;
- do spawn {
- let w = match TcpListener::bind(&mut Loop::wrap(handle), addr) {
- Ok(w) => w, Err(e) => fail!("{:?}", e)
- };
- let mut w = match w.listen() {
- Ok(w) => w, Err(e) => fail!("{:?}", e),
- };
- chan.take().send(());
- match w.accept() {
- Ok(mut stream) => {
- let mut buf = [0u8, ..10];
- match stream.read(buf) {
- Ok(10) => {} e => fail!("{:?}", e),
- }
- for i in range(0, 10u8) {
- assert_eq!(buf[i], i + 1);
- }
- }
- Err(e) => fail!("{:?}", e)
- }
- }
+ let (port, chan) = oneshot();
+ let chan = Cell::new(chan);
+ let addr = next_test_ip6();
- port.recv();
- let mut w = match TcpWatcher::connect(&mut Loop::wrap(handle), addr) {
+ do spawn {
+ let w = match TcpListener::bind(local_loop(), addr) {
Ok(w) => w, Err(e) => fail!("{:?}", e)
};
- match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
- Ok(()) => {}, Err(e) => fail!("{:?}", e)
+ let mut w = match w.listen() {
+ Ok(w) => w, Err(e) => fail!("{:?}", e),
+ };
+ chan.take().send(());
+ match w.accept() {
+ Ok(mut stream) => {
+ let mut buf = [0u8, ..10];
+ match stream.read(buf) {
+ Ok(10) => {} e => fail!("{:?}", e),
+ }
+ for i in range(0, 10u8) {
+ assert_eq!(buf[i], i + 1);
+ }
+ }
+ Err(e) => fail!("{:?}", e)
}
}
+
+ port.recv();
+ let mut w = match TcpWatcher::connect(local_loop(), addr) {
+ Ok(w) => w, Err(e) => fail!("{:?}", e)
+ };
+ match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
+ Ok(()) => {}, Err(e) => fail!("{:?}", e)
+ }
}
#[test]
fn udp_recv_ip4() {
- do run_uv_loop |l| {
- let (port, chan) = oneshot();
- let chan = Cell::new(chan);
- let client = next_test_ip4();
- let server = next_test_ip4();
-
- let handle = l.handle;
- do spawn {
- match UdpWatcher::bind(&mut Loop::wrap(handle), server) {
- Ok(mut w) => {
- chan.take().send(());
- let mut buf = [0u8, ..10];
- match w.recvfrom(buf) {
- Ok((10, addr)) => assert_eq!(addr, client),
- e => fail!("{:?}", e),
- }
- for i in range(0, 10u8) {
- assert_eq!(buf[i], i + 1);
- }
+ let (port, chan) = oneshot();
+ let chan = Cell::new(chan);
+ let client = next_test_ip4();
+ let server = next_test_ip4();
+
+ do spawn {
+ match UdpWatcher::bind(local_loop(), server) {
+ Ok(mut w) => {
+ chan.take().send(());
+ let mut buf = [0u8, ..10];
+ match w.recvfrom(buf) {
+ Ok((10, addr)) => assert_eq!(addr, client),
+ e => fail!("{:?}", e),
+ }
+ for i in range(0, 10u8) {
+ assert_eq!(buf[i], i + 1);
}
- Err(e) => fail!("{:?}", e)
}
+ Err(e) => fail!("{:?}", e)
}
+ }
- port.recv();
- let mut w = match UdpWatcher::bind(&mut Loop::wrap(handle), client) {
- Ok(w) => w, Err(e) => fail!("{:?}", e)
- };
- match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
- Ok(()) => {}, Err(e) => fail!("{:?}", e)
- }
+ port.recv();
+ let mut w = match UdpWatcher::bind(local_loop(), client) {
+ Ok(w) => w, Err(e) => fail!("{:?}", e)
+ };
+ match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
+ Ok(()) => {}, Err(e) => fail!("{:?}", e)
}
}
#[test]
fn udp_recv_ip6() {
- do run_uv_loop |l| {
- let (port, chan) = oneshot();
- let chan = Cell::new(chan);
- let client = next_test_ip6();
- let server = next_test_ip6();
-
- let handle = l.handle;
- do spawn {
- match UdpWatcher::bind(&mut Loop::wrap(handle), server) {
- Ok(mut w) => {
- chan.take().send(());
- let mut buf = [0u8, ..10];
- match w.recvfrom(buf) {
- Ok((10, addr)) => assert_eq!(addr, client),
- e => fail!("{:?}", e),
- }
- for i in range(0, 10u8) {
- assert_eq!(buf[i], i + 1);
- }
+ let (port, chan) = oneshot();
+ let chan = Cell::new(chan);
+ let client = next_test_ip6();
+ let server = next_test_ip6();
+
+ do spawn {
+ match UdpWatcher::bind(local_loop(), server) {
+ Ok(mut w) => {
+ chan.take().send(());
+ let mut buf = [0u8, ..10];
+ match w.recvfrom(buf) {
+ Ok((10, addr)) => assert_eq!(addr, client),
+ e => fail!("{:?}", e),
+ }
+ for i in range(0, 10u8) {
+ assert_eq!(buf[i], i + 1);
}
- Err(e) => fail!("{:?}", e)
}
+ Err(e) => fail!("{:?}", e)
}
+ }
- port.recv();
- let mut w = match UdpWatcher::bind(&mut Loop::wrap(handle), client) {
- Ok(w) => w, Err(e) => fail!("{:?}", e)
- };
- match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
- Ok(()) => {}, Err(e) => fail!("{:?}", e)
- }
+ port.recv();
+ let mut w = match UdpWatcher::bind(local_loop(), client) {
+ Ok(w) => w, Err(e) => fail!("{:?}", e)
+ };
+ match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
+ Ok(()) => {}, Err(e) => fail!("{:?}", e)
}
}
#[test]
fn test_read_read_read() {
- do run_uv_loop |l| {
- let addr = next_test_ip4();
- static MAX: uint = 500000;
- let (port, chan) = oneshot();
- let port = Cell::new(port);
- let chan = Cell::new(chan);
-
- let handle = l.handle;
- do spawntask {
- let l = &mut Loop::wrap(handle);
- let listener = TcpListener::bind(l, addr).unwrap();
- let mut acceptor = listener.listen().unwrap();
- chan.take().send(());
- let mut stream = acceptor.accept().unwrap();
- let buf = [1, .. 2048];
- let mut total_bytes_written = 0;
- while total_bytes_written < MAX {
- stream.write(buf);
- total_bytes_written += buf.len();
- }
+ use std::rt::rtio::*;
+ let addr = next_test_ip4();
+ static MAX: uint = 5000;
+ let (port, chan) = oneshot();
+ let port = Cell::new(port);
+ let chan = Cell::new(chan);
+
+ do spawn {
+ let listener = TcpListener::bind(local_loop(), addr).unwrap();
+ let mut acceptor = listener.listen().unwrap();
+ chan.take().send(());
+ let mut stream = acceptor.accept().unwrap();
+ let buf = [1, .. 2048];
+ let mut total_bytes_written = 0;
+ while total_bytes_written < MAX {
+ assert!(stream.write(buf).is_ok());
+ uvdebug!("wrote bytes");
+ total_bytes_written += buf.len();
}
+ }
- do spawntask {
- let l = &mut Loop::wrap(handle);
- port.take().recv();
- let mut stream = TcpWatcher::connect(l, addr).unwrap();
- let mut buf = [0, .. 2048];
- let mut total_bytes_read = 0;
- while total_bytes_read < MAX {
- let nread = stream.read(buf).unwrap();
- uvdebug!("read {} bytes", nread);
- total_bytes_read += nread;
- for i in range(0u, nread) {
- assert_eq!(buf[i], 1);
- }
+ do spawn {
+ port.take().recv();
+ let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
+ let mut buf = [0, .. 2048];
+ let mut total_bytes_read = 0;
+ while total_bytes_read < MAX {
+ let nread = stream.read(buf).unwrap();
+ total_bytes_read += nread;
+ for i in range(0u, nread) {
+ assert_eq!(buf[i], 1);
}
- uvdebug!("read {} bytes total", total_bytes_read);
}
+ uvdebug!("read {} bytes total", total_bytes_read);
}
}
#[test]
- #[ignore(cfg(windows))] // FIXME(#10102) the server never sees the second send
fn test_udp_twice() {
- do run_uv_loop |l| {
- let server_addr = next_test_ip4();
- let client_addr = next_test_ip4();
- let (port, chan) = oneshot();
- let port = Cell::new(port);
- let chan = Cell::new(chan);
-
- let handle = l.handle;
- do spawntask {
- let l = &mut Loop::wrap(handle);
- let mut client = UdpWatcher::bind(l, client_addr).unwrap();
- port.take().recv();
- assert!(client.sendto([1], server_addr).is_ok());
- assert!(client.sendto([2], server_addr).is_ok());
- }
+ let server_addr = next_test_ip4();
+ let client_addr = next_test_ip4();
+ let (port, chan) = oneshot();
+ let port = Cell::new(port);
+ let chan = Cell::new(chan);
- do spawntask {
- let l = &mut Loop::wrap(handle);
- let mut server = UdpWatcher::bind(l, server_addr).unwrap();
- chan.take().send(());
- let mut buf1 = [0];
- let mut buf2 = [0];
- let (nread1, src1) = server.recvfrom(buf1).unwrap();
- let (nread2, src2) = server.recvfrom(buf2).unwrap();
- assert_eq!(nread1, 1);
- assert_eq!(nread2, 1);
- assert_eq!(src1, client_addr);
- assert_eq!(src2, client_addr);
- assert_eq!(buf1[0], 1);
- assert_eq!(buf2[0], 2);
- }
+ do spawn {
+ let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
+ port.take().recv();
+ assert!(client.sendto([1], server_addr).is_ok());
+ assert!(client.sendto([2], server_addr).is_ok());
}
+
+ let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
+ chan.take().send(());
+ let mut buf1 = [0];
+ let mut buf2 = [0];
+ let (nread1, src1) = server.recvfrom(buf1).unwrap();
+ let (nread2, src2) = server.recvfrom(buf2).unwrap();
+ assert_eq!(nread1, 1);
+ assert_eq!(nread2, 1);
+ assert_eq!(src1, client_addr);
+ assert_eq!(src2, client_addr);
+ assert_eq!(buf1[0], 1);
+ assert_eq!(buf2[0], 2);
}
#[test]
fn test_udp_many_read() {
- do run_uv_loop |l| {
- let server_out_addr = next_test_ip4();
- let server_in_addr = next_test_ip4();
- let client_out_addr = next_test_ip4();
- let client_in_addr = next_test_ip4();
- static MAX: uint = 500_000;
-
- let (p1, c1) = oneshot();
- let (p2, c2) = oneshot();
-
- let first = Cell::new((p1, c2));
- let second = Cell::new((p2, c1));
-
- let handle = l.handle;
- do spawntask {
- let l = &mut Loop::wrap(handle);
- let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
- let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
- let (port, chan) = first.take();
- chan.send(());
- port.recv();
- let msg = [1, .. 2048];
- let mut total_bytes_sent = 0;
- let mut buf = [1];
- while buf[0] == 1 {
- // send more data
- assert!(server_out.sendto(msg, client_in_addr).is_ok());
- total_bytes_sent += msg.len();
- // check if the client has received enough
- let res = server_in.recvfrom(buf);
- assert!(res.is_ok());
- let (nread, src) = res.unwrap();
- assert_eq!(nread, 1);
- assert_eq!(src, client_out_addr);
- }
- assert!(total_bytes_sent >= MAX);
+ let server_out_addr = next_test_ip4();
+ let server_in_addr = next_test_ip4();
+ let client_out_addr = next_test_ip4();
+ let client_in_addr = next_test_ip4();
+ static MAX: uint = 500_000;
+
+ let (p1, c1) = oneshot();
+ let (p2, c2) = oneshot();
+
+ let first = Cell::new((p1, c2));
+ let second = Cell::new((p2, c1));
+
+ do spawn {
+ let l = local_loop();
+ let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
+ let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
+ let (port, chan) = first.take();
+ chan.send(());
+ port.recv();
+ let msg = [1, .. 2048];
+ let mut total_bytes_sent = 0;
+ let mut buf = [1];
+ while buf[0] == 1 {
+ // send more data
+ assert!(server_out.sendto(msg, client_in_addr).is_ok());
+ total_bytes_sent += msg.len();
+ // check if the client has received enough
+ let res = server_in.recvfrom(buf);
+ assert!(res.is_ok());
+ let (nread, src) = res.unwrap();
+ assert_eq!(nread, 1);
+ assert_eq!(src, client_out_addr);
}
+ assert!(total_bytes_sent >= MAX);
+ }
- do spawntask {
- let l = &mut Loop::wrap(handle);
- let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
- let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
- let (port, chan) = second.take();
- port.recv();
- chan.send(());
- let mut total_bytes_recv = 0;
- let mut buf = [0, .. 2048];
- while total_bytes_recv < MAX {
- // ask for more
- assert!(client_out.sendto([1], server_in_addr).is_ok());
- // wait for data
- let res = client_in.recvfrom(buf);
- assert!(res.is_ok());
- let (nread, src) = res.unwrap();
- assert_eq!(src, server_out_addr);
- total_bytes_recv += nread;
- for i in range(0u, nread) {
- assert_eq!(buf[i], 1);
- }
+ do spawn {
+ let l = local_loop();
+ let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
+ let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
+ let (port, chan) = second.take();
+ port.recv();
+ chan.send(());
+ let mut total_bytes_recv = 0;
+ let mut buf = [0, .. 2048];
+ while total_bytes_recv < MAX {
+ // ask for more
+ assert!(client_out.sendto([1], server_in_addr).is_ok());
+ // wait for data
+ let res = client_in.recvfrom(buf);
+ assert!(res.is_ok());
+ let (nread, src) = res.unwrap();
+ assert_eq!(src, server_out_addr);
+ total_bytes_recv += nread;
+ for i in range(0u, nread) {
+ assert_eq!(buf[i], 1);
}
- // tell the server we're done
- assert!(client_out.sendto([0], server_in_addr).is_ok());
}
+ // tell the server we're done
+ assert!(client_out.sendto([0], server_in_addr).is_ok());
}
}
#[test]
fn test_read_and_block() {
- do run_uv_loop |l| {
- let addr = next_test_ip4();
- let (port, chan) = oneshot();
- let port = Cell::new(port);
- let chan = Cell::new(chan);
-
- let handle = l.handle;
- do spawntask {
- let l = &mut Loop::wrap(handle);
- let listener = TcpListener::bind(l, addr).unwrap();
- let mut acceptor = listener.listen().unwrap();
- let (port2, chan2) = stream();
- chan.take().send(port2);
- let mut stream = acceptor.accept().unwrap();
- let mut buf = [0, .. 2048];
-
- let expected = 32;
- let mut current = 0;
- let mut reads = 0;
-
- while current < expected {
- let nread = stream.read(buf).unwrap();
- for i in range(0u, nread) {
- let val = buf[i] as uint;
- assert_eq!(val, current % 8);
- current += 1;
- }
- reads += 1;
+ let addr = next_test_ip4();
+ let (port, chan) = oneshot();
+ let port = Cell::new(port);
+ let chan = Cell::new(chan);
+
+ do spawn {
+ let listener = TcpListener::bind(local_loop(), addr).unwrap();
+ let mut acceptor = listener.listen().unwrap();
+ let (port2, chan2) = stream();
+ chan.take().send(port2);
+ let mut stream = acceptor.accept().unwrap();
+ let mut buf = [0, .. 2048];
+
+ let expected = 32;
+ let mut current = 0;
+ let mut reads = 0;
- chan2.send(());
+ while current < expected {
+ let nread = stream.read(buf).unwrap();
+ for i in range(0u, nread) {
+ let val = buf[i] as uint;
+ assert_eq!(val, current % 8);
+ current += 1;
}
+ reads += 1;
- // Make sure we had multiple reads
- assert!(reads > 1);
+ chan2.send(());
}
- do spawntask {
- let l = &mut Loop::wrap(handle);
- let port2 = port.take().recv();
- let mut stream = TcpWatcher::connect(l, addr).unwrap();
- stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
- stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
- port2.recv();
- stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
- stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
- port2.recv();
- }
+ // Make sure we had multiple reads
+ assert!(reads > 1);
+ }
+
+ do spawn {
+ let port2 = port.take().recv();
+ let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
+ stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+ stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+ port2.recv();
+ stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+ stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+ port2.recv();
}
}
let addr = next_test_ip4();
do task::spawn_sched(task::SingleThreaded) {
- do run_uv_loop |l| {
- let listener = TcpListener::bind(l, addr).unwrap();
- let mut acceptor = listener.listen().unwrap();
- let mut stream = acceptor.accept().unwrap();
- let mut buf = [0, .. 2048];
- let nread = stream.read(buf).unwrap();
- assert_eq!(nread, 8);
- for i in range(0u, nread) {
- assert_eq!(buf[i], i as u8);
- }
+ let listener = TcpListener::bind(local_loop(), addr).unwrap();
+ let mut acceptor = listener.listen().unwrap();
+ let mut stream = acceptor.accept().unwrap();
+ let mut buf = [0, .. 2048];
+ let nread = stream.read(buf).unwrap();
+ assert_eq!(nread, 8);
+ for i in range(0u, nread) {
+ assert_eq!(buf[i], i as u8);
}
}
do task::spawn_sched(task::SingleThreaded) {
- do run_uv_loop |l| {
- let mut stream = TcpWatcher::connect(l, addr);
- while stream.is_err() {
- stream = TcpWatcher::connect(l, addr);
- }
- stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
+ let mut stream = TcpWatcher::connect(local_loop(), addr);
+ while stream.is_err() {
+ stream = TcpWatcher::connect(local_loop(), addr);
}
+ stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
}
}
do task::spawn_sched(task::SingleThreaded) {
let chan = Cell::new(chan.take());
- do run_uv_loop |l| {
- let listener = UdpWatcher::bind(l, next_test_ip4()).unwrap();
- chan.take().send(listener);
- }
+ let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap();
+ chan.take().send(listener);
}
do task::spawn_sched(task::SingleThreaded) {
let port = Cell::new(port.take());
- do run_uv_loop |_l| {
- port.take().recv();
- }
+ port.take().recv();
}
}
}
}
+ #[should_fail]
+ #[test]
+ #[ignore(reason = "linked failure")]
+ fn linked_failure1() {
+ let (port, chan) = oneshot();
+ let chan = Cell::new(chan);
+ let addr = next_test_ip4();
+
+ do spawn {
+ let w = TcpListener::bind(local_loop(), addr).unwrap();
+ let mut w = w.listen().unwrap();
+ chan.take().send(());
+ w.accept();
+ }
+
+ port.recv();
+ fail!();
+ }
+
+ #[should_fail]
+ #[test]
+ #[ignore(reason = "linked failure")]
+ fn linked_failure2() {
+ let (port, chan) = oneshot();
+ let chan = Cell::new(chan);
+ let addr = next_test_ip4();
+
+ do spawn {
+ let w = TcpListener::bind(local_loop(), addr).unwrap();
+ let mut w = w.listen().unwrap();
+ chan.take().send(());
+ let mut buf = [0];
+ w.accept().unwrap().read(buf);
+ }
+
+ port.recv();
+ let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
+
+ fail!();
+ }
+
+ #[should_fail]
+ #[test]
+ #[ignore(reason = "linked failure")]
+ fn linked_failure3() {
+ let (port, chan) = stream();
+ let chan = Cell::new(chan);
+ let addr = next_test_ip4();
+
+ do spawn {
+ let chan = chan.take();
+ let w = TcpListener::bind(local_loop(), addr).unwrap();
+ let mut w = w.listen().unwrap();
+ chan.send(());
+ let mut conn = w.accept().unwrap();
+ chan.send(());
+ let buf = [0, ..65536];
+ conn.write(buf);
+ }
+
+ port.recv();
+ let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
+ port.recv();
+ fail!();
+ }
}