use std::cast;
use std::io::IoError;
use std::io::net::ip;
-use std::libc::{size_t, ssize_t, c_int, c_void, c_uint};
-use std::libc;
+use libc::{size_t, ssize_t, c_int, c_void, c_uint};
+use libc;
use std::mem;
use std::ptr;
use std::rt::rtio;
/// Generic functions related to dealing with sockaddr things
////////////////////////////////////////////////////////////////////////////////
-pub fn htons(u: u16) -> u16 { mem::to_be16(u as i16) as u16 }
-pub fn ntohs(u: u16) -> u16 { mem::from_be16(u as i16) as u16 }
+pub fn htons(u: u16) -> u16 { mem::to_be16(u) }
+pub fn ntohs(u: u16) -> u16 { mem::from_be16(u) }
pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
len: uint) -> ip::SocketAddr {
handle: *uvll::uv_tcp_t,
stream: StreamWatcher,
home: HomeHandle,
- priv refcount: Refcount,
+ refcount: Refcount,
// libuv can't support concurrent reads and concurrent writes of the same
// stream object, so we use these access guards in order to arbitrate among
// multiple concurrent reads and writes. Note that libuv *can* read and
// write simultaneously, it just can't read and read simultaneously.
- priv read_access: Access,
- priv write_access: Access,
+ read_access: Access,
+ write_access: Access,
}
pub struct TcpListener {
home: HomeHandle,
handle: *uvll::uv_pipe_t,
- priv closing_task: Option<BlockedTask>,
- priv outgoing: Chan<Result<~rtio::RtioTcpStream, IoError>>,
- priv incoming: Port<Result<~rtio::RtioTcpStream, IoError>>,
+ closing_task: Option<BlockedTask>,
+ outgoing: Sender<Result<~rtio::RtioTcpStream:Send, IoError>>,
+ incoming: Receiver<Result<~rtio::RtioTcpStream:Send, IoError>>,
}
pub struct TcpAcceptor {
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { status: 0, task: None };
- wait_until_woken_after(&mut cx.task, || {
+ wait_until_woken_after(&mut cx.task, &io.loop_, || {
req.set_data(&cx);
});
match cx.status {
})
}
- fn clone(&self) -> ~rtio::RtioTcpStream {
+ fn clone(&self) -> ~rtio::RtioTcpStream:Send {
~TcpWatcher {
handle: self.handle,
stream: StreamWatcher::new(self.handle),
refcount: self.refcount.clone(),
write_access: self.write_access.clone(),
read_access: self.read_access.clone(),
- } as ~rtio::RtioTcpStream
+ } as ~rtio::RtioTcpStream:Send
+ }
+
+ fn close_write(&mut self) -> Result<(), IoError> {
+ struct Ctx {
+ slot: Option<BlockedTask>,
+ status: c_int,
+ }
+ let mut req = Request::new(uvll::UV_SHUTDOWN);
+
+ return match unsafe {
+ uvll::uv_shutdown(req.handle, self.handle, shutdown_cb)
+ } {
+ 0 => {
+ req.defuse(); // uv callback now owns this request
+ let mut cx = Ctx { slot: None, status: 0 };
+
+ wait_until_woken_after(&mut cx.slot, &self.uv_loop(), || {
+ req.set_data(&cx);
+ });
+
+ status_to_io_result(cx.status)
+ }
+ n => Err(uv_error_to_io_error(UvError(n)))
+ };
+
+ extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
+ let req = Request::wrap(req);
+ assert!(status != uvll::ECANCELED);
+ let cx: &mut Ctx = unsafe { req.get_data() };
+ cx.status = status;
+ wakeup(&mut cx.slot);
+ }
}
}
assert_eq!(unsafe {
uvll::uv_tcp_init(io.uv_loop(), handle)
}, 0);
- let (port, chan) = Chan::new();
+ let (tx, rx) = channel();
let l = ~TcpListener {
home: io.make_handle(),
handle: handle,
closing_task: None,
- outgoing: chan,
- incoming: port,
+ outgoing: tx,
+ incoming: rx,
};
let (addr, _len) = addr_to_sockaddr(address);
let res = unsafe {
}
impl rtio::RtioTcpListener for TcpListener {
- fn listen(~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
+ fn listen(~self) -> Result<~rtio::RtioTcpAcceptor:Send, IoError> {
// create the acceptor object from ourselves
let mut acceptor = ~TcpAcceptor { listener: self };
let _m = acceptor.fire_homing_missile();
// FIXME: the 128 backlog should be configurable
match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
- 0 => Ok(acceptor as ~rtio::RtioTcpAcceptor),
+ 0 => Ok(acceptor as ~rtio::RtioTcpAcceptor:Send),
n => Err(uv_error_to_io_error(UvError(n))),
}
}
});
let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
- Ok(~client as ~rtio::RtioTcpStream)
+ Ok(~client as ~rtio::RtioTcpStream:Send)
}
n => Err(uv_error_to_io_error(UvError(n)))
};
}
impl rtio::RtioTcpAcceptor for TcpAcceptor {
- fn accept(&mut self) -> Result<~rtio::RtioTcpStream, IoError> {
+ fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> {
self.listener.incoming.recv()
}
home: HomeHandle,
// See above for what these fields are
- priv refcount: Refcount,
- priv read_access: Access,
- priv write_access: Access,
+ refcount: Refcount,
+ read_access: Access,
+ write_access: Access,
}
impl UdpWatcher {
buf: Option<Buf>,
result: Option<(ssize_t, Option<ip::SocketAddr>)>,
}
+ let loop_ = self.uv_loop();
let m = self.fire_homing_missile();
let _g = self.read_access.grant(m);
result: None,
};
let handle = self.handle;
- wait_until_woken_after(&mut cx.task, || {
+ wait_until_woken_after(&mut cx.task, &loop_, || {
unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
});
match cx.result.take_unwrap() {
struct Ctx { task: Option<BlockedTask>, result: c_int }
let m = self.fire_homing_missile();
+ let loop_ = self.uv_loop();
let _g = self.write_access.grant(m);
let mut req = Request::new(uvll::UV_UDP_SEND);
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { task: None, result: 0 };
- wait_until_woken_after(&mut cx.task, || {
+ wait_until_woken_after(&mut cx.task, &loop_, || {
req.set_data(&cx);
});
match cx.result {
})
}
- fn clone(&self) -> ~rtio::RtioUdpSocket {
+ fn clone(&self) -> ~rtio::RtioUdpSocket:Send {
~UdpWatcher {
handle: self.handle,
home: self.home.clone(),
refcount: self.refcount.clone(),
write_access: self.write_access.clone(),
read_access: self.read_access.clone(),
- } as ~rtio::RtioUdpSocket
+ } as ~rtio::RtioUdpSocket:Send
}
}
fn connect_close_ip4() {
match TcpWatcher::connect(local_loop(), next_test_ip4()) {
Ok(..) => fail!(),
- Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
+ Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
}
}
fn connect_close_ip6() {
match TcpWatcher::connect(local_loop(), next_test_ip6()) {
Ok(..) => fail!(),
- Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
+ Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
}
}
#[test]
fn listen_ip4() {
- let (port, chan) = Chan::new();
+ let (tx, rx) = channel();
let addr = next_test_ip4();
spawn(proc() {
let mut w = match w.listen() {
Ok(w) => w, Err(e) => fail!("{:?}", e),
};
- chan.send(());
+ tx.send(());
match w.accept() {
Ok(mut stream) => {
let mut buf = [0u8, ..10];
Ok(10) => {} e => fail!("{:?}", e),
}
for i in range(0, 10u8) {
- assert_eq!(buf[i], i + 1);
+ assert_eq!(buf[i as uint], i + 1);
}
}
Err(e) => fail!("{:?}", e)
}
});
- port.recv();
+ rx.recv();
let mut w = match TcpWatcher::connect(local_loop(), addr) {
Ok(w) => w, Err(e) => fail!("{:?}", e)
};
#[test]
fn listen_ip6() {
- let (port, chan) = Chan::new();
+ let (tx, rx) = channel();
let addr = next_test_ip6();
spawn(proc() {
let mut w = match w.listen() {
Ok(w) => w, Err(e) => fail!("{:?}", e),
};
- chan.send(());
+ tx.send(());
match w.accept() {
Ok(mut stream) => {
let mut buf = [0u8, ..10];
Ok(10) => {} e => fail!("{:?}", e),
}
for i in range(0, 10u8) {
- assert_eq!(buf[i], i + 1);
+ assert_eq!(buf[i as uint], i + 1);
}
}
Err(e) => fail!("{:?}", e)
}
});
- port.recv();
+ rx.recv();
let mut w = match TcpWatcher::connect(local_loop(), addr) {
Ok(w) => w, Err(e) => fail!("{:?}", e)
};
#[test]
fn udp_recv_ip4() {
- let (port, chan) = Chan::new();
+ let (tx, rx) = channel();
let client = next_test_ip4();
let server = next_test_ip4();
spawn(proc() {
match UdpWatcher::bind(local_loop(), server) {
Ok(mut w) => {
- chan.send(());
+ tx.send(());
let mut buf = [0u8, ..10];
match w.recvfrom(buf) {
Ok((10, addr)) => assert_eq!(addr, client),
e => fail!("{:?}", e),
}
for i in range(0, 10u8) {
- assert_eq!(buf[i], i + 1);
+ assert_eq!(buf[i as uint], i + 1);
}
}
Err(e) => fail!("{:?}", e)
}
});
- port.recv();
+ rx.recv();
let mut w = match UdpWatcher::bind(local_loop(), client) {
Ok(w) => w, Err(e) => fail!("{:?}", e)
};
#[test]
fn udp_recv_ip6() {
- let (port, chan) = Chan::new();
+ let (tx, rx) = channel();
let client = next_test_ip6();
let server = next_test_ip6();
spawn(proc() {
match UdpWatcher::bind(local_loop(), server) {
Ok(mut w) => {
- chan.send(());
+ tx.send(());
let mut buf = [0u8, ..10];
match w.recvfrom(buf) {
Ok((10, addr)) => assert_eq!(addr, client),
e => fail!("{:?}", e),
}
for i in range(0, 10u8) {
- assert_eq!(buf[i], i + 1);
+ assert_eq!(buf[i as uint], i + 1);
}
}
Err(e) => fail!("{:?}", e)
}
});
- port.recv();
+ rx.recv();
let mut w = match UdpWatcher::bind(local_loop(), client) {
Ok(w) => w, Err(e) => fail!("{:?}", e)
};
fn test_read_read_read() {
let addr = next_test_ip4();
static MAX: uint = 5000;
- let (port, chan) = Chan::new();
+ let (tx, rx) = channel();
spawn(proc() {
let listener = TcpListener::bind(local_loop(), addr).unwrap();
let mut acceptor = listener.listen().unwrap();
- chan.send(());
+ tx.send(());
let mut stream = acceptor.accept().unwrap();
let buf = [1, .. 2048];
let mut total_bytes_written = 0;
}
});
- port.recv();
+ rx.recv();
let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
let mut buf = [0, .. 2048];
let mut total_bytes_read = 0;
fn test_udp_twice() {
let server_addr = next_test_ip4();
let client_addr = next_test_ip4();
- let (port, chan) = Chan::new();
+ let (tx, rx) = channel();
spawn(proc() {
let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
- port.recv();
+ rx.recv();
assert!(client.sendto([1], server_addr).is_ok());
assert!(client.sendto([2], server_addr).is_ok());
});
let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
- chan.send(());
+ tx.send(());
let mut buf1 = [0];
let mut buf2 = [0];
let (nread1, src1) = server.recvfrom(buf1).unwrap();
let client_in_addr = next_test_ip4();
static MAX: uint = 500_000;
- let (p1, c1) = Chan::new();
- let (p2, c2) = Chan::new();
+ let (tx1, rx1) = channel::<()>();
+ let (tx2, rx2) = channel::<()>();
spawn(proc() {
let l = local_loop();
let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
- let (port, chan) = (p1, c2);
- chan.send(());
- port.recv();
+ let (tx, rx) = (tx2, rx1);
+ tx.send(());
+ rx.recv();
let msg = [1, .. 2048];
let mut total_bytes_sent = 0;
let mut buf = [1];
let l = local_loop();
let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
- let (port, chan) = (p2, c1);
- port.recv();
- chan.send(());
+ let (tx, rx) = (tx1, rx2);
+ rx.recv();
+ tx.send(());
let mut total_bytes_recv = 0;
let mut buf = [0, .. 2048];
while total_bytes_recv < MAX {
#[test]
fn test_read_and_block() {
let addr = next_test_ip4();
- let (port, chan) = Chan::<Port<()>>::new();
+ let (tx, rx) = channel::<Receiver<()>>();
spawn(proc() {
- let port2 = port.recv();
+ let rx = rx.recv();
let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
- port2.recv();
+ rx.recv();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
- port2.recv();
+ rx.recv();
});
let listener = TcpListener::bind(local_loop(), addr).unwrap();
let mut acceptor = listener.listen().unwrap();
- let (port2, chan2) = Chan::new();
- chan.send(port2);
+ let (tx2, rx2) = channel();
+ tx.send(rx2);
let mut stream = acceptor.accept().unwrap();
let mut buf = [0, .. 2048];
}
reads += 1;
- chan2.try_send(());
+ let _ = tx2.send_opt(());
}
// Make sure we had multiple reads
#[should_fail] #[test]
fn tcp_stream_fail_cleanup() {
- let (port, chan) = Chan::new();
+ let (tx, rx) = channel();
let addr = next_test_ip4();
spawn(proc() {
let w = TcpListener::bind(local_loop(), addr).unwrap();
let mut w = w.listen().unwrap();
- chan.send(());
+ tx.send(());
drop(w.accept().unwrap());
});
- port.recv();
+ rx.recv();
let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
fail!();
}
#[should_fail] #[test]
fn udp_fail_other_task() {
let addr = next_test_ip4();
- let (port, chan) = Chan::new();
+ let (tx, rx) = channel();
// force the handle to be created on a different scheduler, failure in
// the original task will force a homing operation back to this
// scheduler.
spawn(proc() {
let w = UdpWatcher::bind(local_loop(), addr).unwrap();
- chan.send(w);
+ tx.send(w);
});
- let _w = port.recv();
+ let _w = rx.recv();
fail!();
}
}