]> git.lizzy.rs Git - rust.git/blob - src/librustuv/net.rs
Ignore tests broken by failing on ICE
[rust.git] / src / librustuv / net.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use std::cast;
12 use std::io::{IoError, IoResult};
13 use std::io::net::ip;
14 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
15 use libc;
16 use std::mem;
17 use std::ptr;
18 use std::rt::rtio;
19 use std::rt::task::BlockedTask;
20
21 use access::Access;
22 use homing::{HomingIO, HomeHandle};
23 use rc::Refcount;
24 use stream::StreamWatcher;
25 use super::{Loop, Request, UvError, Buf, status_to_io_result,
26             uv_error_to_io_error, UvHandle, slice_to_uv_buf,
27             wait_until_woken_after, wakeup};
28 use timer::TimerWatcher;
29 use uvio::UvIoFactory;
30 use uvll;
31
32 ////////////////////////////////////////////////////////////////////////////////
33 /// Generic functions related to dealing with sockaddr things
34 ////////////////////////////////////////////////////////////////////////////////
35
36 pub fn htons(u: u16) -> u16 { mem::to_be16(u) }
37 pub fn ntohs(u: u16) -> u16 { mem::from_be16(u) }
38
39 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
40                         len: uint) -> ip::SocketAddr {
41     match storage.ss_family as c_int {
42         libc::AF_INET => {
43             assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
44             let storage: &libc::sockaddr_in = unsafe {
45                 cast::transmute(storage)
46             };
47             let addr = storage.sin_addr.s_addr as u32;
48             let a = (addr >>  0) as u8;
49             let b = (addr >>  8) as u8;
50             let c = (addr >> 16) as u8;
51             let d = (addr >> 24) as u8;
52             ip::SocketAddr {
53                 ip: ip::Ipv4Addr(a, b, c, d),
54                 port: ntohs(storage.sin_port),
55             }
56         }
57         libc::AF_INET6 => {
58             assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
59             let storage: &libc::sockaddr_in6 = unsafe {
60                 cast::transmute(storage)
61             };
62             let a = ntohs(storage.sin6_addr.s6_addr[0]);
63             let b = ntohs(storage.sin6_addr.s6_addr[1]);
64             let c = ntohs(storage.sin6_addr.s6_addr[2]);
65             let d = ntohs(storage.sin6_addr.s6_addr[3]);
66             let e = ntohs(storage.sin6_addr.s6_addr[4]);
67             let f = ntohs(storage.sin6_addr.s6_addr[5]);
68             let g = ntohs(storage.sin6_addr.s6_addr[6]);
69             let h = ntohs(storage.sin6_addr.s6_addr[7]);
70             ip::SocketAddr {
71                 ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
72                 port: ntohs(storage.sin6_port),
73             }
74         }
75         n => {
76             fail!("unknown family {}", n);
77         }
78     }
79 }
80
81 fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
82     unsafe {
83         let mut storage: libc::sockaddr_storage = mem::init();
84         let len = match addr.ip {
85             ip::Ipv4Addr(a, b, c, d) => {
86                 let storage: &mut libc::sockaddr_in =
87                     cast::transmute(&mut storage);
88                 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
89                 (*storage).sin_port = htons(addr.port);
90                 (*storage).sin_addr = libc::in_addr {
91                     s_addr: (d as u32 << 24) |
92                             (c as u32 << 16) |
93                             (b as u32 <<  8) |
94                             (a as u32 <<  0)
95                 };
96                 mem::size_of::<libc::sockaddr_in>()
97             }
98             ip::Ipv6Addr(a, b, c, d, e, f, g, h) => {
99                 let storage: &mut libc::sockaddr_in6 =
100                     cast::transmute(&mut storage);
101                 storage.sin6_family = libc::AF_INET6 as libc::sa_family_t;
102                 storage.sin6_port = htons(addr.port);
103                 storage.sin6_addr = libc::in6_addr {
104                     s6_addr: [
105                         htons(a),
106                         htons(b),
107                         htons(c),
108                         htons(d),
109                         htons(e),
110                         htons(f),
111                         htons(g),
112                         htons(h),
113                     ]
114                 };
115                 mem::size_of::<libc::sockaddr_in6>()
116             }
117         };
118         return (storage, len);
119     }
120 }
121
122 enum SocketNameKind {
123     TcpPeer,
124     Tcp,
125     Udp
126 }
127
128 fn socket_name(sk: SocketNameKind,
129                handle: *c_void) -> Result<ip::SocketAddr, IoError> {
130     let getsockname = match sk {
131         TcpPeer => uvll::uv_tcp_getpeername,
132         Tcp     => uvll::uv_tcp_getsockname,
133         Udp     => uvll::uv_udp_getsockname,
134     };
135
136     // Allocate a sockaddr_storage since we don't know if it's ipv4 or ipv6
137     let mut sockaddr: libc::sockaddr_storage = unsafe { mem::init() };
138     let mut namelen = mem::size_of::<libc::sockaddr_storage>() as c_int;
139
140     let sockaddr_p = &mut sockaddr as *mut libc::sockaddr_storage;
141     match unsafe {
142         getsockname(handle, sockaddr_p as *mut libc::sockaddr, &mut namelen)
143     } {
144         0 => Ok(sockaddr_to_addr(&sockaddr, namelen as uint)),
145         n => Err(uv_error_to_io_error(UvError(n)))
146     }
147 }
148 ////////////////////////////////////////////////////////////////////////////////
149 // Helpers for handling timeouts, shared for pipes/tcp
150 ////////////////////////////////////////////////////////////////////////////////
151
152 pub struct ConnectCtx {
153     pub status: c_int,
154     pub task: Option<BlockedTask>,
155     pub timer: Option<~TimerWatcher>,
156 }
157
158 pub struct AcceptTimeout {
159     timer: Option<TimerWatcher>,
160     timeout_tx: Option<Sender<()>>,
161     timeout_rx: Option<Receiver<()>>,
162 }
163
164 impl ConnectCtx {
165     pub fn connect<T>(
166         mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
167         f: |&Request, &T, uvll::uv_connect_cb| -> libc::c_int
168     ) -> Result<T, UvError> {
169         let mut req = Request::new(uvll::UV_CONNECT);
170         let r = f(&req, &obj, connect_cb);
171         return match r {
172             0 => {
173                 req.defuse(); // uv callback now owns this request
174                 match timeout {
175                     Some(t) => {
176                         let mut timer = TimerWatcher::new(io);
177                         timer.start(timer_cb, t, 0);
178                         self.timer = Some(timer);
179                     }
180                     None => {}
181                 }
182                 wait_until_woken_after(&mut self.task, &io.loop_, || {
183                     let data = &self as *_;
184                     match self.timer {
185                         Some(ref mut timer) => unsafe { timer.set_data(data) },
186                         None => {}
187                     }
188                     req.set_data(data);
189                 });
190                 // Make sure an erroneously fired callback doesn't have access
191                 // to the context any more.
192                 req.set_data(0 as *int);
193
194                 // If we failed because of a timeout, drop the TcpWatcher as
195                 // soon as possible because it's data is now set to null and we
196                 // want to cancel the callback ASAP.
197                 match self.status {
198                     0 => Ok(obj),
199                     n => { drop(obj); Err(UvError(n)) }
200                 }
201             }
202             n => Err(UvError(n))
203         };
204
205         extern fn timer_cb(handle: *uvll::uv_timer_t) {
206             // Don't close the corresponding tcp request, just wake up the task
207             // and let RAII take care of the pending watcher.
208             let cx: &mut ConnectCtx = unsafe {
209                 &mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx)
210             };
211             cx.status = uvll::ECANCELED;
212             wakeup(&mut cx.task);
213         }
214
215         extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
216             // This callback can be invoked with ECANCELED if the watcher is
217             // closed by the timeout callback. In that case we just want to free
218             // the request and be along our merry way.
219             let req = Request::wrap(req);
220             if status == uvll::ECANCELED { return }
221
222             // Apparently on windows when the handle is closed this callback may
223             // not be invoked with ECANCELED but rather another error code.
224             // Either ways, if the data is null, then our timeout has expired
225             // and there's nothing we can do.
226             let data = unsafe { uvll::get_data_for_req(req.handle) };
227             if data.is_null() { return }
228
229             let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) };
230             cx.status = status;
231             match cx.timer {
232                 Some(ref mut t) => t.stop(),
233                 None => {}
234             }
235             // Note that the timer callback doesn't cancel the connect request
236             // (that's the job of uv_close()), so it's possible for this
237             // callback to get triggered after the timeout callback fires, but
238             // before the task wakes up. In that case, we did indeed
239             // successfully connect, but we don't need to wake someone up. We
240             // updated the status above (correctly so), and the task will pick
241             // up on this when it wakes up.
242             if cx.task.is_some() {
243                 wakeup(&mut cx.task);
244             }
245         }
246     }
247 }
248
249 impl AcceptTimeout {
250     pub fn new() -> AcceptTimeout {
251         AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
252     }
253
254     pub fn accept<T: Send>(&mut self, c: &Receiver<IoResult<T>>) -> IoResult<T> {
255         match self.timeout_rx {
256             None => c.recv(),
257             Some(ref rx) => {
258                 use std::comm::Select;
259
260                 // Poll the incoming channel first (don't rely on the order of
261                 // select just yet). If someone's pending then we should return
262                 // them immediately.
263                 match c.try_recv() {
264                     Ok(data) => return data,
265                     Err(..) => {}
266                 }
267
268                 // Use select to figure out which channel gets ready first. We
269                 // do some custom handling of select to ensure that we never
270                 // actually drain the timeout channel (we'll keep seeing the
271                 // timeout message in the future).
272                 let s = Select::new();
273                 let mut timeout = s.handle(rx);
274                 let mut data = s.handle(c);
275                 unsafe {
276                     timeout.add();
277                     data.add();
278                 }
279                 if s.wait() == timeout.id() {
280                     Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
281                 } else {
282                     c.recv()
283                 }
284             }
285         }
286     }
287
288     pub fn clear(&mut self) {
289         // Clear any previous timeout by dropping the timer and transmission
290         // channels
291         drop((self.timer.take(),
292               self.timeout_tx.take(),
293               self.timeout_rx.take()))
294     }
295
296     pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
297         &mut self, ms: u64, t: &mut T
298     ) {
299         // If we have a timeout, lazily initialize the timer which will be used
300         // to fire when the timeout runs out.
301         if self.timer.is_none() {
302             let _m = t.fire_homing_missile();
303             let loop_ = Loop::wrap(unsafe {
304                 uvll::get_loop_for_uv_handle(t.uv_handle())
305             });
306             let mut timer = TimerWatcher::new_home(&loop_, t.home().clone());
307             unsafe {
308                 timer.set_data(self as *mut _ as *AcceptTimeout);
309             }
310             self.timer = Some(timer);
311         }
312
313         // Once we've got a timer, stop any previous timeout, reset it for the
314         // current one, and install some new channels to send/receive data on
315         let timer = self.timer.get_mut_ref();
316         timer.stop();
317         timer.start(timer_cb, ms, 0);
318         let (tx, rx) = channel();
319         self.timeout_tx = Some(tx);
320         self.timeout_rx = Some(rx);
321
322         extern fn timer_cb(timer: *uvll::uv_timer_t) {
323             let acceptor: &mut AcceptTimeout = unsafe {
324                 &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
325             };
326             // This send can never fail because if this timer is active then the
327             // receiving channel is guaranteed to be alive
328             acceptor.timeout_tx.get_ref().send(());
329         }
330     }
331 }
332
333 ////////////////////////////////////////////////////////////////////////////////
334 /// TCP implementation
335 ////////////////////////////////////////////////////////////////////////////////
336
337 pub struct TcpWatcher {
338     handle: *uvll::uv_tcp_t,
339     stream: StreamWatcher,
340     home: HomeHandle,
341     refcount: Refcount,
342
343     // libuv can't support concurrent reads and concurrent writes of the same
344     // stream object, so we use these access guards in order to arbitrate among
345     // multiple concurrent reads and writes. Note that libuv *can* read and
346     // write simultaneously, it just can't read and read simultaneously.
347     read_access: Access,
348     write_access: Access,
349 }
350
351 pub struct TcpListener {
352     home: HomeHandle,
353     handle: *uvll::uv_pipe_t,
354     closing_task: Option<BlockedTask>,
355     outgoing: Sender<Result<~rtio::RtioTcpStream:Send, IoError>>,
356     incoming: Receiver<Result<~rtio::RtioTcpStream:Send, IoError>>,
357 }
358
359 pub struct TcpAcceptor {
360     listener: ~TcpListener,
361     timeout: AcceptTimeout,
362 }
363
364 // TCP watchers (clients/streams)
365
366 impl TcpWatcher {
367     pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
368         let handle = io.make_handle();
369         TcpWatcher::new_home(&io.loop_, handle)
370     }
371
372     fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
373         let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
374         assert_eq!(unsafe {
375             uvll::uv_tcp_init(loop_.handle, handle)
376         }, 0);
377         TcpWatcher {
378             home: home,
379             handle: handle,
380             stream: StreamWatcher::new(handle),
381             refcount: Refcount::new(),
382             read_access: Access::new(),
383             write_access: Access::new(),
384         }
385     }
386
387     pub fn connect(io: &mut UvIoFactory,
388                    address: ip::SocketAddr,
389                    timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
390         let tcp = TcpWatcher::new(io);
391         let cx = ConnectCtx { status: -1, task: None, timer: None };
392         let (addr, _len) = addr_to_sockaddr(address);
393         let addr_p = &addr as *_ as *libc::sockaddr;
394         cx.connect(tcp, timeout, io, |req, tcp, cb| {
395             unsafe { uvll::uv_tcp_connect(req.handle, tcp.handle, addr_p, cb) }
396         })
397     }
398 }
399
400 impl HomingIO for TcpWatcher {
401     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
402 }
403
404 impl rtio::RtioSocket for TcpWatcher {
405     fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
406         let _m = self.fire_homing_missile();
407         socket_name(Tcp, self.handle)
408     }
409 }
410
411 impl rtio::RtioTcpStream for TcpWatcher {
412     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
413         let m = self.fire_homing_missile();
414         let _g = self.read_access.grant(m);
415         self.stream.read(buf).map_err(uv_error_to_io_error)
416     }
417
418     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
419         let m = self.fire_homing_missile();
420         let _g = self.write_access.grant(m);
421         self.stream.write(buf).map_err(uv_error_to_io_error)
422     }
423
424     fn peer_name(&mut self) -> Result<ip::SocketAddr, IoError> {
425         let _m = self.fire_homing_missile();
426         socket_name(TcpPeer, self.handle)
427     }
428
429     fn control_congestion(&mut self) -> Result<(), IoError> {
430         let _m = self.fire_homing_missile();
431         status_to_io_result(unsafe {
432             uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
433         })
434     }
435
436     fn nodelay(&mut self) -> Result<(), IoError> {
437         let _m = self.fire_homing_missile();
438         status_to_io_result(unsafe {
439             uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
440         })
441     }
442
443     fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
444         let _m = self.fire_homing_missile();
445         status_to_io_result(unsafe {
446             uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
447                                    delay_in_seconds as c_uint)
448         })
449     }
450
451     fn letdie(&mut self) -> Result<(), IoError> {
452         let _m = self.fire_homing_missile();
453         status_to_io_result(unsafe {
454             uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
455         })
456     }
457
458     fn clone(&self) -> ~rtio::RtioTcpStream:Send {
459         box TcpWatcher {
460             handle: self.handle,
461             stream: StreamWatcher::new(self.handle),
462             home: self.home.clone(),
463             refcount: self.refcount.clone(),
464             write_access: self.write_access.clone(),
465             read_access: self.read_access.clone(),
466         } as ~rtio::RtioTcpStream:Send
467     }
468
469     fn close_write(&mut self) -> Result<(), IoError> {
470         struct Ctx {
471             slot: Option<BlockedTask>,
472             status: c_int,
473         }
474         let mut req = Request::new(uvll::UV_SHUTDOWN);
475
476         return match unsafe {
477             uvll::uv_shutdown(req.handle, self.handle, shutdown_cb)
478         } {
479             0 => {
480                 req.defuse(); // uv callback now owns this request
481                 let mut cx = Ctx { slot: None, status: 0 };
482
483                 wait_until_woken_after(&mut cx.slot, &self.uv_loop(), || {
484                     req.set_data(&cx);
485                 });
486
487                 status_to_io_result(cx.status)
488             }
489             n => Err(uv_error_to_io_error(UvError(n)))
490         };
491
492         extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
493             let req = Request::wrap(req);
494             assert!(status != uvll::ECANCELED);
495             let cx: &mut Ctx = unsafe { req.get_data() };
496             cx.status = status;
497             wakeup(&mut cx.slot);
498         }
499     }
500 }
501
502 impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
503     fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
504 }
505
506 impl Drop for TcpWatcher {
507     fn drop(&mut self) {
508         let _m = self.fire_homing_missile();
509         if self.refcount.decrement() {
510             self.close();
511         }
512     }
513 }
514
515 // TCP listeners (unbound servers)
516
517 impl TcpListener {
518     pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
519                 -> Result<~TcpListener, UvError> {
520         let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
521         assert_eq!(unsafe {
522             uvll::uv_tcp_init(io.uv_loop(), handle)
523         }, 0);
524         let (tx, rx) = channel();
525         let l = box TcpListener {
526             home: io.make_handle(),
527             handle: handle,
528             closing_task: None,
529             outgoing: tx,
530             incoming: rx,
531         };
532         let (addr, _len) = addr_to_sockaddr(address);
533         let res = unsafe {
534             let addr_p = &addr as *libc::sockaddr_storage;
535             uvll::uv_tcp_bind(l.handle, addr_p as *libc::sockaddr)
536         };
537         return match res {
538             0 => Ok(l.install()),
539             n => Err(UvError(n))
540         };
541     }
542 }
543
544 impl HomingIO for TcpListener {
545     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
546 }
547
548 impl UvHandle<uvll::uv_tcp_t> for TcpListener {
549     fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
550 }
551
552 impl rtio::RtioSocket for TcpListener {
553     fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
554         let _m = self.fire_homing_missile();
555         socket_name(Tcp, self.handle)
556     }
557 }
558
559 impl rtio::RtioTcpListener for TcpListener {
560     fn listen(~self) -> Result<~rtio::RtioTcpAcceptor:Send, IoError> {
561         // create the acceptor object from ourselves
562         let mut acceptor = box TcpAcceptor {
563             listener: self,
564             timeout: AcceptTimeout::new(),
565         };
566
567         let _m = acceptor.fire_homing_missile();
568         // FIXME: the 128 backlog should be configurable
569         match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
570             0 => Ok(acceptor as ~rtio::RtioTcpAcceptor:Send),
571             n => Err(uv_error_to_io_error(UvError(n))),
572         }
573     }
574 }
575
576 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
577     assert!(status != uvll::ECANCELED);
578     let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
579     let msg = match status {
580         0 => {
581             let loop_ = Loop::wrap(unsafe {
582                 uvll::get_loop_for_uv_handle(server)
583             });
584             let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
585             assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
586             Ok(box client as ~rtio::RtioTcpStream:Send)
587         }
588         n => Err(uv_error_to_io_error(UvError(n)))
589     };
590     tcp.outgoing.send(msg);
591 }
592
593 impl Drop for TcpListener {
594     fn drop(&mut self) {
595         let _m = self.fire_homing_missile();
596         self.close();
597     }
598 }
599
600 // TCP acceptors (bound servers)
601
602 impl HomingIO for TcpAcceptor {
603     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
604 }
605
606 impl rtio::RtioSocket for TcpAcceptor {
607     fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
608         let _m = self.fire_homing_missile();
609         socket_name(Tcp, self.listener.handle)
610     }
611 }
612
613 impl rtio::RtioTcpAcceptor for TcpAcceptor {
614     fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> {
615         self.timeout.accept(&self.listener.incoming)
616     }
617
618     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
619         let _m = self.fire_homing_missile();
620         status_to_io_result(unsafe {
621             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
622         })
623     }
624
625     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
626         let _m = self.fire_homing_missile();
627         status_to_io_result(unsafe {
628             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
629         })
630     }
631
632     fn set_timeout(&mut self, ms: Option<u64>) {
633         match ms {
634             None => self.timeout.clear(),
635             Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
636         }
637     }
638 }
639
640 ////////////////////////////////////////////////////////////////////////////////
641 /// UDP implementation
642 ////////////////////////////////////////////////////////////////////////////////
643
644 pub struct UdpWatcher {
645     handle: *uvll::uv_udp_t,
646     home: HomeHandle,
647
648     // See above for what these fields are
649     refcount: Refcount,
650     read_access: Access,
651     write_access: Access,
652 }
653
654 impl UdpWatcher {
655     pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
656                 -> Result<UdpWatcher, UvError> {
657         let udp = UdpWatcher {
658             handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
659             home: io.make_handle(),
660             refcount: Refcount::new(),
661             read_access: Access::new(),
662             write_access: Access::new(),
663         };
664         assert_eq!(unsafe {
665             uvll::uv_udp_init(io.uv_loop(), udp.handle)
666         }, 0);
667         let (addr, _len) = addr_to_sockaddr(address);
668         let result = unsafe {
669             let addr_p = &addr as *libc::sockaddr_storage;
670             uvll::uv_udp_bind(udp.handle, addr_p as *libc::sockaddr, 0u32)
671         };
672         return match result {
673             0 => Ok(udp),
674             n => Err(UvError(n)),
675         };
676     }
677 }
678
679 impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
680     fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
681 }
682
683 impl HomingIO for UdpWatcher {
684     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
685 }
686
687 impl rtio::RtioSocket for UdpWatcher {
688     fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
689         let _m = self.fire_homing_missile();
690         socket_name(Udp, self.handle)
691     }
692 }
693
694 impl rtio::RtioUdpSocket for UdpWatcher {
695     fn recvfrom(&mut self, buf: &mut [u8])
696         -> Result<(uint, ip::SocketAddr), IoError>
697     {
698         struct Ctx {
699             task: Option<BlockedTask>,
700             buf: Option<Buf>,
701             result: Option<(ssize_t, Option<ip::SocketAddr>)>,
702         }
703         let loop_ = self.uv_loop();
704         let m = self.fire_homing_missile();
705         let _g = self.read_access.grant(m);
706
707         let a = match unsafe {
708             uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
709         } {
710             0 => {
711                 let mut cx = Ctx {
712                     task: None,
713                     buf: Some(slice_to_uv_buf(buf)),
714                     result: None,
715                 };
716                 let handle = self.handle;
717                 wait_until_woken_after(&mut cx.task, &loop_, || {
718                     unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
719                 });
720                 match cx.result.take_unwrap() {
721                     (n, _) if n < 0 =>
722                         Err(uv_error_to_io_error(UvError(n as c_int))),
723                     (n, addr) => Ok((n as uint, addr.unwrap()))
724                 }
725             }
726             n => Err(uv_error_to_io_error(UvError(n)))
727         };
728         return a;
729
730         extern fn alloc_cb(handle: *uvll::uv_udp_t,
731                            _suggested_size: size_t,
732                            buf: *mut Buf) {
733             unsafe {
734                 let cx: &mut Ctx =
735                     cast::transmute(uvll::get_data_for_uv_handle(handle));
736                 *buf = cx.buf.take().expect("recv alloc_cb called more than once")
737             }
738         }
739
740         extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
741                           addr: *libc::sockaddr, _flags: c_uint) {
742             assert!(nread != uvll::ECANCELED as ssize_t);
743             let cx: &mut Ctx = unsafe {
744                 cast::transmute(uvll::get_data_for_uv_handle(handle))
745             };
746
747             // When there's no data to read the recv callback can be a no-op.
748             // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
749             // this we just drop back to kqueue and wait for the next callback.
750             if nread == 0 {
751                 cx.buf = Some(unsafe { *buf });
752                 return
753             }
754
755             unsafe {
756                 assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
757             }
758
759             let cx: &mut Ctx = unsafe {
760                 cast::transmute(uvll::get_data_for_uv_handle(handle))
761             };
762             let addr = if addr == ptr::null() {
763                 None
764             } else {
765                 let len = mem::size_of::<libc::sockaddr_storage>();
766                 Some(sockaddr_to_addr(unsafe { cast::transmute(addr) }, len))
767             };
768             cx.result = Some((nread, addr));
769             wakeup(&mut cx.task);
770         }
771     }
772
773     fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> {
774         struct Ctx { task: Option<BlockedTask>, result: c_int }
775
776         let m = self.fire_homing_missile();
777         let loop_ = self.uv_loop();
778         let _g = self.write_access.grant(m);
779
780         let mut req = Request::new(uvll::UV_UDP_SEND);
781         let buf = slice_to_uv_buf(buf);
782         let (addr, _len) = addr_to_sockaddr(dst);
783         let result = unsafe {
784             let addr_p = &addr as *libc::sockaddr_storage;
785             uvll::uv_udp_send(req.handle, self.handle, [buf],
786                               addr_p as *libc::sockaddr, send_cb)
787         };
788
789         return match result {
790             0 => {
791                 req.defuse(); // uv callback now owns this request
792                 let mut cx = Ctx { task: None, result: 0 };
793                 wait_until_woken_after(&mut cx.task, &loop_, || {
794                     req.set_data(&cx);
795                 });
796                 match cx.result {
797                     0 => Ok(()),
798                     n => Err(uv_error_to_io_error(UvError(n)))
799                 }
800             }
801             n => Err(uv_error_to_io_error(UvError(n)))
802         };
803
804         extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
805             let req = Request::wrap(req);
806             assert!(status != uvll::ECANCELED);
807             let cx: &mut Ctx = unsafe { req.get_data() };
808             cx.result = status;
809             wakeup(&mut cx.task);
810         }
811     }
812
813     fn join_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
814         let _m = self.fire_homing_missile();
815         status_to_io_result(unsafe {
816             multi.to_str().with_c_str(|m_addr| {
817                 uvll::uv_udp_set_membership(self.handle,
818                                             m_addr, ptr::null(),
819                                             uvll::UV_JOIN_GROUP)
820             })
821         })
822     }
823
824     fn leave_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
825         let _m = self.fire_homing_missile();
826         status_to_io_result(unsafe {
827             multi.to_str().with_c_str(|m_addr| {
828                 uvll::uv_udp_set_membership(self.handle,
829                                             m_addr, ptr::null(),
830                                             uvll::UV_LEAVE_GROUP)
831             })
832         })
833     }
834
835     fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
836         let _m = self.fire_homing_missile();
837         status_to_io_result(unsafe {
838             uvll::uv_udp_set_multicast_loop(self.handle,
839                                             1 as c_int)
840         })
841     }
842
843     fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
844         let _m = self.fire_homing_missile();
845         status_to_io_result(unsafe {
846             uvll::uv_udp_set_multicast_loop(self.handle,
847                                             0 as c_int)
848         })
849     }
850
851     fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
852         let _m = self.fire_homing_missile();
853         status_to_io_result(unsafe {
854             uvll::uv_udp_set_multicast_ttl(self.handle,
855                                            ttl as c_int)
856         })
857     }
858
859     fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
860         let _m = self.fire_homing_missile();
861         status_to_io_result(unsafe {
862             uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
863         })
864     }
865
866     fn hear_broadcasts(&mut self) -> Result<(), IoError> {
867         let _m = self.fire_homing_missile();
868         status_to_io_result(unsafe {
869             uvll::uv_udp_set_broadcast(self.handle,
870                                        1 as c_int)
871         })
872     }
873
874     fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
875         let _m = self.fire_homing_missile();
876         status_to_io_result(unsafe {
877             uvll::uv_udp_set_broadcast(self.handle,
878                                        0 as c_int)
879         })
880     }
881
882     fn clone(&self) -> ~rtio::RtioUdpSocket:Send {
883         box UdpWatcher {
884             handle: self.handle,
885             home: self.home.clone(),
886             refcount: self.refcount.clone(),
887             write_access: self.write_access.clone(),
888             read_access: self.read_access.clone(),
889         } as ~rtio::RtioUdpSocket:Send
890     }
891 }
892
893 impl Drop for UdpWatcher {
894     fn drop(&mut self) {
895         // Send ourselves home to close this handle (blocking while doing so).
896         let _m = self.fire_homing_missile();
897         if self.refcount.decrement() {
898             self.close();
899         }
900     }
901 }
902
903 #[cfg(test)]
904 mod test {
905     use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
906                         RtioUdpSocket};
907     use std::io::test::{next_test_ip4, next_test_ip6};
908
909     use super::{UdpWatcher, TcpWatcher, TcpListener};
910     use super::super::local_loop;
911
912     #[test]
913     fn connect_close_ip4() {
914         match TcpWatcher::connect(local_loop(), next_test_ip4(), None) {
915             Ok(..) => fail!(),
916             Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
917         }
918     }
919
920     #[test]
921     fn connect_close_ip6() {
922         match TcpWatcher::connect(local_loop(), next_test_ip6(), None) {
923             Ok(..) => fail!(),
924             Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
925         }
926     }
927
928     #[test]
929     fn udp_bind_close_ip4() {
930         match UdpWatcher::bind(local_loop(), next_test_ip4()) {
931             Ok(..) => {}
932             Err(..) => fail!()
933         }
934     }
935
936     #[test]
937     fn udp_bind_close_ip6() {
938         match UdpWatcher::bind(local_loop(), next_test_ip6()) {
939             Ok(..) => {}
940             Err(..) => fail!()
941         }
942     }
943
944     #[test]
945     fn listen_ip4() {
946         let (tx, rx) = channel();
947         let addr = next_test_ip4();
948
949         spawn(proc() {
950             let w = match TcpListener::bind(local_loop(), addr) {
951                 Ok(w) => w, Err(e) => fail!("{:?}", e)
952             };
953             let mut w = match w.listen() {
954                 Ok(w) => w, Err(e) => fail!("{:?}", e),
955             };
956             tx.send(());
957             match w.accept() {
958                 Ok(mut stream) => {
959                     let mut buf = [0u8, ..10];
960                     match stream.read(buf) {
961                         Ok(10) => {} e => fail!("{:?}", e),
962                     }
963                     for i in range(0, 10u8) {
964                         assert_eq!(buf[i as uint], i + 1);
965                     }
966                 }
967                 Err(e) => fail!("{:?}", e)
968             }
969         });
970
971         rx.recv();
972         let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
973             Ok(w) => w, Err(e) => fail!("{:?}", e)
974         };
975         match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
976             Ok(()) => {}, Err(e) => fail!("{:?}", e)
977         }
978     }
979
980     #[test]
981     fn listen_ip6() {
982         let (tx, rx) = channel();
983         let addr = next_test_ip6();
984
985         spawn(proc() {
986             let w = match TcpListener::bind(local_loop(), addr) {
987                 Ok(w) => w, Err(e) => fail!("{:?}", e)
988             };
989             let mut w = match w.listen() {
990                 Ok(w) => w, Err(e) => fail!("{:?}", e),
991             };
992             tx.send(());
993             match w.accept() {
994                 Ok(mut stream) => {
995                     let mut buf = [0u8, ..10];
996                     match stream.read(buf) {
997                         Ok(10) => {} e => fail!("{:?}", e),
998                     }
999                     for i in range(0, 10u8) {
1000                         assert_eq!(buf[i as uint], i + 1);
1001                     }
1002                 }
1003                 Err(e) => fail!("{:?}", e)
1004             }
1005         });
1006
1007         rx.recv();
1008         let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
1009             Ok(w) => w, Err(e) => fail!("{:?}", e)
1010         };
1011         match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
1012             Ok(()) => {}, Err(e) => fail!("{:?}", e)
1013         }
1014     }
1015
1016     #[test]
1017     fn udp_recv_ip4() {
1018         let (tx, rx) = channel();
1019         let client = next_test_ip4();
1020         let server = next_test_ip4();
1021
1022         spawn(proc() {
1023             match UdpWatcher::bind(local_loop(), server) {
1024                 Ok(mut w) => {
1025                     tx.send(());
1026                     let mut buf = [0u8, ..10];
1027                     match w.recvfrom(buf) {
1028                         Ok((10, addr)) => assert_eq!(addr, client),
1029                         e => fail!("{:?}", e),
1030                     }
1031                     for i in range(0, 10u8) {
1032                         assert_eq!(buf[i as uint], i + 1);
1033                     }
1034                 }
1035                 Err(e) => fail!("{:?}", e)
1036             }
1037         });
1038
1039         rx.recv();
1040         let mut w = match UdpWatcher::bind(local_loop(), client) {
1041             Ok(w) => w, Err(e) => fail!("{:?}", e)
1042         };
1043         match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
1044             Ok(()) => {}, Err(e) => fail!("{:?}", e)
1045         }
1046     }
1047
1048     #[test]
1049     fn udp_recv_ip6() {
1050         let (tx, rx) = channel();
1051         let client = next_test_ip6();
1052         let server = next_test_ip6();
1053
1054         spawn(proc() {
1055             match UdpWatcher::bind(local_loop(), server) {
1056                 Ok(mut w) => {
1057                     tx.send(());
1058                     let mut buf = [0u8, ..10];
1059                     match w.recvfrom(buf) {
1060                         Ok((10, addr)) => assert_eq!(addr, client),
1061                         e => fail!("{:?}", e),
1062                     }
1063                     for i in range(0, 10u8) {
1064                         assert_eq!(buf[i as uint], i + 1);
1065                     }
1066                 }
1067                 Err(e) => fail!("{:?}", e)
1068             }
1069         });
1070
1071         rx.recv();
1072         let mut w = match UdpWatcher::bind(local_loop(), client) {
1073             Ok(w) => w, Err(e) => fail!("{:?}", e)
1074         };
1075         match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
1076             Ok(()) => {}, Err(e) => fail!("{:?}", e)
1077         }
1078     }
1079
1080     #[test]
1081     fn test_read_read_read() {
1082         let addr = next_test_ip4();
1083         static MAX: uint = 5000;
1084         let (tx, rx) = channel();
1085
1086         spawn(proc() {
1087             let listener = TcpListener::bind(local_loop(), addr).unwrap();
1088             let mut acceptor = listener.listen().unwrap();
1089             tx.send(());
1090             let mut stream = acceptor.accept().unwrap();
1091             let buf = [1, .. 2048];
1092             let mut total_bytes_written = 0;
1093             while total_bytes_written < MAX {
1094                 assert!(stream.write(buf).is_ok());
1095                 uvdebug!("wrote bytes");
1096                 total_bytes_written += buf.len();
1097             }
1098         });
1099
1100         rx.recv();
1101         let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1102         let mut buf = [0, .. 2048];
1103         let mut total_bytes_read = 0;
1104         while total_bytes_read < MAX {
1105             let nread = stream.read(buf).unwrap();
1106             total_bytes_read += nread;
1107             for i in range(0u, nread) {
1108                 assert_eq!(buf[i], 1);
1109             }
1110         }
1111         uvdebug!("read {} bytes total", total_bytes_read);
1112     }
1113
1114     #[test]
1115     #[ignore(cfg(windows))] // FIXME(#10102) server never sees second packet
1116     fn test_udp_twice() {
1117         let server_addr = next_test_ip4();
1118         let client_addr = next_test_ip4();
1119         let (tx, rx) = channel();
1120
1121         spawn(proc() {
1122             let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
1123             rx.recv();
1124             assert!(client.sendto([1], server_addr).is_ok());
1125             assert!(client.sendto([2], server_addr).is_ok());
1126         });
1127
1128         let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
1129         tx.send(());
1130         let mut buf1 = [0];
1131         let mut buf2 = [0];
1132         let (nread1, src1) = server.recvfrom(buf1).unwrap();
1133         let (nread2, src2) = server.recvfrom(buf2).unwrap();
1134         assert_eq!(nread1, 1);
1135         assert_eq!(nread2, 1);
1136         assert_eq!(src1, client_addr);
1137         assert_eq!(src2, client_addr);
1138         assert_eq!(buf1[0], 1);
1139         assert_eq!(buf2[0], 2);
1140     }
1141
1142     #[test]
1143     fn test_udp_many_read() {
1144         let server_out_addr = next_test_ip4();
1145         let server_in_addr = next_test_ip4();
1146         let client_out_addr = next_test_ip4();
1147         let client_in_addr = next_test_ip4();
1148         static MAX: uint = 500_000;
1149
1150         let (tx1, rx1) = channel::<()>();
1151         let (tx2, rx2) = channel::<()>();
1152
1153         spawn(proc() {
1154             let l = local_loop();
1155             let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
1156             let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
1157             let (tx, rx) = (tx2, rx1);
1158             tx.send(());
1159             rx.recv();
1160             let msg = [1, .. 2048];
1161             let mut total_bytes_sent = 0;
1162             let mut buf = [1];
1163             while buf[0] == 1 {
1164                 // send more data
1165                 assert!(server_out.sendto(msg, client_in_addr).is_ok());
1166                 total_bytes_sent += msg.len();
1167                 // check if the client has received enough
1168                 let res = server_in.recvfrom(buf);
1169                 assert!(res.is_ok());
1170                 let (nread, src) = res.unwrap();
1171                 assert_eq!(nread, 1);
1172                 assert_eq!(src, client_out_addr);
1173             }
1174             assert!(total_bytes_sent >= MAX);
1175         });
1176
1177         let l = local_loop();
1178         let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
1179         let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
1180         let (tx, rx) = (tx1, rx2);
1181         rx.recv();
1182         tx.send(());
1183         let mut total_bytes_recv = 0;
1184         let mut buf = [0, .. 2048];
1185         while total_bytes_recv < MAX {
1186             // ask for more
1187             assert!(client_out.sendto([1], server_in_addr).is_ok());
1188             // wait for data
1189             let res = client_in.recvfrom(buf);
1190             assert!(res.is_ok());
1191             let (nread, src) = res.unwrap();
1192             assert_eq!(src, server_out_addr);
1193             total_bytes_recv += nread;
1194             for i in range(0u, nread) {
1195                 assert_eq!(buf[i], 1);
1196             }
1197         }
1198         // tell the server we're done
1199         assert!(client_out.sendto([0], server_in_addr).is_ok());
1200     }
1201
1202     #[test]
1203     fn test_read_and_block() {
1204         let addr = next_test_ip4();
1205         let (tx, rx) = channel::<Receiver<()>>();
1206
1207         spawn(proc() {
1208             let rx = rx.recv();
1209             let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1210             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1211             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1212             rx.recv();
1213             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1214             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1215             rx.recv();
1216         });
1217
1218         let listener = TcpListener::bind(local_loop(), addr).unwrap();
1219         let mut acceptor = listener.listen().unwrap();
1220         let (tx2, rx2) = channel();
1221         tx.send(rx2);
1222         let mut stream = acceptor.accept().unwrap();
1223         let mut buf = [0, .. 2048];
1224
1225         let expected = 32;
1226         let mut current = 0;
1227         let mut reads = 0;
1228
1229         while current < expected {
1230             let nread = stream.read(buf).unwrap();
1231             for i in range(0u, nread) {
1232                 let val = buf[i] as uint;
1233                 assert_eq!(val, current % 8);
1234                 current += 1;
1235             }
1236             reads += 1;
1237
1238             let _ = tx2.send_opt(());
1239         }
1240
1241         // Make sure we had multiple reads
1242         assert!(reads > 1);
1243     }
1244
1245     #[test]
1246     fn test_simple_tcp_server_and_client_on_diff_threads() {
1247         let addr = next_test_ip4();
1248
1249         spawn(proc() {
1250             let listener = TcpListener::bind(local_loop(), addr).unwrap();
1251             let mut acceptor = listener.listen().unwrap();
1252             let mut stream = acceptor.accept().unwrap();
1253             let mut buf = [0, .. 2048];
1254             let nread = stream.read(buf).unwrap();
1255             assert_eq!(nread, 8);
1256             for i in range(0u, nread) {
1257                 assert_eq!(buf[i], i as u8);
1258             }
1259         });
1260
1261         let mut stream = TcpWatcher::connect(local_loop(), addr, None);
1262         while stream.is_err() {
1263             stream = TcpWatcher::connect(local_loop(), addr, None);
1264         }
1265         stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1266     }
1267
1268     #[should_fail] #[test]
1269     fn tcp_listener_fail_cleanup() {
1270         let addr = next_test_ip4();
1271         let w = TcpListener::bind(local_loop(), addr).unwrap();
1272         let _w = w.listen().unwrap();
1273         fail!();
1274     }
1275
1276     #[should_fail] #[test]
1277     fn tcp_stream_fail_cleanup() {
1278         let (tx, rx) = channel();
1279         let addr = next_test_ip4();
1280
1281         spawn(proc() {
1282             let w = TcpListener::bind(local_loop(), addr).unwrap();
1283             let mut w = w.listen().unwrap();
1284             tx.send(());
1285             drop(w.accept().unwrap());
1286         });
1287         rx.recv();
1288         let _w = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1289         fail!();
1290     }
1291
1292     #[should_fail] #[test]
1293     fn udp_listener_fail_cleanup() {
1294         let addr = next_test_ip4();
1295         let _w = UdpWatcher::bind(local_loop(), addr).unwrap();
1296         fail!();
1297     }
1298
1299     #[should_fail] #[test]
1300     fn udp_fail_other_task() {
1301         let addr = next_test_ip4();
1302         let (tx, rx) = channel();
1303
1304         // force the handle to be created on a different scheduler, failure in
1305         // the original task will force a homing operation back to this
1306         // scheduler.
1307         spawn(proc() {
1308             let w = UdpWatcher::bind(local_loop(), addr).unwrap();
1309             tx.send(w);
1310         });
1311
1312         let _w = rx.recv();
1313         fail!();
1314     }
1315 }