]> git.lizzy.rs Git - rust.git/blob - src/librustuv/net.rs
auto merge of #12186 : alexcrichton/rust/no-sleep-2, r=brson
[rust.git] / src / librustuv / net.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use std::cast;
12 use std::io::IoError;
13 use std::io::net::ip;
14 use std::libc::{size_t, ssize_t, c_int, c_void, c_uint};
15 use std::libc;
16 use std::mem;
17 use std::ptr;
18 use std::rt::rtio;
19 use std::rt::task::BlockedTask;
20
21 use access::Access;
22 use homing::{HomingIO, HomeHandle};
23 use rc::Refcount;
24 use stream::StreamWatcher;
25 use super::{Loop, Request, UvError, Buf, status_to_io_result,
26             uv_error_to_io_error, UvHandle, slice_to_uv_buf,
27             wait_until_woken_after, wakeup};
28 use uvio::UvIoFactory;
29 use uvll;
30
31 ////////////////////////////////////////////////////////////////////////////////
32 /// Generic functions related to dealing with sockaddr things
33 ////////////////////////////////////////////////////////////////////////////////
34
35 pub fn htons(u: u16) -> u16 { mem::to_be16(u as i16) as u16 }
36 pub fn ntohs(u: u16) -> u16 { mem::from_be16(u as i16) as u16 }
37
38 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
39                         len: uint) -> ip::SocketAddr {
40     match storage.ss_family as c_int {
41         libc::AF_INET => {
42             assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
43             let storage: &libc::sockaddr_in = unsafe {
44                 cast::transmute(storage)
45             };
46             let addr = storage.sin_addr.s_addr as u32;
47             let a = (addr >>  0) as u8;
48             let b = (addr >>  8) as u8;
49             let c = (addr >> 16) as u8;
50             let d = (addr >> 24) as u8;
51             ip::SocketAddr {
52                 ip: ip::Ipv4Addr(a, b, c, d),
53                 port: ntohs(storage.sin_port),
54             }
55         }
56         libc::AF_INET6 => {
57             assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
58             let storage: &libc::sockaddr_in6 = unsafe {
59                 cast::transmute(storage)
60             };
61             let a = ntohs(storage.sin6_addr.s6_addr[0]);
62             let b = ntohs(storage.sin6_addr.s6_addr[1]);
63             let c = ntohs(storage.sin6_addr.s6_addr[2]);
64             let d = ntohs(storage.sin6_addr.s6_addr[3]);
65             let e = ntohs(storage.sin6_addr.s6_addr[4]);
66             let f = ntohs(storage.sin6_addr.s6_addr[5]);
67             let g = ntohs(storage.sin6_addr.s6_addr[6]);
68             let h = ntohs(storage.sin6_addr.s6_addr[7]);
69             ip::SocketAddr {
70                 ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
71                 port: ntohs(storage.sin6_port),
72             }
73         }
74         n => {
75             fail!("unknown family {}", n);
76         }
77     }
78 }
79
80 fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
81     unsafe {
82         let mut storage: libc::sockaddr_storage = mem::init();
83         let len = match addr.ip {
84             ip::Ipv4Addr(a, b, c, d) => {
85                 let storage: &mut libc::sockaddr_in =
86                     cast::transmute(&mut storage);
87                 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
88                 (*storage).sin_port = htons(addr.port);
89                 (*storage).sin_addr = libc::in_addr {
90                     s_addr: (d as u32 << 24) |
91                             (c as u32 << 16) |
92                             (b as u32 <<  8) |
93                             (a as u32 <<  0)
94                 };
95                 mem::size_of::<libc::sockaddr_in>()
96             }
97             ip::Ipv6Addr(a, b, c, d, e, f, g, h) => {
98                 let storage: &mut libc::sockaddr_in6 =
99                     cast::transmute(&mut storage);
100                 storage.sin6_family = libc::AF_INET6 as libc::sa_family_t;
101                 storage.sin6_port = htons(addr.port);
102                 storage.sin6_addr = libc::in6_addr {
103                     s6_addr: [
104                         htons(a),
105                         htons(b),
106                         htons(c),
107                         htons(d),
108                         htons(e),
109                         htons(f),
110                         htons(g),
111                         htons(h),
112                     ]
113                 };
114                 mem::size_of::<libc::sockaddr_in6>()
115             }
116         };
117         return (storage, len);
118     }
119 }
120
121 enum SocketNameKind {
122     TcpPeer,
123     Tcp,
124     Udp
125 }
126
127 fn socket_name(sk: SocketNameKind,
128                handle: *c_void) -> Result<ip::SocketAddr, IoError> {
129     let getsockname = match sk {
130         TcpPeer => uvll::uv_tcp_getpeername,
131         Tcp     => uvll::uv_tcp_getsockname,
132         Udp     => uvll::uv_udp_getsockname,
133     };
134
135     // Allocate a sockaddr_storage since we don't know if it's ipv4 or ipv6
136     let mut sockaddr: libc::sockaddr_storage = unsafe { mem::init() };
137     let mut namelen = mem::size_of::<libc::sockaddr_storage>() as c_int;
138
139     let sockaddr_p = &mut sockaddr as *mut libc::sockaddr_storage;
140     match unsafe {
141         getsockname(handle, sockaddr_p as *mut libc::sockaddr, &mut namelen)
142     } {
143         0 => Ok(sockaddr_to_addr(&sockaddr, namelen as uint)),
144         n => Err(uv_error_to_io_error(UvError(n)))
145     }
146 }
147
148 ////////////////////////////////////////////////////////////////////////////////
149 /// TCP implementation
150 ////////////////////////////////////////////////////////////////////////////////
151
152 pub struct TcpWatcher {
153     handle: *uvll::uv_tcp_t,
154     stream: StreamWatcher,
155     home: HomeHandle,
156     priv refcount: Refcount,
157
158     // libuv can't support concurrent reads and concurrent writes of the same
159     // stream object, so we use these access guards in order to arbitrate among
160     // multiple concurrent reads and writes. Note that libuv *can* read and
161     // write simultaneously, it just can't read and read simultaneously.
162     priv read_access: Access,
163     priv write_access: Access,
164 }
165
166 pub struct TcpListener {
167     home: HomeHandle,
168     handle: *uvll::uv_pipe_t,
169     priv closing_task: Option<BlockedTask>,
170     priv outgoing: Chan<Result<~rtio::RtioTcpStream, IoError>>,
171     priv incoming: Port<Result<~rtio::RtioTcpStream, IoError>>,
172 }
173
174 pub struct TcpAcceptor {
175     listener: ~TcpListener,
176 }
177
178 // TCP watchers (clients/streams)
179
180 impl TcpWatcher {
181     pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
182         let handle = io.make_handle();
183         TcpWatcher::new_home(&io.loop_, handle)
184     }
185
186     fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
187         let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
188         assert_eq!(unsafe {
189             uvll::uv_tcp_init(loop_.handle, handle)
190         }, 0);
191         TcpWatcher {
192             home: home,
193             handle: handle,
194             stream: StreamWatcher::new(handle),
195             refcount: Refcount::new(),
196             read_access: Access::new(),
197             write_access: Access::new(),
198         }
199     }
200
201     pub fn connect(io: &mut UvIoFactory, address: ip::SocketAddr)
202         -> Result<TcpWatcher, UvError>
203     {
204         struct Ctx { status: c_int, task: Option<BlockedTask> }
205
206         let tcp = TcpWatcher::new(io);
207         let (addr, _len) = addr_to_sockaddr(address);
208         let mut req = Request::new(uvll::UV_CONNECT);
209         let result = unsafe {
210             let addr_p = &addr as *libc::sockaddr_storage;
211             uvll::uv_tcp_connect(req.handle, tcp.handle,
212                                  addr_p as *libc::sockaddr,
213                                  connect_cb)
214         };
215         return match result {
216             0 => {
217                 req.defuse(); // uv callback now owns this request
218                 let mut cx = Ctx { status: 0, task: None };
219                 wait_until_woken_after(&mut cx.task, &io.loop_, || {
220                     req.set_data(&cx);
221                 });
222                 match cx.status {
223                     0 => Ok(tcp),
224                     n => Err(UvError(n)),
225                 }
226             }
227             n => Err(UvError(n))
228         };
229
230         extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
231             let req = Request::wrap(req);
232             assert!(status != uvll::ECANCELED);
233             let cx: &mut Ctx = unsafe { req.get_data() };
234             cx.status = status;
235             wakeup(&mut cx.task);
236         }
237     }
238 }
239
240 impl HomingIO for TcpWatcher {
241     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
242 }
243
244 impl rtio::RtioSocket for TcpWatcher {
245     fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
246         let _m = self.fire_homing_missile();
247         socket_name(Tcp, self.handle)
248     }
249 }
250
251 impl rtio::RtioTcpStream for TcpWatcher {
252     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
253         let m = self.fire_homing_missile();
254         let _g = self.read_access.grant(m);
255         self.stream.read(buf).map_err(uv_error_to_io_error)
256     }
257
258     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
259         let m = self.fire_homing_missile();
260         let _g = self.write_access.grant(m);
261         self.stream.write(buf).map_err(uv_error_to_io_error)
262     }
263
264     fn peer_name(&mut self) -> Result<ip::SocketAddr, IoError> {
265         let _m = self.fire_homing_missile();
266         socket_name(TcpPeer, self.handle)
267     }
268
269     fn control_congestion(&mut self) -> Result<(), IoError> {
270         let _m = self.fire_homing_missile();
271         status_to_io_result(unsafe {
272             uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
273         })
274     }
275
276     fn nodelay(&mut self) -> Result<(), IoError> {
277         let _m = self.fire_homing_missile();
278         status_to_io_result(unsafe {
279             uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
280         })
281     }
282
283     fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
284         let _m = self.fire_homing_missile();
285         status_to_io_result(unsafe {
286             uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
287                                    delay_in_seconds as c_uint)
288         })
289     }
290
291     fn letdie(&mut self) -> Result<(), IoError> {
292         let _m = self.fire_homing_missile();
293         status_to_io_result(unsafe {
294             uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
295         })
296     }
297
298     fn clone(&self) -> ~rtio::RtioTcpStream {
299         ~TcpWatcher {
300             handle: self.handle,
301             stream: StreamWatcher::new(self.handle),
302             home: self.home.clone(),
303             refcount: self.refcount.clone(),
304             write_access: self.write_access.clone(),
305             read_access: self.read_access.clone(),
306         } as ~rtio::RtioTcpStream
307     }
308 }
309
310 impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
311     fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
312 }
313
314 impl Drop for TcpWatcher {
315     fn drop(&mut self) {
316         let _m = self.fire_homing_missile();
317         if self.refcount.decrement() {
318             self.close();
319         }
320     }
321 }
322
323 // TCP listeners (unbound servers)
324
325 impl TcpListener {
326     pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
327                 -> Result<~TcpListener, UvError> {
328         let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
329         assert_eq!(unsafe {
330             uvll::uv_tcp_init(io.uv_loop(), handle)
331         }, 0);
332         let (port, chan) = Chan::new();
333         let l = ~TcpListener {
334             home: io.make_handle(),
335             handle: handle,
336             closing_task: None,
337             outgoing: chan,
338             incoming: port,
339         };
340         let (addr, _len) = addr_to_sockaddr(address);
341         let res = unsafe {
342             let addr_p = &addr as *libc::sockaddr_storage;
343             uvll::uv_tcp_bind(l.handle, addr_p as *libc::sockaddr)
344         };
345         return match res {
346             0 => Ok(l.install()),
347             n => Err(UvError(n))
348         };
349     }
350 }
351
352 impl HomingIO for TcpListener {
353     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
354 }
355
356 impl UvHandle<uvll::uv_tcp_t> for TcpListener {
357     fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
358 }
359
360 impl rtio::RtioSocket for TcpListener {
361     fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
362         let _m = self.fire_homing_missile();
363         socket_name(Tcp, self.handle)
364     }
365 }
366
367 impl rtio::RtioTcpListener for TcpListener {
368     fn listen(~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
369         // create the acceptor object from ourselves
370         let mut acceptor = ~TcpAcceptor { listener: self };
371
372         let _m = acceptor.fire_homing_missile();
373         // FIXME: the 128 backlog should be configurable
374         match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
375             0 => Ok(acceptor as ~rtio::RtioTcpAcceptor),
376             n => Err(uv_error_to_io_error(UvError(n))),
377         }
378     }
379 }
380
381 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
382     assert!(status != uvll::ECANCELED);
383     let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
384     let msg = match status {
385         0 => {
386             let loop_ = Loop::wrap(unsafe {
387                 uvll::get_loop_for_uv_handle(server)
388             });
389             let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
390             assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
391             Ok(~client as ~rtio::RtioTcpStream)
392         }
393         n => Err(uv_error_to_io_error(UvError(n)))
394     };
395     tcp.outgoing.send(msg);
396 }
397
398 impl Drop for TcpListener {
399     fn drop(&mut self) {
400         let _m = self.fire_homing_missile();
401         self.close();
402     }
403 }
404
405 // TCP acceptors (bound servers)
406
407 impl HomingIO for TcpAcceptor {
408     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
409 }
410
411 impl rtio::RtioSocket for TcpAcceptor {
412     fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
413         let _m = self.fire_homing_missile();
414         socket_name(Tcp, self.listener.handle)
415     }
416 }
417
418 impl rtio::RtioTcpAcceptor for TcpAcceptor {
419     fn accept(&mut self) -> Result<~rtio::RtioTcpStream, IoError> {
420         self.listener.incoming.recv()
421     }
422
423     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
424         let _m = self.fire_homing_missile();
425         status_to_io_result(unsafe {
426             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
427         })
428     }
429
430     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
431         let _m = self.fire_homing_missile();
432         status_to_io_result(unsafe {
433             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
434         })
435     }
436 }
437
438 ////////////////////////////////////////////////////////////////////////////////
439 /// UDP implementation
440 ////////////////////////////////////////////////////////////////////////////////
441
442 pub struct UdpWatcher {
443     handle: *uvll::uv_udp_t,
444     home: HomeHandle,
445
446     // See above for what these fields are
447     priv refcount: Refcount,
448     priv read_access: Access,
449     priv write_access: Access,
450 }
451
452 impl UdpWatcher {
453     pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
454                 -> Result<UdpWatcher, UvError> {
455         let udp = UdpWatcher {
456             handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
457             home: io.make_handle(),
458             refcount: Refcount::new(),
459             read_access: Access::new(),
460             write_access: Access::new(),
461         };
462         assert_eq!(unsafe {
463             uvll::uv_udp_init(io.uv_loop(), udp.handle)
464         }, 0);
465         let (addr, _len) = addr_to_sockaddr(address);
466         let result = unsafe {
467             let addr_p = &addr as *libc::sockaddr_storage;
468             uvll::uv_udp_bind(udp.handle, addr_p as *libc::sockaddr, 0u32)
469         };
470         return match result {
471             0 => Ok(udp),
472             n => Err(UvError(n)),
473         };
474     }
475 }
476
477 impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
478     fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
479 }
480
481 impl HomingIO for UdpWatcher {
482     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
483 }
484
485 impl rtio::RtioSocket for UdpWatcher {
486     fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
487         let _m = self.fire_homing_missile();
488         socket_name(Udp, self.handle)
489     }
490 }
491
492 impl rtio::RtioUdpSocket for UdpWatcher {
493     fn recvfrom(&mut self, buf: &mut [u8])
494         -> Result<(uint, ip::SocketAddr), IoError>
495     {
496         struct Ctx {
497             task: Option<BlockedTask>,
498             buf: Option<Buf>,
499             result: Option<(ssize_t, Option<ip::SocketAddr>)>,
500         }
501         let loop_ = self.uv_loop();
502         let m = self.fire_homing_missile();
503         let _g = self.read_access.grant(m);
504
505         let a = match unsafe {
506             uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
507         } {
508             0 => {
509                 let mut cx = Ctx {
510                     task: None,
511                     buf: Some(slice_to_uv_buf(buf)),
512                     result: None,
513                 };
514                 let handle = self.handle;
515                 wait_until_woken_after(&mut cx.task, &loop_, || {
516                     unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
517                 });
518                 match cx.result.take_unwrap() {
519                     (n, _) if n < 0 =>
520                         Err(uv_error_to_io_error(UvError(n as c_int))),
521                     (n, addr) => Ok((n as uint, addr.unwrap()))
522                 }
523             }
524             n => Err(uv_error_to_io_error(UvError(n)))
525         };
526         return a;
527
528         extern fn alloc_cb(handle: *uvll::uv_udp_t,
529                            _suggested_size: size_t,
530                            buf: *mut Buf) {
531             unsafe {
532                 let cx: &mut Ctx =
533                     cast::transmute(uvll::get_data_for_uv_handle(handle));
534                 *buf = cx.buf.take().expect("recv alloc_cb called more than once")
535             }
536         }
537
538         extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
539                           addr: *libc::sockaddr, _flags: c_uint) {
540             assert!(nread != uvll::ECANCELED as ssize_t);
541             let cx: &mut Ctx = unsafe {
542                 cast::transmute(uvll::get_data_for_uv_handle(handle))
543             };
544
545             // When there's no data to read the recv callback can be a no-op.
546             // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
547             // this we just drop back to kqueue and wait for the next callback.
548             if nread == 0 {
549                 cx.buf = Some(unsafe { *buf });
550                 return
551             }
552
553             unsafe {
554                 assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
555             }
556
557             let cx: &mut Ctx = unsafe {
558                 cast::transmute(uvll::get_data_for_uv_handle(handle))
559             };
560             let addr = if addr == ptr::null() {
561                 None
562             } else {
563                 let len = mem::size_of::<libc::sockaddr_storage>();
564                 Some(sockaddr_to_addr(unsafe { cast::transmute(addr) }, len))
565             };
566             cx.result = Some((nread, addr));
567             wakeup(&mut cx.task);
568         }
569     }
570
571     fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> {
572         struct Ctx { task: Option<BlockedTask>, result: c_int }
573
574         let m = self.fire_homing_missile();
575         let loop_ = self.uv_loop();
576         let _g = self.write_access.grant(m);
577
578         let mut req = Request::new(uvll::UV_UDP_SEND);
579         let buf = slice_to_uv_buf(buf);
580         let (addr, _len) = addr_to_sockaddr(dst);
581         let result = unsafe {
582             let addr_p = &addr as *libc::sockaddr_storage;
583             uvll::uv_udp_send(req.handle, self.handle, [buf],
584                               addr_p as *libc::sockaddr, send_cb)
585         };
586
587         return match result {
588             0 => {
589                 req.defuse(); // uv callback now owns this request
590                 let mut cx = Ctx { task: None, result: 0 };
591                 wait_until_woken_after(&mut cx.task, &loop_, || {
592                     req.set_data(&cx);
593                 });
594                 match cx.result {
595                     0 => Ok(()),
596                     n => Err(uv_error_to_io_error(UvError(n)))
597                 }
598             }
599             n => Err(uv_error_to_io_error(UvError(n)))
600         };
601
602         extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
603             let req = Request::wrap(req);
604             assert!(status != uvll::ECANCELED);
605             let cx: &mut Ctx = unsafe { req.get_data() };
606             cx.result = status;
607             wakeup(&mut cx.task);
608         }
609     }
610
611     fn join_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
612         let _m = self.fire_homing_missile();
613         status_to_io_result(unsafe {
614             multi.to_str().with_c_str(|m_addr| {
615                 uvll::uv_udp_set_membership(self.handle,
616                                             m_addr, ptr::null(),
617                                             uvll::UV_JOIN_GROUP)
618             })
619         })
620     }
621
622     fn leave_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
623         let _m = self.fire_homing_missile();
624         status_to_io_result(unsafe {
625             multi.to_str().with_c_str(|m_addr| {
626                 uvll::uv_udp_set_membership(self.handle,
627                                             m_addr, ptr::null(),
628                                             uvll::UV_LEAVE_GROUP)
629             })
630         })
631     }
632
633     fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
634         let _m = self.fire_homing_missile();
635         status_to_io_result(unsafe {
636             uvll::uv_udp_set_multicast_loop(self.handle,
637                                             1 as c_int)
638         })
639     }
640
641     fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
642         let _m = self.fire_homing_missile();
643         status_to_io_result(unsafe {
644             uvll::uv_udp_set_multicast_loop(self.handle,
645                                             0 as c_int)
646         })
647     }
648
649     fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
650         let _m = self.fire_homing_missile();
651         status_to_io_result(unsafe {
652             uvll::uv_udp_set_multicast_ttl(self.handle,
653                                            ttl as c_int)
654         })
655     }
656
657     fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
658         let _m = self.fire_homing_missile();
659         status_to_io_result(unsafe {
660             uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
661         })
662     }
663
664     fn hear_broadcasts(&mut self) -> Result<(), IoError> {
665         let _m = self.fire_homing_missile();
666         status_to_io_result(unsafe {
667             uvll::uv_udp_set_broadcast(self.handle,
668                                        1 as c_int)
669         })
670     }
671
672     fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
673         let _m = self.fire_homing_missile();
674         status_to_io_result(unsafe {
675             uvll::uv_udp_set_broadcast(self.handle,
676                                        0 as c_int)
677         })
678     }
679
680     fn clone(&self) -> ~rtio::RtioUdpSocket {
681         ~UdpWatcher {
682             handle: self.handle,
683             home: self.home.clone(),
684             refcount: self.refcount.clone(),
685             write_access: self.write_access.clone(),
686             read_access: self.read_access.clone(),
687         } as ~rtio::RtioUdpSocket
688     }
689 }
690
691 impl Drop for UdpWatcher {
692     fn drop(&mut self) {
693         // Send ourselves home to close this handle (blocking while doing so).
694         let _m = self.fire_homing_missile();
695         if self.refcount.decrement() {
696             self.close();
697         }
698     }
699 }
700
701 #[cfg(test)]
702 mod test {
703     use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
704                         RtioUdpSocket};
705     use std::io::test::{next_test_ip4, next_test_ip6};
706
707     use super::{UdpWatcher, TcpWatcher, TcpListener};
708     use super::super::local_loop;
709
710     #[test]
711     fn connect_close_ip4() {
712         match TcpWatcher::connect(local_loop(), next_test_ip4()) {
713             Ok(..) => fail!(),
714             Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
715         }
716     }
717
718     #[test]
719     fn connect_close_ip6() {
720         match TcpWatcher::connect(local_loop(), next_test_ip6()) {
721             Ok(..) => fail!(),
722             Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
723         }
724     }
725
726     #[test]
727     fn udp_bind_close_ip4() {
728         match UdpWatcher::bind(local_loop(), next_test_ip4()) {
729             Ok(..) => {}
730             Err(..) => fail!()
731         }
732     }
733
734     #[test]
735     fn udp_bind_close_ip6() {
736         match UdpWatcher::bind(local_loop(), next_test_ip6()) {
737             Ok(..) => {}
738             Err(..) => fail!()
739         }
740     }
741
742     #[test]
743     fn listen_ip4() {
744         let (port, chan) = Chan::new();
745         let addr = next_test_ip4();
746
747         spawn(proc() {
748             let w = match TcpListener::bind(local_loop(), addr) {
749                 Ok(w) => w, Err(e) => fail!("{:?}", e)
750             };
751             let mut w = match w.listen() {
752                 Ok(w) => w, Err(e) => fail!("{:?}", e),
753             };
754             chan.send(());
755             match w.accept() {
756                 Ok(mut stream) => {
757                     let mut buf = [0u8, ..10];
758                     match stream.read(buf) {
759                         Ok(10) => {} e => fail!("{:?}", e),
760                     }
761                     for i in range(0, 10u8) {
762                         assert_eq!(buf[i], i + 1);
763                     }
764                 }
765                 Err(e) => fail!("{:?}", e)
766             }
767         });
768
769         port.recv();
770         let mut w = match TcpWatcher::connect(local_loop(), addr) {
771             Ok(w) => w, Err(e) => fail!("{:?}", e)
772         };
773         match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
774             Ok(()) => {}, Err(e) => fail!("{:?}", e)
775         }
776     }
777
778     #[test]
779     fn listen_ip6() {
780         let (port, chan) = Chan::new();
781         let addr = next_test_ip6();
782
783         spawn(proc() {
784             let w = match TcpListener::bind(local_loop(), addr) {
785                 Ok(w) => w, Err(e) => fail!("{:?}", e)
786             };
787             let mut w = match w.listen() {
788                 Ok(w) => w, Err(e) => fail!("{:?}", e),
789             };
790             chan.send(());
791             match w.accept() {
792                 Ok(mut stream) => {
793                     let mut buf = [0u8, ..10];
794                     match stream.read(buf) {
795                         Ok(10) => {} e => fail!("{:?}", e),
796                     }
797                     for i in range(0, 10u8) {
798                         assert_eq!(buf[i], i + 1);
799                     }
800                 }
801                 Err(e) => fail!("{:?}", e)
802             }
803         });
804
805         port.recv();
806         let mut w = match TcpWatcher::connect(local_loop(), addr) {
807             Ok(w) => w, Err(e) => fail!("{:?}", e)
808         };
809         match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
810             Ok(()) => {}, Err(e) => fail!("{:?}", e)
811         }
812     }
813
814     #[test]
815     fn udp_recv_ip4() {
816         let (port, chan) = Chan::new();
817         let client = next_test_ip4();
818         let server = next_test_ip4();
819
820         spawn(proc() {
821             match UdpWatcher::bind(local_loop(), server) {
822                 Ok(mut w) => {
823                     chan.send(());
824                     let mut buf = [0u8, ..10];
825                     match w.recvfrom(buf) {
826                         Ok((10, addr)) => assert_eq!(addr, client),
827                         e => fail!("{:?}", e),
828                     }
829                     for i in range(0, 10u8) {
830                         assert_eq!(buf[i], i + 1);
831                     }
832                 }
833                 Err(e) => fail!("{:?}", e)
834             }
835         });
836
837         port.recv();
838         let mut w = match UdpWatcher::bind(local_loop(), client) {
839             Ok(w) => w, Err(e) => fail!("{:?}", e)
840         };
841         match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
842             Ok(()) => {}, Err(e) => fail!("{:?}", e)
843         }
844     }
845
846     #[test]
847     fn udp_recv_ip6() {
848         let (port, chan) = Chan::new();
849         let client = next_test_ip6();
850         let server = next_test_ip6();
851
852         spawn(proc() {
853             match UdpWatcher::bind(local_loop(), server) {
854                 Ok(mut w) => {
855                     chan.send(());
856                     let mut buf = [0u8, ..10];
857                     match w.recvfrom(buf) {
858                         Ok((10, addr)) => assert_eq!(addr, client),
859                         e => fail!("{:?}", e),
860                     }
861                     for i in range(0, 10u8) {
862                         assert_eq!(buf[i], i + 1);
863                     }
864                 }
865                 Err(e) => fail!("{:?}", e)
866             }
867         });
868
869         port.recv();
870         let mut w = match UdpWatcher::bind(local_loop(), client) {
871             Ok(w) => w, Err(e) => fail!("{:?}", e)
872         };
873         match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
874             Ok(()) => {}, Err(e) => fail!("{:?}", e)
875         }
876     }
877
878     #[test]
879     fn test_read_read_read() {
880         let addr = next_test_ip4();
881         static MAX: uint = 5000;
882         let (port, chan) = Chan::new();
883
884         spawn(proc() {
885             let listener = TcpListener::bind(local_loop(), addr).unwrap();
886             let mut acceptor = listener.listen().unwrap();
887             chan.send(());
888             let mut stream = acceptor.accept().unwrap();
889             let buf = [1, .. 2048];
890             let mut total_bytes_written = 0;
891             while total_bytes_written < MAX {
892                 assert!(stream.write(buf).is_ok());
893                 uvdebug!("wrote bytes");
894                 total_bytes_written += buf.len();
895             }
896         });
897
898         port.recv();
899         let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
900         let mut buf = [0, .. 2048];
901         let mut total_bytes_read = 0;
902         while total_bytes_read < MAX {
903             let nread = stream.read(buf).unwrap();
904             total_bytes_read += nread;
905             for i in range(0u, nread) {
906                 assert_eq!(buf[i], 1);
907             }
908         }
909         uvdebug!("read {} bytes total", total_bytes_read);
910     }
911
912     #[test]
913     #[ignore(cfg(windows))] // FIXME(#10102) server never sees second packet
914     fn test_udp_twice() {
915         let server_addr = next_test_ip4();
916         let client_addr = next_test_ip4();
917         let (port, chan) = Chan::new();
918
919         spawn(proc() {
920             let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
921             port.recv();
922             assert!(client.sendto([1], server_addr).is_ok());
923             assert!(client.sendto([2], server_addr).is_ok());
924         });
925
926         let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
927         chan.send(());
928         let mut buf1 = [0];
929         let mut buf2 = [0];
930         let (nread1, src1) = server.recvfrom(buf1).unwrap();
931         let (nread2, src2) = server.recvfrom(buf2).unwrap();
932         assert_eq!(nread1, 1);
933         assert_eq!(nread2, 1);
934         assert_eq!(src1, client_addr);
935         assert_eq!(src2, client_addr);
936         assert_eq!(buf1[0], 1);
937         assert_eq!(buf2[0], 2);
938     }
939
940     #[test]
941     fn test_udp_many_read() {
942         let server_out_addr = next_test_ip4();
943         let server_in_addr = next_test_ip4();
944         let client_out_addr = next_test_ip4();
945         let client_in_addr = next_test_ip4();
946         static MAX: uint = 500_000;
947
948         let (p1, c1) = Chan::new();
949         let (p2, c2) = Chan::new();
950
951         spawn(proc() {
952             let l = local_loop();
953             let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
954             let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
955             let (port, chan) = (p1, c2);
956             chan.send(());
957             port.recv();
958             let msg = [1, .. 2048];
959             let mut total_bytes_sent = 0;
960             let mut buf = [1];
961             while buf[0] == 1 {
962                 // send more data
963                 assert!(server_out.sendto(msg, client_in_addr).is_ok());
964                 total_bytes_sent += msg.len();
965                 // check if the client has received enough
966                 let res = server_in.recvfrom(buf);
967                 assert!(res.is_ok());
968                 let (nread, src) = res.unwrap();
969                 assert_eq!(nread, 1);
970                 assert_eq!(src, client_out_addr);
971             }
972             assert!(total_bytes_sent >= MAX);
973         });
974
975         let l = local_loop();
976         let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
977         let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
978         let (port, chan) = (p2, c1);
979         port.recv();
980         chan.send(());
981         let mut total_bytes_recv = 0;
982         let mut buf = [0, .. 2048];
983         while total_bytes_recv < MAX {
984             // ask for more
985             assert!(client_out.sendto([1], server_in_addr).is_ok());
986             // wait for data
987             let res = client_in.recvfrom(buf);
988             assert!(res.is_ok());
989             let (nread, src) = res.unwrap();
990             assert_eq!(src, server_out_addr);
991             total_bytes_recv += nread;
992             for i in range(0u, nread) {
993                 assert_eq!(buf[i], 1);
994             }
995         }
996         // tell the server we're done
997         assert!(client_out.sendto([0], server_in_addr).is_ok());
998     }
999
1000     #[test]
1001     fn test_read_and_block() {
1002         let addr = next_test_ip4();
1003         let (port, chan) = Chan::<Port<()>>::new();
1004
1005         spawn(proc() {
1006             let port2 = port.recv();
1007             let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
1008             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1009             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1010             port2.recv();
1011             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1012             stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1013             port2.recv();
1014         });
1015
1016         let listener = TcpListener::bind(local_loop(), addr).unwrap();
1017         let mut acceptor = listener.listen().unwrap();
1018         let (port2, chan2) = Chan::new();
1019         chan.send(port2);
1020         let mut stream = acceptor.accept().unwrap();
1021         let mut buf = [0, .. 2048];
1022
1023         let expected = 32;
1024         let mut current = 0;
1025         let mut reads = 0;
1026
1027         while current < expected {
1028             let nread = stream.read(buf).unwrap();
1029             for i in range(0u, nread) {
1030                 let val = buf[i] as uint;
1031                 assert_eq!(val, current % 8);
1032                 current += 1;
1033             }
1034             reads += 1;
1035
1036             chan2.try_send(());
1037         }
1038
1039         // Make sure we had multiple reads
1040         assert!(reads > 1);
1041     }
1042
1043     #[test]
1044     fn test_simple_tcp_server_and_client_on_diff_threads() {
1045         let addr = next_test_ip4();
1046
1047         spawn(proc() {
1048             let listener = TcpListener::bind(local_loop(), addr).unwrap();
1049             let mut acceptor = listener.listen().unwrap();
1050             let mut stream = acceptor.accept().unwrap();
1051             let mut buf = [0, .. 2048];
1052             let nread = stream.read(buf).unwrap();
1053             assert_eq!(nread, 8);
1054             for i in range(0u, nread) {
1055                 assert_eq!(buf[i], i as u8);
1056             }
1057         });
1058
1059         let mut stream = TcpWatcher::connect(local_loop(), addr);
1060         while stream.is_err() {
1061             stream = TcpWatcher::connect(local_loop(), addr);
1062         }
1063         stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1064     }
1065
1066     #[should_fail] #[test]
1067     fn tcp_listener_fail_cleanup() {
1068         let addr = next_test_ip4();
1069         let w = TcpListener::bind(local_loop(), addr).unwrap();
1070         let _w = w.listen().unwrap();
1071         fail!();
1072     }
1073
1074     #[should_fail] #[test]
1075     fn tcp_stream_fail_cleanup() {
1076         let (port, chan) = Chan::new();
1077         let addr = next_test_ip4();
1078
1079         spawn(proc() {
1080             let w = TcpListener::bind(local_loop(), addr).unwrap();
1081             let mut w = w.listen().unwrap();
1082             chan.send(());
1083             drop(w.accept().unwrap());
1084         });
1085         port.recv();
1086         let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
1087         fail!();
1088     }
1089
1090     #[should_fail] #[test]
1091     fn udp_listener_fail_cleanup() {
1092         let addr = next_test_ip4();
1093         let _w = UdpWatcher::bind(local_loop(), addr).unwrap();
1094         fail!();
1095     }
1096
1097     #[should_fail] #[test]
1098     fn udp_fail_other_task() {
1099         let addr = next_test_ip4();
1100         let (port, chan) = Chan::new();
1101
1102         // force the handle to be created on a different scheduler, failure in
1103         // the original task will force a homing operation back to this
1104         // scheduler.
1105         spawn(proc() {
1106             let w = UdpWatcher::bind(local_loop(), addr).unwrap();
1107             chan.send(w);
1108         });
1109
1110         let _w = port.recv();
1111         fail!();
1112     }
1113 }