]> git.lizzy.rs Git - rust.git/blob - src/librustuv/net.rs
Fix fallout of requiring uint indices
[rust.git] / src / librustuv / net.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use std::cast;
12 use std::io::IoError;
13 use std::io::net::ip;
14 use std::libc::{size_t, ssize_t, c_int, c_void, c_uint};
15 use std::libc;
16 use std::mem;
17 use std::ptr;
18 use std::rt::rtio;
19 use std::rt::task::BlockedTask;
20
21 use access::Access;
22 use homing::{HomingIO, HomeHandle};
23 use rc::Refcount;
24 use stream::StreamWatcher;
25 use super::{Loop, Request, UvError, Buf, status_to_io_result,
26             uv_error_to_io_error, UvHandle, slice_to_uv_buf,
27             wait_until_woken_after, wakeup};
28 use uvio::UvIoFactory;
29 use uvll;
30
31 ////////////////////////////////////////////////////////////////////////////////
32 /// Generic functions related to dealing with sockaddr things
33 ////////////////////////////////////////////////////////////////////////////////
34
35 pub fn htons(u: u16) -> u16 { mem::to_be16(u as i16) as u16 }
36 pub fn ntohs(u: u16) -> u16 { mem::from_be16(u as i16) as u16 }
37
38 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
39                         len: uint) -> ip::SocketAddr {
40     match storage.ss_family as c_int {
41         libc::AF_INET => {
42             assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
43             let storage: &libc::sockaddr_in = unsafe {
44                 cast::transmute(storage)
45             };
46             let addr = storage.sin_addr.s_addr as u32;
47             let a = (addr >>  0) as u8;
48             let b = (addr >>  8) as u8;
49             let c = (addr >> 16) as u8;
50             let d = (addr >> 24) as u8;
51             ip::SocketAddr {
52                 ip: ip::Ipv4Addr(a, b, c, d),
53                 port: ntohs(storage.sin_port),
54             }
55         }
56         libc::AF_INET6 => {
57             assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
58             let storage: &libc::sockaddr_in6 = unsafe {
59                 cast::transmute(storage)
60             };
61             let a = ntohs(storage.sin6_addr.s6_addr[0]);
62             let b = ntohs(storage.sin6_addr.s6_addr[1]);
63             let c = ntohs(storage.sin6_addr.s6_addr[2]);
64             let d = ntohs(storage.sin6_addr.s6_addr[3]);
65             let e = ntohs(storage.sin6_addr.s6_addr[4]);
66             let f = ntohs(storage.sin6_addr.s6_addr[5]);
67             let g = ntohs(storage.sin6_addr.s6_addr[6]);
68             let h = ntohs(storage.sin6_addr.s6_addr[7]);
69             ip::SocketAddr {
70                 ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
71                 port: ntohs(storage.sin6_port),
72             }
73         }
74         n => {
75             fail!("unknown family {}", n);
76         }
77     }
78 }
79
80 fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
81     unsafe {
82         let mut storage: libc::sockaddr_storage = mem::init();
83         let len = match addr.ip {
84             ip::Ipv4Addr(a, b, c, d) => {
85                 let storage: &mut libc::sockaddr_in =
86                     cast::transmute(&mut storage);
87                 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
88                 (*storage).sin_port = htons(addr.port);
89                 (*storage).sin_addr = libc::in_addr {
90                     s_addr: (d as u32 << 24) |
91                             (c as u32 << 16) |
92                             (b as u32 <<  8) |
93                             (a as u32 <<  0)
94                 };
95                 mem::size_of::<libc::sockaddr_in>()
96             }
97             ip::Ipv6Addr(a, b, c, d, e, f, g, h) => {
98                 let storage: &mut libc::sockaddr_in6 =
99                     cast::transmute(&mut storage);
100                 storage.sin6_family = libc::AF_INET6 as libc::sa_family_t;
101                 storage.sin6_port = htons(addr.port);
102                 storage.sin6_addr = libc::in6_addr {
103                     s6_addr: [
104                         htons(a),
105                         htons(b),
106                         htons(c),
107                         htons(d),
108                         htons(e),
109                         htons(f),
110                         htons(g),
111                         htons(h),
112                     ]
113                 };
114                 mem::size_of::<libc::sockaddr_in6>()
115             }
116         };
117         return (storage, len);
118     }
119 }
120
121 enum SocketNameKind {
122     TcpPeer,
123     Tcp,
124     Udp
125 }
126
127 fn socket_name(sk: SocketNameKind,
128                handle: *c_void) -> Result<ip::SocketAddr, IoError> {
129     let getsockname = match sk {
130         TcpPeer => uvll::uv_tcp_getpeername,
131         Tcp     => uvll::uv_tcp_getsockname,
132         Udp     => uvll::uv_udp_getsockname,
133     };
134
135     // Allocate a sockaddr_storage since we don't know if it's ipv4 or ipv6
136     let mut sockaddr: libc::sockaddr_storage = unsafe { mem::init() };
137     let mut namelen = mem::size_of::<libc::sockaddr_storage>() as c_int;
138
139     let sockaddr_p = &mut sockaddr as *mut libc::sockaddr_storage;
140     match unsafe {
141         getsockname(handle, sockaddr_p as *mut libc::sockaddr, &mut namelen)
142     } {
143         0 => Ok(sockaddr_to_addr(&sockaddr, namelen as uint)),
144         n => Err(uv_error_to_io_error(UvError(n)))
145     }
146 }
147
148 ////////////////////////////////////////////////////////////////////////////////
149 /// TCP implementation
150 ////////////////////////////////////////////////////////////////////////////////
151
152 pub struct TcpWatcher {
153     handle: *uvll::uv_tcp_t,
154     stream: StreamWatcher,
155     home: HomeHandle,
156     refcount: Refcount,
157
158     // libuv can't support concurrent reads and concurrent writes of the same
159     // stream object, so we use these access guards in order to arbitrate among
160     // multiple concurrent reads and writes. Note that libuv *can* read and
161     // write simultaneously, it just can't read and read simultaneously.
162     read_access: Access,
163     write_access: Access,
164 }
165
166 pub struct TcpListener {
167     home: HomeHandle,
168     handle: *uvll::uv_pipe_t,
169     closing_task: Option<BlockedTask>,
170     outgoing: Sender<Result<~rtio::RtioTcpStream:Send, IoError>>,
171     incoming: Receiver<Result<~rtio::RtioTcpStream:Send, IoError>>,
172 }
173
174 pub struct TcpAcceptor {
175     listener: ~TcpListener,
176 }
177
178 // TCP watchers (clients/streams)
179
180 impl TcpWatcher {
181     pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
182         let handle = io.make_handle();
183         TcpWatcher::new_home(&io.loop_, handle)
184     }
185
186     fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
187         let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
188         assert_eq!(unsafe {
189             uvll::uv_tcp_init(loop_.handle, handle)
190         }, 0);
191         TcpWatcher {
192             home: home,
193             handle: handle,
194             stream: StreamWatcher::new(handle),
195             refcount: Refcount::new(),
196             read_access: Access::new(),
197             write_access: Access::new(),
198         }
199     }
200
201     pub fn connect(io: &mut UvIoFactory, address: ip::SocketAddr)
202         -> Result<TcpWatcher, UvError>
203     {
204         struct Ctx { status: c_int, task: Option<BlockedTask> }
205
206         let tcp = TcpWatcher::new(io);
207         let (addr, _len) = addr_to_sockaddr(address);
208         let mut req = Request::new(uvll::UV_CONNECT);
209         let result = unsafe {
210             let addr_p = &addr as *libc::sockaddr_storage;
211             uvll::uv_tcp_connect(req.handle, tcp.handle,
212                                  addr_p as *libc::sockaddr,
213                                  connect_cb)
214         };
215         return match result {
216             0 => {
217                 req.defuse(); // uv callback now owns this request
218                 let mut cx = Ctx { status: 0, task: None };
219                 wait_until_woken_after(&mut cx.task, &io.loop_, || {
220                     req.set_data(&cx);
221                 });
222                 match cx.status {
223                     0 => Ok(tcp),
224                     n => Err(UvError(n)),
225                 }
226             }
227             n => Err(UvError(n))
228         };
229
230         extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
231             let req = Request::wrap(req);
232             assert!(status != uvll::ECANCELED);
233             let cx: &mut Ctx = unsafe { req.get_data() };
234             cx.status = status;
235             wakeup(&mut cx.task);
236         }
237     }
238 }
239
240 impl HomingIO for TcpWatcher {
241     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
242 }
243
244 impl rtio::RtioSocket for TcpWatcher {
245     fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
246         let _m = self.fire_homing_missile();
247         socket_name(Tcp, self.handle)
248     }
249 }
250
251 impl rtio::RtioTcpStream for TcpWatcher {
252     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
253         let m = self.fire_homing_missile();
254         let _g = self.read_access.grant(m);
255         self.stream.read(buf).map_err(uv_error_to_io_error)
256     }
257
258     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
259         let m = self.fire_homing_missile();
260         let _g = self.write_access.grant(m);
261         self.stream.write(buf).map_err(uv_error_to_io_error)
262     }
263
264     fn peer_name(&mut self) -> Result<ip::SocketAddr, IoError> {
265         let _m = self.fire_homing_missile();
266         socket_name(TcpPeer, self.handle)
267     }
268
269     fn control_congestion(&mut self) -> Result<(), IoError> {
270         let _m = self.fire_homing_missile();
271         status_to_io_result(unsafe {
272             uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
273         })
274     }
275
276     fn nodelay(&mut self) -> Result<(), IoError> {
277         let _m = self.fire_homing_missile();
278         status_to_io_result(unsafe {
279             uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
280         })
281     }
282
283     fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
284         let _m = self.fire_homing_missile();
285         status_to_io_result(unsafe {
286             uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
287                                    delay_in_seconds as c_uint)
288         })
289     }
290
291     fn letdie(&mut self) -> Result<(), IoError> {
292         let _m = self.fire_homing_missile();
293         status_to_io_result(unsafe {
294             uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
295         })
296     }
297
298     fn clone(&self) -> ~rtio::RtioTcpStream:Send {
299         ~TcpWatcher {
300             handle: self.handle,
301             stream: StreamWatcher::new(self.handle),
302             home: self.home.clone(),
303             refcount: self.refcount.clone(),
304             write_access: self.write_access.clone(),
305             read_access: self.read_access.clone(),
306         } as ~rtio::RtioTcpStream:Send
307     }
308
309     fn close_write(&mut self) -> Result<(), IoError> {
310         struct Ctx {
311             slot: Option<BlockedTask>,
312             status: c_int,
313         }
314         let mut req = Request::new(uvll::UV_SHUTDOWN);
315
316         return match unsafe {
317             uvll::uv_shutdown(req.handle, self.handle, shutdown_cb)
318         } {
319             0 => {
320                 req.defuse(); // uv callback now owns this request
321                 let mut cx = Ctx { slot: None, status: 0 };
322
323                 wait_until_woken_after(&mut cx.slot, &self.uv_loop(), || {
324                     req.set_data(&cx);
325                 });
326
327                 status_to_io_result(cx.status)
328             }
329             n => Err(uv_error_to_io_error(UvError(n)))
330         };
331
332         extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
333             let req = Request::wrap(req);
334             assert!(status != uvll::ECANCELED);
335             let cx: &mut Ctx = unsafe { req.get_data() };
336             cx.status = status;
337             wakeup(&mut cx.slot);
338         }
339     }
340 }
341
342 impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
343     fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
344 }
345
346 impl Drop for TcpWatcher {
347     fn drop(&mut self) {
348         let _m = self.fire_homing_missile();
349         if self.refcount.decrement() {
350             self.close();
351         }
352     }
353 }
354
355 // TCP listeners (unbound servers)
356
357 impl TcpListener {
358     pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
359                 -> Result<~TcpListener, UvError> {
360         let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
361         assert_eq!(unsafe {
362             uvll::uv_tcp_init(io.uv_loop(), handle)
363         }, 0);
364         let (tx, rx) = channel();
365         let l = ~TcpListener {
366             home: io.make_handle(),
367             handle: handle,
368             closing_task: None,
369             outgoing: tx,
370             incoming: rx,
371         };
372         let (addr, _len) = addr_to_sockaddr(address);
373         let res = unsafe {
374             let addr_p = &addr as *libc::sockaddr_storage;
375             uvll::uv_tcp_bind(l.handle, addr_p as *libc::sockaddr)
376         };
377         return match res {
378             0 => Ok(l.install()),
379             n => Err(UvError(n))
380         };
381     }
382 }
383
384 impl HomingIO for TcpListener {
385     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
386 }
387
388 impl UvHandle<uvll::uv_tcp_t> for TcpListener {
389     fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
390 }
391
392 impl rtio::RtioSocket for TcpListener {
393     fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
394         let _m = self.fire_homing_missile();
395         socket_name(Tcp, self.handle)
396     }
397 }
398
399 impl rtio::RtioTcpListener for TcpListener {
400     fn listen(~self) -> Result<~rtio::RtioTcpAcceptor:Send, IoError> {
401         // create the acceptor object from ourselves
402         let mut acceptor = ~TcpAcceptor { listener: self };
403
404         let _m = acceptor.fire_homing_missile();
405         // FIXME: the 128 backlog should be configurable
406         match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
407             0 => Ok(acceptor as ~rtio::RtioTcpAcceptor:Send),
408             n => Err(uv_error_to_io_error(UvError(n))),
409         }
410     }
411 }
412
413 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
414     assert!(status != uvll::ECANCELED);
415     let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
416     let msg = match status {
417         0 => {
418             let loop_ = Loop::wrap(unsafe {
419                 uvll::get_loop_for_uv_handle(server)
420             });
421             let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
422             assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
423             Ok(~client as ~rtio::RtioTcpStream:Send)
424         }
425         n => Err(uv_error_to_io_error(UvError(n)))
426     };
427     tcp.outgoing.send(msg);
428 }
429
430 impl Drop for TcpListener {
431     fn drop(&mut self) {
432         let _m = self.fire_homing_missile();
433         self.close();
434     }
435 }
436
437 // TCP acceptors (bound servers)
438
439 impl HomingIO for TcpAcceptor {
440     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
441 }
442
443 impl rtio::RtioSocket for TcpAcceptor {
444     fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
445         let _m = self.fire_homing_missile();
446         socket_name(Tcp, self.listener.handle)
447     }
448 }
449
450 impl rtio::RtioTcpAcceptor for TcpAcceptor {
451     fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> {
452         self.listener.incoming.recv()
453     }
454
455     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
456         let _m = self.fire_homing_missile();
457         status_to_io_result(unsafe {
458             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
459         })
460     }
461
462     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
463         let _m = self.fire_homing_missile();
464         status_to_io_result(unsafe {
465             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
466         })
467     }
468 }
469
470 ////////////////////////////////////////////////////////////////////////////////
471 /// UDP implementation
472 ////////////////////////////////////////////////////////////////////////////////
473
474 pub struct UdpWatcher {
475     handle: *uvll::uv_udp_t,
476     home: HomeHandle,
477
478     // See above for what these fields are
479     refcount: Refcount,
480     read_access: Access,
481     write_access: Access,
482 }
483
484 impl UdpWatcher {
485     pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
486                 -> Result<UdpWatcher, UvError> {
487         let udp = UdpWatcher {
488             handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
489             home: io.make_handle(),
490             refcount: Refcount::new(),
491             read_access: Access::new(),
492             write_access: Access::new(),
493         };
494         assert_eq!(unsafe {
495             uvll::uv_udp_init(io.uv_loop(), udp.handle)
496         }, 0);
497         let (addr, _len) = addr_to_sockaddr(address);
498         let result = unsafe {
499             let addr_p = &addr as *libc::sockaddr_storage;
500             uvll::uv_udp_bind(udp.handle, addr_p as *libc::sockaddr, 0u32)
501         };
502         return match result {
503             0 => Ok(udp),
504             n => Err(UvError(n)),
505         };
506     }
507 }
508
509 impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
510     fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
511 }
512
513 impl HomingIO for UdpWatcher {
514     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
515 }
516
517 impl rtio::RtioSocket for UdpWatcher {
518     fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
519         let _m = self.fire_homing_missile();
520         socket_name(Udp, self.handle)
521     }
522 }
523
524 impl rtio::RtioUdpSocket for UdpWatcher {
525     fn recvfrom(&mut self, buf: &mut [u8])
526         -> Result<(uint, ip::SocketAddr), IoError>
527     {
528         struct Ctx {
529             task: Option<BlockedTask>,
530             buf: Option<Buf>,
531             result: Option<(ssize_t, Option<ip::SocketAddr>)>,
532         }
533         let loop_ = self.uv_loop();
534         let m = self.fire_homing_missile();
535         let _g = self.read_access.grant(m);
536
537         let a = match unsafe {
538             uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
539         } {
540             0 => {
541                 let mut cx = Ctx {
542                     task: None,
543                     buf: Some(slice_to_uv_buf(buf)),
544                     result: None,
545                 };
546                 let handle = self.handle;
547                 wait_until_woken_after(&mut cx.task, &loop_, || {
548                     unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
549                 });
550                 match cx.result.take_unwrap() {
551                     (n, _) if n < 0 =>
552                         Err(uv_error_to_io_error(UvError(n as c_int))),
553                     (n, addr) => Ok((n as uint, addr.unwrap()))
554                 }
555             }
556             n => Err(uv_error_to_io_error(UvError(n)))
557         };
558         return a;
559
560         extern fn alloc_cb(handle: *uvll::uv_udp_t,
561                            _suggested_size: size_t,
562                            buf: *mut Buf) {
563             unsafe {
564                 let cx: &mut Ctx =
565                     cast::transmute(uvll::get_data_for_uv_handle(handle));
566                 *buf = cx.buf.take().expect("recv alloc_cb called more than once")
567             }
568         }
569
570         extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
571                           addr: *libc::sockaddr, _flags: c_uint) {
572             assert!(nread != uvll::ECANCELED as ssize_t);
573             let cx: &mut Ctx = unsafe {
574                 cast::transmute(uvll::get_data_for_uv_handle(handle))
575             };
576
577             // When there's no data to read the recv callback can be a no-op.
578             // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
579             // this we just drop back to kqueue and wait for the next callback.
580             if nread == 0 {
581                 cx.buf = Some(unsafe { *buf });
582                 return
583             }
584
585             unsafe {
586                 assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
587             }
588
589             let cx: &mut Ctx = unsafe {
590                 cast::transmute(uvll::get_data_for_uv_handle(handle))
591             };
592             let addr = if addr == ptr::null() {
593                 None
594             } else {
595                 let len = mem::size_of::<libc::sockaddr_storage>();
596                 Some(sockaddr_to_addr(unsafe { cast::transmute(addr) }, len))
597             };
598             cx.result = Some((nread, addr));
599             wakeup(&mut cx.task);
600         }
601     }
602
603     fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> {
604         struct Ctx { task: Option<BlockedTask>, result: c_int }
605
606         let m = self.fire_homing_missile();
607         let loop_ = self.uv_loop();
608         let _g = self.write_access.grant(m);
609
610         let mut req = Request::new(uvll::UV_UDP_SEND);
611         let buf = slice_to_uv_buf(buf);
612         let (addr, _len) = addr_to_sockaddr(dst);
613         let result = unsafe {
614             let addr_p = &addr as *libc::sockaddr_storage;
615             uvll::uv_udp_send(req.handle, self.handle, [buf],
616                               addr_p as *libc::sockaddr, send_cb)
617         };
618
619         return match result {
620             0 => {
621                 req.defuse(); // uv callback now owns this request
622                 let mut cx = Ctx { task: None, result: 0 };
623                 wait_until_woken_after(&mut cx.task, &loop_, || {
624                     req.set_data(&cx);
625                 });
626                 match cx.result {
627                     0 => Ok(()),
628                     n => Err(uv_error_to_io_error(UvError(n)))
629                 }
630             }
631             n => Err(uv_error_to_io_error(UvError(n)))
632         };
633
634         extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
635             let req = Request::wrap(req);
636             assert!(status != uvll::ECANCELED);
637             let cx: &mut Ctx = unsafe { req.get_data() };
638             cx.result = status;
639             wakeup(&mut cx.task);
640         }
641     }
642
643     fn join_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
644         let _m = self.fire_homing_missile();
645         status_to_io_result(unsafe {
646             multi.to_str().with_c_str(|m_addr| {
647                 uvll::uv_udp_set_membership(self.handle,
648                                             m_addr, ptr::null(),
649                                             uvll::UV_JOIN_GROUP)
650             })
651         })
652     }
653
654     fn leave_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
655         let _m = self.fire_homing_missile();
656         status_to_io_result(unsafe {
657             multi.to_str().with_c_str(|m_addr| {
658                 uvll::uv_udp_set_membership(self.handle,
659                                             m_addr, ptr::null(),
660                                             uvll::UV_LEAVE_GROUP)
661             })
662         })
663     }
664
665     fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
666         let _m = self.fire_homing_missile();
667         status_to_io_result(unsafe {
668             uvll::uv_udp_set_multicast_loop(self.handle,
669                                             1 as c_int)
670         })
671     }
672
673     fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
674         let _m = self.fire_homing_missile();
675         status_to_io_result(unsafe {
676             uvll::uv_udp_set_multicast_loop(self.handle,
677                                             0 as c_int)
678         })
679     }
680
681     fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
682         let _m = self.fire_homing_missile();
683         status_to_io_result(unsafe {
684             uvll::uv_udp_set_multicast_ttl(self.handle,
685                                            ttl as c_int)
686         })
687     }
688
689     fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
690         let _m = self.fire_homing_missile();
691         status_to_io_result(unsafe {
692             uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
693         })
694     }
695
696     fn hear_broadcasts(&mut self) -> Result<(), IoError> {
697         let _m = self.fire_homing_missile();
698         status_to_io_result(unsafe {
699             uvll::uv_udp_set_broadcast(self.handle,
700                                        1 as c_int)
701         })
702     }
703
704     fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
705         let _m = self.fire_homing_missile();
706         status_to_io_result(unsafe {
707             uvll::uv_udp_set_broadcast(self.handle,
708                                        0 as c_int)
709         })
710     }
711
712     fn clone(&self) -> ~rtio::RtioUdpSocket:Send {
713         ~UdpWatcher {
714             handle: self.handle,
715             home: self.home.clone(),
716             refcount: self.refcount.clone(),
717             write_access: self.write_access.clone(),
718             read_access: self.read_access.clone(),
719         } as ~rtio::RtioUdpSocket:Send
720     }
721 }
722
723 impl Drop for UdpWatcher {
724     fn drop(&mut self) {
725         // Send ourselves home to close this handle (blocking while doing so).
726         let _m = self.fire_homing_missile();
727         if self.refcount.decrement() {
728             self.close();
729         }
730     }
731 }
732
733 #[cfg(test)]
734 mod test {
735     use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
736                         RtioUdpSocket};
737     use std::io::test::{next_test_ip4, next_test_ip6};
738
739     use super::{UdpWatcher, TcpWatcher, TcpListener};
740     use super::super::local_loop;
741
742     #[test]
743     fn connect_close_ip4() {
744         match TcpWatcher::connect(local_loop(), next_test_ip4()) {
745             Ok(..) => fail!(),
746             Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
747         }
748     }
749
750     #[test]
751     fn connect_close_ip6() {
752         match TcpWatcher::connect(local_loop(), next_test_ip6()) {
753             Ok(..) => fail!(),
754             Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
755         }
756     }
757
758     #[test]
759     fn udp_bind_close_ip4() {
760         match UdpWatcher::bind(local_loop(), next_test_ip4()) {
761             Ok(..) => {}
762             Err(..) => fail!()
763         }
764     }
765
766     #[test]
767     fn udp_bind_close_ip6() {
768         match UdpWatcher::bind(local_loop(), next_test_ip6()) {
769             Ok(..) => {}
770             Err(..) => fail!()
771         }
772     }
773
774     #[test]
775     fn listen_ip4() {
776         let (tx, rx) = channel();
777         let addr = next_test_ip4();
778
779         spawn(proc() {
780             let w = match TcpListener::bind(local_loop(), addr) {
781                 Ok(w) => w, Err(e) => fail!("{:?}", e)
782             };
783             let mut w = match w.listen() {
784                 Ok(w) => w, Err(e) => fail!("{:?}", e),
785             };
786             tx.send(());
787             match w.accept() {
788                 Ok(mut stream) => {
789                     let mut buf = [0u8, ..10];
790                     match stream.read(buf) {
791                         Ok(10) => {} e => fail!("{:?}", e),
792                     }
793                     for i in range(0, 10u8) {
794                         assert_eq!(buf[i as uint], i + 1);
795                     }
796                 }
797                 Err(e) => fail!("{:?}", e)
798             }
799         });
800
801         rx.recv();
802         let mut w = match TcpWatcher::connect(local_loop(), addr) {
803             Ok(w) => w, Err(e) => fail!("{:?}", e)
804         };
805         match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
806             Ok(()) => {}, Err(e) => fail!("{:?}", e)
807         }
808     }
809
810     #[test]
811     fn listen_ip6() {
812         let (tx, rx) = channel();
813         let addr = next_test_ip6();
814
815         spawn(proc() {
816             let w = match TcpListener::bind(local_loop(), addr) {
817                 Ok(w) => w, Err(e) => fail!("{:?}", e)
818             };
819             let mut w = match w.listen() {
820                 Ok(w) => w, Err(e) => fail!("{:?}", e),
821             };
822             tx.send(());
823             match w.accept() {
824                 Ok(mut stream) => {
825                     let mut buf = [0u8, ..10];
826                     match stream.read(buf) {
827                         Ok(10) => {} e => fail!("{:?}", e),
828                     }
829                     for i in range(0, 10u8) {
830                         assert_eq!(buf[i as uint], i + 1);
831                     }
832                 }
833                 Err(e) => fail!("{:?}", e)
834             }
835         });
836
837         rx.recv();
838         let mut w = match TcpWatcher::connect(local_loop(), addr) {
839             Ok(w) => w, Err(e) => fail!("{:?}", e)
840         };
841         match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
842             Ok(()) => {}, Err(e) => fail!("{:?}", e)
843         }
844     }
845
846     #[test]
847     fn udp_recv_ip4() {
848         let (tx, rx) = channel();
849         let client = next_test_ip4();
850         let server = next_test_ip4();
851
852         spawn(proc() {
853             match UdpWatcher::bind(local_loop(), server) {
854                 Ok(mut w) => {
855                     tx.send(());
856                     let mut buf = [0u8, ..10];
857                     match w.recvfrom(buf) {
858                         Ok((10, addr)) => assert_eq!(addr, client),
859                         e => fail!("{:?}", e),
860                     }
861                     for i in range(0, 10u8) {
862                         assert_eq!(buf[i as uint], i + 1);
863                     }
864                 }
865                 Err(e) => fail!("{:?}", e)
866             }
867         });
868
869         rx.recv();
870         let mut w = match UdpWatcher::bind(local_loop(), client) {
871             Ok(w) => w, Err(e) => fail!("{:?}", e)
872         };
873         match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
874             Ok(()) => {}, Err(e) => fail!("{:?}", e)
875         }
876     }
877
878     #[test]
879     fn udp_recv_ip6() {
880         let (tx, rx) = channel();
881         let client = next_test_ip6();
882         let server = next_test_ip6();
883
884         spawn(proc() {
885             match UdpWatcher::bind(local_loop(), server) {
886                 Ok(mut w) => {
887                     tx.send(());
888                     let mut buf = [0u8, ..10];
889                     match w.recvfrom(buf) {
890                         Ok((10, addr)) => assert_eq!(addr, client),
891                         e => fail!("{:?}", e),
892                     }
893                     for i in range(0, 10u8) {
894                         assert_eq!(buf[i as uint], i + 1);
895                     }
896                 }
897                 Err(e) => fail!("{:?}", e)
898             }
899         });
900
901         rx.recv();
902         let mut w = match UdpWatcher::bind(local_loop(), client) {
903             Ok(w) => w, Err(e) => fail!("{:?}", e)
904         };
905         match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
906             Ok(()) => {}, Err(e) => fail!("{:?}", e)
907         }
908     }
909
910     #[test]
911     fn test_read_read_read() {
912         let addr = next_test_ip4();
913         static MAX: uint = 5000;
914         let (tx, rx) = channel();
915
916         spawn(proc() {
917             let listener = TcpListener::bind(local_loop(), addr).unwrap();
918             let mut acceptor = listener.listen().unwrap();
919             tx.send(());
920             let mut stream = acceptor.accept().unwrap();
921             let buf = [1, .. 2048];
922             let mut total_bytes_written = 0;
923             while total_bytes_written < MAX {
924                 assert!(stream.write(buf).is_ok());
925                 uvdebug!("wrote bytes");
926                 total_bytes_written += buf.len();
927             }
928         });
929
930         rx.recv();
931         let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
932         let mut buf = [0, .. 2048];
933         let mut total_bytes_read = 0;
934         while total_bytes_read < MAX {
935             let nread = stream.read(buf).unwrap();
936             total_bytes_read += nread;
937             for i in range(0u, nread) {
938                 assert_eq!(buf[i], 1);
939             }
940         }
941         uvdebug!("read {} bytes total", total_bytes_read);
942     }
943
944     #[test]
945     #[ignore(cfg(windows))] // FIXME(#10102) server never sees second packet
946     fn test_udp_twice() {
947         let server_addr = next_test_ip4();
948         let client_addr = next_test_ip4();
949         let (tx, rx) = channel();
950
951         spawn(proc() {
952             let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
953             rx.recv();
954             assert!(client.sendto([1], server_addr).is_ok());
955             assert!(client.sendto([2], server_addr).is_ok());
956         });
957
958         let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
959         tx.send(());
960         let mut buf1 = [0];
961         let mut buf2 = [0];
962         let (nread1, src1) = server.recvfrom(buf1).unwrap();
963         let (nread2, src2) = server.recvfrom(buf2).unwrap();
964         assert_eq!(nread1, 1);
965         assert_eq!(nread2, 1);
966         assert_eq!(src1, client_addr);
967         assert_eq!(src2, client_addr);
968         assert_eq!(buf1[0], 1);
969         assert_eq!(buf2[0], 2);
970     }
971
972     #[test]
973     fn test_udp_many_read() {
974         let server_out_addr = next_test_ip4();
975         let server_in_addr = next_test_ip4();
976         let client_out_addr = next_test_ip4();
977         let client_in_addr = next_test_ip4();
978         static MAX: uint = 500_000;
979
980         let (tx1, rx1) = channel::<()>();
981         let (tx2, rx2) = channel::<()>();
982
983         spawn(proc() {
984             let l = local_loop();
985             let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
986             let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
987             let (tx, rx) = (tx2, rx1);
988             tx.send(());
989             rx.recv();
990             let msg = [1, .. 2048];
991             let mut total_bytes_sent = 0;
992             let mut buf = [1];
993             while buf[0] == 1 {
994                 // send more data
995                 assert!(server_out.sendto(msg, client_in_addr).is_ok());
996                 total_bytes_sent += msg.len();
997                 // check if the client has received enough
998                 let res = server_in.recvfrom(buf);
999                 assert!(res.is_ok());
1000                 let (nread, src) = res.unwrap();
1001                 assert_eq!(nread, 1);
1002                 assert_eq!(src, client_out_addr);
1003             }
1004             assert!(total_bytes_sent >= MAX);
1005         });
1006
1007         let l = local_loop();
1008         let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
1009         let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
1010         let (tx, rx) = (tx1, rx2);
1011         rx.recv();
1012         tx.send(());
1013         let mut total_bytes_recv = 0;
1014         let mut buf = [0, .. 2048];
1015         while total_bytes_recv < MAX {
1016             // ask for more
1017             assert!(client_out.sendto([1], server_in_addr).is_ok());
1018             // wait for data
1019             let res = client_in.recvfrom(buf);
1020             assert!(res.is_ok());
1021             let (nread, src) = res.unwrap();
1022             assert_eq!(src, server_out_addr);
1023             total_bytes_recv += nread;
1024             for i in range(0u, nread) {
1025                 assert_eq!(buf[i], 1);
1026             }
1027         }
1028         // tell the server we're done
1029         assert!(client_out.sendto([0], server_in_addr).is_ok());
1030     }
1031
1032     #[test]
1033     fn test_read_and_block() {
1034         let addr = next_test_ip4();
1035         let (tx, rx) = channel::<Receiver<()>>();
1036
1037         spawn(proc() {
1038             let rx = rx.recv();
1039             let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
1040             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1041             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1042             rx.recv();
1043             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1044             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1045             rx.recv();
1046         });
1047
1048         let listener = TcpListener::bind(local_loop(), addr).unwrap();
1049         let mut acceptor = listener.listen().unwrap();
1050         let (tx2, rx2) = channel();
1051         tx.send(rx2);
1052         let mut stream = acceptor.accept().unwrap();
1053         let mut buf = [0, .. 2048];
1054
1055         let expected = 32;
1056         let mut current = 0;
1057         let mut reads = 0;
1058
1059         while current < expected {
1060             let nread = stream.read(buf).unwrap();
1061             for i in range(0u, nread) {
1062                 let val = buf[i] as uint;
1063                 assert_eq!(val, current % 8);
1064                 current += 1;
1065             }
1066             reads += 1;
1067
1068             tx2.try_send(());
1069         }
1070
1071         // Make sure we had multiple reads
1072         assert!(reads > 1);
1073     }
1074
1075     #[test]
1076     fn test_simple_tcp_server_and_client_on_diff_threads() {
1077         let addr = next_test_ip4();
1078
1079         spawn(proc() {
1080             let listener = TcpListener::bind(local_loop(), addr).unwrap();
1081             let mut acceptor = listener.listen().unwrap();
1082             let mut stream = acceptor.accept().unwrap();
1083             let mut buf = [0, .. 2048];
1084             let nread = stream.read(buf).unwrap();
1085             assert_eq!(nread, 8);
1086             for i in range(0u, nread) {
1087                 assert_eq!(buf[i], i as u8);
1088             }
1089         });
1090
1091         let mut stream = TcpWatcher::connect(local_loop(), addr);
1092         while stream.is_err() {
1093             stream = TcpWatcher::connect(local_loop(), addr);
1094         }
1095         stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1096     }
1097
1098     #[should_fail] #[test]
1099     fn tcp_listener_fail_cleanup() {
1100         let addr = next_test_ip4();
1101         let w = TcpListener::bind(local_loop(), addr).unwrap();
1102         let _w = w.listen().unwrap();
1103         fail!();
1104     }
1105
1106     #[should_fail] #[test]
1107     fn tcp_stream_fail_cleanup() {
1108         let (tx, rx) = channel();
1109         let addr = next_test_ip4();
1110
1111         spawn(proc() {
1112             let w = TcpListener::bind(local_loop(), addr).unwrap();
1113             let mut w = w.listen().unwrap();
1114             tx.send(());
1115             drop(w.accept().unwrap());
1116         });
1117         rx.recv();
1118         let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
1119         fail!();
1120     }
1121
1122     #[should_fail] #[test]
1123     fn udp_listener_fail_cleanup() {
1124         let addr = next_test_ip4();
1125         let _w = UdpWatcher::bind(local_loop(), addr).unwrap();
1126         fail!();
1127     }
1128
1129     #[should_fail] #[test]
1130     fn udp_fail_other_task() {
1131         let addr = next_test_ip4();
1132         let (tx, rx) = channel();
1133
1134         // force the handle to be created on a different scheduler, failure in
1135         // the original task will force a homing operation back to this
1136         // scheduler.
1137         spawn(proc() {
1138             let w = UdpWatcher::bind(local_loop(), addr).unwrap();
1139             tx.send(w);
1140         });
1141
1142         let _w = rx.recv();
1143         fail!();
1144     }
1145 }