1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
16 use std::rt::rtio::IoError;
17 use std::rt::task::BlockedTask;
19 use homing::{HomingIO, HomeHandle};
21 use stream::StreamWatcher;
22 use super::{Loop, Request, UvError, Buf, status_to_io_result,
23 uv_error_to_io_error, UvHandle, slice_to_uv_buf,
24 wait_until_woken_after, wakeup};
25 use timeout::{AccessTimeout, AcceptTimeout, ConnectCtx};
26 use uvio::UvIoFactory;
29 ////////////////////////////////////////////////////////////////////////////////
30 /// Generic functions related to dealing with sockaddr things
31 ////////////////////////////////////////////////////////////////////////////////
33 pub fn htons(u: u16) -> u16 { mem::to_be16(u) }
34 pub fn ntohs(u: u16) -> u16 { mem::from_be16(u) }
36 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
37 len: uint) -> rtio::SocketAddr {
38 match storage.ss_family as c_int {
40 assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
41 let storage: &libc::sockaddr_in = unsafe {
42 mem::transmute(storage)
44 let ip = mem::to_be32(storage.sin_addr.s_addr as u32);
45 let a = (ip >> 24) as u8;
46 let b = (ip >> 16) as u8;
47 let c = (ip >> 8) as u8;
48 let d = (ip >> 0) as u8;
50 ip: rtio::Ipv4Addr(a, b, c, d),
51 port: ntohs(storage.sin_port),
55 assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
56 let storage: &libc::sockaddr_in6 = unsafe {
57 mem::transmute(storage)
59 let a = ntohs(storage.sin6_addr.s6_addr[0]);
60 let b = ntohs(storage.sin6_addr.s6_addr[1]);
61 let c = ntohs(storage.sin6_addr.s6_addr[2]);
62 let d = ntohs(storage.sin6_addr.s6_addr[3]);
63 let e = ntohs(storage.sin6_addr.s6_addr[4]);
64 let f = ntohs(storage.sin6_addr.s6_addr[5]);
65 let g = ntohs(storage.sin6_addr.s6_addr[6]);
66 let h = ntohs(storage.sin6_addr.s6_addr[7]);
68 ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h),
69 port: ntohs(storage.sin6_port),
73 fail!("unknown family {}", n);
78 fn addr_to_sockaddr(addr: rtio::SocketAddr) -> (libc::sockaddr_storage, uint) {
80 let mut storage: libc::sockaddr_storage = mem::zeroed();
81 let len = match addr.ip {
82 rtio::Ipv4Addr(a, b, c, d) => {
83 let ip = (a as u32 << 24) |
87 let storage: &mut libc::sockaddr_in =
88 mem::transmute(&mut storage);
89 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
90 (*storage).sin_port = htons(addr.port);
91 (*storage).sin_addr = libc::in_addr {
92 s_addr: mem::from_be32(ip)
94 mem::size_of::<libc::sockaddr_in>()
96 rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
97 let storage: &mut libc::sockaddr_in6 =
98 mem::transmute(&mut storage);
99 storage.sin6_family = libc::AF_INET6 as libc::sa_family_t;
100 storage.sin6_port = htons(addr.port);
101 storage.sin6_addr = libc::in6_addr {
113 mem::size_of::<libc::sockaddr_in6>()
116 return (storage, len);
120 enum SocketNameKind {
126 fn socket_name(sk: SocketNameKind,
127 handle: *c_void) -> Result<rtio::SocketAddr, IoError> {
128 let getsockname = match sk {
129 TcpPeer => uvll::uv_tcp_getpeername,
130 Tcp => uvll::uv_tcp_getsockname,
131 Udp => uvll::uv_udp_getsockname,
134 // Allocate a sockaddr_storage since we don't know if it's ipv4 or ipv6
135 let mut sockaddr: libc::sockaddr_storage = unsafe { mem::zeroed() };
136 let mut namelen = mem::size_of::<libc::sockaddr_storage>() as c_int;
138 let sockaddr_p = &mut sockaddr as *mut libc::sockaddr_storage;
140 getsockname(handle, sockaddr_p as *mut libc::sockaddr, &mut namelen)
142 0 => Ok(sockaddr_to_addr(&sockaddr, namelen as uint)),
143 n => Err(uv_error_to_io_error(UvError(n)))
147 ////////////////////////////////////////////////////////////////////////////////
148 /// TCP implementation
149 ////////////////////////////////////////////////////////////////////////////////
151 pub struct TcpWatcher {
152 handle: *uvll::uv_tcp_t,
153 stream: StreamWatcher,
157 // libuv can't support concurrent reads and concurrent writes of the same
158 // stream object, so we use these access guards in order to arbitrate among
159 // multiple concurrent reads and writes. Note that libuv *can* read and
160 // write simultaneously, it just can't read and read simultaneously.
161 read_access: AccessTimeout,
162 write_access: AccessTimeout,
165 pub struct TcpListener {
167 handle: *uvll::uv_pipe_t,
168 outgoing: Sender<Result<Box<rtio::RtioTcpStream:Send>, IoError>>,
169 incoming: Receiver<Result<Box<rtio::RtioTcpStream:Send>, IoError>>,
172 pub struct TcpAcceptor {
173 listener: Box<TcpListener>,
174 timeout: AcceptTimeout,
177 // TCP watchers (clients/streams)
180 pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
181 let handle = io.make_handle();
182 TcpWatcher::new_home(&io.loop_, handle)
185 fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
186 let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
188 uvll::uv_tcp_init(loop_.handle, handle)
193 stream: StreamWatcher::new(handle),
194 refcount: Refcount::new(),
195 read_access: AccessTimeout::new(),
196 write_access: AccessTimeout::new(),
200 pub fn connect(io: &mut UvIoFactory,
201 address: rtio::SocketAddr,
202 timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
203 let tcp = TcpWatcher::new(io);
204 let cx = ConnectCtx { status: -1, task: None, timer: None };
205 let (addr, _len) = addr_to_sockaddr(address);
206 let addr_p = &addr as *_ as *libc::sockaddr;
207 cx.connect(tcp, timeout, io, |req, tcp, cb| {
208 unsafe { uvll::uv_tcp_connect(req.handle, tcp.handle, addr_p, cb) }
213 impl HomingIO for TcpWatcher {
214 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
217 impl rtio::RtioSocket for TcpWatcher {
218 fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
219 let _m = self.fire_homing_missile();
220 socket_name(Tcp, self.handle)
224 impl rtio::RtioTcpStream for TcpWatcher {
225 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
226 let m = self.fire_homing_missile();
227 let guard = try!(self.read_access.grant(m));
229 // see comments in close_read about this check
230 if guard.access.is_closed() {
231 return Err(uv_error_to_io_error(UvError(uvll::EOF)))
234 self.stream.read(buf).map_err(uv_error_to_io_error)
237 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
238 let m = self.fire_homing_missile();
239 let guard = try!(self.write_access.grant(m));
240 self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
243 fn peer_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
244 let _m = self.fire_homing_missile();
245 socket_name(TcpPeer, self.handle)
248 fn control_congestion(&mut self) -> Result<(), IoError> {
249 let _m = self.fire_homing_missile();
250 status_to_io_result(unsafe {
251 uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
255 fn nodelay(&mut self) -> Result<(), IoError> {
256 let _m = self.fire_homing_missile();
257 status_to_io_result(unsafe {
258 uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
262 fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
263 let _m = self.fire_homing_missile();
264 status_to_io_result(unsafe {
265 uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
266 delay_in_seconds as c_uint)
270 fn letdie(&mut self) -> Result<(), IoError> {
271 let _m = self.fire_homing_missile();
272 status_to_io_result(unsafe {
273 uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
277 fn clone(&self) -> Box<rtio::RtioTcpStream:Send> {
280 stream: StreamWatcher::new(self.handle),
281 home: self.home.clone(),
282 refcount: self.refcount.clone(),
283 read_access: self.read_access.clone(),
284 write_access: self.write_access.clone(),
285 } as Box<rtio::RtioTcpStream:Send>
288 fn close_read(&mut self) -> Result<(), IoError> {
289 // see comments in PipeWatcher::close_read
291 let m = self.fire_homing_missile();
292 self.read_access.access.close(&m);
293 self.stream.cancel_read(uvll::EOF as libc::ssize_t)
295 let _ = task.map(|t| t.reawaken());
299 fn close_write(&mut self) -> Result<(), IoError> {
300 let _m = self.fire_homing_missile();
301 shutdown(self.handle, &self.uv_loop())
304 fn set_timeout(&mut self, timeout: Option<u64>) {
305 self.set_read_timeout(timeout);
306 self.set_write_timeout(timeout);
309 fn set_read_timeout(&mut self, ms: Option<u64>) {
310 let _m = self.fire_homing_missile();
311 let loop_ = self.uv_loop();
312 self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
313 &self.stream as *_ as uint);
315 fn cancel_read(stream: uint) -> Option<BlockedTask> {
316 let stream: &mut StreamWatcher = unsafe { mem::transmute(stream) };
317 stream.cancel_read(uvll::ECANCELED as ssize_t)
321 fn set_write_timeout(&mut self, ms: Option<u64>) {
322 let _m = self.fire_homing_missile();
323 let loop_ = self.uv_loop();
324 self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
325 &self.stream as *_ as uint);
327 fn cancel_write(stream: uint) -> Option<BlockedTask> {
328 let stream: &mut StreamWatcher = unsafe { mem::transmute(stream) };
329 stream.cancel_write()
334 impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
335 fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
338 impl Drop for TcpWatcher {
340 let _m = self.fire_homing_missile();
341 if self.refcount.decrement() {
347 // TCP listeners (unbound servers)
350 pub fn bind(io: &mut UvIoFactory, address: rtio::SocketAddr)
351 -> Result<Box<TcpListener>, UvError> {
352 let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
354 uvll::uv_tcp_init(io.uv_loop(), handle)
356 let (tx, rx) = channel();
357 let l = box TcpListener {
358 home: io.make_handle(),
363 let (addr, _len) = addr_to_sockaddr(address);
365 let addr_p = &addr as *libc::sockaddr_storage;
366 uvll::uv_tcp_bind(l.handle, addr_p as *libc::sockaddr)
369 0 => Ok(l.install()),
375 impl HomingIO for TcpListener {
376 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
379 impl UvHandle<uvll::uv_tcp_t> for TcpListener {
380 fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
383 impl rtio::RtioSocket for TcpListener {
384 fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
385 let _m = self.fire_homing_missile();
386 socket_name(Tcp, self.handle)
390 impl rtio::RtioTcpListener for TcpListener {
391 fn listen(~self) -> Result<Box<rtio::RtioTcpAcceptor:Send>, IoError> {
392 // create the acceptor object from ourselves
393 let mut acceptor = box TcpAcceptor {
395 timeout: AcceptTimeout::new(),
398 let _m = acceptor.fire_homing_missile();
399 // FIXME: the 128 backlog should be configurable
400 match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
401 0 => Ok(acceptor as Box<rtio::RtioTcpAcceptor:Send>),
402 n => Err(uv_error_to_io_error(UvError(n))),
407 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
408 assert!(status != uvll::ECANCELED);
409 let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
410 let msg = match status {
412 let loop_ = Loop::wrap(unsafe {
413 uvll::get_loop_for_uv_handle(server)
415 let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
416 assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
417 Ok(box client as Box<rtio::RtioTcpStream:Send>)
419 n => Err(uv_error_to_io_error(UvError(n)))
421 tcp.outgoing.send(msg);
424 impl Drop for TcpListener {
426 let _m = self.fire_homing_missile();
431 // TCP acceptors (bound servers)
433 impl HomingIO for TcpAcceptor {
434 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
437 impl rtio::RtioSocket for TcpAcceptor {
438 fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
439 let _m = self.fire_homing_missile();
440 socket_name(Tcp, self.listener.handle)
444 impl rtio::RtioTcpAcceptor for TcpAcceptor {
445 fn accept(&mut self) -> Result<Box<rtio::RtioTcpStream:Send>, IoError> {
446 self.timeout.accept(&self.listener.incoming)
449 fn accept_simultaneously(&mut self) -> Result<(), IoError> {
450 let _m = self.fire_homing_missile();
451 status_to_io_result(unsafe {
452 uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
456 fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
457 let _m = self.fire_homing_missile();
458 status_to_io_result(unsafe {
459 uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
463 fn set_timeout(&mut self, ms: Option<u64>) {
464 let _m = self.fire_homing_missile();
466 None => self.timeout.clear(),
467 Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
472 ////////////////////////////////////////////////////////////////////////////////
473 /// UDP implementation
474 ////////////////////////////////////////////////////////////////////////////////
476 pub struct UdpWatcher {
477 handle: *uvll::uv_udp_t,
480 // See above for what these fields are
482 read_access: AccessTimeout,
483 write_access: AccessTimeout,
485 blocked_sender: Option<BlockedTask>,
489 task: Option<BlockedTask>,
491 result: Option<(ssize_t, Option<rtio::SocketAddr>)>,
496 data: Option<Vec<u8>>,
497 udp: *mut UdpWatcher,
501 pub fn bind(io: &mut UvIoFactory, address: rtio::SocketAddr)
502 -> Result<UdpWatcher, UvError> {
503 let udp = UdpWatcher {
504 handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
505 home: io.make_handle(),
506 refcount: Refcount::new(),
507 read_access: AccessTimeout::new(),
508 write_access: AccessTimeout::new(),
509 blocked_sender: None,
512 uvll::uv_udp_init(io.uv_loop(), udp.handle)
514 let (addr, _len) = addr_to_sockaddr(address);
515 let result = unsafe {
516 let addr_p = &addr as *libc::sockaddr_storage;
517 uvll::uv_udp_bind(udp.handle, addr_p as *libc::sockaddr, 0u32)
519 return match result {
521 n => Err(UvError(n)),
526 impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
527 fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
530 impl HomingIO for UdpWatcher {
531 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
534 impl rtio::RtioSocket for UdpWatcher {
535 fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
536 let _m = self.fire_homing_missile();
537 socket_name(Udp, self.handle)
541 impl rtio::RtioUdpSocket for UdpWatcher {
542 fn recvfrom(&mut self, buf: &mut [u8])
543 -> Result<(uint, rtio::SocketAddr), IoError>
545 let loop_ = self.uv_loop();
546 let m = self.fire_homing_missile();
547 let _guard = try!(self.read_access.grant(m));
549 return match unsafe {
550 uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
553 let mut cx = UdpRecvCtx {
555 buf: Some(slice_to_uv_buf(buf)),
558 let handle = self.handle;
559 wait_until_woken_after(&mut cx.task, &loop_, || {
560 unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
562 match cx.result.take_unwrap() {
564 Err(uv_error_to_io_error(UvError(n as c_int))),
565 (n, addr) => Ok((n as uint, addr.unwrap()))
568 n => Err(uv_error_to_io_error(UvError(n)))
571 extern fn alloc_cb(handle: *uvll::uv_udp_t,
572 _suggested_size: size_t,
575 let cx = uvll::get_data_for_uv_handle(handle);
576 let cx = &mut *(cx as *mut UdpRecvCtx);
577 *buf = cx.buf.take().expect("recv alloc_cb called more than once")
581 extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
582 addr: *libc::sockaddr, _flags: c_uint) {
583 assert!(nread != uvll::ECANCELED as ssize_t);
585 &mut *(uvll::get_data_for_uv_handle(handle) as *mut UdpRecvCtx)
588 // When there's no data to read the recv callback can be a no-op.
589 // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
590 // this we just drop back to kqueue and wait for the next callback.
592 cx.buf = Some(unsafe { *buf });
596 unsafe { assert_eq!(uvll::uv_udp_recv_stop(handle), 0) }
597 let addr = if addr == ptr::null() {
600 let len = mem::size_of::<libc::sockaddr_storage>();
601 Some(sockaddr_to_addr(unsafe { mem::transmute(addr) }, len))
603 cx.result = Some((nread, addr));
604 wakeup(&mut cx.task);
608 fn sendto(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> Result<(), IoError> {
609 let m = self.fire_homing_missile();
610 let loop_ = self.uv_loop();
611 let guard = try!(self.write_access.grant(m));
613 let mut req = Request::new(uvll::UV_UDP_SEND);
614 let (addr, _len) = addr_to_sockaddr(dst);
615 let addr_p = &addr as *_ as *libc::sockaddr;
617 // see comments in StreamWatcher::write for why we may allocate a buffer
619 let data = if guard.can_timeout {Some(Vec::from_slice(buf))} else {None};
620 let uv_buf = if guard.can_timeout {
621 slice_to_uv_buf(data.get_ref().as_slice())
626 return match unsafe {
627 uvll::uv_udp_send(req.handle, self.handle, [uv_buf], addr_p, send_cb)
630 req.defuse(); // uv callback now owns this request
631 let mut cx = UdpSendCtx {
632 result: uvll::ECANCELED, data: data, udp: self as *mut _
634 wait_until_woken_after(&mut self.blocked_sender, &loop_, || {
638 if cx.result != uvll::ECANCELED {
639 return match cx.result {
641 n => Err(uv_error_to_io_error(UvError(n)))
644 let new_cx = box UdpSendCtx {
646 udp: 0 as *mut UdpWatcher,
647 data: cx.data.take(),
650 req.set_data(&*new_cx);
653 Err(uv_error_to_io_error(UvError(cx.result)))
655 n => Err(uv_error_to_io_error(UvError(n)))
658 // This function is the same as stream::write_cb, but adapted for udp
659 // instead of streams.
660 extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
661 let req = Request::wrap(req);
662 let cx: &mut UdpSendCtx = unsafe { req.get_data() };
665 if cx.udp as uint != 0 {
666 let udp: &mut UdpWatcher = unsafe { &mut *cx.udp };
667 wakeup(&mut udp.blocked_sender);
669 let _cx: Box<UdpSendCtx> = unsafe { mem::transmute(cx) };
674 fn join_multicast(&mut self, multi: rtio::IpAddr) -> Result<(), IoError> {
675 let _m = self.fire_homing_missile();
676 status_to_io_result(unsafe {
677 multi.to_str().with_c_str(|m_addr| {
678 uvll::uv_udp_set_membership(self.handle,
685 fn leave_multicast(&mut self, multi: rtio::IpAddr) -> Result<(), IoError> {
686 let _m = self.fire_homing_missile();
687 status_to_io_result(unsafe {
688 multi.to_str().with_c_str(|m_addr| {
689 uvll::uv_udp_set_membership(self.handle,
691 uvll::UV_LEAVE_GROUP)
696 fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
697 let _m = self.fire_homing_missile();
698 status_to_io_result(unsafe {
699 uvll::uv_udp_set_multicast_loop(self.handle,
704 fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
705 let _m = self.fire_homing_missile();
706 status_to_io_result(unsafe {
707 uvll::uv_udp_set_multicast_loop(self.handle,
712 fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
713 let _m = self.fire_homing_missile();
714 status_to_io_result(unsafe {
715 uvll::uv_udp_set_multicast_ttl(self.handle,
720 fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
721 let _m = self.fire_homing_missile();
722 status_to_io_result(unsafe {
723 uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
727 fn hear_broadcasts(&mut self) -> Result<(), IoError> {
728 let _m = self.fire_homing_missile();
729 status_to_io_result(unsafe {
730 uvll::uv_udp_set_broadcast(self.handle,
735 fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
736 let _m = self.fire_homing_missile();
737 status_to_io_result(unsafe {
738 uvll::uv_udp_set_broadcast(self.handle,
743 fn clone(&self) -> Box<rtio::RtioUdpSocket:Send> {
746 home: self.home.clone(),
747 refcount: self.refcount.clone(),
748 write_access: self.write_access.clone(),
749 read_access: self.read_access.clone(),
750 blocked_sender: None,
751 } as Box<rtio::RtioUdpSocket:Send>
754 fn set_timeout(&mut self, timeout: Option<u64>) {
755 self.set_read_timeout(timeout);
756 self.set_write_timeout(timeout);
759 fn set_read_timeout(&mut self, ms: Option<u64>) {
760 let _m = self.fire_homing_missile();
761 let loop_ = self.uv_loop();
762 self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
763 self.handle as uint);
765 fn cancel_read(stream: uint) -> Option<BlockedTask> {
766 // This method is quite similar to StreamWatcher::cancel_read, see
767 // there for more information
768 let handle = stream as *uvll::uv_udp_t;
769 assert_eq!(unsafe { uvll::uv_udp_recv_stop(handle) }, 0);
771 let data = uvll::get_data_for_uv_handle(handle);
772 if data.is_null() { return None }
773 uvll::set_data_for_uv_handle(handle, 0 as *int);
774 &mut *(data as *mut UdpRecvCtx)
776 data.result = Some((uvll::ECANCELED as ssize_t, None));
781 fn set_write_timeout(&mut self, ms: Option<u64>) {
782 let _m = self.fire_homing_missile();
783 let loop_ = self.uv_loop();
784 self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
785 self as *mut _ as uint);
787 fn cancel_write(stream: uint) -> Option<BlockedTask> {
788 let stream: &mut UdpWatcher = unsafe { mem::transmute(stream) };
789 stream.blocked_sender.take()
794 impl Drop for UdpWatcher {
796 // Send ourselves home to close this handle (blocking while doing so).
797 let _m = self.fire_homing_missile();
798 if self.refcount.decrement() {
804 ////////////////////////////////////////////////////////////////////////////////
806 ////////////////////////////////////////////////////////////////////////////////
808 pub fn shutdown(handle: *uvll::uv_stream_t, loop_: &Loop) -> Result<(), IoError> {
810 slot: Option<BlockedTask>,
813 let mut req = Request::new(uvll::UV_SHUTDOWN);
815 return match unsafe { uvll::uv_shutdown(req.handle, handle, shutdown_cb) } {
817 req.defuse(); // uv callback now owns this request
818 let mut cx = Ctx { slot: None, status: 0 };
820 wait_until_woken_after(&mut cx.slot, loop_, || {
824 status_to_io_result(cx.status)
826 n => Err(uv_error_to_io_error(UvError(n)))
829 extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
830 let req = Request::wrap(req);
831 assert!(status != uvll::ECANCELED);
832 let cx: &mut Ctx = unsafe { req.get_data() };
834 wakeup(&mut cx.slot);
840 use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
843 use super::{UdpWatcher, TcpWatcher, TcpListener};
844 use super::super::local_loop;
847 fn connect_close_ip4() {
848 match TcpWatcher::connect(local_loop(), ::next_test_ip4(), None) {
850 Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_string()),
855 fn connect_close_ip6() {
856 match TcpWatcher::connect(local_loop(), ::next_test_ip6(), None) {
858 Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_string()),
863 fn udp_bind_close_ip4() {
864 match UdpWatcher::bind(local_loop(), ::next_test_ip4()) {
871 fn udp_bind_close_ip6() {
872 match UdpWatcher::bind(local_loop(), ::next_test_ip6()) {
880 let (tx, rx) = channel();
881 let addr = ::next_test_ip4();
884 let w = match TcpListener::bind(local_loop(), addr) {
885 Ok(w) => w, Err(e) => fail!("{:?}", e)
887 let mut w = match w.listen() {
888 Ok(w) => w, Err(e) => fail!("{:?}", e),
893 let mut buf = [0u8, ..10];
894 match stream.read(buf) {
895 Ok(10) => {} e => fail!("{:?}", e),
897 for i in range(0, 10u8) {
898 assert_eq!(buf[i as uint], i + 1);
901 Err(e) => fail!("{:?}", e)
906 let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
907 Ok(w) => w, Err(e) => fail!("{:?}", e)
909 match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
910 Ok(()) => {}, Err(e) => fail!("{:?}", e)
916 let (tx, rx) = channel();
917 let addr = ::next_test_ip6();
920 let w = match TcpListener::bind(local_loop(), addr) {
921 Ok(w) => w, Err(e) => fail!("{:?}", e)
923 let mut w = match w.listen() {
924 Ok(w) => w, Err(e) => fail!("{:?}", e),
929 let mut buf = [0u8, ..10];
930 match stream.read(buf) {
931 Ok(10) => {} e => fail!("{:?}", e),
933 for i in range(0, 10u8) {
934 assert_eq!(buf[i as uint], i + 1);
937 Err(e) => fail!("{:?}", e)
942 let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
943 Ok(w) => w, Err(e) => fail!("{:?}", e)
945 match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
946 Ok(()) => {}, Err(e) => fail!("{:?}", e)
952 let (tx, rx) = channel();
953 let client = ::next_test_ip4();
954 let server = ::next_test_ip4();
957 match UdpWatcher::bind(local_loop(), server) {
960 let mut buf = [0u8, ..10];
961 match w.recvfrom(buf) {
962 Ok((10, addr)) => assert!(addr == client),
963 e => fail!("{:?}", e),
965 for i in range(0, 10u8) {
966 assert_eq!(buf[i as uint], i + 1);
969 Err(e) => fail!("{:?}", e)
974 let mut w = match UdpWatcher::bind(local_loop(), client) {
975 Ok(w) => w, Err(e) => fail!("{:?}", e)
977 match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
978 Ok(()) => {}, Err(e) => fail!("{:?}", e)
984 let (tx, rx) = channel();
985 let client = ::next_test_ip6();
986 let server = ::next_test_ip6();
989 match UdpWatcher::bind(local_loop(), server) {
992 let mut buf = [0u8, ..10];
993 match w.recvfrom(buf) {
994 Ok((10, addr)) => assert!(addr == client),
995 e => fail!("{:?}", e),
997 for i in range(0, 10u8) {
998 assert_eq!(buf[i as uint], i + 1);
1001 Err(e) => fail!("{:?}", e)
1006 let mut w = match UdpWatcher::bind(local_loop(), client) {
1007 Ok(w) => w, Err(e) => fail!("{:?}", e)
1009 match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
1010 Ok(()) => {}, Err(e) => fail!("{:?}", e)
1015 fn test_read_read_read() {
1016 let addr = ::next_test_ip4();
1017 static MAX: uint = 5000;
1018 let (tx, rx) = channel();
1021 let listener = TcpListener::bind(local_loop(), addr).unwrap();
1022 let mut acceptor = listener.listen().ok().unwrap();
1024 let mut stream = acceptor.accept().ok().unwrap();
1025 let buf = [1, .. 2048];
1026 let mut total_bytes_written = 0;
1027 while total_bytes_written < MAX {
1028 assert!(stream.write(buf).is_ok());
1029 uvdebug!("wrote bytes");
1030 total_bytes_written += buf.len();
1035 let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1036 let mut buf = [0, .. 2048];
1037 let mut total_bytes_read = 0;
1038 while total_bytes_read < MAX {
1039 let nread = stream.read(buf).ok().unwrap();
1040 total_bytes_read += nread;
1041 for i in range(0u, nread) {
1042 assert_eq!(buf[i], 1);
1045 uvdebug!("read {} bytes total", total_bytes_read);
1049 #[ignore(cfg(windows))] // FIXME(#10102) server never sees second packet
1050 fn test_udp_twice() {
1051 let server_addr = ::next_test_ip4();
1052 let client_addr = ::next_test_ip4();
1053 let (tx, rx) = channel();
1056 let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
1058 assert!(client.sendto([1], server_addr).is_ok());
1059 assert!(client.sendto([2], server_addr).is_ok());
1062 let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
1066 let (nread1, src1) = server.recvfrom(buf1).ok().unwrap();
1067 let (nread2, src2) = server.recvfrom(buf2).ok().unwrap();
1068 assert_eq!(nread1, 1);
1069 assert_eq!(nread2, 1);
1070 assert!(src1 == client_addr);
1071 assert!(src2 == client_addr);
1072 assert_eq!(buf1[0], 1);
1073 assert_eq!(buf2[0], 2);
1077 fn test_udp_many_read() {
1078 let server_out_addr = ::next_test_ip4();
1079 let server_in_addr = ::next_test_ip4();
1080 let client_out_addr = ::next_test_ip4();
1081 let client_in_addr = ::next_test_ip4();
1082 static MAX: uint = 500_000;
1084 let (tx1, rx1) = channel::<()>();
1085 let (tx2, rx2) = channel::<()>();
1088 let l = local_loop();
1089 let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
1090 let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
1091 let (tx, rx) = (tx2, rx1);
1094 let msg = [1, .. 2048];
1095 let mut total_bytes_sent = 0;
1099 assert!(server_out.sendto(msg, client_in_addr).is_ok());
1100 total_bytes_sent += msg.len();
1101 // check if the client has received enough
1102 let res = server_in.recvfrom(buf);
1103 assert!(res.is_ok());
1104 let (nread, src) = res.ok().unwrap();
1105 assert_eq!(nread, 1);
1106 assert!(src == client_out_addr);
1108 assert!(total_bytes_sent >= MAX);
1111 let l = local_loop();
1112 let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
1113 let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
1114 let (tx, rx) = (tx1, rx2);
1117 let mut total_bytes_recv = 0;
1118 let mut buf = [0, .. 2048];
1119 while total_bytes_recv < MAX {
1121 assert!(client_out.sendto([1], server_in_addr).is_ok());
1123 let res = client_in.recvfrom(buf);
1124 assert!(res.is_ok());
1125 let (nread, src) = res.ok().unwrap();
1126 assert!(src == server_out_addr);
1127 total_bytes_recv += nread;
1128 for i in range(0u, nread) {
1129 assert_eq!(buf[i], 1);
1132 // tell the server we're done
1133 assert!(client_out.sendto([0], server_in_addr).is_ok());
1137 fn test_read_and_block() {
1138 let addr = ::next_test_ip4();
1139 let (tx, rx) = channel::<Receiver<()>>();
1143 let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1144 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1145 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1147 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1148 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1152 let listener = TcpListener::bind(local_loop(), addr).unwrap();
1153 let mut acceptor = listener.listen().ok().unwrap();
1154 let (tx2, rx2) = channel();
1156 let mut stream = acceptor.accept().ok().unwrap();
1157 let mut buf = [0, .. 2048];
1160 let mut current = 0;
1163 while current < expected {
1164 let nread = stream.read(buf).ok().unwrap();
1165 for i in range(0u, nread) {
1166 let val = buf[i] as uint;
1167 assert_eq!(val, current % 8);
1172 let _ = tx2.send_opt(());
1175 // Make sure we had multiple reads
1180 fn test_simple_tcp_server_and_client_on_diff_threads() {
1181 let addr = ::next_test_ip4();
1184 let listener = TcpListener::bind(local_loop(), addr).unwrap();
1185 let mut acceptor = listener.listen().ok().unwrap();
1186 let mut stream = acceptor.accept().ok().unwrap();
1187 let mut buf = [0, .. 2048];
1188 let nread = stream.read(buf).ok().unwrap();
1189 assert_eq!(nread, 8);
1190 for i in range(0u, nread) {
1191 assert_eq!(buf[i], i as u8);
1195 let mut stream = TcpWatcher::connect(local_loop(), addr, None);
1196 while stream.is_err() {
1197 stream = TcpWatcher::connect(local_loop(), addr, None);
1199 stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1202 #[should_fail] #[test]
1203 fn tcp_listener_fail_cleanup() {
1204 let addr = ::next_test_ip4();
1205 let w = TcpListener::bind(local_loop(), addr).unwrap();
1206 let _w = w.listen().ok().unwrap();
1210 #[should_fail] #[test]
1211 fn tcp_stream_fail_cleanup() {
1212 let (tx, rx) = channel();
1213 let addr = ::next_test_ip4();
1216 let w = TcpListener::bind(local_loop(), addr).unwrap();
1217 let mut w = w.listen().ok().unwrap();
1219 drop(w.accept().ok().unwrap());
1222 let _w = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1226 #[should_fail] #[test]
1227 fn udp_listener_fail_cleanup() {
1228 let addr = ::next_test_ip4();
1229 let _w = UdpWatcher::bind(local_loop(), addr).unwrap();
1233 #[should_fail] #[test]
1234 fn udp_fail_other_task() {
1235 let addr = ::next_test_ip4();
1236 let (tx, rx) = channel();
1238 // force the handle to be created on a different scheduler, failure in
1239 // the original task will force a homing operation back to this
1242 let w = UdpWatcher::bind(local_loop(), addr).unwrap();