use std::rt::sched::Scheduler;
use net;
-use super::{Loop, UvError, NativeHandle};
-use uvll::UV_GETADDRINFO;
+use super::{Loop, UvError, NativeHandle, Request};
use uvll;
-struct GetAddrInfoRequest {
- handle: *uvll::uv_getaddrinfo_t,
-}
-
struct Addrinfo {
handle: *uvll::addrinfo,
}
addrinfo: Option<Addrinfo>,
}
-impl GetAddrInfoRequest {
- pub fn new() -> GetAddrInfoRequest {
- GetAddrInfoRequest {
- handle: unsafe { uvll::malloc_req(uvll::UV_GETADDRINFO) },
- }
- }
+pub struct GetAddrInfoRequest;
+impl GetAddrInfoRequest {
pub fn run(loop_: &Loop, node: Option<&str>, service: Option<&str>,
hints: Option<ai::Hint>) -> Result<~[ai::Info], UvError> {
assert!(node.is_some() || service.is_some());
}
});
let hint_ptr = hint.as_ref().map_default(null(), |x| x as *uvll::addrinfo);
- let req = GetAddrInfoRequest::new();
+ let req = Request::new(uvll::UV_GETADDRINFO);
return match unsafe {
uvll::uv_getaddrinfo(loop_.native_handle(), req.handle,
} {
0 => {
let mut cx = Ctx { slot: None, status: 0, addrinfo: None };
- unsafe { uvll::set_data_for_req(req.handle, &cx) }
+ req.set_data(&cx);
+ req.defuse();
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
cx.slot = Some(task);
extern fn getaddrinfo_cb(req: *uvll::uv_getaddrinfo_t,
status: c_int,
res: *uvll::addrinfo) {
- let cx: &mut Ctx = unsafe {
- cast::transmute(uvll::get_data_for_req(req))
- };
+ let req = Request::wrap(req);
+ if status == uvll::ECANCELED { return }
+ let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
cx.status = status;
cx.addrinfo = Some(Addrinfo { handle: res });
}
}
-impl Drop for GetAddrInfoRequest {
- fn drop(&mut self) {
- unsafe { uvll::free_req(self.handle) }
- }
-}
-
impl Drop for Addrinfo {
fn drop(&mut self) {
unsafe { uvll::uv_freeaddrinfo(self.handle) }
use std::cast::transmute;
use std::ptr::null;
use std::unstable::finally::Finally;
-use std::rt::io::net::ip::SocketAddr;
use std::rt::io::IoError;
//#[cfg(test)] use unstable::run_in_bare_thread;
pub use self::file::{FsRequest, FileWatcher};
-pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher};
+pub use self::net::{TcpWatcher, TcpListener, TcpAcceptor, UdpWatcher};
pub use self::idle::IdleWatcher;
pub use self::timer::TimerWatcher;
pub use self::async::AsyncWatcher;
pub use self::process::Process;
-pub use self::pipe::PipeWatcher;
+pub use self::pipe::{PipeWatcher, PipeListener, PipeAcceptor};
pub use self::signal::SignalWatcher;
pub use self::tty::TtyWatcher;
priv handle: *uvll::uv_loop_t
}
-pub struct Handle(*uvll::uv_handle_t);
-
-impl Watcher for Handle {}
-impl NativeHandle<*uvll::uv_handle_t> for Handle {
- fn from_native_handle(h: *uvll::uv_handle_t) -> Handle { Handle(h) }
- fn native_handle(&self) -> *uvll::uv_handle_t { **self }
-}
-
-/// The trait implemented by uv 'watchers' (handles). Watchers are
-/// non-owning wrappers around the uv handles and are not completely
-/// safe - there may be multiple instances for a single underlying
-/// handle. Watchers are generally created, then `start`ed, `stop`ed
-/// and `close`ed, but due to their complex life cycle may not be
-/// entirely memory safe if used in unanticipated patterns.
-pub trait Watcher { }
-
-pub trait Request { }
-
/// A type that wraps a native handle
pub trait NativeHandle<T> {
fn from_native_handle(T) -> Self;
}
}
-pub trait UvRequest<T> {
- fn uv_request(&self) -> *T;
+pub struct Request {
+ handle: *uvll::uv_req_t,
+}
- // FIXME(#8888) dummy self
- fn alloc(_: Option<Self>, ty: uvll::uv_req_type) -> *T {
- unsafe {
- let handle = uvll::malloc_req(ty);
- assert!(!handle.is_null());
- handle as *T
- }
+impl Request {
+ pub fn new(ty: uvll::uv_req_type) -> Request {
+ Request::wrap(unsafe { uvll::malloc_req(ty) })
}
- unsafe fn from_uv_request<'a>(h: &'a *T) -> &'a mut Self {
- cast::transmute(uvll::get_data_for_req(*h))
+ pub fn wrap(handle: *uvll::uv_req_t) -> Request {
+ Request { handle: handle }
}
- fn install(~self) -> ~Self {
- unsafe {
- let myptr = cast::transmute::<&~Self, &*u8>(&self);
- uvll::set_data_for_req(self.uv_request(), *myptr);
- }
- self
+ pub fn set_data<T>(&self, t: *T) {
+ unsafe { uvll::set_data_for_req(self.handle, t) }
}
- fn delete(&mut self) {
- unsafe { uvll::free_req(self.uv_request() as *c_void) }
+ pub fn get_data(&self) -> *c_void {
+ unsafe { uvll::get_data_for_req(self.handle) }
+ }
+
+ // This function should be used when the request handle has been given to an
+ // underlying uv function, and the uv function has succeeded. This means
+ // that uv will at some point invoke the callback, and in the meantime we
+ // can't deallocate the handle because libuv could be using it.
+ //
+ // This is still a problem in blocking situations due to linked failure. In
+ // the connection callback the handle should be re-wrapped with the `wrap`
+ // function to ensure its destruction.
+ pub fn defuse(mut self) {
+ self.handle = ptr::null();
+ }
+}
+
+impl Drop for Request {
+ fn drop(&mut self) {
+ unsafe {
+ if self.handle != ptr::null() {
+ uvll::free_req(self.handle)
+ }
+ }
}
}
}
}
-// XXX: The uv alloc callback also has a *uv_handle_t arg
-pub type AllocCallback = ~fn(uint) -> Buf;
-pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option<UvError>);
-pub type NullCallback = ~fn();
-pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
-pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option<UvError>);
-pub type UdpSendCallback = ~fn(UdpWatcher, Option<UvError>);
-
-
-/// Callbacks used by StreamWatchers, set as custom data on the foreign handle.
-/// XXX: Would be better not to have all watchers allocate room for all callback types.
-struct WatcherData {
- read_cb: Option<ReadCallback>,
- write_cb: Option<ConnectionCallback>,
- connect_cb: Option<ConnectionCallback>,
- close_cb: Option<NullCallback>,
- alloc_cb: Option<AllocCallback>,
- udp_recv_cb: Option<UdpReceiveCallback>,
- udp_send_cb: Option<UdpSendCallback>,
-}
-
-pub trait WatcherInterop {
- fn event_loop(&self) -> Loop;
- fn install_watcher_data(&mut self);
- fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData;
- fn drop_watcher_data(&mut self);
- fn close(self, cb: NullCallback);
- fn close_async(self);
-}
-
-impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
- /// Get the uv event loop from a Watcher
- fn event_loop(&self) -> Loop {
- unsafe {
- let handle = self.native_handle();
- let loop_ = uvll::get_loop_for_uv_handle(handle);
- NativeHandle::from_native_handle(loop_)
- }
- }
-
- fn install_watcher_data(&mut self) {
- unsafe {
- let data = ~WatcherData {
- read_cb: None,
- write_cb: None,
- connect_cb: None,
- close_cb: None,
- alloc_cb: None,
- udp_recv_cb: None,
- udp_send_cb: None,
- };
- let data = transmute::<~WatcherData, *c_void>(data);
- uvll::set_data_for_uv_handle(self.native_handle(), data);
- }
- }
-
- fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData {
- unsafe {
- let data = uvll::get_data_for_uv_handle(self.native_handle());
- let data = transmute::<&*c_void, &mut ~WatcherData>(&data);
- return &mut **data;
- }
- }
-
- fn drop_watcher_data(&mut self) {
- unsafe {
- let data = uvll::get_data_for_uv_handle(self.native_handle());
- let _data = transmute::<*c_void, ~WatcherData>(data);
- uvll::set_data_for_uv_handle(self.native_handle(), null::<()>());
- }
- }
-
- fn close(mut self, cb: NullCallback) {
- {
- let data = self.get_watcher_data();
- assert!(data.close_cb.is_none());
- data.close_cb = Some(cb);
- }
-
- unsafe {
- uvll::uv_close(self.native_handle() as *uvll::uv_handle_t, close_cb);
- }
-
- extern fn close_cb(handle: *uvll::uv_handle_t) {
- let mut h: Handle = NativeHandle::from_native_handle(handle);
- h.get_watcher_data().close_cb.take_unwrap()();
- h.drop_watcher_data();
- unsafe { uvll::free_handle(handle as *c_void) }
- }
- }
-
- fn close_async(self) {
- unsafe {
- uvll::uv_close(self.native_handle() as *uvll::uv_handle_t, close_cb);
- }
-
- extern fn close_cb(handle: *uvll::uv_handle_t) {
- let mut h: Handle = NativeHandle::from_native_handle(handle);
- h.drop_watcher_data();
- unsafe { uvll::free_handle(handle as *c_void) }
- }
- }
-}
-
// XXX: Need to define the error constants like EOF so they can be
// compared to the UvError type
// option. This file may not be copied, modified, or distributed
// except according to those terms.
+use std::cast;
use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char};
-use std::vec;
+use std::ptr;
+use std::rt::BlockedTask;
+use std::rt::io::IoError;
+use std::rt::io::net::ip::{Ipv4Addr, Ipv6Addr};
+use std::rt::local::Local;
+use std::rt::io::net::ip::{SocketAddr, IpAddr};
+use std::rt::rtio;
+use std::rt::sched::{Scheduler, SchedHandle};
+use std::rt::tube::Tube;
use std::str;
-use std::rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
+use std::vec;
use uvll;
use uvll::*;
-use super::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback,
- UdpSendCallback, Loop, Watcher, Request, UvError, Buf, NativeHandle,
- status_to_maybe_uv_error, empty_buf};
+use super::{
+ Loop, Request, UvError, Buf, NativeHandle,
+ status_to_io_result,
+ uv_error_to_io_error, UvHandle, slice_to_uv_buf};
+use uvio::HomingIO;
+use stream::StreamWatcher;
-pub struct UvAddrInfo(*uvll::addrinfo);
+////////////////////////////////////////////////////////////////////////////////
+/// Generic functions related to dealing with sockaddr things
+////////////////////////////////////////////////////////////////////////////////
pub enum UvSocketAddr {
UvIpv4SocketAddr(*sockaddr_in),
assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
}
-// uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
-// and uv_file_t
-pub struct StreamWatcher(*uvll::uv_stream_t);
-impl Watcher for StreamWatcher { }
+enum SocketNameKind {
+ TcpPeer,
+ Tcp,
+ Udp
+}
-impl StreamWatcher {
- pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
- unsafe {
- match uvll::uv_read_start(self.native_handle(), alloc_cb, read_cb) {
- 0 => {
- let data = self.get_watcher_data();
- data.alloc_cb = Some(alloc);
- data.read_cb = Some(cb);
- }
- n => {
- cb(*self, 0, empty_buf(), Some(UvError(n)))
- }
- }
- }
+fn socket_name(sk: SocketNameKind, handle: *c_void) -> Result<SocketAddr, IoError> {
+ let getsockname = match sk {
+ TcpPeer => uvll::tcp_getpeername,
+ Tcp => uvll::tcp_getsockname,
+ Udp => uvll::udp_getsockname,
+ };
- extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
- let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
- let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref();
- return (*alloc_cb)(suggested_size as uint);
- }
+ // Allocate a sockaddr_storage
+ // since we don't know if it's ipv4 or ipv6
+ let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
- extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) {
- uvdebug!("buf addr: {}", buf.base);
- uvdebug!("buf len: {}", buf.len);
- let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
- let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
- let status = status_to_maybe_uv_error(nread as c_int);
- (*cb)(stream_watcher, nread as int, buf, status);
- }
- }
+ let r = unsafe {
+ getsockname(handle, r_addr as *uvll::sockaddr_storage)
+ };
- pub fn read_stop(&mut self) {
- // It would be nice to drop the alloc and read callbacks here,
- // but read_stop may be called from inside one of them and we
- // would end up freeing the in-use environment
- let handle = self.native_handle();
- unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); }
+ if r != 0 {
+ return Err(uv_error_to_io_error(UvError(r)));
}
- pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
- let req = WriteRequest::new();
- return unsafe {
- match uvll::uv_write(req.native_handle(), self.native_handle(),
- [buf], write_cb) {
- 0 => {
- let data = self.get_watcher_data();
- assert!(data.write_cb.is_none());
- data.write_cb = Some(cb);
- }
- n => {
- req.delete();
- cb(*self, Some(UvError(n)))
- }
- }
- };
-
- extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
- let write_request: WriteRequest = NativeHandle::from_native_handle(req);
- let mut stream_watcher = write_request.stream();
- write_request.delete();
- let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap();
- let status = status_to_maybe_uv_error(status);
- cb(stream_watcher, status);
+ let addr = unsafe {
+ if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
+ uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
+ } else {
+ uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
}
- }
+ };
+ unsafe { uvll::free_sockaddr_storage(r_addr); }
- pub fn listen(&mut self, cb: ConnectionCallback) -> Result<(), UvError> {
- {
- let data = self.get_watcher_data();
- assert!(data.connect_cb.is_none());
- data.connect_cb = Some(cb);
- }
+ Ok(addr)
- return unsafe {
- static BACKLOG: c_int = 128; // XXX should be configurable
- match uvll::uv_listen(self.native_handle(), BACKLOG, connection_cb) {
- 0 => Ok(()),
- n => Err(UvError(n))
- }
- };
+}
- extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
- uvdebug!("connection_cb");
- let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
- let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
- let status = status_to_maybe_uv_error(status);
- (*cb)(stream_watcher, status);
- }
- }
+////////////////////////////////////////////////////////////////////////////////
+/// TCP implementation
+////////////////////////////////////////////////////////////////////////////////
- pub fn accept(&mut self, stream: StreamWatcher) {
- let self_handle = self.native_handle() as *c_void;
- let stream_handle = stream.native_handle() as *c_void;
- assert_eq!(0, unsafe { uvll::uv_accept(self_handle, stream_handle) } );
- }
+pub struct TcpWatcher {
+ handle: *uvll::uv_tcp_t,
+ stream: StreamWatcher,
+ home: SchedHandle,
}
-impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
- fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
- StreamWatcher(handle)
- }
- fn native_handle(&self) -> *uvll::uv_stream_t {
- match self { &StreamWatcher(ptr) => ptr }
- }
+pub struct TcpListener {
+ home: SchedHandle,
+ handle: *uvll::uv_pipe_t,
+ priv closing_task: Option<BlockedTask>,
+ priv outgoing: Tube<Result<~rtio::RtioTcpStream, IoError>>,
+}
+
+pub struct TcpAcceptor {
+ listener: ~TcpListener,
+ priv incoming: Tube<Result<~rtio::RtioTcpStream, IoError>>,
}
-pub struct TcpWatcher(*uvll::uv_tcp_t);
-impl Watcher for TcpWatcher { }
+// TCP watchers (clients/streams)
impl TcpWatcher {
pub fn new(loop_: &Loop) -> TcpWatcher {
- unsafe {
- let handle = malloc_handle(UV_TCP);
- assert!(handle.is_not_null());
- assert_eq!(0, uvll::uv_tcp_init(loop_.native_handle(), handle));
- let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
- watcher.install_watcher_data();
- return watcher;
+ let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
+ assert_eq!(unsafe {
+ uvll::uv_tcp_init(loop_.native_handle(), handle)
+ }, 0);
+ TcpWatcher {
+ home: get_handle_to_current_scheduler!(),
+ handle: handle,
+ stream: StreamWatcher::new(handle),
}
}
- pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
- do socket_addr_as_uv_socket_addr(address) |addr| {
- let result = unsafe {
- match addr {
- UvIpv4SocketAddr(addr) => uvll::tcp_bind(self.native_handle(), addr),
- UvIpv6SocketAddr(addr) => uvll::tcp_bind6(self.native_handle(), addr),
- }
+ pub fn connect(loop_: &mut Loop, address: SocketAddr)
+ -> Result<TcpWatcher, UvError>
+ {
+ struct Ctx { status: c_int, task: Option<BlockedTask> }
+
+ let tcp = TcpWatcher::new(loop_);
+ let ret = do socket_addr_as_uv_socket_addr(address) |addr| {
+ let req = Request::new(uvll::UV_CONNECT);
+ let result = match addr {
+ UvIpv4SocketAddr(addr) => unsafe {
+ uvll::tcp_connect(req.handle, tcp.handle, addr,
+ connect_cb)
+ },
+ UvIpv6SocketAddr(addr) => unsafe {
+ uvll::tcp_connect6(req.handle, tcp.handle, addr,
+ connect_cb)
+ },
};
match result {
- 0 => Ok(()),
- _ => Err(UvError(result)),
+ 0 => {
+ req.defuse();
+ let mut cx = Ctx { status: 0, task: None };
+ let scheduler: ~Scheduler = Local::take();
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ cx.task = Some(task);
+ }
+ match cx.status {
+ 0 => Ok(()),
+ n => Err(UvError(n)),
+ }
+ }
+ n => Err(UvError(n))
}
- }
- }
+ };
- pub fn connect(&mut self, address: SocketAddr, cb: ConnectionCallback) {
- unsafe {
- assert!(self.get_watcher_data().connect_cb.is_none());
- self.get_watcher_data().connect_cb = Some(cb);
-
- let connect_handle = ConnectRequest::new().native_handle();
- uvdebug!("connect_t: {}", connect_handle);
- do socket_addr_as_uv_socket_addr(address) |addr| {
- let result = match addr {
- UvIpv4SocketAddr(addr) => uvll::tcp_connect(connect_handle,
- self.native_handle(), addr, connect_cb),
- UvIpv6SocketAddr(addr) => uvll::tcp_connect6(connect_handle,
- self.native_handle(), addr, connect_cb),
- };
- assert_eq!(0, result);
- }
+ return match ret {
+ Ok(()) => Ok(tcp),
+ Err(e) => Err(e),
+ };
- extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
- uvdebug!("connect_t: {}", req);
- let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
- let mut stream_watcher = connect_request.stream();
- connect_request.delete();
- let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
- let status = status_to_maybe_uv_error(status);
- cb(stream_watcher, status);
- }
+ extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
+ let _req = Request::wrap(req);
+ if status == uvll::ECANCELED { return }
+ let cx: &mut Ctx = unsafe {
+ cast::transmute(uvll::get_data_for_req(req))
+ };
+ cx.status = status;
+ let scheduler: ~Scheduler = Local::take();
+ scheduler.resume_blocked_task_immediately(cx.task.take_unwrap());
}
}
+}
- pub fn as_stream(&self) -> StreamWatcher {
- NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t)
+impl HomingIO for TcpWatcher {
+ fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+}
+
+impl rtio::RtioSocket for TcpWatcher {
+ fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
+ let _m = self.fire_missiles();
+ socket_name(Tcp, self.handle)
}
}
-impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
- fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
- TcpWatcher(handle)
+impl rtio::RtioTcpStream for TcpWatcher {
+ fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
+ let _m = self.fire_missiles();
+ self.stream.read(buf).map_err(uv_error_to_io_error)
}
- fn native_handle(&self) -> *uvll::uv_tcp_t {
- match self { &TcpWatcher(ptr) => ptr }
+
+ fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
+ let _m = self.fire_missiles();
+ self.stream.write(buf).map_err(uv_error_to_io_error)
}
-}
-pub struct UdpWatcher(*uvll::uv_udp_t);
-impl Watcher for UdpWatcher { }
+ fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
+ let _m = self.fire_missiles();
+ socket_name(TcpPeer, self.handle)
+ }
-impl UdpWatcher {
- pub fn new(loop_: &Loop) -> UdpWatcher {
- unsafe {
- let handle = malloc_handle(UV_UDP);
- assert!(handle.is_not_null());
- assert_eq!(0, uvll::uv_udp_init(loop_.native_handle(), handle));
- let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
- watcher.install_watcher_data();
- return watcher;
- }
+ fn control_congestion(&mut self) -> Result<(), IoError> {
+ let _m = self.fire_missiles();
+ status_to_io_result(unsafe {
+ uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
+ })
}
- pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
- do socket_addr_as_uv_socket_addr(address) |addr| {
- let result = unsafe {
- match addr {
- UvIpv4SocketAddr(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32),
- UvIpv6SocketAddr(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32),
- }
- };
- match result {
- 0 => Ok(()),
- _ => Err(UvError(result)),
- }
- }
+ fn nodelay(&mut self) -> Result<(), IoError> {
+ let _m = self.fire_missiles();
+ status_to_io_result(unsafe {
+ uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
+ })
}
- pub fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) {
- {
- let data = self.get_watcher_data();
- data.alloc_cb = Some(alloc);
- data.udp_recv_cb = Some(cb);
- }
+ fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
+ let _m = self.fire_missiles();
+ status_to_io_result(unsafe {
+ uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
+ delay_in_seconds as c_uint)
+ })
+ }
- unsafe { uvll::uv_udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
+ fn letdie(&mut self) -> Result<(), IoError> {
+ let _m = self.fire_missiles();
+ status_to_io_result(unsafe {
+ uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
+ })
+ }
+}
- extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
- let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
- let alloc_cb = udp_watcher.get_watcher_data().alloc_cb.get_ref();
- return (*alloc_cb)(suggested_size as uint);
- }
+impl Drop for TcpWatcher {
+ fn drop(&mut self) {
+ let _m = self.fire_missiles();
+ self.stream.close(true);
+ }
+}
- extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
- addr: *uvll::sockaddr, flags: c_uint) {
- // When there's no data to read the recv callback can be a no-op.
- // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
- // this we just drop back to kqueue and wait for the next callback.
- if nread == 0 {
- return;
+// TCP listeners (unbound servers)
+
+impl TcpListener {
+ pub fn bind(loop_: &mut Loop, address: SocketAddr)
+ -> Result<~TcpListener, UvError>
+ {
+ let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
+ assert_eq!(unsafe {
+ uvll::uv_tcp_init(loop_.native_handle(), handle)
+ }, 0);
+ let l = ~TcpListener {
+ home: get_handle_to_current_scheduler!(),
+ handle: handle,
+ closing_task: None,
+ outgoing: Tube::new(),
+ };
+ let res = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
+ match addr {
+ UvIpv4SocketAddr(addr) => uvll::tcp_bind(l.handle, addr),
+ UvIpv6SocketAddr(addr) => uvll::tcp_bind6(l.handle, addr),
}
-
- uvdebug!("buf addr: {}", buf.base);
- uvdebug!("buf len: {}", buf.len);
- let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
- let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
- let status = status_to_maybe_uv_error(nread as c_int);
- let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
- (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
+ });
+ match res {
+ 0 => Ok(l.install()),
+ n => Err(UvError(n))
}
}
+}
+
+impl HomingIO for TcpListener {
+ fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+}
+
+impl UvHandle<uvll::uv_tcp_t> for TcpListener {
+ fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
+}
- pub fn recv_stop(&mut self) {
- unsafe { uvll::uv_udp_recv_stop(self.native_handle()); }
+impl rtio::RtioSocket for TcpListener {
+ fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
+ let _m = self.fire_missiles();
+ socket_name(Tcp, self.handle)
}
+}
+
+impl rtio::RtioTcpListener for TcpListener {
+ fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
+ // create the acceptor object from ourselves
+ let incoming = self.outgoing.clone();
+ let mut acceptor = ~TcpAcceptor {
+ listener: self,
+ incoming: incoming,
+ };
- pub fn send(&mut self, buf: Buf, address: SocketAddr, cb: UdpSendCallback) {
- {
- let data = self.get_watcher_data();
- assert!(data.udp_send_cb.is_none());
- data.udp_send_cb = Some(cb);
+ let _m = acceptor.fire_missiles();
+ // XXX: the 128 backlog should be configurable
+ match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
+ 0 => Ok(acceptor as ~rtio::RtioTcpAcceptor),
+ n => Err(uv_error_to_io_error(UvError(n))),
}
+ }
+}
- let req = UdpSendRequest::new();
- do socket_addr_as_uv_socket_addr(address) |addr| {
- let result = unsafe {
- match addr {
- UvIpv4SocketAddr(addr) => uvll::udp_send(req.native_handle(),
- self.native_handle(), [buf], addr, send_cb),
- UvIpv6SocketAddr(addr) => uvll::udp_send6(req.native_handle(),
- self.native_handle(), [buf], addr, send_cb),
- }
- };
- assert_eq!(0, result);
+extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
+ let msg = match status {
+ 0 => {
+ let loop_ = NativeHandle::from_native_handle(unsafe {
+ uvll::get_loop_for_uv_handle(server)
+ });
+ let client = TcpWatcher::new(&loop_);
+ assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
+ Ok(~client as ~rtio::RtioTcpStream)
}
+ uvll::ECANCELED => return,
+ n => Err(uv_error_to_io_error(UvError(n)))
+ };
- extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
- let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
- let mut udp_watcher = send_request.handle();
- send_request.delete();
- let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap();
- let status = status_to_maybe_uv_error(status);
- cb(udp_watcher, status);
+ let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
+ tcp.outgoing.send(msg);
+}
+
+impl Drop for TcpListener {
+ fn drop(&mut self) {
+ let (_m, sched) = self.fire_missiles_sched();
+
+ do sched.deschedule_running_task_and_then |_, task| {
+ self.closing_task = Some(task);
+ unsafe { uvll::uv_close(self.handle, listener_close_cb) }
}
}
}
-impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
- fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
- UdpWatcher(handle)
- }
- fn native_handle(&self) -> *uvll::uv_udp_t {
- match self { &UdpWatcher(ptr) => ptr }
- }
+extern fn listener_close_cb(handle: *uvll::uv_handle_t) {
+ let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&handle) };
+ unsafe { uvll::free_handle(handle) }
+
+ let sched: ~Scheduler = Local::take();
+ sched.resume_blocked_task_immediately(tcp.closing_task.take_unwrap());
}
-// uv_connect_t is a subclass of uv_req_t
-pub struct ConnectRequest(*uvll::uv_connect_t);
-impl Request for ConnectRequest { }
+// TCP acceptors (bound servers)
+
+impl HomingIO for TcpAcceptor {
+ fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
+}
-impl ConnectRequest {
+impl rtio::RtioSocket for TcpAcceptor {
+ fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
+ let _m = self.fire_missiles();
+ socket_name(Tcp, self.listener.handle)
+ }
+}
- pub fn new() -> ConnectRequest {
- let connect_handle = unsafe { malloc_req(UV_CONNECT) };
- assert!(connect_handle.is_not_null());
- ConnectRequest(connect_handle as *uvll::uv_connect_t)
+impl rtio::RtioTcpAcceptor for TcpAcceptor {
+ fn accept(&mut self) -> Result<~rtio::RtioTcpStream, IoError> {
+ let _m = self.fire_missiles();
+ self.incoming.recv()
}
- fn stream(&self) -> StreamWatcher {
- unsafe {
- let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle());
- NativeHandle::from_native_handle(stream_handle)
- }
+ fn accept_simultaneously(&mut self) -> Result<(), IoError> {
+ let _m = self.fire_missiles();
+ status_to_io_result(unsafe {
+ uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
+ })
}
- fn delete(self) {
- unsafe { free_req(self.native_handle() as *c_void) }
+ fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
+ let _m = self.fire_missiles();
+ status_to_io_result(unsafe {
+ uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
+ })
}
}
-impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
- fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
- ConnectRequest(handle)
+////////////////////////////////////////////////////////////////////////////////
+/// UDP implementation
+////////////////////////////////////////////////////////////////////////////////
+
+pub struct UdpWatcher {
+ handle: *uvll::uv_udp_t,
+ home: SchedHandle,
+}
+
+impl UdpWatcher {
+ pub fn bind(loop_: &Loop, address: SocketAddr)
+ -> Result<UdpWatcher, UvError>
+ {
+ let udp = UdpWatcher {
+ handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
+ home: get_handle_to_current_scheduler!(),
+ };
+ assert_eq!(unsafe {
+ uvll::uv_udp_init(loop_.native_handle(), udp.handle)
+ }, 0);
+ let result = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
+ match addr {
+ UvIpv4SocketAddr(addr) => uvll::udp_bind(udp.handle, addr, 0u32),
+ UvIpv6SocketAddr(addr) => uvll::udp_bind6(udp.handle, addr, 0u32),
+ }
+ });
+ match result {
+ 0 => Ok(udp),
+ n => Err(UvError(n)),
+ }
}
- fn native_handle(&self) -> *uvll::uv_connect_t {
- match self { &ConnectRequest(ptr) => ptr }
+}
+
+impl HomingIO for UdpWatcher {
+ fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+}
+
+impl rtio::RtioSocket for UdpWatcher {
+ fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
+ let _m = self.fire_missiles();
+ socket_name(Udp, self.handle)
}
}
-pub struct WriteRequest(*uvll::uv_write_t);
+impl rtio::RtioUdpSocket for UdpWatcher {
+ fn recvfrom(&mut self, buf: &mut [u8])
+ -> Result<(uint, SocketAddr), IoError>
+ {
+ struct Ctx {
+ task: Option<BlockedTask>,
+ buf: Option<Buf>,
+ result: Option<(ssize_t, SocketAddr)>,
+ }
+ let _m = self.fire_missiles();
+
+ return match unsafe {
+ uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
+ } {
+ 0 => {
+ let mut cx = Ctx {
+ task: None,
+ buf: Some(slice_to_uv_buf(buf)),
+ result: None,
+ };
+ unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) }
+ let scheduler: ~Scheduler = Local::take();
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ cx.task = Some(task);
+ }
+ match cx.result.take_unwrap() {
+ (n, _) if n < 0 =>
+ Err(uv_error_to_io_error(UvError(n as c_int))),
+ (n, addr) => Ok((n as uint, addr))
+ }
+ }
+ n => Err(uv_error_to_io_error(UvError(n)))
+ };
+
+ extern fn alloc_cb(handle: *uvll::uv_udp_t,
+ _suggested_size: size_t) -> Buf {
+ let cx: &mut Ctx = unsafe {
+ cast::transmute(uvll::get_data_for_uv_handle(handle))
+ };
+ cx.buf.take().expect("alloc_cb called more than once")
+ }
+
+ extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, _buf: Buf,
+ addr: *uvll::sockaddr, _flags: c_uint) {
-impl Request for WriteRequest { }
+ // When there's no data to read the recv callback can be a no-op.
+ // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
+ // this we just drop back to kqueue and wait for the next callback.
+ if nread == 0 { return }
+ if nread == uvll::ECANCELED as ssize_t { return }
-impl WriteRequest {
- pub fn new() -> WriteRequest {
- let write_handle = unsafe { malloc_req(UV_WRITE) };
- assert!(write_handle.is_not_null());
- WriteRequest(write_handle as *uvll::uv_write_t)
+ unsafe {
+ assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
+ }
+
+ let cx: &mut Ctx = unsafe {
+ cast::transmute(uvll::get_data_for_uv_handle(handle))
+ };
+ let addr = sockaddr_to_UvSocketAddr(addr);
+ let addr = uv_socket_addr_to_socket_addr(addr);
+ cx.result = Some((nread, addr));
+
+ let sched: ~Scheduler = Local::take();
+ sched.resume_blocked_task_immediately(cx.task.take_unwrap());
+ }
}
- pub fn stream(&self) -> StreamWatcher {
- unsafe {
- let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle());
- NativeHandle::from_native_handle(stream_handle)
+ fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
+ struct Ctx { task: Option<BlockedTask>, result: c_int }
+
+ let _m = self.fire_missiles();
+
+ let req = Request::new(uvll::UV_UDP_SEND);
+ let buf = slice_to_uv_buf(buf);
+ let result = socket_addr_as_uv_socket_addr(dst, |dst| unsafe {
+ match dst {
+ UvIpv4SocketAddr(dst) =>
+ uvll::udp_send(req.handle, self.handle, [buf], dst, send_cb),
+ UvIpv6SocketAddr(dst) =>
+ uvll::udp_send6(req.handle, self.handle, [buf], dst, send_cb),
+ }
+ });
+
+ return match result {
+ 0 => {
+ let mut cx = Ctx { task: None, result: 0 };
+ req.set_data(&cx);
+ req.defuse();
+
+ let sched: ~Scheduler = Local::take();
+ do sched.deschedule_running_task_and_then |_, task| {
+ cx.task = Some(task);
+ }
+
+ match cx.result {
+ 0 => Ok(()),
+ n => Err(uv_error_to_io_error(UvError(n)))
+ }
+ }
+ n => Err(uv_error_to_io_error(UvError(n)))
+ };
+
+ extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
+ let req = Request::wrap(req);
+ let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
+ cx.result = status;
+
+ let sched: ~Scheduler = Local::take();
+ sched.resume_blocked_task_immediately(cx.task.take_unwrap());
}
}
- pub fn delete(self) {
- unsafe { free_req(self.native_handle() as *c_void) }
+ fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
+ let _m = self.fire_missiles();
+ status_to_io_result(unsafe {
+ do multi.to_str().with_c_str |m_addr| {
+ uvll::uv_udp_set_membership(self.handle,
+ m_addr, ptr::null(),
+ uvll::UV_JOIN_GROUP)
+ }
+ })
}
-}
-impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
- fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
- WriteRequest(handle)
+ fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
+ let _m = self.fire_missiles();
+ status_to_io_result(unsafe {
+ do multi.to_str().with_c_str |m_addr| {
+ uvll::uv_udp_set_membership(self.handle,
+ m_addr, ptr::null(),
+ uvll::UV_LEAVE_GROUP)
+ }
+ })
}
- fn native_handle(&self) -> *uvll::uv_write_t {
- match self { &WriteRequest(ptr) => ptr }
+
+ fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
+ let _m = self.fire_missiles();
+ status_to_io_result(unsafe {
+ uvll::uv_udp_set_multicast_loop(self.handle,
+ 1 as c_int)
+ })
}
-}
-pub struct UdpSendRequest(*uvll::uv_udp_send_t);
-impl Request for UdpSendRequest { }
+ fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
+ let _m = self.fire_missiles();
+ status_to_io_result(unsafe {
+ uvll::uv_udp_set_multicast_loop(self.handle,
+ 0 as c_int)
+ })
+ }
-impl UdpSendRequest {
- pub fn new() -> UdpSendRequest {
- let send_handle = unsafe { malloc_req(UV_UDP_SEND) };
- assert!(send_handle.is_not_null());
- UdpSendRequest(send_handle as *uvll::uv_udp_send_t)
+ fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
+ let _m = self.fire_missiles();
+ status_to_io_result(unsafe {
+ uvll::uv_udp_set_multicast_ttl(self.handle,
+ ttl as c_int)
+ })
}
- pub fn handle(&self) -> UdpWatcher {
- let send_request_handle = unsafe {
- uvll::get_udp_handle_from_send_req(self.native_handle())
- };
- NativeHandle::from_native_handle(send_request_handle)
+ fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
+ let _m = self.fire_missiles();
+ status_to_io_result(unsafe {
+ uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
+ })
}
- pub fn delete(self) {
- unsafe { free_req(self.native_handle() as *c_void) }
+ fn hear_broadcasts(&mut self) -> Result<(), IoError> {
+ let _m = self.fire_missiles();
+ status_to_io_result(unsafe {
+ uvll::uv_udp_set_broadcast(self.handle,
+ 1 as c_int)
+ })
}
-}
-impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
- fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
- UdpSendRequest(handle)
+ fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
+ let _m = self.fire_missiles();
+ status_to_io_result(unsafe {
+ uvll::uv_udp_set_broadcast(self.handle,
+ 0 as c_int)
+ })
}
- fn native_handle(&self) -> *uvll::uv_udp_send_t {
- match self { &UdpSendRequest(ptr) => ptr }
+}
+
+impl Drop for UdpWatcher {
+ fn drop(&mut self) {
+ // Send ourselves home to close this handle (blocking while doing so).
+ let (_m, sched) = self.fire_missiles_sched();
+ let mut slot = None;
+ unsafe {
+ uvll::set_data_for_uv_handle(self.handle, &slot);
+ uvll::uv_close(self.handle, close_cb);
+ }
+ do sched.deschedule_running_task_and_then |_, task| {
+ slot = Some(task);
+ }
+
+ extern fn close_cb(handle: *uvll::uv_handle_t) {
+ let slot: &mut Option<BlockedTask> = unsafe {
+ cast::transmute(uvll::get_data_for_uv_handle(handle))
+ };
+ let sched: ~Scheduler = Local::take();
+ sched.resume_blocked_task_immediately(slot.take_unwrap());
+ }
}
}
+////////////////////////////////////////////////////////////////////////////////
+/// UV request support
+////////////////////////////////////////////////////////////////////////////////
+
#[cfg(test)]
mod test {
use super::*;
use std::rt::tube::Tube;
use stream::StreamWatcher;
-use super::{Loop, UvError, NativeHandle, uv_error_to_io_error, UvHandle};
+use super::{Loop, UvError, NativeHandle, uv_error_to_io_error, UvHandle, Request};
use uvio::HomingIO;
use uvll;
result: Option<Result<PipeWatcher, UvError>>,
}
let mut cx = Ctx { task: None, result: None };
- let req = unsafe { uvll::malloc_req(uvll::UV_CONNECT) };
- unsafe { uvll::set_data_for_req(req, &cx as *Ctx) }
+ let req = Request::new(uvll::UV_CONNECT);
+ unsafe {
+ uvll::set_data_for_req(req.handle, &cx as *Ctx);
+ uvll::uv_pipe_connect(req.handle,
+ PipeWatcher::alloc(loop_, false),
+ name.with_ref(|p| p),
+ connect_cb)
+ }
+ req.defuse();
let sched: ~Scheduler = Local::take();
do sched.deschedule_running_task_and_then |_, task| {
cx.task = Some(task);
- unsafe {
- uvll::uv_pipe_connect(req,
- PipeWatcher::alloc(loop_, false),
- name.with_ref(|p| p),
- connect_cb)
- }
}
assert!(cx.task.is_none());
return cx.result.take().expect("pipe connect needs a result");
extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {
+ let _req = Request::wrap(req);
+ if status == uvll::ECANCELED { return }
unsafe {
let cx: &mut Ctx = cast::transmute(uvll::get_data_for_req(req));
let stream = uvll::get_stream_handle_from_connect_req(req);
Err(UvError(n))
}
});
- uvll::free_req(req);
let sched: ~Scheduler = Local::take();
sched.resume_blocked_task_immediately(cx.task.take_unwrap());
assert_eq!(unsafe { uvll::uv_accept(server, client) }, 0);
Ok(~PipeWatcher::new(client) as ~RtioPipe)
}
+ uvll::ECANCELED => return,
n => Err(uv_error_to_io_error(UvError(n)))
};
// option. This file may not be copied, modified, or distributed
// except according to those terms.
-use std::cell::Cell;
use std::libc::c_int;
use std::libc;
use std::ptr;
}
}
- let ret_io = Cell::new(ret_io);
- do with_argv(config.program, config.args) |argv| {
+ let ret = do with_argv(config.program, config.args) |argv| {
do with_env(config.env) |envp| {
let options = uvll::uv_process_options_t {
exit_cb: on_exit,
exit_status: None,
term_signal: None,
};
- Ok((process.install(), ret_io.take()))
+ Ok(process.install())
}
err => {
unsafe { uvll::free_handle(handle) }
}
}
}
+ };
+
+ match ret {
+ Ok(p) => Ok((p, ret_io)),
+ Err(e) => Err(e),
}
}
}
use std::rt::local::Local;
use std::rt::sched::Scheduler;
-use super::{UvError, Buf, slice_to_uv_buf};
+use super::{UvError, Buf, slice_to_uv_buf, Request};
use uvll;
// This is a helper structure which is intended to get embedded into other
// every call to uv_write(). Ideally this would be a stack-allocated
// structure, but currently we don't have mappings for all the structures
// defined in libuv, so we're foced to malloc this.
- priv last_write_req: Option<*uvll::uv_write_t>,
+ priv last_write_req: Option<Request>,
}
struct ReadContext {
buf: Option<Buf>,
- result: Option<Result<uint, UvError>>,
+ result: ssize_t,
task: Option<BlockedTask>,
}
struct WriteContext {
- result: Option<Result<(), UvError>>,
+ result: c_int,
task: Option<BlockedTask>,
}
0 => {
let mut rcx = ReadContext {
buf: Some(slice_to_uv_buf(buf)),
- result: None,
+ result: 0,
task: None,
};
unsafe {
do scheduler.deschedule_running_task_and_then |_sched, task| {
rcx.task = Some(task);
}
- rcx.result.take().expect("no result in read stream?")
+ match rcx.result {
+ n if n < 0 => Err(UvError(n as c_int)),
+ n => Ok(n as uint),
+ }
}
n => Err(UvError(n))
}
pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> {
// Prepare the write request, either using a cached one or allocating a
// new one
- let req = match self.last_write_req {
- Some(req) => req,
- None => unsafe { uvll::malloc_req(uvll::UV_WRITE) },
- };
- self.last_write_req = Some(req);
- let mut wcx = WriteContext { result: None, task: None, };
- unsafe { uvll::set_data_for_req(req, &wcx as *WriteContext) }
+ if self.last_write_req.is_none() {
+ self.last_write_req = Some(Request::new(uvll::UV_WRITE));
+ }
+ let req = self.last_write_req.get_ref();
// Send off the request, but be careful to not block until we're sure
// that the write reqeust is queued. If the reqeust couldn't be queued,
// then we should return immediately with an error.
match unsafe {
- uvll::uv_write(req, self.handle, [slice_to_uv_buf(buf)], write_cb)
+ uvll::uv_write(req.handle, self.handle, [slice_to_uv_buf(buf)],
+ write_cb)
} {
0 => {
+ let mut wcx = WriteContext { result: 0, task: None, };
+ req.set_data(&wcx);
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_sched, task| {
wcx.task = Some(task);
}
- assert!(wcx.task.is_none());
- wcx.result.take().expect("no result in write stream?")
+ match wcx.result {
+ 0 => Ok(()),
+ n => Err(UvError(n)),
+ }
}
n => Err(UvError(n)),
}
// synchronously (the task is blocked) or asynchronously (the task is not
// block, but the handle is still deallocated).
pub fn close(&mut self, synchronous: bool) {
- // clean up the cached write request if we have one
- match self.last_write_req {
- Some(req) => unsafe { uvll::free_req(req) },
- None => {}
- }
-
if synchronous {
let mut closing_task = None;
unsafe {
// XXX: Is there a performance impact to calling
// stop here?
unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); }
+ rcx.result = nread;
- assert!(rcx.result.is_none());
- rcx.result = Some(match nread {
- n if n < 0 => Err(UvError(n as c_int)),
- n => Ok(n as uint),
- });
-
- let task = rcx.task.take().expect("read_cb needs a task");
let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task);
+ scheduler.resume_blocked_task_immediately(rcx.task.take_unwrap());
}
// Unlike reading, the WriteContext is stored in the uv_write_t request. Like
// reading, however, all this does is wake up the blocked task after squirreling
// away the error code as a result.
extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
+ if status == uvll::ECANCELED { return }
// Remember to not free the request because it is re-used between writes on
// the same stream.
- unsafe {
- let wcx: &mut WriteContext = cast::transmute(uvll::get_data_for_req(req));
- wcx.result = Some(match status {
- 0 => Ok(()),
- n => Err(UvError(n)),
- });
- let sched: ~Scheduler = Local::take();
- sched.resume_blocked_task_immediately(wcx.task.take_unwrap());
- }
+ let req = Request::wrap(req);
+ let wcx: &mut WriteContext = unsafe { cast::transmute(req.get_data()) };
+ wcx.result = status;
+
+ let sched: ~Scheduler = Local::take();
+ sched.resume_blocked_task_immediately(wcx.task.take_unwrap());
+ req.defuse();
}
// option. This file may not be copied, modified, or distributed
// except according to those terms.
-use std::cell::Cell;
use std::comm::{oneshot, stream, PortOne, ChanOne, SendDeferred};
use std::libc::c_int;
use std::rt::BlockedTask;
fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
let (port, chan) = oneshot();
- let chan = Cell::new(chan);
let _m = self.fire_missiles();
- self.action = Some(SendOnce(chan.take()));
+ self.action = Some(SendOnce(chan));
self.start(msecs, 0);
return port;
fn period(&mut self, msecs: u64) -> Port<()> {
let (port, chan) = stream();
- let chan = Cell::new(chan);
let _m = self.fire_missiles();
- self.action = Some(SendMany(chan.take()));
+ self.action = Some(SendMany(chan));
self.start(msecs, msecs);
return port;
use std::c_str::CString;
use std::cast::transmute;
use std::cast;
-use std::cell::Cell;
-use std::clone::Clone;
use std::comm::{SharedChan, GenericChan};
use std::libc;
-use std::libc::{c_int, c_uint, c_void};
-use std::ptr;
+use std::libc::c_int;
use std::str;
use std::rt::io;
use std::rt::io::IoError;
-use std::rt::io::net::ip::{SocketAddr, IpAddr};
-use std::rt::io::{standard_error, OtherIoError};
+use std::rt::io::net::ip::SocketAddr;
use std::rt::io::process::ProcessConfig;
use std::rt::local::Local;
use std::rt::rtio::*;
use std::rt::sched::{Scheduler, SchedHandle};
-use std::rt::tube::Tube;
use std::rt::task::Task;
use std::path::Path;
use std::libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY,
use super::*;
use idle::IdleWatcher;
-use net::{UvIpv4SocketAddr, UvIpv6SocketAddr};
use addrinfo::GetAddrInfoRequest;
-use pipe::PipeListener;
// XXX we should not be calling uvll functions in here.
}
}
-enum SocketNameKind {
- TcpPeer,
- Tcp,
- Udp
-}
-
-fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
- handle: U) -> Result<SocketAddr, IoError> {
- let getsockname = match sk {
- TcpPeer => uvll::tcp_getpeername,
- Tcp => uvll::tcp_getsockname,
- Udp => uvll::udp_getsockname,
- };
-
- // Allocate a sockaddr_storage
- // since we don't know if it's ipv4 or ipv6
- let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
-
- let r = unsafe {
- getsockname(handle.native_handle() as *c_void, r_addr as *uvll::sockaddr_storage)
- };
-
- if r != 0 {
- let status = status_to_maybe_uv_error(r);
- return Err(uv_error_to_io_error(status.unwrap()));
- }
-
- let addr = unsafe {
- if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
- net::uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
- } else {
- net::uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
- }
- };
-
- unsafe { uvll::free_sockaddr_storage(r_addr); }
-
- Ok(addr)
-
-}
-
// Obviously an Event Loop is always home.
pub struct UvEventLoop {
priv uvio: UvIoFactory
// Connect to an address and return a new stream
// NB: This blocks the task waiting on the connection.
// It would probably be better to return a future
- fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStream, IoError> {
- // Create a cell in the task to hold the result. We will fill
- // the cell before resuming the task.
- let result_cell = Cell::new_empty();
- let result_cell_ptr: *Cell<Result<~RtioTcpStream, IoError>> = &result_cell;
-
- // Block this task and take ownership, switch to scheduler context
- do task::unkillable { // FIXME(#8674)
- let scheduler: ~Scheduler = Local::take();
- do scheduler.deschedule_running_task_and_then |_, task| {
-
- let mut tcp = TcpWatcher::new(self.uv_loop());
- let task_cell = Cell::new(task);
-
- // Wait for a connection
- do tcp.connect(addr) |stream, status| {
- match status {
- None => {
- let tcp = NativeHandle::from_native_handle(stream.native_handle());
- let home = get_handle_to_current_scheduler!();
- let res = Ok(~UvTcpStream { watcher: tcp, home: home }
- as ~RtioTcpStream);
-
- // Store the stream in the task's stack
- unsafe { (*result_cell_ptr).put_back(res); }
-
- // Context switch
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
- Some(_) => {
- let task_cell = Cell::new(task_cell.take());
- do stream.close {
- let res = Err(uv_error_to_io_error(status.unwrap()));
- unsafe { (*result_cell_ptr).put_back(res); }
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
- }
- }
- }
- }
+ fn tcp_connect(&mut self, addr: SocketAddr)
+ -> Result<~RtioTcpStream, IoError>
+ {
+ match TcpWatcher::connect(self.uv_loop(), addr) {
+ Ok(t) => Ok(~t as ~RtioTcpStream),
+ Err(e) => Err(uv_error_to_io_error(e)),
}
-
- assert!(!result_cell.is_empty());
- return result_cell.take();
}
fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListener, IoError> {
- let mut watcher = TcpWatcher::new(self.uv_loop());
- match watcher.bind(addr) {
- Ok(_) => {
- let home = get_handle_to_current_scheduler!();
- Ok(~UvTcpListener::new(watcher, home) as ~RtioTcpListener)
- }
- Err(uverr) => {
- do task::unkillable { // FIXME(#8674)
- let scheduler: ~Scheduler = Local::take();
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- do watcher.as_stream().close {
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
- }
- Err(uv_error_to_io_error(uverr))
- }
- }
+ match TcpListener::bind(self.uv_loop(), addr) {
+ Ok(t) => Ok(t as ~RtioTcpListener),
+ Err(e) => Err(uv_error_to_io_error(e)),
}
}
fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocket, IoError> {
- let mut watcher = UdpWatcher::new(self.uv_loop());
- match watcher.bind(addr) {
- Ok(_) => {
- let home = get_handle_to_current_scheduler!();
- Ok(~UvUdpSocket { watcher: watcher, home: home } as ~RtioUdpSocket)
- }
- Err(uverr) => {
- do task::unkillable { // FIXME(#8674)
- let scheduler: ~Scheduler = Local::take();
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- do watcher.close {
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
- }
- Err(uv_error_to_io_error(uverr))
- }
- }
+ match UdpWatcher::bind(self.uv_loop(), addr) {
+ Ok(u) => Ok(~u as ~RtioUdpSocket),
+ Err(e) => Err(uv_error_to_io_error(e)),
}
}
}
}
-pub struct UvTcpListener {
- priv watcher : TcpWatcher,
- priv home: SchedHandle,
-}
-
-impl HomingIO for UvTcpListener {
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
-}
-
-impl UvTcpListener {
- fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
- UvTcpListener { watcher: watcher, home: home }
- }
-}
-
-impl Drop for UvTcpListener {
- fn drop(&mut self) {
- let (_m, sched) = self.fire_homing_missile_sched();
- do sched.deschedule_running_task_and_then |_, task| {
- let task = Cell::new(task);
- do self.watcher.as_stream().close {
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task.take());
- }
- }
- }
-}
-
-impl RtioSocket for UvTcpListener {
- fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
- let _m = self.fire_homing_missile();
- socket_name(Tcp, self.watcher)
- }
-}
-
-impl RtioTcpListener for UvTcpListener {
- fn listen(mut ~self) -> Result<~RtioTcpAcceptor, IoError> {
- let _m = self.fire_homing_missile();
- let acceptor = ~UvTcpAcceptor::new(*self);
- let incoming = Cell::new(acceptor.incoming.clone());
- let mut stream = acceptor.listener.watcher.as_stream();
- let res = do stream.listen |mut server, status| {
- do incoming.with_mut_ref |incoming| {
- let inc = match status {
- Some(_) => Err(standard_error(OtherIoError)),
- None => {
- let inc = TcpWatcher::new(&server.event_loop());
- // first accept call in the callback guarenteed to succeed
- server.accept(inc.as_stream());
- let home = get_handle_to_current_scheduler!();
- Ok(~UvTcpStream { watcher: inc, home: home }
- as ~RtioTcpStream)
- }
- };
- incoming.send(inc);
- }
- };
- match res {
- Ok(()) => Ok(acceptor as ~RtioTcpAcceptor),
- Err(e) => Err(uv_error_to_io_error(e)),
- }
- }
-}
-
-pub struct UvTcpAcceptor {
- priv listener: UvTcpListener,
- priv incoming: Tube<Result<~RtioTcpStream, IoError>>,
-}
-
-impl HomingIO for UvTcpAcceptor {
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
-}
-
-impl UvTcpAcceptor {
- fn new(listener: UvTcpListener) -> UvTcpAcceptor {
- UvTcpAcceptor { listener: listener, incoming: Tube::new() }
- }
-}
-
-impl RtioSocket for UvTcpAcceptor {
- fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
- let _m = self.fire_homing_missile();
- socket_name(Tcp, self.listener.watcher)
- }
-}
-
-fn accept_simultaneously(stream: StreamWatcher, a: int) -> Result<(), IoError> {
- let r = unsafe {
- uvll::uv_tcp_simultaneous_accepts(stream.native_handle(), a as c_int)
- };
- status_to_io_result(r)
-}
-
-impl RtioTcpAcceptor for UvTcpAcceptor {
- fn accept(&mut self) -> Result<~RtioTcpStream, IoError> {
- let _m = self.fire_homing_missile();
- self.incoming.recv()
- }
-
- fn accept_simultaneously(&mut self) -> Result<(), IoError> {
- let _m = self.fire_homing_missile();
- accept_simultaneously(self.listener.watcher.as_stream(), 1)
- }
-
- fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
- let _m = self.fire_homing_missile();
- accept_simultaneously(self.listener.watcher.as_stream(), 0)
- }
-}
-
-fn read_stream(mut watcher: StreamWatcher,
- scheduler: ~Scheduler,
- buf: &mut [u8]) -> Result<uint, IoError> {
- let result_cell = Cell::new_empty();
- let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
-
- let uv_buf = slice_to_uv_buf(buf);
- do scheduler.deschedule_running_task_and_then |_sched, task| {
- let task_cell = Cell::new(task);
- // XXX: We shouldn't reallocate these callbacks every
- // call to read
- let alloc: AllocCallback = |_| uv_buf;
- do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
-
- // Stop reading so that no read callbacks are
- // triggered before the user calls `read` again.
- // XXX: Is there a performance impact to calling
- // stop here?
- watcher.read_stop();
-
- let result = if status.is_none() {
- assert!(nread >= 0);
- Ok(nread as uint)
- } else {
- Err(uv_error_to_io_error(status.unwrap()))
- };
-
- unsafe { (*result_cell_ptr).put_back(result); }
-
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
- }
-
- assert!(!result_cell.is_empty());
- result_cell.take()
-}
-
-fn write_stream(mut watcher: StreamWatcher,
- scheduler: ~Scheduler,
- buf: &[u8]) -> Result<(), IoError> {
- let result_cell = Cell::new_empty();
- let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
- let buf_ptr: *&[u8] = &buf;
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
- do watcher.write(buf) |_watcher, status| {
- let result = if status.is_none() {
- Ok(())
- } else {
- Err(uv_error_to_io_error(status.unwrap()))
- };
-
- unsafe { (*result_cell_ptr).put_back(result); }
-
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
- }
-
- assert!(!result_cell.is_empty());
- result_cell.take()
-}
-
-pub struct UvTcpStream {
- priv watcher: TcpWatcher,
- priv home: SchedHandle,
-}
-
-impl HomingIO for UvTcpStream {
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
-}
-
-impl Drop for UvTcpStream {
- fn drop(&mut self) {
- let (_m, sched) = self.fire_homing_missile_sched();
- do sched.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- do self.watcher.as_stream().close {
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
- }
- }
-}
-
-impl RtioSocket for UvTcpStream {
- fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
- let _m = self.fire_homing_missile();
- socket_name(Tcp, self.watcher)
- }
-}
-
-impl RtioTcpStream for UvTcpStream {
- fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
- let (_m, scheduler) = self.fire_homing_missile_sched();
- read_stream(self.watcher.as_stream(), scheduler, buf)
- }
-
- fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
- let (_m, scheduler) = self.fire_homing_missile_sched();
- write_stream(self.watcher.as_stream(), scheduler, buf)
- }
-
- fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
- let _m = self.fire_homing_missile();
- socket_name(TcpPeer, self.watcher)
- }
-
- fn control_congestion(&mut self) -> Result<(), IoError> {
- let _m = self.fire_homing_missile();
- status_to_io_result(unsafe {
- uvll::uv_tcp_nodelay(self.watcher.native_handle(), 0 as c_int)
- })
- }
-
- fn nodelay(&mut self) -> Result<(), IoError> {
- let _m = self.fire_homing_missile();
- status_to_io_result(unsafe {
- uvll::uv_tcp_nodelay(self.watcher.native_handle(), 1 as c_int)
- })
- }
-
- fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
- let _m = self.fire_homing_missile();
- status_to_io_result(unsafe {
- uvll::uv_tcp_keepalive(self.watcher.native_handle(), 1 as c_int,
- delay_in_seconds as c_uint)
- })
- }
-
- fn letdie(&mut self) -> Result<(), IoError> {
- let _m = self.fire_homing_missile();
- status_to_io_result(unsafe {
- uvll::uv_tcp_keepalive(self.watcher.native_handle(),
- 0 as c_int, 0 as c_uint)
- })
- }
-}
-
-pub struct UvUdpSocket {
- priv watcher: UdpWatcher,
- priv home: SchedHandle,
-}
-
-impl HomingIO for UvUdpSocket {
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
-}
-
-impl Drop for UvUdpSocket {
- fn drop(&mut self) {
- let (_m, scheduler) = self.fire_homing_missile_sched();
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- do self.watcher.close {
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
- }
- }
-}
-
-impl RtioSocket for UvUdpSocket {
- fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
- let _m = self.fire_homing_missile();
- socket_name(Udp, self.watcher)
- }
-}
-
-impl RtioUdpSocket for UvUdpSocket {
- fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
- let (_m, scheduler) = self.fire_homing_missile_sched();
- let result_cell = Cell::new_empty();
- let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
-
- let buf_ptr: *&mut [u8] = &buf;
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
- do self.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
- let _ = flags; // /XXX add handling for partials?
-
- watcher.recv_stop();
-
- let result = match status {
- None => {
- assert!(nread >= 0);
- Ok((nread as uint, addr))
- }
- Some(err) => Err(uv_error_to_io_error(err)),
- };
-
- unsafe { (*result_cell_ptr).put_back(result); }
-
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
- }
-
- assert!(!result_cell.is_empty());
- result_cell.take()
- }
-
- fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
- let (_m, scheduler) = self.fire_homing_missile_sched();
- let result_cell = Cell::new_empty();
- let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
- let buf_ptr: *&[u8] = &buf;
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
- do self.watcher.send(buf, dst) |_watcher, status| {
-
- let result = match status {
- None => Ok(()),
- Some(err) => Err(uv_error_to_io_error(err)),
- };
-
- unsafe { (*result_cell_ptr).put_back(result); }
-
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
- }
-
- assert!(!result_cell.is_empty());
- result_cell.take()
- }
-
- fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
- let _m = self.fire_homing_missile();
- status_to_io_result(unsafe {
- do multi.to_str().with_c_str |m_addr| {
- uvll::uv_udp_set_membership(self.watcher.native_handle(),
- m_addr, ptr::null(),
- uvll::UV_JOIN_GROUP)
- }
- })
- }
-
- fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
- let _m = self.fire_homing_missile();
- status_to_io_result(unsafe {
- do multi.to_str().with_c_str |m_addr| {
- uvll::uv_udp_set_membership(self.watcher.native_handle(),
- m_addr, ptr::null(),
- uvll::UV_LEAVE_GROUP)
- }
- })
- }
-
- fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
- let _m = self.fire_homing_missile();
- status_to_io_result(unsafe {
- uvll::uv_udp_set_multicast_loop(self.watcher.native_handle(),
- 1 as c_int)
- })
- }
-
- fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
- let _m = self.fire_homing_missile();
- status_to_io_result(unsafe {
- uvll::uv_udp_set_multicast_loop(self.watcher.native_handle(),
- 0 as c_int)
- })
- }
-
- fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
- let _m = self.fire_homing_missile();
- status_to_io_result(unsafe {
- uvll::uv_udp_set_multicast_ttl(self.watcher.native_handle(),
- ttl as c_int)
- })
- }
-
- fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
- let _m = self.fire_homing_missile();
- status_to_io_result(unsafe {
- uvll::uv_udp_set_ttl(self.watcher.native_handle(), ttl as c_int)
- })
- }
-
- fn hear_broadcasts(&mut self) -> Result<(), IoError> {
- let _m = self.fire_homing_missile();
- status_to_io_result(unsafe {
- uvll::uv_udp_set_broadcast(self.watcher.native_handle(),
- 1 as c_int)
- })
- }
-
- fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
- let _m = self.fire_homing_missile();
- status_to_io_result(unsafe {
- uvll::uv_udp_set_broadcast(self.watcher.native_handle(),
- 0 as c_int)
- })
- }
-}
-
// this function is full of lies
unsafe fn local_io() -> &'static mut IoFactory {
do Local::borrow |sched: &mut Scheduler| {
pub static ENOTCONN: c_int = -4054;
pub static EPIPE: c_int = -4048;
pub static ECONNABORTED: c_int = -4080;
+ pub static ECANCELED: c_int = -4082;
}
#[cfg(not(windows))]
pub mod errors {
pub static ENOTCONN: c_int = -libc::ENOTCONN;
pub static EPIPE: c_int = -libc::EPIPE;
pub static ECONNABORTED: c_int = -libc::ECONNABORTED;
+ pub static ECANCELED : c_int = -libc::ECANCELED;
}
pub static PROCESS_SETUID: c_int = 1 << 0;
}
pub type uv_handle_t = c_void;
+pub type uv_req_t = c_void;
pub type uv_loop_t = c_void;
pub type uv_idle_t = c_void;
pub type uv_tcp_t = c_void;