1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
12 use std::io::{IoError, IoResult};
14 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
19 use std::rt::task::BlockedTask;
22 use homing::{HomingIO, HomeHandle};
24 use stream::StreamWatcher;
25 use super::{Loop, Request, UvError, Buf, status_to_io_result,
26 uv_error_to_io_error, UvHandle, slice_to_uv_buf,
27 wait_until_woken_after, wakeup};
28 use timer::TimerWatcher;
29 use uvio::UvIoFactory;
32 ////////////////////////////////////////////////////////////////////////////////
33 /// Generic functions related to dealing with sockaddr things
34 ////////////////////////////////////////////////////////////////////////////////
36 pub fn htons(u: u16) -> u16 { mem::to_be16(u) }
37 pub fn ntohs(u: u16) -> u16 { mem::from_be16(u) }
39 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
40 len: uint) -> ip::SocketAddr {
41 match storage.ss_family as c_int {
43 assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
44 let storage: &libc::sockaddr_in = unsafe {
45 cast::transmute(storage)
47 let addr = storage.sin_addr.s_addr as u32;
48 let a = (addr >> 0) as u8;
49 let b = (addr >> 8) as u8;
50 let c = (addr >> 16) as u8;
51 let d = (addr >> 24) as u8;
53 ip: ip::Ipv4Addr(a, b, c, d),
54 port: ntohs(storage.sin_port),
58 assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
59 let storage: &libc::sockaddr_in6 = unsafe {
60 cast::transmute(storage)
62 let a = ntohs(storage.sin6_addr.s6_addr[0]);
63 let b = ntohs(storage.sin6_addr.s6_addr[1]);
64 let c = ntohs(storage.sin6_addr.s6_addr[2]);
65 let d = ntohs(storage.sin6_addr.s6_addr[3]);
66 let e = ntohs(storage.sin6_addr.s6_addr[4]);
67 let f = ntohs(storage.sin6_addr.s6_addr[5]);
68 let g = ntohs(storage.sin6_addr.s6_addr[6]);
69 let h = ntohs(storage.sin6_addr.s6_addr[7]);
71 ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
72 port: ntohs(storage.sin6_port),
76 fail!("unknown family {}", n);
81 fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
83 let mut storage: libc::sockaddr_storage = mem::init();
84 let len = match addr.ip {
85 ip::Ipv4Addr(a, b, c, d) => {
86 let storage: &mut libc::sockaddr_in =
87 cast::transmute(&mut storage);
88 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
89 (*storage).sin_port = htons(addr.port);
90 (*storage).sin_addr = libc::in_addr {
91 s_addr: (d as u32 << 24) |
96 mem::size_of::<libc::sockaddr_in>()
98 ip::Ipv6Addr(a, b, c, d, e, f, g, h) => {
99 let storage: &mut libc::sockaddr_in6 =
100 cast::transmute(&mut storage);
101 storage.sin6_family = libc::AF_INET6 as libc::sa_family_t;
102 storage.sin6_port = htons(addr.port);
103 storage.sin6_addr = libc::in6_addr {
115 mem::size_of::<libc::sockaddr_in6>()
118 return (storage, len);
122 enum SocketNameKind {
128 fn socket_name(sk: SocketNameKind,
129 handle: *c_void) -> Result<ip::SocketAddr, IoError> {
130 let getsockname = match sk {
131 TcpPeer => uvll::uv_tcp_getpeername,
132 Tcp => uvll::uv_tcp_getsockname,
133 Udp => uvll::uv_udp_getsockname,
136 // Allocate a sockaddr_storage since we don't know if it's ipv4 or ipv6
137 let mut sockaddr: libc::sockaddr_storage = unsafe { mem::init() };
138 let mut namelen = mem::size_of::<libc::sockaddr_storage>() as c_int;
140 let sockaddr_p = &mut sockaddr as *mut libc::sockaddr_storage;
142 getsockname(handle, sockaddr_p as *mut libc::sockaddr, &mut namelen)
144 0 => Ok(sockaddr_to_addr(&sockaddr, namelen as uint)),
145 n => Err(uv_error_to_io_error(UvError(n)))
148 ////////////////////////////////////////////////////////////////////////////////
149 // Helpers for handling timeouts, shared for pipes/tcp
150 ////////////////////////////////////////////////////////////////////////////////
152 pub struct ConnectCtx {
154 pub task: Option<BlockedTask>,
155 pub timer: Option<~TimerWatcher>,
158 pub struct AcceptTimeout {
159 timer: Option<TimerWatcher>,
160 timeout_tx: Option<Sender<()>>,
161 timeout_rx: Option<Receiver<()>>,
166 mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
167 f: |&Request, &T, uvll::uv_connect_cb| -> libc::c_int
168 ) -> Result<T, UvError> {
169 let mut req = Request::new(uvll::UV_CONNECT);
170 let r = f(&req, &obj, connect_cb);
173 req.defuse(); // uv callback now owns this request
176 let mut timer = TimerWatcher::new(io);
177 timer.start(timer_cb, t, 0);
178 self.timer = Some(timer);
182 wait_until_woken_after(&mut self.task, &io.loop_, || {
183 let data = &self as *_;
185 Some(ref mut timer) => unsafe { timer.set_data(data) },
190 // Make sure an erroneously fired callback doesn't have access
191 // to the context any more.
192 req.set_data(0 as *int);
194 // If we failed because of a timeout, drop the TcpWatcher as
195 // soon as possible because it's data is now set to null and we
196 // want to cancel the callback ASAP.
199 n => { drop(obj); Err(UvError(n)) }
205 extern fn timer_cb(handle: *uvll::uv_timer_t) {
206 // Don't close the corresponding tcp request, just wake up the task
207 // and let RAII take care of the pending watcher.
208 let cx: &mut ConnectCtx = unsafe {
209 &mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx)
211 cx.status = uvll::ECANCELED;
212 wakeup(&mut cx.task);
215 extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
216 // This callback can be invoked with ECANCELED if the watcher is
217 // closed by the timeout callback. In that case we just want to free
218 // the request and be along our merry way.
219 let req = Request::wrap(req);
220 if status == uvll::ECANCELED { return }
222 // Apparently on windows when the handle is closed this callback may
223 // not be invoked with ECANCELED but rather another error code.
224 // Either ways, if the data is null, then our timeout has expired
225 // and there's nothing we can do.
226 let data = unsafe { uvll::get_data_for_req(req.handle) };
227 if data.is_null() { return }
229 let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) };
232 Some(ref mut t) => t.stop(),
235 // Note that the timer callback doesn't cancel the connect request
236 // (that's the job of uv_close()), so it's possible for this
237 // callback to get triggered after the timeout callback fires, but
238 // before the task wakes up. In that case, we did indeed
239 // successfully connect, but we don't need to wake someone up. We
240 // updated the status above (correctly so), and the task will pick
241 // up on this when it wakes up.
242 if cx.task.is_some() {
243 wakeup(&mut cx.task);
250 pub fn new() -> AcceptTimeout {
251 AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
254 pub fn accept<T: Send>(&mut self, c: &Receiver<IoResult<T>>) -> IoResult<T> {
255 match self.timeout_rx {
258 use std::comm::Select;
260 // Poll the incoming channel first (don't rely on the order of
261 // select just yet). If someone's pending then we should return
264 Ok(data) => return data,
268 // Use select to figure out which channel gets ready first. We
269 // do some custom handling of select to ensure that we never
270 // actually drain the timeout channel (we'll keep seeing the
271 // timeout message in the future).
272 let s = Select::new();
273 let mut timeout = s.handle(rx);
274 let mut data = s.handle(c);
279 if s.wait() == timeout.id() {
280 Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
288 pub fn clear(&mut self) {
289 // Clear any previous timeout by dropping the timer and transmission
291 drop((self.timer.take(),
292 self.timeout_tx.take(),
293 self.timeout_rx.take()))
296 pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
297 &mut self, ms: u64, t: &mut T
299 // If we have a timeout, lazily initialize the timer which will be used
300 // to fire when the timeout runs out.
301 if self.timer.is_none() {
302 let _m = t.fire_homing_missile();
303 let loop_ = Loop::wrap(unsafe {
304 uvll::get_loop_for_uv_handle(t.uv_handle())
306 let mut timer = TimerWatcher::new_home(&loop_, t.home().clone());
308 timer.set_data(self as *mut _ as *AcceptTimeout);
310 self.timer = Some(timer);
313 // Once we've got a timer, stop any previous timeout, reset it for the
314 // current one, and install some new channels to send/receive data on
315 let timer = self.timer.get_mut_ref();
317 timer.start(timer_cb, ms, 0);
318 let (tx, rx) = channel();
319 self.timeout_tx = Some(tx);
320 self.timeout_rx = Some(rx);
322 extern fn timer_cb(timer: *uvll::uv_timer_t) {
323 let acceptor: &mut AcceptTimeout = unsafe {
324 &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
326 // This send can never fail because if this timer is active then the
327 // receiving channel is guaranteed to be alive
328 acceptor.timeout_tx.get_ref().send(());
333 ////////////////////////////////////////////////////////////////////////////////
334 /// TCP implementation
335 ////////////////////////////////////////////////////////////////////////////////
337 pub struct TcpWatcher {
338 handle: *uvll::uv_tcp_t,
339 stream: StreamWatcher,
343 // libuv can't support concurrent reads and concurrent writes of the same
344 // stream object, so we use these access guards in order to arbitrate among
345 // multiple concurrent reads and writes. Note that libuv *can* read and
346 // write simultaneously, it just can't read and read simultaneously.
348 write_access: Access,
351 pub struct TcpListener {
353 handle: *uvll::uv_pipe_t,
354 closing_task: Option<BlockedTask>,
355 outgoing: Sender<Result<~rtio::RtioTcpStream:Send, IoError>>,
356 incoming: Receiver<Result<~rtio::RtioTcpStream:Send, IoError>>,
359 pub struct TcpAcceptor {
360 listener: ~TcpListener,
361 timeout: AcceptTimeout,
364 // TCP watchers (clients/streams)
367 pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
368 let handle = io.make_handle();
369 TcpWatcher::new_home(&io.loop_, handle)
372 fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
373 let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
375 uvll::uv_tcp_init(loop_.handle, handle)
380 stream: StreamWatcher::new(handle),
381 refcount: Refcount::new(),
382 read_access: Access::new(),
383 write_access: Access::new(),
387 pub fn connect(io: &mut UvIoFactory,
388 address: ip::SocketAddr,
389 timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
390 let tcp = TcpWatcher::new(io);
391 let cx = ConnectCtx { status: -1, task: None, timer: None };
392 let (addr, _len) = addr_to_sockaddr(address);
393 let addr_p = &addr as *_ as *libc::sockaddr;
394 cx.connect(tcp, timeout, io, |req, tcp, cb| {
395 unsafe { uvll::uv_tcp_connect(req.handle, tcp.handle, addr_p, cb) }
400 impl HomingIO for TcpWatcher {
401 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
404 impl rtio::RtioSocket for TcpWatcher {
405 fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
406 let _m = self.fire_homing_missile();
407 socket_name(Tcp, self.handle)
411 impl rtio::RtioTcpStream for TcpWatcher {
412 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
413 let m = self.fire_homing_missile();
414 let _g = self.read_access.grant(m);
415 self.stream.read(buf).map_err(uv_error_to_io_error)
418 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
419 let m = self.fire_homing_missile();
420 let _g = self.write_access.grant(m);
421 self.stream.write(buf).map_err(uv_error_to_io_error)
424 fn peer_name(&mut self) -> Result<ip::SocketAddr, IoError> {
425 let _m = self.fire_homing_missile();
426 socket_name(TcpPeer, self.handle)
429 fn control_congestion(&mut self) -> Result<(), IoError> {
430 let _m = self.fire_homing_missile();
431 status_to_io_result(unsafe {
432 uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
436 fn nodelay(&mut self) -> Result<(), IoError> {
437 let _m = self.fire_homing_missile();
438 status_to_io_result(unsafe {
439 uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
443 fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
444 let _m = self.fire_homing_missile();
445 status_to_io_result(unsafe {
446 uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
447 delay_in_seconds as c_uint)
451 fn letdie(&mut self) -> Result<(), IoError> {
452 let _m = self.fire_homing_missile();
453 status_to_io_result(unsafe {
454 uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
458 fn clone(&self) -> ~rtio::RtioTcpStream:Send {
461 stream: StreamWatcher::new(self.handle),
462 home: self.home.clone(),
463 refcount: self.refcount.clone(),
464 write_access: self.write_access.clone(),
465 read_access: self.read_access.clone(),
466 } as ~rtio::RtioTcpStream:Send
469 fn close_write(&mut self) -> Result<(), IoError> {
471 slot: Option<BlockedTask>,
474 let mut req = Request::new(uvll::UV_SHUTDOWN);
476 return match unsafe {
477 uvll::uv_shutdown(req.handle, self.handle, shutdown_cb)
480 req.defuse(); // uv callback now owns this request
481 let mut cx = Ctx { slot: None, status: 0 };
483 wait_until_woken_after(&mut cx.slot, &self.uv_loop(), || {
487 status_to_io_result(cx.status)
489 n => Err(uv_error_to_io_error(UvError(n)))
492 extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
493 let req = Request::wrap(req);
494 assert!(status != uvll::ECANCELED);
495 let cx: &mut Ctx = unsafe { req.get_data() };
497 wakeup(&mut cx.slot);
502 impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
503 fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
506 impl Drop for TcpWatcher {
508 let _m = self.fire_homing_missile();
509 if self.refcount.decrement() {
515 // TCP listeners (unbound servers)
518 pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
519 -> Result<~TcpListener, UvError> {
520 let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
522 uvll::uv_tcp_init(io.uv_loop(), handle)
524 let (tx, rx) = channel();
525 let l = box TcpListener {
526 home: io.make_handle(),
532 let (addr, _len) = addr_to_sockaddr(address);
534 let addr_p = &addr as *libc::sockaddr_storage;
535 uvll::uv_tcp_bind(l.handle, addr_p as *libc::sockaddr)
538 0 => Ok(l.install()),
544 impl HomingIO for TcpListener {
545 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
548 impl UvHandle<uvll::uv_tcp_t> for TcpListener {
549 fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
552 impl rtio::RtioSocket for TcpListener {
553 fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
554 let _m = self.fire_homing_missile();
555 socket_name(Tcp, self.handle)
559 impl rtio::RtioTcpListener for TcpListener {
560 fn listen(~self) -> Result<~rtio::RtioTcpAcceptor:Send, IoError> {
561 // create the acceptor object from ourselves
562 let mut acceptor = box TcpAcceptor {
564 timeout: AcceptTimeout::new(),
567 let _m = acceptor.fire_homing_missile();
568 // FIXME: the 128 backlog should be configurable
569 match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
570 0 => Ok(acceptor as ~rtio::RtioTcpAcceptor:Send),
571 n => Err(uv_error_to_io_error(UvError(n))),
576 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
577 assert!(status != uvll::ECANCELED);
578 let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
579 let msg = match status {
581 let loop_ = Loop::wrap(unsafe {
582 uvll::get_loop_for_uv_handle(server)
584 let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
585 assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
586 Ok(box client as ~rtio::RtioTcpStream:Send)
588 n => Err(uv_error_to_io_error(UvError(n)))
590 tcp.outgoing.send(msg);
593 impl Drop for TcpListener {
595 let _m = self.fire_homing_missile();
600 // TCP acceptors (bound servers)
602 impl HomingIO for TcpAcceptor {
603 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
606 impl rtio::RtioSocket for TcpAcceptor {
607 fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
608 let _m = self.fire_homing_missile();
609 socket_name(Tcp, self.listener.handle)
613 impl rtio::RtioTcpAcceptor for TcpAcceptor {
614 fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> {
615 self.timeout.accept(&self.listener.incoming)
618 fn accept_simultaneously(&mut self) -> Result<(), IoError> {
619 let _m = self.fire_homing_missile();
620 status_to_io_result(unsafe {
621 uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
625 fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
626 let _m = self.fire_homing_missile();
627 status_to_io_result(unsafe {
628 uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
632 fn set_timeout(&mut self, ms: Option<u64>) {
634 None => self.timeout.clear(),
635 Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
640 ////////////////////////////////////////////////////////////////////////////////
641 /// UDP implementation
642 ////////////////////////////////////////////////////////////////////////////////
644 pub struct UdpWatcher {
645 handle: *uvll::uv_udp_t,
648 // See above for what these fields are
651 write_access: Access,
655 pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
656 -> Result<UdpWatcher, UvError> {
657 let udp = UdpWatcher {
658 handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
659 home: io.make_handle(),
660 refcount: Refcount::new(),
661 read_access: Access::new(),
662 write_access: Access::new(),
665 uvll::uv_udp_init(io.uv_loop(), udp.handle)
667 let (addr, _len) = addr_to_sockaddr(address);
668 let result = unsafe {
669 let addr_p = &addr as *libc::sockaddr_storage;
670 uvll::uv_udp_bind(udp.handle, addr_p as *libc::sockaddr, 0u32)
672 return match result {
674 n => Err(UvError(n)),
679 impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
680 fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
683 impl HomingIO for UdpWatcher {
684 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
687 impl rtio::RtioSocket for UdpWatcher {
688 fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
689 let _m = self.fire_homing_missile();
690 socket_name(Udp, self.handle)
694 impl rtio::RtioUdpSocket for UdpWatcher {
695 fn recvfrom(&mut self, buf: &mut [u8])
696 -> Result<(uint, ip::SocketAddr), IoError>
699 task: Option<BlockedTask>,
701 result: Option<(ssize_t, Option<ip::SocketAddr>)>,
703 let loop_ = self.uv_loop();
704 let m = self.fire_homing_missile();
705 let _g = self.read_access.grant(m);
707 let a = match unsafe {
708 uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
713 buf: Some(slice_to_uv_buf(buf)),
716 let handle = self.handle;
717 wait_until_woken_after(&mut cx.task, &loop_, || {
718 unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
720 match cx.result.take_unwrap() {
722 Err(uv_error_to_io_error(UvError(n as c_int))),
723 (n, addr) => Ok((n as uint, addr.unwrap()))
726 n => Err(uv_error_to_io_error(UvError(n)))
730 extern fn alloc_cb(handle: *uvll::uv_udp_t,
731 _suggested_size: size_t,
735 cast::transmute(uvll::get_data_for_uv_handle(handle));
736 *buf = cx.buf.take().expect("recv alloc_cb called more than once")
740 extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
741 addr: *libc::sockaddr, _flags: c_uint) {
742 assert!(nread != uvll::ECANCELED as ssize_t);
743 let cx: &mut Ctx = unsafe {
744 cast::transmute(uvll::get_data_for_uv_handle(handle))
747 // When there's no data to read the recv callback can be a no-op.
748 // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
749 // this we just drop back to kqueue and wait for the next callback.
751 cx.buf = Some(unsafe { *buf });
756 assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
759 let cx: &mut Ctx = unsafe {
760 cast::transmute(uvll::get_data_for_uv_handle(handle))
762 let addr = if addr == ptr::null() {
765 let len = mem::size_of::<libc::sockaddr_storage>();
766 Some(sockaddr_to_addr(unsafe { cast::transmute(addr) }, len))
768 cx.result = Some((nread, addr));
769 wakeup(&mut cx.task);
773 fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> {
774 struct Ctx { task: Option<BlockedTask>, result: c_int }
776 let m = self.fire_homing_missile();
777 let loop_ = self.uv_loop();
778 let _g = self.write_access.grant(m);
780 let mut req = Request::new(uvll::UV_UDP_SEND);
781 let buf = slice_to_uv_buf(buf);
782 let (addr, _len) = addr_to_sockaddr(dst);
783 let result = unsafe {
784 let addr_p = &addr as *libc::sockaddr_storage;
785 uvll::uv_udp_send(req.handle, self.handle, [buf],
786 addr_p as *libc::sockaddr, send_cb)
789 return match result {
791 req.defuse(); // uv callback now owns this request
792 let mut cx = Ctx { task: None, result: 0 };
793 wait_until_woken_after(&mut cx.task, &loop_, || {
798 n => Err(uv_error_to_io_error(UvError(n)))
801 n => Err(uv_error_to_io_error(UvError(n)))
804 extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
805 let req = Request::wrap(req);
806 assert!(status != uvll::ECANCELED);
807 let cx: &mut Ctx = unsafe { req.get_data() };
809 wakeup(&mut cx.task);
813 fn join_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
814 let _m = self.fire_homing_missile();
815 status_to_io_result(unsafe {
816 multi.to_str().with_c_str(|m_addr| {
817 uvll::uv_udp_set_membership(self.handle,
824 fn leave_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
825 let _m = self.fire_homing_missile();
826 status_to_io_result(unsafe {
827 multi.to_str().with_c_str(|m_addr| {
828 uvll::uv_udp_set_membership(self.handle,
830 uvll::UV_LEAVE_GROUP)
835 fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
836 let _m = self.fire_homing_missile();
837 status_to_io_result(unsafe {
838 uvll::uv_udp_set_multicast_loop(self.handle,
843 fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
844 let _m = self.fire_homing_missile();
845 status_to_io_result(unsafe {
846 uvll::uv_udp_set_multicast_loop(self.handle,
851 fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
852 let _m = self.fire_homing_missile();
853 status_to_io_result(unsafe {
854 uvll::uv_udp_set_multicast_ttl(self.handle,
859 fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
860 let _m = self.fire_homing_missile();
861 status_to_io_result(unsafe {
862 uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
866 fn hear_broadcasts(&mut self) -> Result<(), IoError> {
867 let _m = self.fire_homing_missile();
868 status_to_io_result(unsafe {
869 uvll::uv_udp_set_broadcast(self.handle,
874 fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
875 let _m = self.fire_homing_missile();
876 status_to_io_result(unsafe {
877 uvll::uv_udp_set_broadcast(self.handle,
882 fn clone(&self) -> ~rtio::RtioUdpSocket:Send {
885 home: self.home.clone(),
886 refcount: self.refcount.clone(),
887 write_access: self.write_access.clone(),
888 read_access: self.read_access.clone(),
889 } as ~rtio::RtioUdpSocket:Send
893 impl Drop for UdpWatcher {
895 // Send ourselves home to close this handle (blocking while doing so).
896 let _m = self.fire_homing_missile();
897 if self.refcount.decrement() {
905 use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
907 use std::io::test::{next_test_ip4, next_test_ip6};
909 use super::{UdpWatcher, TcpWatcher, TcpListener};
910 use super::super::local_loop;
913 fn connect_close_ip4() {
914 match TcpWatcher::connect(local_loop(), next_test_ip4(), None) {
916 Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
921 fn connect_close_ip6() {
922 match TcpWatcher::connect(local_loop(), next_test_ip6(), None) {
924 Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
929 fn udp_bind_close_ip4() {
930 match UdpWatcher::bind(local_loop(), next_test_ip4()) {
937 fn udp_bind_close_ip6() {
938 match UdpWatcher::bind(local_loop(), next_test_ip6()) {
946 let (tx, rx) = channel();
947 let addr = next_test_ip4();
950 let w = match TcpListener::bind(local_loop(), addr) {
951 Ok(w) => w, Err(e) => fail!("{:?}", e)
953 let mut w = match w.listen() {
954 Ok(w) => w, Err(e) => fail!("{:?}", e),
959 let mut buf = [0u8, ..10];
960 match stream.read(buf) {
961 Ok(10) => {} e => fail!("{:?}", e),
963 for i in range(0, 10u8) {
964 assert_eq!(buf[i as uint], i + 1);
967 Err(e) => fail!("{:?}", e)
972 let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
973 Ok(w) => w, Err(e) => fail!("{:?}", e)
975 match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
976 Ok(()) => {}, Err(e) => fail!("{:?}", e)
982 let (tx, rx) = channel();
983 let addr = next_test_ip6();
986 let w = match TcpListener::bind(local_loop(), addr) {
987 Ok(w) => w, Err(e) => fail!("{:?}", e)
989 let mut w = match w.listen() {
990 Ok(w) => w, Err(e) => fail!("{:?}", e),
995 let mut buf = [0u8, ..10];
996 match stream.read(buf) {
997 Ok(10) => {} e => fail!("{:?}", e),
999 for i in range(0, 10u8) {
1000 assert_eq!(buf[i as uint], i + 1);
1003 Err(e) => fail!("{:?}", e)
1008 let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
1009 Ok(w) => w, Err(e) => fail!("{:?}", e)
1011 match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
1012 Ok(()) => {}, Err(e) => fail!("{:?}", e)
1018 let (tx, rx) = channel();
1019 let client = next_test_ip4();
1020 let server = next_test_ip4();
1023 match UdpWatcher::bind(local_loop(), server) {
1026 let mut buf = [0u8, ..10];
1027 match w.recvfrom(buf) {
1028 Ok((10, addr)) => assert_eq!(addr, client),
1029 e => fail!("{:?}", e),
1031 for i in range(0, 10u8) {
1032 assert_eq!(buf[i as uint], i + 1);
1035 Err(e) => fail!("{:?}", e)
1040 let mut w = match UdpWatcher::bind(local_loop(), client) {
1041 Ok(w) => w, Err(e) => fail!("{:?}", e)
1043 match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
1044 Ok(()) => {}, Err(e) => fail!("{:?}", e)
1050 let (tx, rx) = channel();
1051 let client = next_test_ip6();
1052 let server = next_test_ip6();
1055 match UdpWatcher::bind(local_loop(), server) {
1058 let mut buf = [0u8, ..10];
1059 match w.recvfrom(buf) {
1060 Ok((10, addr)) => assert_eq!(addr, client),
1061 e => fail!("{:?}", e),
1063 for i in range(0, 10u8) {
1064 assert_eq!(buf[i as uint], i + 1);
1067 Err(e) => fail!("{:?}", e)
1072 let mut w = match UdpWatcher::bind(local_loop(), client) {
1073 Ok(w) => w, Err(e) => fail!("{:?}", e)
1075 match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
1076 Ok(()) => {}, Err(e) => fail!("{:?}", e)
1081 fn test_read_read_read() {
1082 let addr = next_test_ip4();
1083 static MAX: uint = 5000;
1084 let (tx, rx) = channel();
1087 let listener = TcpListener::bind(local_loop(), addr).unwrap();
1088 let mut acceptor = listener.listen().unwrap();
1090 let mut stream = acceptor.accept().unwrap();
1091 let buf = [1, .. 2048];
1092 let mut total_bytes_written = 0;
1093 while total_bytes_written < MAX {
1094 assert!(stream.write(buf).is_ok());
1095 uvdebug!("wrote bytes");
1096 total_bytes_written += buf.len();
1101 let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1102 let mut buf = [0, .. 2048];
1103 let mut total_bytes_read = 0;
1104 while total_bytes_read < MAX {
1105 let nread = stream.read(buf).unwrap();
1106 total_bytes_read += nread;
1107 for i in range(0u, nread) {
1108 assert_eq!(buf[i], 1);
1111 uvdebug!("read {} bytes total", total_bytes_read);
1115 #[ignore(cfg(windows))] // FIXME(#10102) server never sees second packet
1116 fn test_udp_twice() {
1117 let server_addr = next_test_ip4();
1118 let client_addr = next_test_ip4();
1119 let (tx, rx) = channel();
1122 let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
1124 assert!(client.sendto([1], server_addr).is_ok());
1125 assert!(client.sendto([2], server_addr).is_ok());
1128 let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
1132 let (nread1, src1) = server.recvfrom(buf1).unwrap();
1133 let (nread2, src2) = server.recvfrom(buf2).unwrap();
1134 assert_eq!(nread1, 1);
1135 assert_eq!(nread2, 1);
1136 assert_eq!(src1, client_addr);
1137 assert_eq!(src2, client_addr);
1138 assert_eq!(buf1[0], 1);
1139 assert_eq!(buf2[0], 2);
1143 fn test_udp_many_read() {
1144 let server_out_addr = next_test_ip4();
1145 let server_in_addr = next_test_ip4();
1146 let client_out_addr = next_test_ip4();
1147 let client_in_addr = next_test_ip4();
1148 static MAX: uint = 500_000;
1150 let (tx1, rx1) = channel::<()>();
1151 let (tx2, rx2) = channel::<()>();
1154 let l = local_loop();
1155 let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
1156 let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
1157 let (tx, rx) = (tx2, rx1);
1160 let msg = [1, .. 2048];
1161 let mut total_bytes_sent = 0;
1165 assert!(server_out.sendto(msg, client_in_addr).is_ok());
1166 total_bytes_sent += msg.len();
1167 // check if the client has received enough
1168 let res = server_in.recvfrom(buf);
1169 assert!(res.is_ok());
1170 let (nread, src) = res.unwrap();
1171 assert_eq!(nread, 1);
1172 assert_eq!(src, client_out_addr);
1174 assert!(total_bytes_sent >= MAX);
1177 let l = local_loop();
1178 let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
1179 let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
1180 let (tx, rx) = (tx1, rx2);
1183 let mut total_bytes_recv = 0;
1184 let mut buf = [0, .. 2048];
1185 while total_bytes_recv < MAX {
1187 assert!(client_out.sendto([1], server_in_addr).is_ok());
1189 let res = client_in.recvfrom(buf);
1190 assert!(res.is_ok());
1191 let (nread, src) = res.unwrap();
1192 assert_eq!(src, server_out_addr);
1193 total_bytes_recv += nread;
1194 for i in range(0u, nread) {
1195 assert_eq!(buf[i], 1);
1198 // tell the server we're done
1199 assert!(client_out.sendto([0], server_in_addr).is_ok());
1203 fn test_read_and_block() {
1204 let addr = next_test_ip4();
1205 let (tx, rx) = channel::<Receiver<()>>();
1209 let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1210 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1211 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1213 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1214 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1218 let listener = TcpListener::bind(local_loop(), addr).unwrap();
1219 let mut acceptor = listener.listen().unwrap();
1220 let (tx2, rx2) = channel();
1222 let mut stream = acceptor.accept().unwrap();
1223 let mut buf = [0, .. 2048];
1226 let mut current = 0;
1229 while current < expected {
1230 let nread = stream.read(buf).unwrap();
1231 for i in range(0u, nread) {
1232 let val = buf[i] as uint;
1233 assert_eq!(val, current % 8);
1238 let _ = tx2.send_opt(());
1241 // Make sure we had multiple reads
1246 fn test_simple_tcp_server_and_client_on_diff_threads() {
1247 let addr = next_test_ip4();
1250 let listener = TcpListener::bind(local_loop(), addr).unwrap();
1251 let mut acceptor = listener.listen().unwrap();
1252 let mut stream = acceptor.accept().unwrap();
1253 let mut buf = [0, .. 2048];
1254 let nread = stream.read(buf).unwrap();
1255 assert_eq!(nread, 8);
1256 for i in range(0u, nread) {
1257 assert_eq!(buf[i], i as u8);
1261 let mut stream = TcpWatcher::connect(local_loop(), addr, None);
1262 while stream.is_err() {
1263 stream = TcpWatcher::connect(local_loop(), addr, None);
1265 stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1268 #[should_fail] #[test]
1269 fn tcp_listener_fail_cleanup() {
1270 let addr = next_test_ip4();
1271 let w = TcpListener::bind(local_loop(), addr).unwrap();
1272 let _w = w.listen().unwrap();
1276 #[should_fail] #[test]
1277 fn tcp_stream_fail_cleanup() {
1278 let (tx, rx) = channel();
1279 let addr = next_test_ip4();
1282 let w = TcpListener::bind(local_loop(), addr).unwrap();
1283 let mut w = w.listen().unwrap();
1285 drop(w.accept().unwrap());
1288 let _w = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1292 #[should_fail] #[test]
1293 fn udp_listener_fail_cleanup() {
1294 let addr = next_test_ip4();
1295 let _w = UdpWatcher::bind(local_loop(), addr).unwrap();
1299 #[should_fail] #[test]
1300 fn udp_fail_other_task() {
1301 let addr = next_test_ip4();
1302 let (tx, rx) = channel();
1304 // force the handle to be created on a different scheduler, failure in
1305 // the original task will force a homing operation back to this
1308 let w = UdpWatcher::bind(local_loop(), addr).unwrap();