1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
16 use std::rt::rtio::IoError;
17 use std::rt::task::BlockedTask;
19 use homing::{HomingIO, HomeHandle};
21 use stream::StreamWatcher;
22 use super::{Loop, Request, UvError, Buf, status_to_io_result,
23 uv_error_to_io_error, UvHandle, slice_to_uv_buf,
24 wait_until_woken_after, wakeup};
25 use timeout::{AccessTimeout, AcceptTimeout, ConnectCtx};
26 use uvio::UvIoFactory;
29 ////////////////////////////////////////////////////////////////////////////////
30 /// Generic functions related to dealing with sockaddr things
31 ////////////////////////////////////////////////////////////////////////////////
33 pub fn htons(u: u16) -> u16 { u.to_big_endian() }
34 pub fn ntohs(u: u16) -> u16 { Int::from_big_endian(u) }
36 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
37 len: uint) -> rtio::SocketAddr {
38 match storage.ss_family as c_int {
40 assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
41 let storage: &libc::sockaddr_in = unsafe {
42 mem::transmute(storage)
44 let ip = (storage.sin_addr.s_addr as u32).to_big_endian();
45 let a = (ip >> 24) as u8;
46 let b = (ip >> 16) as u8;
47 let c = (ip >> 8) as u8;
48 let d = (ip >> 0) as u8;
50 ip: rtio::Ipv4Addr(a, b, c, d),
51 port: ntohs(storage.sin_port),
55 assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
56 let storage: &libc::sockaddr_in6 = unsafe {
57 mem::transmute(storage)
59 let a = ntohs(storage.sin6_addr.s6_addr[0]);
60 let b = ntohs(storage.sin6_addr.s6_addr[1]);
61 let c = ntohs(storage.sin6_addr.s6_addr[2]);
62 let d = ntohs(storage.sin6_addr.s6_addr[3]);
63 let e = ntohs(storage.sin6_addr.s6_addr[4]);
64 let f = ntohs(storage.sin6_addr.s6_addr[5]);
65 let g = ntohs(storage.sin6_addr.s6_addr[6]);
66 let h = ntohs(storage.sin6_addr.s6_addr[7]);
68 ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h),
69 port: ntohs(storage.sin6_port),
73 fail!("unknown family {}", n);
78 fn addr_to_sockaddr(addr: rtio::SocketAddr) -> (libc::sockaddr_storage, uint) {
80 let mut storage: libc::sockaddr_storage = mem::zeroed();
81 let len = match addr.ip {
82 rtio::Ipv4Addr(a, b, c, d) => {
83 let ip = (a as u32 << 24) |
87 let storage: &mut libc::sockaddr_in =
88 mem::transmute(&mut storage);
89 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
90 (*storage).sin_port = htons(addr.port);
91 (*storage).sin_addr = libc::in_addr {
92 s_addr: Int::from_big_endian(ip),
95 mem::size_of::<libc::sockaddr_in>()
97 rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
98 let storage: &mut libc::sockaddr_in6 =
99 mem::transmute(&mut storage);
100 storage.sin6_family = libc::AF_INET6 as libc::sa_family_t;
101 storage.sin6_port = htons(addr.port);
102 storage.sin6_addr = libc::in6_addr {
114 mem::size_of::<libc::sockaddr_in6>()
117 return (storage, len);
121 enum SocketNameKind {
127 fn socket_name(sk: SocketNameKind,
128 handle: *c_void) -> Result<rtio::SocketAddr, IoError> {
129 let getsockname = match sk {
130 TcpPeer => uvll::uv_tcp_getpeername,
131 Tcp => uvll::uv_tcp_getsockname,
132 Udp => uvll::uv_udp_getsockname,
135 // Allocate a sockaddr_storage since we don't know if it's ipv4 or ipv6
136 let mut sockaddr: libc::sockaddr_storage = unsafe { mem::zeroed() };
137 let mut namelen = mem::size_of::<libc::sockaddr_storage>() as c_int;
139 let sockaddr_p = &mut sockaddr as *mut libc::sockaddr_storage;
141 getsockname(handle, sockaddr_p as *mut libc::sockaddr, &mut namelen)
143 0 => Ok(sockaddr_to_addr(&sockaddr, namelen as uint)),
144 n => Err(uv_error_to_io_error(UvError(n)))
148 ////////////////////////////////////////////////////////////////////////////////
149 /// TCP implementation
150 ////////////////////////////////////////////////////////////////////////////////
152 pub struct TcpWatcher {
153 handle: *uvll::uv_tcp_t,
154 stream: StreamWatcher,
158 // libuv can't support concurrent reads and concurrent writes of the same
159 // stream object, so we use these access guards in order to arbitrate among
160 // multiple concurrent reads and writes. Note that libuv *can* read and
161 // write simultaneously, it just can't read and read simultaneously.
162 read_access: AccessTimeout,
163 write_access: AccessTimeout,
166 pub struct TcpListener {
168 handle: *uvll::uv_pipe_t,
169 outgoing: Sender<Result<Box<rtio::RtioTcpStream + Send>, IoError>>,
170 incoming: Receiver<Result<Box<rtio::RtioTcpStream + Send>, IoError>>,
173 pub struct TcpAcceptor {
174 listener: Box<TcpListener>,
175 timeout: AcceptTimeout,
178 // TCP watchers (clients/streams)
181 pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
182 let handle = io.make_handle();
183 TcpWatcher::new_home(&io.loop_, handle)
186 fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
187 let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
189 uvll::uv_tcp_init(loop_.handle, handle)
194 stream: StreamWatcher::new(handle),
195 refcount: Refcount::new(),
196 read_access: AccessTimeout::new(),
197 write_access: AccessTimeout::new(),
201 pub fn connect(io: &mut UvIoFactory,
202 address: rtio::SocketAddr,
203 timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
204 let tcp = TcpWatcher::new(io);
205 let cx = ConnectCtx { status: -1, task: None, timer: None };
206 let (addr, _len) = addr_to_sockaddr(address);
207 let addr_p = &addr as *_ as *libc::sockaddr;
208 cx.connect(tcp, timeout, io, |req, tcp, cb| {
209 unsafe { uvll::uv_tcp_connect(req.handle, tcp.handle, addr_p, cb) }
214 impl HomingIO for TcpWatcher {
215 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
218 impl rtio::RtioSocket for TcpWatcher {
219 fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
220 let _m = self.fire_homing_missile();
221 socket_name(Tcp, self.handle)
225 impl rtio::RtioTcpStream for TcpWatcher {
226 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
227 let m = self.fire_homing_missile();
228 let guard = try!(self.read_access.grant(m));
230 // see comments in close_read about this check
231 if guard.access.is_closed() {
232 return Err(uv_error_to_io_error(UvError(uvll::EOF)))
235 self.stream.read(buf).map_err(uv_error_to_io_error)
238 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
239 let m = self.fire_homing_missile();
240 let guard = try!(self.write_access.grant(m));
241 self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
244 fn peer_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
245 let _m = self.fire_homing_missile();
246 socket_name(TcpPeer, self.handle)
249 fn control_congestion(&mut self) -> Result<(), IoError> {
250 let _m = self.fire_homing_missile();
251 status_to_io_result(unsafe {
252 uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
256 fn nodelay(&mut self) -> Result<(), IoError> {
257 let _m = self.fire_homing_missile();
258 status_to_io_result(unsafe {
259 uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
263 fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
264 let _m = self.fire_homing_missile();
265 status_to_io_result(unsafe {
266 uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
267 delay_in_seconds as c_uint)
271 fn letdie(&mut self) -> Result<(), IoError> {
272 let _m = self.fire_homing_missile();
273 status_to_io_result(unsafe {
274 uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
278 fn clone(&self) -> Box<rtio::RtioTcpStream + Send> {
281 stream: StreamWatcher::new(self.handle),
282 home: self.home.clone(),
283 refcount: self.refcount.clone(),
284 read_access: self.read_access.clone(),
285 write_access: self.write_access.clone(),
286 } as Box<rtio::RtioTcpStream + Send>
289 fn close_read(&mut self) -> Result<(), IoError> {
290 // see comments in PipeWatcher::close_read
292 let m = self.fire_homing_missile();
293 self.read_access.access.close(&m);
294 self.stream.cancel_read(uvll::EOF as libc::ssize_t)
296 let _ = task.map(|t| t.reawaken());
300 fn close_write(&mut self) -> Result<(), IoError> {
301 let _m = self.fire_homing_missile();
302 shutdown(self.handle, &self.uv_loop())
305 fn set_timeout(&mut self, timeout: Option<u64>) {
306 self.set_read_timeout(timeout);
307 self.set_write_timeout(timeout);
310 fn set_read_timeout(&mut self, ms: Option<u64>) {
311 let _m = self.fire_homing_missile();
312 let loop_ = self.uv_loop();
313 self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
314 &self.stream as *_ as uint);
316 fn cancel_read(stream: uint) -> Option<BlockedTask> {
317 let stream: &mut StreamWatcher = unsafe { mem::transmute(stream) };
318 stream.cancel_read(uvll::ECANCELED as ssize_t)
322 fn set_write_timeout(&mut self, ms: Option<u64>) {
323 let _m = self.fire_homing_missile();
324 let loop_ = self.uv_loop();
325 self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
326 &self.stream as *_ as uint);
328 fn cancel_write(stream: uint) -> Option<BlockedTask> {
329 let stream: &mut StreamWatcher = unsafe { mem::transmute(stream) };
330 stream.cancel_write()
335 impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
336 fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
339 impl Drop for TcpWatcher {
341 let _m = self.fire_homing_missile();
342 if self.refcount.decrement() {
348 // TCP listeners (unbound servers)
351 pub fn bind(io: &mut UvIoFactory, address: rtio::SocketAddr)
352 -> Result<Box<TcpListener>, UvError> {
353 let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
355 uvll::uv_tcp_init(io.uv_loop(), handle)
357 let (tx, rx) = channel();
358 let l = box TcpListener {
359 home: io.make_handle(),
364 let (addr, _len) = addr_to_sockaddr(address);
366 let addr_p = &addr as *libc::sockaddr_storage;
367 uvll::uv_tcp_bind(l.handle, addr_p as *libc::sockaddr)
370 0 => Ok(l.install()),
376 impl HomingIO for TcpListener {
377 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
380 impl UvHandle<uvll::uv_tcp_t> for TcpListener {
381 fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
384 impl rtio::RtioSocket for TcpListener {
385 fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
386 let _m = self.fire_homing_missile();
387 socket_name(Tcp, self.handle)
391 impl rtio::RtioTcpListener for TcpListener {
392 fn listen(~self) -> Result<Box<rtio::RtioTcpAcceptor + Send>, IoError> {
393 // create the acceptor object from ourselves
394 let mut acceptor = box TcpAcceptor {
396 timeout: AcceptTimeout::new(),
399 let _m = acceptor.fire_homing_missile();
400 // FIXME: the 128 backlog should be configurable
401 match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
402 0 => Ok(acceptor as Box<rtio::RtioTcpAcceptor + Send>),
403 n => Err(uv_error_to_io_error(UvError(n))),
408 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
409 assert!(status != uvll::ECANCELED);
410 let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
411 let msg = match status {
413 let loop_ = Loop::wrap(unsafe {
414 uvll::get_loop_for_uv_handle(server)
416 let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
417 assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
418 Ok(box client as Box<rtio::RtioTcpStream + Send>)
420 n => Err(uv_error_to_io_error(UvError(n)))
422 tcp.outgoing.send(msg);
425 impl Drop for TcpListener {
427 let _m = self.fire_homing_missile();
432 // TCP acceptors (bound servers)
434 impl HomingIO for TcpAcceptor {
435 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
438 impl rtio::RtioSocket for TcpAcceptor {
439 fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
440 let _m = self.fire_homing_missile();
441 socket_name(Tcp, self.listener.handle)
445 impl rtio::RtioTcpAcceptor for TcpAcceptor {
446 fn accept(&mut self) -> Result<Box<rtio::RtioTcpStream + Send>, IoError> {
447 self.timeout.accept(&self.listener.incoming)
450 fn accept_simultaneously(&mut self) -> Result<(), IoError> {
451 let _m = self.fire_homing_missile();
452 status_to_io_result(unsafe {
453 uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
457 fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
458 let _m = self.fire_homing_missile();
459 status_to_io_result(unsafe {
460 uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
464 fn set_timeout(&mut self, ms: Option<u64>) {
465 let _m = self.fire_homing_missile();
467 None => self.timeout.clear(),
468 Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
473 ////////////////////////////////////////////////////////////////////////////////
474 /// UDP implementation
475 ////////////////////////////////////////////////////////////////////////////////
477 pub struct UdpWatcher {
478 handle: *uvll::uv_udp_t,
481 // See above for what these fields are
483 read_access: AccessTimeout,
484 write_access: AccessTimeout,
486 blocked_sender: Option<BlockedTask>,
490 task: Option<BlockedTask>,
492 result: Option<(ssize_t, Option<rtio::SocketAddr>)>,
497 data: Option<Vec<u8>>,
498 udp: *mut UdpWatcher,
502 pub fn bind(io: &mut UvIoFactory, address: rtio::SocketAddr)
503 -> Result<UdpWatcher, UvError> {
504 let udp = UdpWatcher {
505 handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
506 home: io.make_handle(),
507 refcount: Refcount::new(),
508 read_access: AccessTimeout::new(),
509 write_access: AccessTimeout::new(),
510 blocked_sender: None,
513 uvll::uv_udp_init(io.uv_loop(), udp.handle)
515 let (addr, _len) = addr_to_sockaddr(address);
516 let result = unsafe {
517 let addr_p = &addr as *libc::sockaddr_storage;
518 uvll::uv_udp_bind(udp.handle, addr_p as *libc::sockaddr, 0u32)
520 return match result {
522 n => Err(UvError(n)),
527 impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
528 fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
531 impl HomingIO for UdpWatcher {
532 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
535 impl rtio::RtioSocket for UdpWatcher {
536 fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
537 let _m = self.fire_homing_missile();
538 socket_name(Udp, self.handle)
542 impl rtio::RtioUdpSocket for UdpWatcher {
543 fn recvfrom(&mut self, buf: &mut [u8])
544 -> Result<(uint, rtio::SocketAddr), IoError>
546 let loop_ = self.uv_loop();
547 let m = self.fire_homing_missile();
548 let _guard = try!(self.read_access.grant(m));
550 return match unsafe {
551 uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
554 let mut cx = UdpRecvCtx {
556 buf: Some(slice_to_uv_buf(buf)),
559 let handle = self.handle;
560 wait_until_woken_after(&mut cx.task, &loop_, || {
561 unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
563 match cx.result.take_unwrap() {
565 Err(uv_error_to_io_error(UvError(n as c_int))),
566 (n, addr) => Ok((n as uint, addr.unwrap()))
569 n => Err(uv_error_to_io_error(UvError(n)))
572 extern fn alloc_cb(handle: *uvll::uv_udp_t,
573 _suggested_size: size_t,
576 let cx = uvll::get_data_for_uv_handle(handle);
577 let cx = &mut *(cx as *mut UdpRecvCtx);
578 *buf = cx.buf.take().expect("recv alloc_cb called more than once")
582 extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
583 addr: *libc::sockaddr, _flags: c_uint) {
584 assert!(nread != uvll::ECANCELED as ssize_t);
586 &mut *(uvll::get_data_for_uv_handle(handle) as *mut UdpRecvCtx)
589 // When there's no data to read the recv callback can be a no-op.
590 // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
591 // this we just drop back to kqueue and wait for the next callback.
593 cx.buf = Some(unsafe { *buf });
597 unsafe { assert_eq!(uvll::uv_udp_recv_stop(handle), 0) }
598 let addr = if addr == ptr::null() {
601 let len = mem::size_of::<libc::sockaddr_storage>();
602 Some(sockaddr_to_addr(unsafe { mem::transmute(addr) }, len))
604 cx.result = Some((nread, addr));
605 wakeup(&mut cx.task);
609 fn sendto(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> Result<(), IoError> {
610 let m = self.fire_homing_missile();
611 let loop_ = self.uv_loop();
612 let guard = try!(self.write_access.grant(m));
614 let mut req = Request::new(uvll::UV_UDP_SEND);
615 let (addr, _len) = addr_to_sockaddr(dst);
616 let addr_p = &addr as *_ as *libc::sockaddr;
618 // see comments in StreamWatcher::write for why we may allocate a buffer
620 let data = if guard.can_timeout {Some(Vec::from_slice(buf))} else {None};
621 let uv_buf = if guard.can_timeout {
622 slice_to_uv_buf(data.get_ref().as_slice())
627 return match unsafe {
628 uvll::uv_udp_send(req.handle, self.handle, [uv_buf], addr_p, send_cb)
631 req.defuse(); // uv callback now owns this request
632 let mut cx = UdpSendCtx {
633 result: uvll::ECANCELED, data: data, udp: self as *mut _
635 wait_until_woken_after(&mut self.blocked_sender, &loop_, || {
639 if cx.result != uvll::ECANCELED {
640 return match cx.result {
642 n => Err(uv_error_to_io_error(UvError(n)))
645 let new_cx = box UdpSendCtx {
647 udp: 0 as *mut UdpWatcher,
648 data: cx.data.take(),
651 req.set_data(&*new_cx);
654 Err(uv_error_to_io_error(UvError(cx.result)))
656 n => Err(uv_error_to_io_error(UvError(n)))
659 // This function is the same as stream::write_cb, but adapted for udp
660 // instead of streams.
661 extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
662 let req = Request::wrap(req);
663 let cx: &mut UdpSendCtx = unsafe { req.get_data() };
666 if cx.udp as uint != 0 {
667 let udp: &mut UdpWatcher = unsafe { &mut *cx.udp };
668 wakeup(&mut udp.blocked_sender);
670 let _cx: Box<UdpSendCtx> = unsafe { mem::transmute(cx) };
675 fn join_multicast(&mut self, multi: rtio::IpAddr) -> Result<(), IoError> {
676 let _m = self.fire_homing_missile();
677 status_to_io_result(unsafe {
678 multi.to_str().with_c_str(|m_addr| {
679 uvll::uv_udp_set_membership(self.handle,
686 fn leave_multicast(&mut self, multi: rtio::IpAddr) -> Result<(), IoError> {
687 let _m = self.fire_homing_missile();
688 status_to_io_result(unsafe {
689 multi.to_str().with_c_str(|m_addr| {
690 uvll::uv_udp_set_membership(self.handle,
692 uvll::UV_LEAVE_GROUP)
697 fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
698 let _m = self.fire_homing_missile();
699 status_to_io_result(unsafe {
700 uvll::uv_udp_set_multicast_loop(self.handle,
705 fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
706 let _m = self.fire_homing_missile();
707 status_to_io_result(unsafe {
708 uvll::uv_udp_set_multicast_loop(self.handle,
713 fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
714 let _m = self.fire_homing_missile();
715 status_to_io_result(unsafe {
716 uvll::uv_udp_set_multicast_ttl(self.handle,
721 fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
722 let _m = self.fire_homing_missile();
723 status_to_io_result(unsafe {
724 uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
728 fn hear_broadcasts(&mut self) -> Result<(), IoError> {
729 let _m = self.fire_homing_missile();
730 status_to_io_result(unsafe {
731 uvll::uv_udp_set_broadcast(self.handle,
736 fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
737 let _m = self.fire_homing_missile();
738 status_to_io_result(unsafe {
739 uvll::uv_udp_set_broadcast(self.handle,
744 fn clone(&self) -> Box<rtio::RtioUdpSocket + Send> {
747 home: self.home.clone(),
748 refcount: self.refcount.clone(),
749 write_access: self.write_access.clone(),
750 read_access: self.read_access.clone(),
751 blocked_sender: None,
752 } as Box<rtio::RtioUdpSocket + Send>
755 fn set_timeout(&mut self, timeout: Option<u64>) {
756 self.set_read_timeout(timeout);
757 self.set_write_timeout(timeout);
760 fn set_read_timeout(&mut self, ms: Option<u64>) {
761 let _m = self.fire_homing_missile();
762 let loop_ = self.uv_loop();
763 self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
764 self.handle as uint);
766 fn cancel_read(stream: uint) -> Option<BlockedTask> {
767 // This method is quite similar to StreamWatcher::cancel_read, see
768 // there for more information
769 let handle = stream as *uvll::uv_udp_t;
770 assert_eq!(unsafe { uvll::uv_udp_recv_stop(handle) }, 0);
772 let data = uvll::get_data_for_uv_handle(handle);
773 if data.is_null() { return None }
774 uvll::set_data_for_uv_handle(handle, 0 as *int);
775 &mut *(data as *mut UdpRecvCtx)
777 data.result = Some((uvll::ECANCELED as ssize_t, None));
782 fn set_write_timeout(&mut self, ms: Option<u64>) {
783 let _m = self.fire_homing_missile();
784 let loop_ = self.uv_loop();
785 self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
786 self as *mut _ as uint);
788 fn cancel_write(stream: uint) -> Option<BlockedTask> {
789 let stream: &mut UdpWatcher = unsafe { mem::transmute(stream) };
790 stream.blocked_sender.take()
795 impl Drop for UdpWatcher {
797 // Send ourselves home to close this handle (blocking while doing so).
798 let _m = self.fire_homing_missile();
799 if self.refcount.decrement() {
805 ////////////////////////////////////////////////////////////////////////////////
807 ////////////////////////////////////////////////////////////////////////////////
809 pub fn shutdown(handle: *uvll::uv_stream_t, loop_: &Loop) -> Result<(), IoError> {
811 slot: Option<BlockedTask>,
814 let mut req = Request::new(uvll::UV_SHUTDOWN);
816 return match unsafe { uvll::uv_shutdown(req.handle, handle, shutdown_cb) } {
818 req.defuse(); // uv callback now owns this request
819 let mut cx = Ctx { slot: None, status: 0 };
821 wait_until_woken_after(&mut cx.slot, loop_, || {
825 status_to_io_result(cx.status)
827 n => Err(uv_error_to_io_error(UvError(n)))
830 extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
831 let req = Request::wrap(req);
832 assert!(status != uvll::ECANCELED);
833 let cx: &mut Ctx = unsafe { req.get_data() };
835 wakeup(&mut cx.slot);
841 use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
844 use super::{UdpWatcher, TcpWatcher, TcpListener};
845 use super::super::local_loop;
848 fn connect_close_ip4() {
849 match TcpWatcher::connect(local_loop(), ::next_test_ip4(), None) {
851 Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_string()),
856 fn connect_close_ip6() {
857 match TcpWatcher::connect(local_loop(), ::next_test_ip6(), None) {
859 Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_string()),
864 fn udp_bind_close_ip4() {
865 match UdpWatcher::bind(local_loop(), ::next_test_ip4()) {
872 fn udp_bind_close_ip6() {
873 match UdpWatcher::bind(local_loop(), ::next_test_ip6()) {
881 let (tx, rx) = channel();
882 let addr = ::next_test_ip4();
885 let w = match TcpListener::bind(local_loop(), addr) {
886 Ok(w) => w, Err(e) => fail!("{:?}", e)
888 let mut w = match w.listen() {
889 Ok(w) => w, Err(e) => fail!("{:?}", e),
894 let mut buf = [0u8, ..10];
895 match stream.read(buf) {
896 Ok(10) => {} e => fail!("{:?}", e),
898 for i in range(0, 10u8) {
899 assert_eq!(buf[i as uint], i + 1);
902 Err(e) => fail!("{:?}", e)
907 let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
908 Ok(w) => w, Err(e) => fail!("{:?}", e)
910 match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
911 Ok(()) => {}, Err(e) => fail!("{:?}", e)
917 let (tx, rx) = channel();
918 let addr = ::next_test_ip6();
921 let w = match TcpListener::bind(local_loop(), addr) {
922 Ok(w) => w, Err(e) => fail!("{:?}", e)
924 let mut w = match w.listen() {
925 Ok(w) => w, Err(e) => fail!("{:?}", e),
930 let mut buf = [0u8, ..10];
931 match stream.read(buf) {
932 Ok(10) => {} e => fail!("{:?}", e),
934 for i in range(0, 10u8) {
935 assert_eq!(buf[i as uint], i + 1);
938 Err(e) => fail!("{:?}", e)
943 let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
944 Ok(w) => w, Err(e) => fail!("{:?}", e)
946 match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
947 Ok(()) => {}, Err(e) => fail!("{:?}", e)
953 let (tx, rx) = channel();
954 let client = ::next_test_ip4();
955 let server = ::next_test_ip4();
958 match UdpWatcher::bind(local_loop(), server) {
961 let mut buf = [0u8, ..10];
962 match w.recvfrom(buf) {
963 Ok((10, addr)) => assert!(addr == client),
964 e => fail!("{:?}", e),
966 for i in range(0, 10u8) {
967 assert_eq!(buf[i as uint], i + 1);
970 Err(e) => fail!("{:?}", e)
975 let mut w = match UdpWatcher::bind(local_loop(), client) {
976 Ok(w) => w, Err(e) => fail!("{:?}", e)
978 match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
979 Ok(()) => {}, Err(e) => fail!("{:?}", e)
985 let (tx, rx) = channel();
986 let client = ::next_test_ip6();
987 let server = ::next_test_ip6();
990 match UdpWatcher::bind(local_loop(), server) {
993 let mut buf = [0u8, ..10];
994 match w.recvfrom(buf) {
995 Ok((10, addr)) => assert!(addr == client),
996 e => fail!("{:?}", e),
998 for i in range(0, 10u8) {
999 assert_eq!(buf[i as uint], i + 1);
1002 Err(e) => fail!("{:?}", e)
1007 let mut w = match UdpWatcher::bind(local_loop(), client) {
1008 Ok(w) => w, Err(e) => fail!("{:?}", e)
1010 match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
1011 Ok(()) => {}, Err(e) => fail!("{:?}", e)
1016 fn test_read_read_read() {
1017 let addr = ::next_test_ip4();
1018 static MAX: uint = 5000;
1019 let (tx, rx) = channel();
1022 let listener = TcpListener::bind(local_loop(), addr).unwrap();
1023 let mut acceptor = listener.listen().ok().unwrap();
1025 let mut stream = acceptor.accept().ok().unwrap();
1026 let buf = [1, .. 2048];
1027 let mut total_bytes_written = 0;
1028 while total_bytes_written < MAX {
1029 assert!(stream.write(buf).is_ok());
1030 uvdebug!("wrote bytes");
1031 total_bytes_written += buf.len();
1036 let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1037 let mut buf = [0, .. 2048];
1038 let mut total_bytes_read = 0;
1039 while total_bytes_read < MAX {
1040 let nread = stream.read(buf).ok().unwrap();
1041 total_bytes_read += nread;
1042 for i in range(0u, nread) {
1043 assert_eq!(buf[i], 1);
1046 uvdebug!("read {} bytes total", total_bytes_read);
1050 #[ignore(cfg(windows))] // FIXME(#10102) server never sees second packet
1051 fn test_udp_twice() {
1052 let server_addr = ::next_test_ip4();
1053 let client_addr = ::next_test_ip4();
1054 let (tx, rx) = channel();
1057 let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
1059 assert!(client.sendto([1], server_addr).is_ok());
1060 assert!(client.sendto([2], server_addr).is_ok());
1063 let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
1067 let (nread1, src1) = server.recvfrom(buf1).ok().unwrap();
1068 let (nread2, src2) = server.recvfrom(buf2).ok().unwrap();
1069 assert_eq!(nread1, 1);
1070 assert_eq!(nread2, 1);
1071 assert!(src1 == client_addr);
1072 assert!(src2 == client_addr);
1073 assert_eq!(buf1[0], 1);
1074 assert_eq!(buf2[0], 2);
1078 fn test_udp_many_read() {
1079 let server_out_addr = ::next_test_ip4();
1080 let server_in_addr = ::next_test_ip4();
1081 let client_out_addr = ::next_test_ip4();
1082 let client_in_addr = ::next_test_ip4();
1083 static MAX: uint = 500_000;
1085 let (tx1, rx1) = channel::<()>();
1086 let (tx2, rx2) = channel::<()>();
1089 let l = local_loop();
1090 let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
1091 let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
1092 let (tx, rx) = (tx2, rx1);
1095 let msg = [1, .. 2048];
1096 let mut total_bytes_sent = 0;
1100 assert!(server_out.sendto(msg, client_in_addr).is_ok());
1101 total_bytes_sent += msg.len();
1102 // check if the client has received enough
1103 let res = server_in.recvfrom(buf);
1104 assert!(res.is_ok());
1105 let (nread, src) = res.ok().unwrap();
1106 assert_eq!(nread, 1);
1107 assert!(src == client_out_addr);
1109 assert!(total_bytes_sent >= MAX);
1112 let l = local_loop();
1113 let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
1114 let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
1115 let (tx, rx) = (tx1, rx2);
1118 let mut total_bytes_recv = 0;
1119 let mut buf = [0, .. 2048];
1120 while total_bytes_recv < MAX {
1122 assert!(client_out.sendto([1], server_in_addr).is_ok());
1124 let res = client_in.recvfrom(buf);
1125 assert!(res.is_ok());
1126 let (nread, src) = res.ok().unwrap();
1127 assert!(src == server_out_addr);
1128 total_bytes_recv += nread;
1129 for i in range(0u, nread) {
1130 assert_eq!(buf[i], 1);
1133 // tell the server we're done
1134 assert!(client_out.sendto([0], server_in_addr).is_ok());
1138 fn test_read_and_block() {
1139 let addr = ::next_test_ip4();
1140 let (tx, rx) = channel::<Receiver<()>>();
1144 let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1145 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1146 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1148 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1149 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1153 let listener = TcpListener::bind(local_loop(), addr).unwrap();
1154 let mut acceptor = listener.listen().ok().unwrap();
1155 let (tx2, rx2) = channel();
1157 let mut stream = acceptor.accept().ok().unwrap();
1158 let mut buf = [0, .. 2048];
1161 let mut current = 0;
1164 while current < expected {
1165 let nread = stream.read(buf).ok().unwrap();
1166 for i in range(0u, nread) {
1167 let val = buf[i] as uint;
1168 assert_eq!(val, current % 8);
1173 let _ = tx2.send_opt(());
1176 // Make sure we had multiple reads
1181 fn test_simple_tcp_server_and_client_on_diff_threads() {
1182 let addr = ::next_test_ip4();
1185 let listener = TcpListener::bind(local_loop(), addr).unwrap();
1186 let mut acceptor = listener.listen().ok().unwrap();
1187 let mut stream = acceptor.accept().ok().unwrap();
1188 let mut buf = [0, .. 2048];
1189 let nread = stream.read(buf).ok().unwrap();
1190 assert_eq!(nread, 8);
1191 for i in range(0u, nread) {
1192 assert_eq!(buf[i], i as u8);
1196 let mut stream = TcpWatcher::connect(local_loop(), addr, None);
1197 while stream.is_err() {
1198 stream = TcpWatcher::connect(local_loop(), addr, None);
1200 stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1203 #[should_fail] #[test]
1204 fn tcp_listener_fail_cleanup() {
1205 let addr = ::next_test_ip4();
1206 let w = TcpListener::bind(local_loop(), addr).unwrap();
1207 let _w = w.listen().ok().unwrap();
1211 #[should_fail] #[test]
1212 fn tcp_stream_fail_cleanup() {
1213 let (tx, rx) = channel();
1214 let addr = ::next_test_ip4();
1217 let w = TcpListener::bind(local_loop(), addr).unwrap();
1218 let mut w = w.listen().ok().unwrap();
1220 drop(w.accept().ok().unwrap());
1223 let _w = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1227 #[should_fail] #[test]
1228 fn udp_listener_fail_cleanup() {
1229 let addr = ::next_test_ip4();
1230 let _w = UdpWatcher::bind(local_loop(), addr).unwrap();
1234 #[should_fail] #[test]
1235 fn udp_fail_other_task() {
1236 let addr = ::next_test_ip4();
1237 let (tx, rx) = channel();
1239 // force the handle to be created on a different scheduler, failure in
1240 // the original task will force a homing operation back to this
1243 let w = UdpWatcher::bind(local_loop(), addr).unwrap();