]> git.lizzy.rs Git - rust.git/blob - src/librustuv/net.rs
Register new snapshots
[rust.git] / src / librustuv / net.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use std::cast;
12 use std::io::IoError;
13 use std::io::net::ip;
14 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
15 use libc;
16 use std::mem;
17 use std::ptr;
18 use std::rt::rtio;
19 use std::rt::task::BlockedTask;
20
21 use access::Access;
22 use homing::{HomingIO, HomeHandle};
23 use rc::Refcount;
24 use stream::StreamWatcher;
25 use super::{Loop, Request, UvError, Buf, status_to_io_result,
26             uv_error_to_io_error, UvHandle, slice_to_uv_buf,
27             wait_until_woken_after, wakeup};
28 use timer::TimerWatcher;
29 use uvio::UvIoFactory;
30 use uvll;
31
32 ////////////////////////////////////////////////////////////////////////////////
33 /// Generic functions related to dealing with sockaddr things
34 ////////////////////////////////////////////////////////////////////////////////
35
36 pub fn htons(u: u16) -> u16 { mem::to_be16(u) }
37 pub fn ntohs(u: u16) -> u16 { mem::from_be16(u) }
38
39 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
40                         len: uint) -> ip::SocketAddr {
41     match storage.ss_family as c_int {
42         libc::AF_INET => {
43             assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
44             let storage: &libc::sockaddr_in = unsafe {
45                 cast::transmute(storage)
46             };
47             let addr = storage.sin_addr.s_addr as u32;
48             let a = (addr >>  0) as u8;
49             let b = (addr >>  8) as u8;
50             let c = (addr >> 16) as u8;
51             let d = (addr >> 24) as u8;
52             ip::SocketAddr {
53                 ip: ip::Ipv4Addr(a, b, c, d),
54                 port: ntohs(storage.sin_port),
55             }
56         }
57         libc::AF_INET6 => {
58             assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
59             let storage: &libc::sockaddr_in6 = unsafe {
60                 cast::transmute(storage)
61             };
62             let a = ntohs(storage.sin6_addr.s6_addr[0]);
63             let b = ntohs(storage.sin6_addr.s6_addr[1]);
64             let c = ntohs(storage.sin6_addr.s6_addr[2]);
65             let d = ntohs(storage.sin6_addr.s6_addr[3]);
66             let e = ntohs(storage.sin6_addr.s6_addr[4]);
67             let f = ntohs(storage.sin6_addr.s6_addr[5]);
68             let g = ntohs(storage.sin6_addr.s6_addr[6]);
69             let h = ntohs(storage.sin6_addr.s6_addr[7]);
70             ip::SocketAddr {
71                 ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
72                 port: ntohs(storage.sin6_port),
73             }
74         }
75         n => {
76             fail!("unknown family {}", n);
77         }
78     }
79 }
80
81 fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
82     unsafe {
83         let mut storage: libc::sockaddr_storage = mem::init();
84         let len = match addr.ip {
85             ip::Ipv4Addr(a, b, c, d) => {
86                 let storage: &mut libc::sockaddr_in =
87                     cast::transmute(&mut storage);
88                 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
89                 (*storage).sin_port = htons(addr.port);
90                 (*storage).sin_addr = libc::in_addr {
91                     s_addr: (d as u32 << 24) |
92                             (c as u32 << 16) |
93                             (b as u32 <<  8) |
94                             (a as u32 <<  0)
95                 };
96                 mem::size_of::<libc::sockaddr_in>()
97             }
98             ip::Ipv6Addr(a, b, c, d, e, f, g, h) => {
99                 let storage: &mut libc::sockaddr_in6 =
100                     cast::transmute(&mut storage);
101                 storage.sin6_family = libc::AF_INET6 as libc::sa_family_t;
102                 storage.sin6_port = htons(addr.port);
103                 storage.sin6_addr = libc::in6_addr {
104                     s6_addr: [
105                         htons(a),
106                         htons(b),
107                         htons(c),
108                         htons(d),
109                         htons(e),
110                         htons(f),
111                         htons(g),
112                         htons(h),
113                     ]
114                 };
115                 mem::size_of::<libc::sockaddr_in6>()
116             }
117         };
118         return (storage, len);
119     }
120 }
121
122 enum SocketNameKind {
123     TcpPeer,
124     Tcp,
125     Udp
126 }
127
128 fn socket_name(sk: SocketNameKind,
129                handle: *c_void) -> Result<ip::SocketAddr, IoError> {
130     let getsockname = match sk {
131         TcpPeer => uvll::uv_tcp_getpeername,
132         Tcp     => uvll::uv_tcp_getsockname,
133         Udp     => uvll::uv_udp_getsockname,
134     };
135
136     // Allocate a sockaddr_storage since we don't know if it's ipv4 or ipv6
137     let mut sockaddr: libc::sockaddr_storage = unsafe { mem::init() };
138     let mut namelen = mem::size_of::<libc::sockaddr_storage>() as c_int;
139
140     let sockaddr_p = &mut sockaddr as *mut libc::sockaddr_storage;
141     match unsafe {
142         getsockname(handle, sockaddr_p as *mut libc::sockaddr, &mut namelen)
143     } {
144         0 => Ok(sockaddr_to_addr(&sockaddr, namelen as uint)),
145         n => Err(uv_error_to_io_error(UvError(n)))
146     }
147 }
148
149 ////////////////////////////////////////////////////////////////////////////////
150 /// TCP implementation
151 ////////////////////////////////////////////////////////////////////////////////
152
153 pub struct TcpWatcher {
154     handle: *uvll::uv_tcp_t,
155     stream: StreamWatcher,
156     home: HomeHandle,
157     refcount: Refcount,
158
159     // libuv can't support concurrent reads and concurrent writes of the same
160     // stream object, so we use these access guards in order to arbitrate among
161     // multiple concurrent reads and writes. Note that libuv *can* read and
162     // write simultaneously, it just can't read and read simultaneously.
163     read_access: Access,
164     write_access: Access,
165 }
166
167 pub struct TcpListener {
168     home: HomeHandle,
169     handle: *uvll::uv_pipe_t,
170     closing_task: Option<BlockedTask>,
171     outgoing: Sender<Result<~rtio::RtioTcpStream:Send, IoError>>,
172     incoming: Receiver<Result<~rtio::RtioTcpStream:Send, IoError>>,
173 }
174
175 pub struct TcpAcceptor {
176     listener: ~TcpListener,
177 }
178
179 // TCP watchers (clients/streams)
180
181 impl TcpWatcher {
182     pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
183         let handle = io.make_handle();
184         TcpWatcher::new_home(&io.loop_, handle)
185     }
186
187     fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
188         let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
189         assert_eq!(unsafe {
190             uvll::uv_tcp_init(loop_.handle, handle)
191         }, 0);
192         TcpWatcher {
193             home: home,
194             handle: handle,
195             stream: StreamWatcher::new(handle),
196             refcount: Refcount::new(),
197             read_access: Access::new(),
198             write_access: Access::new(),
199         }
200     }
201
202     pub fn connect(io: &mut UvIoFactory,
203                    address: ip::SocketAddr,
204                    timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
205         struct Ctx {
206             status: c_int,
207             task: Option<BlockedTask>,
208             timer: Option<~TimerWatcher>,
209         }
210
211         let tcp = TcpWatcher::new(io);
212         let (addr, _len) = addr_to_sockaddr(address);
213         let mut req = Request::new(uvll::UV_CONNECT);
214         let result = unsafe {
215             let addr_p = &addr as *libc::sockaddr_storage;
216             uvll::uv_tcp_connect(req.handle, tcp.handle,
217                                  addr_p as *libc::sockaddr,
218                                  connect_cb)
219         };
220         return match result {
221             0 => {
222                 req.defuse(); // uv callback now owns this request
223                 let mut cx = Ctx { status: -1, task: None, timer: None };
224                 match timeout {
225                     Some(t) => {
226                         let mut timer = TimerWatcher::new(io);
227                         timer.start(timer_cb, t, 0);
228                         cx.timer = Some(timer);
229                     }
230                     None => {}
231                 }
232                 wait_until_woken_after(&mut cx.task, &io.loop_, || {
233                     let data = &cx as *_;
234                     match cx.timer {
235                         Some(ref mut timer) => unsafe { timer.set_data(data) },
236                         None => {}
237                     }
238                     req.set_data(data);
239                 });
240                 // Make sure an erroneously fired callback doesn't have access
241                 // to the context any more.
242                 req.set_data(0 as *int);
243
244                 // If we failed because of a timeout, drop the TcpWatcher as
245                 // soon as possible because it's data is now set to null and we
246                 // want to cancel the callback ASAP.
247                 match cx.status {
248                     0 => Ok(tcp),
249                     n => { drop(tcp); Err(UvError(n)) }
250                 }
251             }
252             n => Err(UvError(n))
253         };
254
255         extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) {
256             // Don't close the corresponding tcp request, just wake up the task
257             // and let RAII take care of the pending watcher.
258             assert_eq!(status, 0);
259             let cx: &mut Ctx = unsafe {
260                 &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx)
261             };
262             cx.status = uvll::ECANCELED;
263             wakeup(&mut cx.task);
264         }
265
266         extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
267             // This callback can be invoked with ECANCELED if the watcher is
268             // closed by the timeout callback. In that case we just want to free
269             // the request and be along our merry way.
270             let req = Request::wrap(req);
271             if status == uvll::ECANCELED { return }
272
273             // Apparently on windows when the handle is closed this callback may
274             // not be invoked with ECANCELED but rather another error code.
275             // Either ways, if the data is null, then our timeout has expired
276             // and there's nothing we can do.
277             let data = unsafe { uvll::get_data_for_req(req.handle) };
278             if data.is_null() { return }
279
280             let cx: &mut Ctx = unsafe { &mut *(data as *mut Ctx) };
281             cx.status = status;
282             match cx.timer {
283                 Some(ref mut t) => t.stop(),
284                 None => {}
285             }
286             // Note that the timer callback doesn't cancel the connect request
287             // (that's the job of uv_close()), so it's possible for this
288             // callback to get triggered after the timeout callback fires, but
289             // before the task wakes up. In that case, we did indeed
290             // successfully connect, but we don't need to wake someone up. We
291             // updated the status above (correctly so), and the task will pick
292             // up on this when it wakes up.
293             if cx.task.is_some() {
294                 wakeup(&mut cx.task);
295             }
296         }
297     }
298 }
299
300 impl HomingIO for TcpWatcher {
301     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
302 }
303
304 impl rtio::RtioSocket for TcpWatcher {
305     fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
306         let _m = self.fire_homing_missile();
307         socket_name(Tcp, self.handle)
308     }
309 }
310
311 impl rtio::RtioTcpStream for TcpWatcher {
312     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
313         let m = self.fire_homing_missile();
314         let _g = self.read_access.grant(m);
315         self.stream.read(buf).map_err(uv_error_to_io_error)
316     }
317
318     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
319         let m = self.fire_homing_missile();
320         let _g = self.write_access.grant(m);
321         self.stream.write(buf).map_err(uv_error_to_io_error)
322     }
323
324     fn peer_name(&mut self) -> Result<ip::SocketAddr, IoError> {
325         let _m = self.fire_homing_missile();
326         socket_name(TcpPeer, self.handle)
327     }
328
329     fn control_congestion(&mut self) -> Result<(), IoError> {
330         let _m = self.fire_homing_missile();
331         status_to_io_result(unsafe {
332             uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
333         })
334     }
335
336     fn nodelay(&mut self) -> Result<(), IoError> {
337         let _m = self.fire_homing_missile();
338         status_to_io_result(unsafe {
339             uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
340         })
341     }
342
343     fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
344         let _m = self.fire_homing_missile();
345         status_to_io_result(unsafe {
346             uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
347                                    delay_in_seconds as c_uint)
348         })
349     }
350
351     fn letdie(&mut self) -> Result<(), IoError> {
352         let _m = self.fire_homing_missile();
353         status_to_io_result(unsafe {
354             uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
355         })
356     }
357
358     fn clone(&self) -> ~rtio::RtioTcpStream:Send {
359         ~TcpWatcher {
360             handle: self.handle,
361             stream: StreamWatcher::new(self.handle),
362             home: self.home.clone(),
363             refcount: self.refcount.clone(),
364             write_access: self.write_access.clone(),
365             read_access: self.read_access.clone(),
366         } as ~rtio::RtioTcpStream:Send
367     }
368
369     fn close_write(&mut self) -> Result<(), IoError> {
370         struct Ctx {
371             slot: Option<BlockedTask>,
372             status: c_int,
373         }
374         let mut req = Request::new(uvll::UV_SHUTDOWN);
375
376         return match unsafe {
377             uvll::uv_shutdown(req.handle, self.handle, shutdown_cb)
378         } {
379             0 => {
380                 req.defuse(); // uv callback now owns this request
381                 let mut cx = Ctx { slot: None, status: 0 };
382
383                 wait_until_woken_after(&mut cx.slot, &self.uv_loop(), || {
384                     req.set_data(&cx);
385                 });
386
387                 status_to_io_result(cx.status)
388             }
389             n => Err(uv_error_to_io_error(UvError(n)))
390         };
391
392         extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
393             let req = Request::wrap(req);
394             assert!(status != uvll::ECANCELED);
395             let cx: &mut Ctx = unsafe { req.get_data() };
396             cx.status = status;
397             wakeup(&mut cx.slot);
398         }
399     }
400 }
401
402 impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
403     fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
404 }
405
406 impl Drop for TcpWatcher {
407     fn drop(&mut self) {
408         let _m = self.fire_homing_missile();
409         if self.refcount.decrement() {
410             self.close();
411         }
412     }
413 }
414
415 // TCP listeners (unbound servers)
416
417 impl TcpListener {
418     pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
419                 -> Result<~TcpListener, UvError> {
420         let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
421         assert_eq!(unsafe {
422             uvll::uv_tcp_init(io.uv_loop(), handle)
423         }, 0);
424         let (tx, rx) = channel();
425         let l = ~TcpListener {
426             home: io.make_handle(),
427             handle: handle,
428             closing_task: None,
429             outgoing: tx,
430             incoming: rx,
431         };
432         let (addr, _len) = addr_to_sockaddr(address);
433         let res = unsafe {
434             let addr_p = &addr as *libc::sockaddr_storage;
435             uvll::uv_tcp_bind(l.handle, addr_p as *libc::sockaddr)
436         };
437         return match res {
438             0 => Ok(l.install()),
439             n => Err(UvError(n))
440         };
441     }
442 }
443
444 impl HomingIO for TcpListener {
445     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
446 }
447
448 impl UvHandle<uvll::uv_tcp_t> for TcpListener {
449     fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
450 }
451
452 impl rtio::RtioSocket for TcpListener {
453     fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
454         let _m = self.fire_homing_missile();
455         socket_name(Tcp, self.handle)
456     }
457 }
458
459 impl rtio::RtioTcpListener for TcpListener {
460     fn listen(~self) -> Result<~rtio::RtioTcpAcceptor:Send, IoError> {
461         // create the acceptor object from ourselves
462         let mut acceptor = ~TcpAcceptor { listener: self };
463
464         let _m = acceptor.fire_homing_missile();
465         // FIXME: the 128 backlog should be configurable
466         match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
467             0 => Ok(acceptor as ~rtio::RtioTcpAcceptor:Send),
468             n => Err(uv_error_to_io_error(UvError(n))),
469         }
470     }
471 }
472
473 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
474     assert!(status != uvll::ECANCELED);
475     let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
476     let msg = match status {
477         0 => {
478             let loop_ = Loop::wrap(unsafe {
479                 uvll::get_loop_for_uv_handle(server)
480             });
481             let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
482             assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
483             Ok(~client as ~rtio::RtioTcpStream:Send)
484         }
485         n => Err(uv_error_to_io_error(UvError(n)))
486     };
487     tcp.outgoing.send(msg);
488 }
489
490 impl Drop for TcpListener {
491     fn drop(&mut self) {
492         let _m = self.fire_homing_missile();
493         self.close();
494     }
495 }
496
497 // TCP acceptors (bound servers)
498
499 impl HomingIO for TcpAcceptor {
500     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
501 }
502
503 impl rtio::RtioSocket for TcpAcceptor {
504     fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
505         let _m = self.fire_homing_missile();
506         socket_name(Tcp, self.listener.handle)
507     }
508 }
509
510 impl rtio::RtioTcpAcceptor for TcpAcceptor {
511     fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> {
512         self.listener.incoming.recv()
513     }
514
515     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
516         let _m = self.fire_homing_missile();
517         status_to_io_result(unsafe {
518             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
519         })
520     }
521
522     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
523         let _m = self.fire_homing_missile();
524         status_to_io_result(unsafe {
525             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
526         })
527     }
528 }
529
530 ////////////////////////////////////////////////////////////////////////////////
531 /// UDP implementation
532 ////////////////////////////////////////////////////////////////////////////////
533
534 pub struct UdpWatcher {
535     handle: *uvll::uv_udp_t,
536     home: HomeHandle,
537
538     // See above for what these fields are
539     refcount: Refcount,
540     read_access: Access,
541     write_access: Access,
542 }
543
544 impl UdpWatcher {
545     pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
546                 -> Result<UdpWatcher, UvError> {
547         let udp = UdpWatcher {
548             handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
549             home: io.make_handle(),
550             refcount: Refcount::new(),
551             read_access: Access::new(),
552             write_access: Access::new(),
553         };
554         assert_eq!(unsafe {
555             uvll::uv_udp_init(io.uv_loop(), udp.handle)
556         }, 0);
557         let (addr, _len) = addr_to_sockaddr(address);
558         let result = unsafe {
559             let addr_p = &addr as *libc::sockaddr_storage;
560             uvll::uv_udp_bind(udp.handle, addr_p as *libc::sockaddr, 0u32)
561         };
562         return match result {
563             0 => Ok(udp),
564             n => Err(UvError(n)),
565         };
566     }
567 }
568
569 impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
570     fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
571 }
572
573 impl HomingIO for UdpWatcher {
574     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
575 }
576
577 impl rtio::RtioSocket for UdpWatcher {
578     fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
579         let _m = self.fire_homing_missile();
580         socket_name(Udp, self.handle)
581     }
582 }
583
584 impl rtio::RtioUdpSocket for UdpWatcher {
585     fn recvfrom(&mut self, buf: &mut [u8])
586         -> Result<(uint, ip::SocketAddr), IoError>
587     {
588         struct Ctx {
589             task: Option<BlockedTask>,
590             buf: Option<Buf>,
591             result: Option<(ssize_t, Option<ip::SocketAddr>)>,
592         }
593         let loop_ = self.uv_loop();
594         let m = self.fire_homing_missile();
595         let _g = self.read_access.grant(m);
596
597         let a = match unsafe {
598             uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
599         } {
600             0 => {
601                 let mut cx = Ctx {
602                     task: None,
603                     buf: Some(slice_to_uv_buf(buf)),
604                     result: None,
605                 };
606                 let handle = self.handle;
607                 wait_until_woken_after(&mut cx.task, &loop_, || {
608                     unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
609                 });
610                 match cx.result.take_unwrap() {
611                     (n, _) if n < 0 =>
612                         Err(uv_error_to_io_error(UvError(n as c_int))),
613                     (n, addr) => Ok((n as uint, addr.unwrap()))
614                 }
615             }
616             n => Err(uv_error_to_io_error(UvError(n)))
617         };
618         return a;
619
620         extern fn alloc_cb(handle: *uvll::uv_udp_t,
621                            _suggested_size: size_t,
622                            buf: *mut Buf) {
623             unsafe {
624                 let cx: &mut Ctx =
625                     cast::transmute(uvll::get_data_for_uv_handle(handle));
626                 *buf = cx.buf.take().expect("recv alloc_cb called more than once")
627             }
628         }
629
630         extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
631                           addr: *libc::sockaddr, _flags: c_uint) {
632             assert!(nread != uvll::ECANCELED as ssize_t);
633             let cx: &mut Ctx = unsafe {
634                 cast::transmute(uvll::get_data_for_uv_handle(handle))
635             };
636
637             // When there's no data to read the recv callback can be a no-op.
638             // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
639             // this we just drop back to kqueue and wait for the next callback.
640             if nread == 0 {
641                 cx.buf = Some(unsafe { *buf });
642                 return
643             }
644
645             unsafe {
646                 assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
647             }
648
649             let cx: &mut Ctx = unsafe {
650                 cast::transmute(uvll::get_data_for_uv_handle(handle))
651             };
652             let addr = if addr == ptr::null() {
653                 None
654             } else {
655                 let len = mem::size_of::<libc::sockaddr_storage>();
656                 Some(sockaddr_to_addr(unsafe { cast::transmute(addr) }, len))
657             };
658             cx.result = Some((nread, addr));
659             wakeup(&mut cx.task);
660         }
661     }
662
663     fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> {
664         struct Ctx { task: Option<BlockedTask>, result: c_int }
665
666         let m = self.fire_homing_missile();
667         let loop_ = self.uv_loop();
668         let _g = self.write_access.grant(m);
669
670         let mut req = Request::new(uvll::UV_UDP_SEND);
671         let buf = slice_to_uv_buf(buf);
672         let (addr, _len) = addr_to_sockaddr(dst);
673         let result = unsafe {
674             let addr_p = &addr as *libc::sockaddr_storage;
675             uvll::uv_udp_send(req.handle, self.handle, [buf],
676                               addr_p as *libc::sockaddr, send_cb)
677         };
678
679         return match result {
680             0 => {
681                 req.defuse(); // uv callback now owns this request
682                 let mut cx = Ctx { task: None, result: 0 };
683                 wait_until_woken_after(&mut cx.task, &loop_, || {
684                     req.set_data(&cx);
685                 });
686                 match cx.result {
687                     0 => Ok(()),
688                     n => Err(uv_error_to_io_error(UvError(n)))
689                 }
690             }
691             n => Err(uv_error_to_io_error(UvError(n)))
692         };
693
694         extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
695             let req = Request::wrap(req);
696             assert!(status != uvll::ECANCELED);
697             let cx: &mut Ctx = unsafe { req.get_data() };
698             cx.result = status;
699             wakeup(&mut cx.task);
700         }
701     }
702
703     fn join_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
704         let _m = self.fire_homing_missile();
705         status_to_io_result(unsafe {
706             multi.to_str().with_c_str(|m_addr| {
707                 uvll::uv_udp_set_membership(self.handle,
708                                             m_addr, ptr::null(),
709                                             uvll::UV_JOIN_GROUP)
710             })
711         })
712     }
713
714     fn leave_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
715         let _m = self.fire_homing_missile();
716         status_to_io_result(unsafe {
717             multi.to_str().with_c_str(|m_addr| {
718                 uvll::uv_udp_set_membership(self.handle,
719                                             m_addr, ptr::null(),
720                                             uvll::UV_LEAVE_GROUP)
721             })
722         })
723     }
724
725     fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
726         let _m = self.fire_homing_missile();
727         status_to_io_result(unsafe {
728             uvll::uv_udp_set_multicast_loop(self.handle,
729                                             1 as c_int)
730         })
731     }
732
733     fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
734         let _m = self.fire_homing_missile();
735         status_to_io_result(unsafe {
736             uvll::uv_udp_set_multicast_loop(self.handle,
737                                             0 as c_int)
738         })
739     }
740
741     fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
742         let _m = self.fire_homing_missile();
743         status_to_io_result(unsafe {
744             uvll::uv_udp_set_multicast_ttl(self.handle,
745                                            ttl as c_int)
746         })
747     }
748
749     fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
750         let _m = self.fire_homing_missile();
751         status_to_io_result(unsafe {
752             uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
753         })
754     }
755
756     fn hear_broadcasts(&mut self) -> Result<(), IoError> {
757         let _m = self.fire_homing_missile();
758         status_to_io_result(unsafe {
759             uvll::uv_udp_set_broadcast(self.handle,
760                                        1 as c_int)
761         })
762     }
763
764     fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
765         let _m = self.fire_homing_missile();
766         status_to_io_result(unsafe {
767             uvll::uv_udp_set_broadcast(self.handle,
768                                        0 as c_int)
769         })
770     }
771
772     fn clone(&self) -> ~rtio::RtioUdpSocket:Send {
773         ~UdpWatcher {
774             handle: self.handle,
775             home: self.home.clone(),
776             refcount: self.refcount.clone(),
777             write_access: self.write_access.clone(),
778             read_access: self.read_access.clone(),
779         } as ~rtio::RtioUdpSocket:Send
780     }
781 }
782
783 impl Drop for UdpWatcher {
784     fn drop(&mut self) {
785         // Send ourselves home to close this handle (blocking while doing so).
786         let _m = self.fire_homing_missile();
787         if self.refcount.decrement() {
788             self.close();
789         }
790     }
791 }
792
793 #[cfg(test)]
794 mod test {
795     use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
796                         RtioUdpSocket};
797     use std::io::test::{next_test_ip4, next_test_ip6};
798
799     use super::{UdpWatcher, TcpWatcher, TcpListener};
800     use super::super::local_loop;
801
802     #[test]
803     fn connect_close_ip4() {
804         match TcpWatcher::connect(local_loop(), next_test_ip4(), None) {
805             Ok(..) => fail!(),
806             Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
807         }
808     }
809
810     #[test]
811     fn connect_close_ip6() {
812         match TcpWatcher::connect(local_loop(), next_test_ip6(), None) {
813             Ok(..) => fail!(),
814             Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
815         }
816     }
817
818     #[test]
819     fn udp_bind_close_ip4() {
820         match UdpWatcher::bind(local_loop(), next_test_ip4()) {
821             Ok(..) => {}
822             Err(..) => fail!()
823         }
824     }
825
826     #[test]
827     fn udp_bind_close_ip6() {
828         match UdpWatcher::bind(local_loop(), next_test_ip6()) {
829             Ok(..) => {}
830             Err(..) => fail!()
831         }
832     }
833
834     #[test]
835     fn listen_ip4() {
836         let (tx, rx) = channel();
837         let addr = next_test_ip4();
838
839         spawn(proc() {
840             let w = match TcpListener::bind(local_loop(), addr) {
841                 Ok(w) => w, Err(e) => fail!("{:?}", e)
842             };
843             let mut w = match w.listen() {
844                 Ok(w) => w, Err(e) => fail!("{:?}", e),
845             };
846             tx.send(());
847             match w.accept() {
848                 Ok(mut stream) => {
849                     let mut buf = [0u8, ..10];
850                     match stream.read(buf) {
851                         Ok(10) => {} e => fail!("{:?}", e),
852                     }
853                     for i in range(0, 10u8) {
854                         assert_eq!(buf[i as uint], i + 1);
855                     }
856                 }
857                 Err(e) => fail!("{:?}", e)
858             }
859         });
860
861         rx.recv();
862         let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
863             Ok(w) => w, Err(e) => fail!("{:?}", e)
864         };
865         match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
866             Ok(()) => {}, Err(e) => fail!("{:?}", e)
867         }
868     }
869
870     #[test]
871     fn listen_ip6() {
872         let (tx, rx) = channel();
873         let addr = next_test_ip6();
874
875         spawn(proc() {
876             let w = match TcpListener::bind(local_loop(), addr) {
877                 Ok(w) => w, Err(e) => fail!("{:?}", e)
878             };
879             let mut w = match w.listen() {
880                 Ok(w) => w, Err(e) => fail!("{:?}", e),
881             };
882             tx.send(());
883             match w.accept() {
884                 Ok(mut stream) => {
885                     let mut buf = [0u8, ..10];
886                     match stream.read(buf) {
887                         Ok(10) => {} e => fail!("{:?}", e),
888                     }
889                     for i in range(0, 10u8) {
890                         assert_eq!(buf[i as uint], i + 1);
891                     }
892                 }
893                 Err(e) => fail!("{:?}", e)
894             }
895         });
896
897         rx.recv();
898         let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
899             Ok(w) => w, Err(e) => fail!("{:?}", e)
900         };
901         match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
902             Ok(()) => {}, Err(e) => fail!("{:?}", e)
903         }
904     }
905
906     #[test]
907     fn udp_recv_ip4() {
908         let (tx, rx) = channel();
909         let client = next_test_ip4();
910         let server = next_test_ip4();
911
912         spawn(proc() {
913             match UdpWatcher::bind(local_loop(), server) {
914                 Ok(mut w) => {
915                     tx.send(());
916                     let mut buf = [0u8, ..10];
917                     match w.recvfrom(buf) {
918                         Ok((10, addr)) => assert_eq!(addr, client),
919                         e => fail!("{:?}", e),
920                     }
921                     for i in range(0, 10u8) {
922                         assert_eq!(buf[i as uint], i + 1);
923                     }
924                 }
925                 Err(e) => fail!("{:?}", e)
926             }
927         });
928
929         rx.recv();
930         let mut w = match UdpWatcher::bind(local_loop(), client) {
931             Ok(w) => w, Err(e) => fail!("{:?}", e)
932         };
933         match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
934             Ok(()) => {}, Err(e) => fail!("{:?}", e)
935         }
936     }
937
938     #[test]
939     fn udp_recv_ip6() {
940         let (tx, rx) = channel();
941         let client = next_test_ip6();
942         let server = next_test_ip6();
943
944         spawn(proc() {
945             match UdpWatcher::bind(local_loop(), server) {
946                 Ok(mut w) => {
947                     tx.send(());
948                     let mut buf = [0u8, ..10];
949                     match w.recvfrom(buf) {
950                         Ok((10, addr)) => assert_eq!(addr, client),
951                         e => fail!("{:?}", e),
952                     }
953                     for i in range(0, 10u8) {
954                         assert_eq!(buf[i as uint], i + 1);
955                     }
956                 }
957                 Err(e) => fail!("{:?}", e)
958             }
959         });
960
961         rx.recv();
962         let mut w = match UdpWatcher::bind(local_loop(), client) {
963             Ok(w) => w, Err(e) => fail!("{:?}", e)
964         };
965         match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
966             Ok(()) => {}, Err(e) => fail!("{:?}", e)
967         }
968     }
969
970     #[test]
971     fn test_read_read_read() {
972         let addr = next_test_ip4();
973         static MAX: uint = 5000;
974         let (tx, rx) = channel();
975
976         spawn(proc() {
977             let listener = TcpListener::bind(local_loop(), addr).unwrap();
978             let mut acceptor = listener.listen().unwrap();
979             tx.send(());
980             let mut stream = acceptor.accept().unwrap();
981             let buf = [1, .. 2048];
982             let mut total_bytes_written = 0;
983             while total_bytes_written < MAX {
984                 assert!(stream.write(buf).is_ok());
985                 uvdebug!("wrote bytes");
986                 total_bytes_written += buf.len();
987             }
988         });
989
990         rx.recv();
991         let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
992         let mut buf = [0, .. 2048];
993         let mut total_bytes_read = 0;
994         while total_bytes_read < MAX {
995             let nread = stream.read(buf).unwrap();
996             total_bytes_read += nread;
997             for i in range(0u, nread) {
998                 assert_eq!(buf[i], 1);
999             }
1000         }
1001         uvdebug!("read {} bytes total", total_bytes_read);
1002     }
1003
1004     #[test]
1005     #[ignore(cfg(windows))] // FIXME(#10102) server never sees second packet
1006     fn test_udp_twice() {
1007         let server_addr = next_test_ip4();
1008         let client_addr = next_test_ip4();
1009         let (tx, rx) = channel();
1010
1011         spawn(proc() {
1012             let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
1013             rx.recv();
1014             assert!(client.sendto([1], server_addr).is_ok());
1015             assert!(client.sendto([2], server_addr).is_ok());
1016         });
1017
1018         let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
1019         tx.send(());
1020         let mut buf1 = [0];
1021         let mut buf2 = [0];
1022         let (nread1, src1) = server.recvfrom(buf1).unwrap();
1023         let (nread2, src2) = server.recvfrom(buf2).unwrap();
1024         assert_eq!(nread1, 1);
1025         assert_eq!(nread2, 1);
1026         assert_eq!(src1, client_addr);
1027         assert_eq!(src2, client_addr);
1028         assert_eq!(buf1[0], 1);
1029         assert_eq!(buf2[0], 2);
1030     }
1031
1032     #[test]
1033     fn test_udp_many_read() {
1034         let server_out_addr = next_test_ip4();
1035         let server_in_addr = next_test_ip4();
1036         let client_out_addr = next_test_ip4();
1037         let client_in_addr = next_test_ip4();
1038         static MAX: uint = 500_000;
1039
1040         let (tx1, rx1) = channel::<()>();
1041         let (tx2, rx2) = channel::<()>();
1042
1043         spawn(proc() {
1044             let l = local_loop();
1045             let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
1046             let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
1047             let (tx, rx) = (tx2, rx1);
1048             tx.send(());
1049             rx.recv();
1050             let msg = [1, .. 2048];
1051             let mut total_bytes_sent = 0;
1052             let mut buf = [1];
1053             while buf[0] == 1 {
1054                 // send more data
1055                 assert!(server_out.sendto(msg, client_in_addr).is_ok());
1056                 total_bytes_sent += msg.len();
1057                 // check if the client has received enough
1058                 let res = server_in.recvfrom(buf);
1059                 assert!(res.is_ok());
1060                 let (nread, src) = res.unwrap();
1061                 assert_eq!(nread, 1);
1062                 assert_eq!(src, client_out_addr);
1063             }
1064             assert!(total_bytes_sent >= MAX);
1065         });
1066
1067         let l = local_loop();
1068         let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
1069         let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
1070         let (tx, rx) = (tx1, rx2);
1071         rx.recv();
1072         tx.send(());
1073         let mut total_bytes_recv = 0;
1074         let mut buf = [0, .. 2048];
1075         while total_bytes_recv < MAX {
1076             // ask for more
1077             assert!(client_out.sendto([1], server_in_addr).is_ok());
1078             // wait for data
1079             let res = client_in.recvfrom(buf);
1080             assert!(res.is_ok());
1081             let (nread, src) = res.unwrap();
1082             assert_eq!(src, server_out_addr);
1083             total_bytes_recv += nread;
1084             for i in range(0u, nread) {
1085                 assert_eq!(buf[i], 1);
1086             }
1087         }
1088         // tell the server we're done
1089         assert!(client_out.sendto([0], server_in_addr).is_ok());
1090     }
1091
1092     #[test]
1093     fn test_read_and_block() {
1094         let addr = next_test_ip4();
1095         let (tx, rx) = channel::<Receiver<()>>();
1096
1097         spawn(proc() {
1098             let rx = rx.recv();
1099             let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1100             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1101             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1102             rx.recv();
1103             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1104             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1105             rx.recv();
1106         });
1107
1108         let listener = TcpListener::bind(local_loop(), addr).unwrap();
1109         let mut acceptor = listener.listen().unwrap();
1110         let (tx2, rx2) = channel();
1111         tx.send(rx2);
1112         let mut stream = acceptor.accept().unwrap();
1113         let mut buf = [0, .. 2048];
1114
1115         let expected = 32;
1116         let mut current = 0;
1117         let mut reads = 0;
1118
1119         while current < expected {
1120             let nread = stream.read(buf).unwrap();
1121             for i in range(0u, nread) {
1122                 let val = buf[i] as uint;
1123                 assert_eq!(val, current % 8);
1124                 current += 1;
1125             }
1126             reads += 1;
1127
1128             let _ = tx2.send_opt(());
1129         }
1130
1131         // Make sure we had multiple reads
1132         assert!(reads > 1);
1133     }
1134
1135     #[test]
1136     fn test_simple_tcp_server_and_client_on_diff_threads() {
1137         let addr = next_test_ip4();
1138
1139         spawn(proc() {
1140             let listener = TcpListener::bind(local_loop(), addr).unwrap();
1141             let mut acceptor = listener.listen().unwrap();
1142             let mut stream = acceptor.accept().unwrap();
1143             let mut buf = [0, .. 2048];
1144             let nread = stream.read(buf).unwrap();
1145             assert_eq!(nread, 8);
1146             for i in range(0u, nread) {
1147                 assert_eq!(buf[i], i as u8);
1148             }
1149         });
1150
1151         let mut stream = TcpWatcher::connect(local_loop(), addr, None);
1152         while stream.is_err() {
1153             stream = TcpWatcher::connect(local_loop(), addr, None);
1154         }
1155         stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1156     }
1157
1158     #[should_fail] #[test]
1159     fn tcp_listener_fail_cleanup() {
1160         let addr = next_test_ip4();
1161         let w = TcpListener::bind(local_loop(), addr).unwrap();
1162         let _w = w.listen().unwrap();
1163         fail!();
1164     }
1165
1166     #[should_fail] #[test]
1167     fn tcp_stream_fail_cleanup() {
1168         let (tx, rx) = channel();
1169         let addr = next_test_ip4();
1170
1171         spawn(proc() {
1172             let w = TcpListener::bind(local_loop(), addr).unwrap();
1173             let mut w = w.listen().unwrap();
1174             tx.send(());
1175             drop(w.accept().unwrap());
1176         });
1177         rx.recv();
1178         let _w = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1179         fail!();
1180     }
1181
1182     #[should_fail] #[test]
1183     fn udp_listener_fail_cleanup() {
1184         let addr = next_test_ip4();
1185         let _w = UdpWatcher::bind(local_loop(), addr).unwrap();
1186         fail!();
1187     }
1188
1189     #[should_fail] #[test]
1190     fn udp_fail_other_task() {
1191         let addr = next_test_ip4();
1192         let (tx, rx) = channel();
1193
1194         // force the handle to be created on a different scheduler, failure in
1195         // the original task will force a homing operation back to this
1196         // scheduler.
1197         spawn(proc() {
1198             let w = UdpWatcher::bind(local_loop(), addr).unwrap();
1199             tx.send(w);
1200         });
1201
1202         let _w = rx.recv();
1203         fail!();
1204     }
1205 }