use std::cast;
use std::io::IoError;
use std::io::net::ip;
-use std::libc::{size_t, ssize_t, c_int, c_void, c_uint};
-use std::libc;
+use libc::{size_t, ssize_t, c_int, c_void, c_uint};
+use libc;
use std::mem;
use std::ptr;
use std::rt::rtio;
/// Generic functions related to dealing with sockaddr things
////////////////////////////////////////////////////////////////////////////////
-pub fn htons(u: u16) -> u16 { mem::to_be16(u as i16) as u16 }
-pub fn ntohs(u: u16) -> u16 { mem::from_be16(u as i16) as u16 }
+pub fn htons(u: u16) -> u16 { mem::to_be16(u) }
+pub fn ntohs(u: u16) -> u16 { mem::from_be16(u) }
pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
len: uint) -> ip::SocketAddr {
handle: *uvll::uv_tcp_t,
stream: StreamWatcher,
home: HomeHandle,
- priv refcount: Refcount,
+ refcount: Refcount,
// libuv can't support concurrent reads and concurrent writes of the same
// stream object, so we use these access guards in order to arbitrate among
// multiple concurrent reads and writes. Note that libuv *can* read and
// write simultaneously, it just can't read and read simultaneously.
- priv read_access: Access,
- priv write_access: Access,
+ read_access: Access,
+ write_access: Access,
}
pub struct TcpListener {
home: HomeHandle,
handle: *uvll::uv_pipe_t,
- priv closing_task: Option<BlockedTask>,
- priv outgoing: Sender<Result<~rtio::RtioTcpStream, IoError>>,
- priv incoming: Receiver<Result<~rtio::RtioTcpStream, IoError>>,
+ closing_task: Option<BlockedTask>,
+ outgoing: Sender<Result<~rtio::RtioTcpStream:Send, IoError>>,
+ incoming: Receiver<Result<~rtio::RtioTcpStream:Send, IoError>>,
}
pub struct TcpAcceptor {
})
}
- fn clone(&self) -> ~rtio::RtioTcpStream {
+ fn clone(&self) -> ~rtio::RtioTcpStream:Send {
~TcpWatcher {
handle: self.handle,
stream: StreamWatcher::new(self.handle),
refcount: self.refcount.clone(),
write_access: self.write_access.clone(),
read_access: self.read_access.clone(),
- } as ~rtio::RtioTcpStream
+ } as ~rtio::RtioTcpStream:Send
+ }
+
+ fn close_write(&mut self) -> Result<(), IoError> {
+ struct Ctx {
+ slot: Option<BlockedTask>,
+ status: c_int,
+ }
+ let mut req = Request::new(uvll::UV_SHUTDOWN);
+
+ return match unsafe {
+ uvll::uv_shutdown(req.handle, self.handle, shutdown_cb)
+ } {
+ 0 => {
+ req.defuse(); // uv callback now owns this request
+ let mut cx = Ctx { slot: None, status: 0 };
+
+ wait_until_woken_after(&mut cx.slot, &self.uv_loop(), || {
+ req.set_data(&cx);
+ });
+
+ status_to_io_result(cx.status)
+ }
+ n => Err(uv_error_to_io_error(UvError(n)))
+ };
+
+ extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
+ let req = Request::wrap(req);
+ assert!(status != uvll::ECANCELED);
+ let cx: &mut Ctx = unsafe { req.get_data() };
+ cx.status = status;
+ wakeup(&mut cx.slot);
+ }
}
}
}
impl rtio::RtioTcpListener for TcpListener {
- fn listen(~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
+ fn listen(~self) -> Result<~rtio::RtioTcpAcceptor:Send, IoError> {
// create the acceptor object from ourselves
let mut acceptor = ~TcpAcceptor { listener: self };
let _m = acceptor.fire_homing_missile();
// FIXME: the 128 backlog should be configurable
match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
- 0 => Ok(acceptor as ~rtio::RtioTcpAcceptor),
+ 0 => Ok(acceptor as ~rtio::RtioTcpAcceptor:Send),
n => Err(uv_error_to_io_error(UvError(n))),
}
}
});
let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
- Ok(~client as ~rtio::RtioTcpStream)
+ Ok(~client as ~rtio::RtioTcpStream:Send)
}
n => Err(uv_error_to_io_error(UvError(n)))
};
}
impl rtio::RtioTcpAcceptor for TcpAcceptor {
- fn accept(&mut self) -> Result<~rtio::RtioTcpStream, IoError> {
+ fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> {
self.listener.incoming.recv()
}
home: HomeHandle,
// See above for what these fields are
- priv refcount: Refcount,
- priv read_access: Access,
- priv write_access: Access,
+ refcount: Refcount,
+ read_access: Access,
+ write_access: Access,
}
impl UdpWatcher {
})
}
- fn clone(&self) -> ~rtio::RtioUdpSocket {
+ fn clone(&self) -> ~rtio::RtioUdpSocket:Send {
~UdpWatcher {
handle: self.handle,
home: self.home.clone(),
refcount: self.refcount.clone(),
write_access: self.write_access.clone(),
read_access: self.read_access.clone(),
- } as ~rtio::RtioUdpSocket
+ } as ~rtio::RtioUdpSocket:Send
}
}
fn connect_close_ip4() {
match TcpWatcher::connect(local_loop(), next_test_ip4()) {
Ok(..) => fail!(),
- Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
+ Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
}
}
fn connect_close_ip6() {
match TcpWatcher::connect(local_loop(), next_test_ip6()) {
Ok(..) => fail!(),
- Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
+ Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
}
}
Ok(10) => {} e => fail!("{:?}", e),
}
for i in range(0, 10u8) {
- assert_eq!(buf[i], i + 1);
+ assert_eq!(buf[i as uint], i + 1);
}
}
Err(e) => fail!("{:?}", e)
Ok(10) => {} e => fail!("{:?}", e),
}
for i in range(0, 10u8) {
- assert_eq!(buf[i], i + 1);
+ assert_eq!(buf[i as uint], i + 1);
}
}
Err(e) => fail!("{:?}", e)
e => fail!("{:?}", e),
}
for i in range(0, 10u8) {
- assert_eq!(buf[i], i + 1);
+ assert_eq!(buf[i as uint], i + 1);
}
}
Err(e) => fail!("{:?}", e)
e => fail!("{:?}", e),
}
for i in range(0, 10u8) {
- assert_eq!(buf[i], i + 1);
+ assert_eq!(buf[i as uint], i + 1);
}
}
Err(e) => fail!("{:?}", e)
}
reads += 1;
- tx2.try_send(());
+ let _ = tx2.send_opt(());
}
// Make sure we had multiple reads