]> git.lizzy.rs Git - rust.git/blob - src/librustuv/net.rs
47b321b068b36ae6046cd91107d38e428f3e51d0
[rust.git] / src / librustuv / net.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
12 use libc;
13 use std::mem;
14 use std::ptr;
15 use std::rt::rtio;
16 use std::rt::rtio::IoError;
17 use std::rt::task::BlockedTask;
18
19 use homing::{HomingIO, HomeHandle};
20 use rc::Refcount;
21 use stream::StreamWatcher;
22 use super::{Loop, Request, UvError, Buf, status_to_io_result,
23             uv_error_to_io_error, UvHandle, slice_to_uv_buf,
24             wait_until_woken_after, wakeup};
25 use timeout::{AccessTimeout, AcceptTimeout, ConnectCtx};
26 use uvio::UvIoFactory;
27 use uvll;
28
29 ////////////////////////////////////////////////////////////////////////////////
30 /// Generic functions related to dealing with sockaddr things
31 ////////////////////////////////////////////////////////////////////////////////
32
33 pub fn htons(u: u16) -> u16 { mem::to_be16(u) }
34 pub fn ntohs(u: u16) -> u16 { mem::from_be16(u) }
35
36 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
37                         len: uint) -> rtio::SocketAddr {
38     match storage.ss_family as c_int {
39         libc::AF_INET => {
40             assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
41             let storage: &libc::sockaddr_in = unsafe {
42                 mem::transmute(storage)
43             };
44             let ip = mem::to_be32(storage.sin_addr.s_addr as u32);
45             let a = (ip >> 24) as u8;
46             let b = (ip >> 16) as u8;
47             let c = (ip >>  8) as u8;
48             let d = (ip >>  0) as u8;
49             rtio::SocketAddr {
50                 ip: rtio::Ipv4Addr(a, b, c, d),
51                 port: ntohs(storage.sin_port),
52             }
53         }
54         libc::AF_INET6 => {
55             assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
56             let storage: &libc::sockaddr_in6 = unsafe {
57                 mem::transmute(storage)
58             };
59             let a = ntohs(storage.sin6_addr.s6_addr[0]);
60             let b = ntohs(storage.sin6_addr.s6_addr[1]);
61             let c = ntohs(storage.sin6_addr.s6_addr[2]);
62             let d = ntohs(storage.sin6_addr.s6_addr[3]);
63             let e = ntohs(storage.sin6_addr.s6_addr[4]);
64             let f = ntohs(storage.sin6_addr.s6_addr[5]);
65             let g = ntohs(storage.sin6_addr.s6_addr[6]);
66             let h = ntohs(storage.sin6_addr.s6_addr[7]);
67             rtio::SocketAddr {
68                 ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h),
69                 port: ntohs(storage.sin6_port),
70             }
71         }
72         n => {
73             fail!("unknown family {}", n);
74         }
75     }
76 }
77
78 fn addr_to_sockaddr(addr: rtio::SocketAddr) -> (libc::sockaddr_storage, uint) {
79     unsafe {
80         let mut storage: libc::sockaddr_storage = mem::zeroed();
81         let len = match addr.ip {
82             rtio::Ipv4Addr(a, b, c, d) => {
83                 let ip = (a as u32 << 24) |
84                          (b as u32 << 16) |
85                          (c as u32 <<  8) |
86                          (d as u32 <<  0);
87                 let storage: &mut libc::sockaddr_in =
88                     mem::transmute(&mut storage);
89                 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
90                 (*storage).sin_port = htons(addr.port);
91                 (*storage).sin_addr = libc::in_addr {
92                     s_addr: mem::from_be32(ip)
93                 };
94                 mem::size_of::<libc::sockaddr_in>()
95             }
96             rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
97                 let storage: &mut libc::sockaddr_in6 =
98                     mem::transmute(&mut storage);
99                 storage.sin6_family = libc::AF_INET6 as libc::sa_family_t;
100                 storage.sin6_port = htons(addr.port);
101                 storage.sin6_addr = libc::in6_addr {
102                     s6_addr: [
103                         htons(a),
104                         htons(b),
105                         htons(c),
106                         htons(d),
107                         htons(e),
108                         htons(f),
109                         htons(g),
110                         htons(h),
111                     ]
112                 };
113                 mem::size_of::<libc::sockaddr_in6>()
114             }
115         };
116         return (storage, len);
117     }
118 }
119
120 enum SocketNameKind {
121     TcpPeer,
122     Tcp,
123     Udp
124 }
125
126 fn socket_name(sk: SocketNameKind,
127                handle: *c_void) -> Result<rtio::SocketAddr, IoError> {
128     let getsockname = match sk {
129         TcpPeer => uvll::uv_tcp_getpeername,
130         Tcp     => uvll::uv_tcp_getsockname,
131         Udp     => uvll::uv_udp_getsockname,
132     };
133
134     // Allocate a sockaddr_storage since we don't know if it's ipv4 or ipv6
135     let mut sockaddr: libc::sockaddr_storage = unsafe { mem::zeroed() };
136     let mut namelen = mem::size_of::<libc::sockaddr_storage>() as c_int;
137
138     let sockaddr_p = &mut sockaddr as *mut libc::sockaddr_storage;
139     match unsafe {
140         getsockname(handle, sockaddr_p as *mut libc::sockaddr, &mut namelen)
141     } {
142         0 => Ok(sockaddr_to_addr(&sockaddr, namelen as uint)),
143         n => Err(uv_error_to_io_error(UvError(n)))
144     }
145 }
146
147 ////////////////////////////////////////////////////////////////////////////////
148 /// TCP implementation
149 ////////////////////////////////////////////////////////////////////////////////
150
151 pub struct TcpWatcher {
152     handle: *uvll::uv_tcp_t,
153     stream: StreamWatcher,
154     home: HomeHandle,
155     refcount: Refcount,
156
157     // libuv can't support concurrent reads and concurrent writes of the same
158     // stream object, so we use these access guards in order to arbitrate among
159     // multiple concurrent reads and writes. Note that libuv *can* read and
160     // write simultaneously, it just can't read and read simultaneously.
161     read_access: AccessTimeout,
162     write_access: AccessTimeout,
163 }
164
165 pub struct TcpListener {
166     home: HomeHandle,
167     handle: *uvll::uv_pipe_t,
168     outgoing: Sender<Result<Box<rtio::RtioTcpStream:Send>, IoError>>,
169     incoming: Receiver<Result<Box<rtio::RtioTcpStream:Send>, IoError>>,
170 }
171
172 pub struct TcpAcceptor {
173     listener: Box<TcpListener>,
174     timeout: AcceptTimeout,
175 }
176
177 // TCP watchers (clients/streams)
178
179 impl TcpWatcher {
180     pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
181         let handle = io.make_handle();
182         TcpWatcher::new_home(&io.loop_, handle)
183     }
184
185     fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
186         let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
187         assert_eq!(unsafe {
188             uvll::uv_tcp_init(loop_.handle, handle)
189         }, 0);
190         TcpWatcher {
191             home: home,
192             handle: handle,
193             stream: StreamWatcher::new(handle),
194             refcount: Refcount::new(),
195             read_access: AccessTimeout::new(),
196             write_access: AccessTimeout::new(),
197         }
198     }
199
200     pub fn connect(io: &mut UvIoFactory,
201                    address: rtio::SocketAddr,
202                    timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
203         let tcp = TcpWatcher::new(io);
204         let cx = ConnectCtx { status: -1, task: None, timer: None };
205         let (addr, _len) = addr_to_sockaddr(address);
206         let addr_p = &addr as *_ as *libc::sockaddr;
207         cx.connect(tcp, timeout, io, |req, tcp, cb| {
208             unsafe { uvll::uv_tcp_connect(req.handle, tcp.handle, addr_p, cb) }
209         })
210     }
211 }
212
213 impl HomingIO for TcpWatcher {
214     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
215 }
216
217 impl rtio::RtioSocket for TcpWatcher {
218     fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
219         let _m = self.fire_homing_missile();
220         socket_name(Tcp, self.handle)
221     }
222 }
223
224 impl rtio::RtioTcpStream for TcpWatcher {
225     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
226         let m = self.fire_homing_missile();
227         let guard = try!(self.read_access.grant(m));
228
229         // see comments in close_read about this check
230         if guard.access.is_closed() {
231             return Err(uv_error_to_io_error(UvError(uvll::EOF)))
232         }
233
234         self.stream.read(buf).map_err(uv_error_to_io_error)
235     }
236
237     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
238         let m = self.fire_homing_missile();
239         let guard = try!(self.write_access.grant(m));
240         self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
241     }
242
243     fn peer_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
244         let _m = self.fire_homing_missile();
245         socket_name(TcpPeer, self.handle)
246     }
247
248     fn control_congestion(&mut self) -> Result<(), IoError> {
249         let _m = self.fire_homing_missile();
250         status_to_io_result(unsafe {
251             uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
252         })
253     }
254
255     fn nodelay(&mut self) -> Result<(), IoError> {
256         let _m = self.fire_homing_missile();
257         status_to_io_result(unsafe {
258             uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
259         })
260     }
261
262     fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
263         let _m = self.fire_homing_missile();
264         status_to_io_result(unsafe {
265             uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
266                                    delay_in_seconds as c_uint)
267         })
268     }
269
270     fn letdie(&mut self) -> Result<(), IoError> {
271         let _m = self.fire_homing_missile();
272         status_to_io_result(unsafe {
273             uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
274         })
275     }
276
277     fn clone(&self) -> Box<rtio::RtioTcpStream:Send> {
278         box TcpWatcher {
279             handle: self.handle,
280             stream: StreamWatcher::new(self.handle),
281             home: self.home.clone(),
282             refcount: self.refcount.clone(),
283             read_access: self.read_access.clone(),
284             write_access: self.write_access.clone(),
285         } as Box<rtio::RtioTcpStream:Send>
286     }
287
288     fn close_read(&mut self) -> Result<(), IoError> {
289         // see comments in PipeWatcher::close_read
290         let task = {
291             let m = self.fire_homing_missile();
292             self.read_access.access.close(&m);
293     self.stream.cancel_read(uvll::EOF as libc::ssize_t)
294         };
295         let _ = task.map(|t| t.reawaken());
296         Ok(())
297     }
298
299     fn close_write(&mut self) -> Result<(), IoError> {
300         let _m = self.fire_homing_missile();
301         shutdown(self.handle, &self.uv_loop())
302     }
303
304     fn set_timeout(&mut self, timeout: Option<u64>) {
305         self.set_read_timeout(timeout);
306         self.set_write_timeout(timeout);
307     }
308
309     fn set_read_timeout(&mut self, ms: Option<u64>) {
310         let _m = self.fire_homing_missile();
311         let loop_ = self.uv_loop();
312         self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
313                                      &self.stream as *_ as uint);
314
315         fn cancel_read(stream: uint) -> Option<BlockedTask> {
316             let stream: &mut StreamWatcher = unsafe { mem::transmute(stream) };
317             stream.cancel_read(uvll::ECANCELED as ssize_t)
318         }
319     }
320
321     fn set_write_timeout(&mut self, ms: Option<u64>) {
322         let _m = self.fire_homing_missile();
323         let loop_ = self.uv_loop();
324         self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
325                                       &self.stream as *_ as uint);
326
327         fn cancel_write(stream: uint) -> Option<BlockedTask> {
328             let stream: &mut StreamWatcher = unsafe { mem::transmute(stream) };
329             stream.cancel_write()
330         }
331     }
332 }
333
334 impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
335     fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
336 }
337
338 impl Drop for TcpWatcher {
339     fn drop(&mut self) {
340         let _m = self.fire_homing_missile();
341         if self.refcount.decrement() {
342             self.close();
343         }
344     }
345 }
346
347 // TCP listeners (unbound servers)
348
349 impl TcpListener {
350     pub fn bind(io: &mut UvIoFactory, address: rtio::SocketAddr)
351                 -> Result<Box<TcpListener>, UvError> {
352         let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
353         assert_eq!(unsafe {
354             uvll::uv_tcp_init(io.uv_loop(), handle)
355         }, 0);
356         let (tx, rx) = channel();
357         let l = box TcpListener {
358             home: io.make_handle(),
359             handle: handle,
360             outgoing: tx,
361             incoming: rx,
362         };
363         let (addr, _len) = addr_to_sockaddr(address);
364         let res = unsafe {
365             let addr_p = &addr as *libc::sockaddr_storage;
366             uvll::uv_tcp_bind(l.handle, addr_p as *libc::sockaddr)
367         };
368         return match res {
369             0 => Ok(l.install()),
370             n => Err(UvError(n))
371         };
372     }
373 }
374
375 impl HomingIO for TcpListener {
376     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
377 }
378
379 impl UvHandle<uvll::uv_tcp_t> for TcpListener {
380     fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
381 }
382
383 impl rtio::RtioSocket for TcpListener {
384     fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
385         let _m = self.fire_homing_missile();
386         socket_name(Tcp, self.handle)
387     }
388 }
389
390 impl rtio::RtioTcpListener for TcpListener {
391     fn listen(~self) -> Result<Box<rtio::RtioTcpAcceptor:Send>, IoError> {
392         // create the acceptor object from ourselves
393         let mut acceptor = box TcpAcceptor {
394             listener: self,
395             timeout: AcceptTimeout::new(),
396         };
397
398         let _m = acceptor.fire_homing_missile();
399         // FIXME: the 128 backlog should be configurable
400         match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
401             0 => Ok(acceptor as Box<rtio::RtioTcpAcceptor:Send>),
402             n => Err(uv_error_to_io_error(UvError(n))),
403         }
404     }
405 }
406
407 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
408     assert!(status != uvll::ECANCELED);
409     let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
410     let msg = match status {
411         0 => {
412             let loop_ = Loop::wrap(unsafe {
413                 uvll::get_loop_for_uv_handle(server)
414             });
415             let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
416             assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
417             Ok(box client as Box<rtio::RtioTcpStream:Send>)
418         }
419         n => Err(uv_error_to_io_error(UvError(n)))
420     };
421     tcp.outgoing.send(msg);
422 }
423
424 impl Drop for TcpListener {
425     fn drop(&mut self) {
426         let _m = self.fire_homing_missile();
427         self.close();
428     }
429 }
430
431 // TCP acceptors (bound servers)
432
433 impl HomingIO for TcpAcceptor {
434     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
435 }
436
437 impl rtio::RtioSocket for TcpAcceptor {
438     fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
439         let _m = self.fire_homing_missile();
440         socket_name(Tcp, self.listener.handle)
441     }
442 }
443
444 impl rtio::RtioTcpAcceptor for TcpAcceptor {
445     fn accept(&mut self) -> Result<Box<rtio::RtioTcpStream:Send>, IoError> {
446         self.timeout.accept(&self.listener.incoming)
447     }
448
449     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
450         let _m = self.fire_homing_missile();
451         status_to_io_result(unsafe {
452             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
453         })
454     }
455
456     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
457         let _m = self.fire_homing_missile();
458         status_to_io_result(unsafe {
459             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
460         })
461     }
462
463     fn set_timeout(&mut self, ms: Option<u64>) {
464         let _m = self.fire_homing_missile();
465         match ms {
466             None => self.timeout.clear(),
467             Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
468         }
469     }
470 }
471
472 ////////////////////////////////////////////////////////////////////////////////
473 /// UDP implementation
474 ////////////////////////////////////////////////////////////////////////////////
475
476 pub struct UdpWatcher {
477     handle: *uvll::uv_udp_t,
478     home: HomeHandle,
479
480     // See above for what these fields are
481     refcount: Refcount,
482     read_access: AccessTimeout,
483     write_access: AccessTimeout,
484
485     blocked_sender: Option<BlockedTask>,
486 }
487
488 struct UdpRecvCtx {
489     task: Option<BlockedTask>,
490     buf: Option<Buf>,
491     result: Option<(ssize_t, Option<rtio::SocketAddr>)>,
492 }
493
494 struct UdpSendCtx {
495     result: c_int,
496     data: Option<Vec<u8>>,
497     udp: *mut UdpWatcher,
498 }
499
500 impl UdpWatcher {
501     pub fn bind(io: &mut UvIoFactory, address: rtio::SocketAddr)
502                 -> Result<UdpWatcher, UvError> {
503         let udp = UdpWatcher {
504             handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
505             home: io.make_handle(),
506             refcount: Refcount::new(),
507             read_access: AccessTimeout::new(),
508             write_access: AccessTimeout::new(),
509             blocked_sender: None,
510         };
511         assert_eq!(unsafe {
512             uvll::uv_udp_init(io.uv_loop(), udp.handle)
513         }, 0);
514         let (addr, _len) = addr_to_sockaddr(address);
515         let result = unsafe {
516             let addr_p = &addr as *libc::sockaddr_storage;
517             uvll::uv_udp_bind(udp.handle, addr_p as *libc::sockaddr, 0u32)
518         };
519         return match result {
520             0 => Ok(udp),
521             n => Err(UvError(n)),
522         };
523     }
524 }
525
526 impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
527     fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
528 }
529
530 impl HomingIO for UdpWatcher {
531     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
532 }
533
534 impl rtio::RtioSocket for UdpWatcher {
535     fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
536         let _m = self.fire_homing_missile();
537         socket_name(Udp, self.handle)
538     }
539 }
540
541 impl rtio::RtioUdpSocket for UdpWatcher {
542     fn recvfrom(&mut self, buf: &mut [u8])
543         -> Result<(uint, rtio::SocketAddr), IoError>
544     {
545         let loop_ = self.uv_loop();
546         let m = self.fire_homing_missile();
547         let _guard = try!(self.read_access.grant(m));
548
549         return match unsafe {
550             uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
551         } {
552             0 => {
553                 let mut cx = UdpRecvCtx {
554                     task: None,
555                     buf: Some(slice_to_uv_buf(buf)),
556                     result: None,
557                 };
558                 let handle = self.handle;
559                 wait_until_woken_after(&mut cx.task, &loop_, || {
560                     unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
561                 });
562                 match cx.result.take_unwrap() {
563                     (n, _) if n < 0 =>
564                         Err(uv_error_to_io_error(UvError(n as c_int))),
565                     (n, addr) => Ok((n as uint, addr.unwrap()))
566                 }
567             }
568             n => Err(uv_error_to_io_error(UvError(n)))
569         };
570
571         extern fn alloc_cb(handle: *uvll::uv_udp_t,
572                            _suggested_size: size_t,
573                            buf: *mut Buf) {
574             unsafe {
575                 let cx = uvll::get_data_for_uv_handle(handle);
576                 let cx = &mut *(cx as *mut UdpRecvCtx);
577                 *buf = cx.buf.take().expect("recv alloc_cb called more than once")
578             }
579         }
580
581         extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
582                           addr: *libc::sockaddr, _flags: c_uint) {
583             assert!(nread != uvll::ECANCELED as ssize_t);
584             let cx = unsafe {
585                 &mut *(uvll::get_data_for_uv_handle(handle) as *mut UdpRecvCtx)
586             };
587
588             // When there's no data to read the recv callback can be a no-op.
589             // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
590             // this we just drop back to kqueue and wait for the next callback.
591             if nread == 0 {
592                 cx.buf = Some(unsafe { *buf });
593                 return
594             }
595
596             unsafe { assert_eq!(uvll::uv_udp_recv_stop(handle), 0) }
597             let addr = if addr == ptr::null() {
598                 None
599             } else {
600                 let len = mem::size_of::<libc::sockaddr_storage>();
601                 Some(sockaddr_to_addr(unsafe { mem::transmute(addr) }, len))
602             };
603             cx.result = Some((nread, addr));
604             wakeup(&mut cx.task);
605         }
606     }
607
608     fn sendto(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> Result<(), IoError> {
609         let m = self.fire_homing_missile();
610         let loop_ = self.uv_loop();
611         let guard = try!(self.write_access.grant(m));
612
613         let mut req = Request::new(uvll::UV_UDP_SEND);
614         let (addr, _len) = addr_to_sockaddr(dst);
615         let addr_p = &addr as *_ as *libc::sockaddr;
616
617         // see comments in StreamWatcher::write for why we may allocate a buffer
618         // here.
619         let data = if guard.can_timeout {Some(Vec::from_slice(buf))} else {None};
620         let uv_buf = if guard.can_timeout {
621             slice_to_uv_buf(data.get_ref().as_slice())
622         } else {
623             slice_to_uv_buf(buf)
624         };
625
626         return match unsafe {
627             uvll::uv_udp_send(req.handle, self.handle, [uv_buf], addr_p, send_cb)
628         } {
629             0 => {
630                 req.defuse(); // uv callback now owns this request
631                 let mut cx = UdpSendCtx {
632                     result: uvll::ECANCELED, data: data, udp: self as *mut _
633                 };
634                 wait_until_woken_after(&mut self.blocked_sender, &loop_, || {
635                     req.set_data(&cx);
636                 });
637
638                 if cx.result != uvll::ECANCELED {
639                     return match cx.result {
640                         0 => Ok(()),
641                         n => Err(uv_error_to_io_error(UvError(n)))
642                     }
643                 }
644                 let new_cx = box UdpSendCtx {
645                     result: 0,
646                     udp: 0 as *mut UdpWatcher,
647                     data: cx.data.take(),
648                 };
649                 unsafe {
650                     req.set_data(&*new_cx);
651                     mem::forget(new_cx);
652                 }
653                 Err(uv_error_to_io_error(UvError(cx.result)))
654             }
655             n => Err(uv_error_to_io_error(UvError(n)))
656         };
657
658         // This function is the same as stream::write_cb, but adapted for udp
659         // instead of streams.
660         extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
661             let req = Request::wrap(req);
662             let cx: &mut UdpSendCtx = unsafe { req.get_data() };
663             cx.result = status;
664
665             if cx.udp as uint != 0 {
666                 let udp: &mut UdpWatcher = unsafe { &mut *cx.udp };
667                 wakeup(&mut udp.blocked_sender);
668             } else {
669                 let _cx: Box<UdpSendCtx> = unsafe { mem::transmute(cx) };
670             }
671         }
672     }
673
674     fn join_multicast(&mut self, multi: rtio::IpAddr) -> Result<(), IoError> {
675         let _m = self.fire_homing_missile();
676         status_to_io_result(unsafe {
677             multi.to_str().with_c_str(|m_addr| {
678                 uvll::uv_udp_set_membership(self.handle,
679                                             m_addr, ptr::null(),
680                                             uvll::UV_JOIN_GROUP)
681             })
682         })
683     }
684
685     fn leave_multicast(&mut self, multi: rtio::IpAddr) -> Result<(), IoError> {
686         let _m = self.fire_homing_missile();
687         status_to_io_result(unsafe {
688             multi.to_str().with_c_str(|m_addr| {
689                 uvll::uv_udp_set_membership(self.handle,
690                                             m_addr, ptr::null(),
691                                             uvll::UV_LEAVE_GROUP)
692             })
693         })
694     }
695
696     fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
697         let _m = self.fire_homing_missile();
698         status_to_io_result(unsafe {
699             uvll::uv_udp_set_multicast_loop(self.handle,
700                                             1 as c_int)
701         })
702     }
703
704     fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
705         let _m = self.fire_homing_missile();
706         status_to_io_result(unsafe {
707             uvll::uv_udp_set_multicast_loop(self.handle,
708                                             0 as c_int)
709         })
710     }
711
712     fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
713         let _m = self.fire_homing_missile();
714         status_to_io_result(unsafe {
715             uvll::uv_udp_set_multicast_ttl(self.handle,
716                                            ttl as c_int)
717         })
718     }
719
720     fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
721         let _m = self.fire_homing_missile();
722         status_to_io_result(unsafe {
723             uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
724         })
725     }
726
727     fn hear_broadcasts(&mut self) -> Result<(), IoError> {
728         let _m = self.fire_homing_missile();
729         status_to_io_result(unsafe {
730             uvll::uv_udp_set_broadcast(self.handle,
731                                        1 as c_int)
732         })
733     }
734
735     fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
736         let _m = self.fire_homing_missile();
737         status_to_io_result(unsafe {
738             uvll::uv_udp_set_broadcast(self.handle,
739                                        0 as c_int)
740         })
741     }
742
743     fn clone(&self) -> Box<rtio::RtioUdpSocket:Send> {
744         box UdpWatcher {
745             handle: self.handle,
746             home: self.home.clone(),
747             refcount: self.refcount.clone(),
748             write_access: self.write_access.clone(),
749             read_access: self.read_access.clone(),
750             blocked_sender: None,
751         } as Box<rtio::RtioUdpSocket:Send>
752     }
753
754     fn set_timeout(&mut self, timeout: Option<u64>) {
755         self.set_read_timeout(timeout);
756         self.set_write_timeout(timeout);
757     }
758
759     fn set_read_timeout(&mut self, ms: Option<u64>) {
760         let _m = self.fire_homing_missile();
761         let loop_ = self.uv_loop();
762         self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
763                                      self.handle as uint);
764
765         fn cancel_read(stream: uint) -> Option<BlockedTask> {
766             // This method is quite similar to StreamWatcher::cancel_read, see
767             // there for more information
768             let handle = stream as *uvll::uv_udp_t;
769             assert_eq!(unsafe { uvll::uv_udp_recv_stop(handle) }, 0);
770             let data = unsafe {
771                 let data = uvll::get_data_for_uv_handle(handle);
772                 if data.is_null() { return None }
773                 uvll::set_data_for_uv_handle(handle, 0 as *int);
774                 &mut *(data as *mut UdpRecvCtx)
775             };
776             data.result = Some((uvll::ECANCELED as ssize_t, None));
777             data.task.take()
778         }
779     }
780
781     fn set_write_timeout(&mut self, ms: Option<u64>) {
782         let _m = self.fire_homing_missile();
783         let loop_ = self.uv_loop();
784         self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
785                                       self as *mut _ as uint);
786
787         fn cancel_write(stream: uint) -> Option<BlockedTask> {
788             let stream: &mut UdpWatcher = unsafe { mem::transmute(stream) };
789             stream.blocked_sender.take()
790         }
791     }
792 }
793
794 impl Drop for UdpWatcher {
795     fn drop(&mut self) {
796         // Send ourselves home to close this handle (blocking while doing so).
797         let _m = self.fire_homing_missile();
798         if self.refcount.decrement() {
799             self.close();
800         }
801     }
802 }
803
804 ////////////////////////////////////////////////////////////////////////////////
805 // Shutdown helper
806 ////////////////////////////////////////////////////////////////////////////////
807
808 pub fn shutdown(handle: *uvll::uv_stream_t, loop_: &Loop) -> Result<(), IoError> {
809     struct Ctx {
810         slot: Option<BlockedTask>,
811         status: c_int,
812     }
813     let mut req = Request::new(uvll::UV_SHUTDOWN);
814
815     return match unsafe { uvll::uv_shutdown(req.handle, handle, shutdown_cb) } {
816         0 => {
817             req.defuse(); // uv callback now owns this request
818             let mut cx = Ctx { slot: None, status: 0 };
819
820             wait_until_woken_after(&mut cx.slot, loop_, || {
821                 req.set_data(&cx);
822             });
823
824             status_to_io_result(cx.status)
825         }
826         n => Err(uv_error_to_io_error(UvError(n)))
827     };
828
829     extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
830         let req = Request::wrap(req);
831         assert!(status != uvll::ECANCELED);
832         let cx: &mut Ctx = unsafe { req.get_data() };
833         cx.status = status;
834         wakeup(&mut cx.slot);
835     }
836 }
837
838 #[cfg(test)]
839 mod test {
840     use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
841                         RtioUdpSocket};
842
843     use super::{UdpWatcher, TcpWatcher, TcpListener};
844     use super::super::local_loop;
845
846     #[test]
847     fn connect_close_ip4() {
848         match TcpWatcher::connect(local_loop(), ::next_test_ip4(), None) {
849             Ok(..) => fail!(),
850             Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_string()),
851         }
852     }
853
854     #[test]
855     fn connect_close_ip6() {
856         match TcpWatcher::connect(local_loop(), ::next_test_ip6(), None) {
857             Ok(..) => fail!(),
858             Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_string()),
859         }
860     }
861
862     #[test]
863     fn udp_bind_close_ip4() {
864         match UdpWatcher::bind(local_loop(), ::next_test_ip4()) {
865             Ok(..) => {}
866             Err(..) => fail!()
867         }
868     }
869
870     #[test]
871     fn udp_bind_close_ip6() {
872         match UdpWatcher::bind(local_loop(), ::next_test_ip6()) {
873             Ok(..) => {}
874             Err(..) => fail!()
875         }
876     }
877
878     #[test]
879     fn listen_ip4() {
880         let (tx, rx) = channel();
881         let addr = ::next_test_ip4();
882
883         spawn(proc() {
884             let w = match TcpListener::bind(local_loop(), addr) {
885                 Ok(w) => w, Err(e) => fail!("{:?}", e)
886             };
887             let mut w = match w.listen() {
888                 Ok(w) => w, Err(e) => fail!("{:?}", e),
889             };
890             tx.send(());
891             match w.accept() {
892                 Ok(mut stream) => {
893                     let mut buf = [0u8, ..10];
894                     match stream.read(buf) {
895                         Ok(10) => {} e => fail!("{:?}", e),
896                     }
897                     for i in range(0, 10u8) {
898                         assert_eq!(buf[i as uint], i + 1);
899                     }
900                 }
901                 Err(e) => fail!("{:?}", e)
902             }
903         });
904
905         rx.recv();
906         let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
907             Ok(w) => w, Err(e) => fail!("{:?}", e)
908         };
909         match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
910             Ok(()) => {}, Err(e) => fail!("{:?}", e)
911         }
912     }
913
914     #[test]
915     fn listen_ip6() {
916         let (tx, rx) = channel();
917         let addr = ::next_test_ip6();
918
919         spawn(proc() {
920             let w = match TcpListener::bind(local_loop(), addr) {
921                 Ok(w) => w, Err(e) => fail!("{:?}", e)
922             };
923             let mut w = match w.listen() {
924                 Ok(w) => w, Err(e) => fail!("{:?}", e),
925             };
926             tx.send(());
927             match w.accept() {
928                 Ok(mut stream) => {
929                     let mut buf = [0u8, ..10];
930                     match stream.read(buf) {
931                         Ok(10) => {} e => fail!("{:?}", e),
932                     }
933                     for i in range(0, 10u8) {
934                         assert_eq!(buf[i as uint], i + 1);
935                     }
936                 }
937                 Err(e) => fail!("{:?}", e)
938             }
939         });
940
941         rx.recv();
942         let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
943             Ok(w) => w, Err(e) => fail!("{:?}", e)
944         };
945         match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
946             Ok(()) => {}, Err(e) => fail!("{:?}", e)
947         }
948     }
949
950     #[test]
951     fn udp_recv_ip4() {
952         let (tx, rx) = channel();
953         let client = ::next_test_ip4();
954         let server = ::next_test_ip4();
955
956         spawn(proc() {
957             match UdpWatcher::bind(local_loop(), server) {
958                 Ok(mut w) => {
959                     tx.send(());
960                     let mut buf = [0u8, ..10];
961                     match w.recvfrom(buf) {
962                         Ok((10, addr)) => assert!(addr == client),
963                         e => fail!("{:?}", e),
964                     }
965                     for i in range(0, 10u8) {
966                         assert_eq!(buf[i as uint], i + 1);
967                     }
968                 }
969                 Err(e) => fail!("{:?}", e)
970             }
971         });
972
973         rx.recv();
974         let mut w = match UdpWatcher::bind(local_loop(), client) {
975             Ok(w) => w, Err(e) => fail!("{:?}", e)
976         };
977         match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
978             Ok(()) => {}, Err(e) => fail!("{:?}", e)
979         }
980     }
981
982     #[test]
983     fn udp_recv_ip6() {
984         let (tx, rx) = channel();
985         let client = ::next_test_ip6();
986         let server = ::next_test_ip6();
987
988         spawn(proc() {
989             match UdpWatcher::bind(local_loop(), server) {
990                 Ok(mut w) => {
991                     tx.send(());
992                     let mut buf = [0u8, ..10];
993                     match w.recvfrom(buf) {
994                         Ok((10, addr)) => assert!(addr == client),
995                         e => fail!("{:?}", e),
996                     }
997                     for i in range(0, 10u8) {
998                         assert_eq!(buf[i as uint], i + 1);
999                     }
1000                 }
1001                 Err(e) => fail!("{:?}", e)
1002             }
1003         });
1004
1005         rx.recv();
1006         let mut w = match UdpWatcher::bind(local_loop(), client) {
1007             Ok(w) => w, Err(e) => fail!("{:?}", e)
1008         };
1009         match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
1010             Ok(()) => {}, Err(e) => fail!("{:?}", e)
1011         }
1012     }
1013
1014     #[test]
1015     fn test_read_read_read() {
1016         let addr = ::next_test_ip4();
1017         static MAX: uint = 5000;
1018         let (tx, rx) = channel();
1019
1020         spawn(proc() {
1021             let listener = TcpListener::bind(local_loop(), addr).unwrap();
1022             let mut acceptor = listener.listen().ok().unwrap();
1023             tx.send(());
1024             let mut stream = acceptor.accept().ok().unwrap();
1025             let buf = [1, .. 2048];
1026             let mut total_bytes_written = 0;
1027             while total_bytes_written < MAX {
1028                 assert!(stream.write(buf).is_ok());
1029                 uvdebug!("wrote bytes");
1030                 total_bytes_written += buf.len();
1031             }
1032         });
1033
1034         rx.recv();
1035         let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1036         let mut buf = [0, .. 2048];
1037         let mut total_bytes_read = 0;
1038         while total_bytes_read < MAX {
1039             let nread = stream.read(buf).ok().unwrap();
1040             total_bytes_read += nread;
1041             for i in range(0u, nread) {
1042                 assert_eq!(buf[i], 1);
1043             }
1044         }
1045         uvdebug!("read {} bytes total", total_bytes_read);
1046     }
1047
1048     #[test]
1049     #[ignore(cfg(windows))] // FIXME(#10102) server never sees second packet
1050     fn test_udp_twice() {
1051         let server_addr = ::next_test_ip4();
1052         let client_addr = ::next_test_ip4();
1053         let (tx, rx) = channel();
1054
1055         spawn(proc() {
1056             let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
1057             rx.recv();
1058             assert!(client.sendto([1], server_addr).is_ok());
1059             assert!(client.sendto([2], server_addr).is_ok());
1060         });
1061
1062         let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
1063         tx.send(());
1064         let mut buf1 = [0];
1065         let mut buf2 = [0];
1066         let (nread1, src1) = server.recvfrom(buf1).ok().unwrap();
1067         let (nread2, src2) = server.recvfrom(buf2).ok().unwrap();
1068         assert_eq!(nread1, 1);
1069         assert_eq!(nread2, 1);
1070         assert!(src1 == client_addr);
1071         assert!(src2 == client_addr);
1072         assert_eq!(buf1[0], 1);
1073         assert_eq!(buf2[0], 2);
1074     }
1075
1076     #[test]
1077     fn test_udp_many_read() {
1078         let server_out_addr = ::next_test_ip4();
1079         let server_in_addr = ::next_test_ip4();
1080         let client_out_addr = ::next_test_ip4();
1081         let client_in_addr = ::next_test_ip4();
1082         static MAX: uint = 500_000;
1083
1084         let (tx1, rx1) = channel::<()>();
1085         let (tx2, rx2) = channel::<()>();
1086
1087         spawn(proc() {
1088             let l = local_loop();
1089             let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
1090             let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
1091             let (tx, rx) = (tx2, rx1);
1092             tx.send(());
1093             rx.recv();
1094             let msg = [1, .. 2048];
1095             let mut total_bytes_sent = 0;
1096             let mut buf = [1];
1097             while buf[0] == 1 {
1098                 // send more data
1099                 assert!(server_out.sendto(msg, client_in_addr).is_ok());
1100                 total_bytes_sent += msg.len();
1101                 // check if the client has received enough
1102                 let res = server_in.recvfrom(buf);
1103                 assert!(res.is_ok());
1104                 let (nread, src) = res.ok().unwrap();
1105                 assert_eq!(nread, 1);
1106                 assert!(src == client_out_addr);
1107             }
1108             assert!(total_bytes_sent >= MAX);
1109         });
1110
1111         let l = local_loop();
1112         let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
1113         let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
1114         let (tx, rx) = (tx1, rx2);
1115         rx.recv();
1116         tx.send(());
1117         let mut total_bytes_recv = 0;
1118         let mut buf = [0, .. 2048];
1119         while total_bytes_recv < MAX {
1120             // ask for more
1121             assert!(client_out.sendto([1], server_in_addr).is_ok());
1122             // wait for data
1123             let res = client_in.recvfrom(buf);
1124             assert!(res.is_ok());
1125             let (nread, src) = res.ok().unwrap();
1126             assert!(src == server_out_addr);
1127             total_bytes_recv += nread;
1128             for i in range(0u, nread) {
1129                 assert_eq!(buf[i], 1);
1130             }
1131         }
1132         // tell the server we're done
1133         assert!(client_out.sendto([0], server_in_addr).is_ok());
1134     }
1135
1136     #[test]
1137     fn test_read_and_block() {
1138         let addr = ::next_test_ip4();
1139         let (tx, rx) = channel::<Receiver<()>>();
1140
1141         spawn(proc() {
1142             let rx = rx.recv();
1143             let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1144             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1145             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1146             rx.recv();
1147             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1148             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1149             rx.recv();
1150         });
1151
1152         let listener = TcpListener::bind(local_loop(), addr).unwrap();
1153         let mut acceptor = listener.listen().ok().unwrap();
1154         let (tx2, rx2) = channel();
1155         tx.send(rx2);
1156         let mut stream = acceptor.accept().ok().unwrap();
1157         let mut buf = [0, .. 2048];
1158
1159         let expected = 32;
1160         let mut current = 0;
1161         let mut reads = 0;
1162
1163         while current < expected {
1164             let nread = stream.read(buf).ok().unwrap();
1165             for i in range(0u, nread) {
1166                 let val = buf[i] as uint;
1167                 assert_eq!(val, current % 8);
1168                 current += 1;
1169             }
1170             reads += 1;
1171
1172             let _ = tx2.send_opt(());
1173         }
1174
1175         // Make sure we had multiple reads
1176         assert!(reads > 1);
1177     }
1178
1179     #[test]
1180     fn test_simple_tcp_server_and_client_on_diff_threads() {
1181         let addr = ::next_test_ip4();
1182
1183         spawn(proc() {
1184             let listener = TcpListener::bind(local_loop(), addr).unwrap();
1185             let mut acceptor = listener.listen().ok().unwrap();
1186             let mut stream = acceptor.accept().ok().unwrap();
1187             let mut buf = [0, .. 2048];
1188             let nread = stream.read(buf).ok().unwrap();
1189             assert_eq!(nread, 8);
1190             for i in range(0u, nread) {
1191                 assert_eq!(buf[i], i as u8);
1192             }
1193         });
1194
1195         let mut stream = TcpWatcher::connect(local_loop(), addr, None);
1196         while stream.is_err() {
1197             stream = TcpWatcher::connect(local_loop(), addr, None);
1198         }
1199         stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
1200     }
1201
1202     #[should_fail] #[test]
1203     fn tcp_listener_fail_cleanup() {
1204         let addr = ::next_test_ip4();
1205         let w = TcpListener::bind(local_loop(), addr).unwrap();
1206         let _w = w.listen().ok().unwrap();
1207         fail!();
1208     }
1209
1210     #[should_fail] #[test]
1211     fn tcp_stream_fail_cleanup() {
1212         let (tx, rx) = channel();
1213         let addr = ::next_test_ip4();
1214
1215         spawn(proc() {
1216             let w = TcpListener::bind(local_loop(), addr).unwrap();
1217             let mut w = w.listen().ok().unwrap();
1218             tx.send(());
1219             drop(w.accept().ok().unwrap());
1220         });
1221         rx.recv();
1222         let _w = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1223         fail!();
1224     }
1225
1226     #[should_fail] #[test]
1227     fn udp_listener_fail_cleanup() {
1228         let addr = ::next_test_ip4();
1229         let _w = UdpWatcher::bind(local_loop(), addr).unwrap();
1230         fail!();
1231     }
1232
1233     #[should_fail] #[test]
1234     fn udp_fail_other_task() {
1235         let addr = ::next_test_ip4();
1236         let (tx, rx) = channel();
1237
1238         // force the handle to be created on a different scheduler, failure in
1239         // the original task will force a homing operation back to this
1240         // scheduler.
1241         spawn(proc() {
1242             let w = UdpWatcher::bind(local_loop(), addr).unwrap();
1243             tx.send(w);
1244         });
1245
1246         let _w = rx.recv();
1247         fail!();
1248     }
1249 }