]> git.lizzy.rs Git - rust.git/blob - src/librustuv/net.rs
Migrate uv net bindings away from ~fn()
[rust.git] / src / librustuv / net.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use std::cast;
12 use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char};
13 use std::ptr;
14 use std::rt::BlockedTask;
15 use std::rt::io::IoError;
16 use std::rt::io::net::ip::{Ipv4Addr, Ipv6Addr};
17 use std::rt::local::Local;
18 use std::rt::io::net::ip::{SocketAddr, IpAddr};
19 use std::rt::rtio;
20 use std::rt::sched::{Scheduler, SchedHandle};
21 use std::rt::tube::Tube;
22 use std::str;
23 use std::vec;
24
25 use uvll;
26 use uvll::*;
27 use super::{
28             Loop, Request, UvError, Buf, NativeHandle,
29             status_to_io_result,
30             uv_error_to_io_error, UvHandle, slice_to_uv_buf};
31 use uvio::HomingIO;
32 use stream::StreamWatcher;
33
34 ////////////////////////////////////////////////////////////////////////////////
35 /// Generic functions related to dealing with sockaddr things
36 ////////////////////////////////////////////////////////////////////////////////
37
38 pub enum UvSocketAddr {
39     UvIpv4SocketAddr(*sockaddr_in),
40     UvIpv6SocketAddr(*sockaddr_in6),
41 }
42
43 pub fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
44     unsafe {
45         assert!((is_ip4_addr(addr) || is_ip6_addr(addr)));
46         assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr)));
47         match addr {
48             _ if is_ip4_addr(addr) => UvIpv4SocketAddr(addr as *uvll::sockaddr_in),
49             _ if is_ip6_addr(addr) => UvIpv6SocketAddr(addr as *uvll::sockaddr_in6),
50             _ => fail!(),
51         }
52     }
53 }
54
55 fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
56     let malloc = match addr.ip {
57         Ipv4Addr(*) => malloc_ip4_addr,
58         Ipv6Addr(*) => malloc_ip6_addr,
59     };
60     let wrap = match addr.ip {
61         Ipv4Addr(*) => UvIpv4SocketAddr,
62         Ipv6Addr(*) => UvIpv6SocketAddr,
63     };
64     let free = match addr.ip {
65         Ipv4Addr(*) => free_ip4_addr,
66         Ipv6Addr(*) => free_ip6_addr,
67     };
68
69     let addr = unsafe { malloc(addr.ip.to_str(), addr.port as int) };
70     do (|| {
71         f(wrap(addr))
72     }).finally {
73         unsafe { free(addr) };
74     }
75 }
76
77 fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
78     let ip_size = match addr {
79         UvIpv4SocketAddr(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/,
80         UvIpv6SocketAddr(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/,
81     };
82     let ip_name = {
83         let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
84         unsafe {
85             let buf_ptr = vec::raw::to_ptr(buf);
86             match addr {
87                 UvIpv4SocketAddr(addr) =>
88                     uvll::uv_ip4_name(addr, buf_ptr as *c_char, ip_size as size_t),
89                 UvIpv6SocketAddr(addr) =>
90                     uvll::uv_ip6_name(addr, buf_ptr as *c_char, ip_size as size_t),
91             }
92         };
93         buf
94     };
95     let ip_port = unsafe {
96         let port = match addr {
97             UvIpv4SocketAddr(addr) => uvll::ip4_port(addr),
98             UvIpv6SocketAddr(addr) => uvll::ip6_port(addr),
99         };
100         port as u16
101     };
102     let ip_str = str::from_utf8_slice(ip_name).trim_right_chars(&'\x00');
103     let ip_addr = FromStr::from_str(ip_str).unwrap();
104
105     // finally run the closure
106     f(SocketAddr { ip: ip_addr, port: ip_port })
107 }
108
109 pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
110     use std::util;
111     uv_socket_addr_as_socket_addr(addr, util::id)
112 }
113
114 #[cfg(test)]
115 #[test]
116 fn test_ip4_conversion() {
117     use std::rt;
118     let ip4 = rt::test::next_test_ip4();
119     assert_eq!(ip4, socket_addr_as_uv_socket_addr(ip4, uv_socket_addr_to_socket_addr));
120 }
121
122 #[cfg(test)]
123 #[test]
124 fn test_ip6_conversion() {
125     use std::rt;
126     let ip6 = rt::test::next_test_ip6();
127     assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
128 }
129
130 enum SocketNameKind {
131     TcpPeer,
132     Tcp,
133     Udp
134 }
135
136 fn socket_name(sk: SocketNameKind, handle: *c_void) -> Result<SocketAddr, IoError> {
137     let getsockname = match sk {
138         TcpPeer => uvll::tcp_getpeername,
139         Tcp     => uvll::tcp_getsockname,
140         Udp     => uvll::udp_getsockname,
141     };
142
143     // Allocate a sockaddr_storage
144     // since we don't know if it's ipv4 or ipv6
145     let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
146
147     let r = unsafe {
148         getsockname(handle, r_addr as *uvll::sockaddr_storage)
149     };
150
151     if r != 0 {
152         return Err(uv_error_to_io_error(UvError(r)));
153     }
154
155     let addr = unsafe {
156         if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
157             uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
158         } else {
159             uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
160         }
161     };
162
163     unsafe { uvll::free_sockaddr_storage(r_addr); }
164
165     Ok(addr)
166
167 }
168
169 ////////////////////////////////////////////////////////////////////////////////
170 /// TCP implementation
171 ////////////////////////////////////////////////////////////////////////////////
172
173 pub struct TcpWatcher {
174     handle: *uvll::uv_tcp_t,
175     stream: StreamWatcher,
176     home: SchedHandle,
177 }
178
179 pub struct TcpListener {
180     home: SchedHandle,
181     handle: *uvll::uv_pipe_t,
182     priv closing_task: Option<BlockedTask>,
183     priv outgoing: Tube<Result<~rtio::RtioTcpStream, IoError>>,
184 }
185
186 pub struct TcpAcceptor {
187     listener: ~TcpListener,
188     priv incoming: Tube<Result<~rtio::RtioTcpStream, IoError>>,
189 }
190
191 // TCP watchers (clients/streams)
192
193 impl TcpWatcher {
194     pub fn new(loop_: &Loop) -> TcpWatcher {
195         let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
196         assert_eq!(unsafe {
197             uvll::uv_tcp_init(loop_.native_handle(), handle)
198         }, 0);
199         TcpWatcher {
200             home: get_handle_to_current_scheduler!(),
201             handle: handle,
202             stream: StreamWatcher::new(handle),
203         }
204     }
205
206     pub fn connect(loop_: &mut Loop, address: SocketAddr)
207         -> Result<TcpWatcher, UvError>
208     {
209         struct Ctx { status: c_int, task: Option<BlockedTask> }
210
211         let tcp = TcpWatcher::new(loop_);
212         let ret = do socket_addr_as_uv_socket_addr(address) |addr| {
213             let req = Request::new(uvll::UV_CONNECT);
214             let result = match addr {
215                 UvIpv4SocketAddr(addr) => unsafe {
216                     uvll::tcp_connect(req.handle, tcp.handle, addr,
217                                       connect_cb)
218                 },
219                 UvIpv6SocketAddr(addr) => unsafe {
220                     uvll::tcp_connect6(req.handle, tcp.handle, addr,
221                                        connect_cb)
222                 },
223             };
224             match result {
225                 0 => {
226                     req.defuse();
227                     let mut cx = Ctx { status: 0, task: None };
228                     let scheduler: ~Scheduler = Local::take();
229                     do scheduler.deschedule_running_task_and_then |_, task| {
230                         cx.task = Some(task);
231                     }
232                     match cx.status {
233                         0 => Ok(()),
234                         n => Err(UvError(n)),
235                     }
236                 }
237                 n => Err(UvError(n))
238             }
239         };
240
241         return match ret {
242             Ok(()) => Ok(tcp),
243             Err(e) => Err(e),
244         };
245
246         extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
247             let _req = Request::wrap(req);
248             if status == uvll::ECANCELED { return }
249             let cx: &mut Ctx = unsafe {
250                 cast::transmute(uvll::get_data_for_req(req))
251             };
252             cx.status = status;
253             let scheduler: ~Scheduler = Local::take();
254             scheduler.resume_blocked_task_immediately(cx.task.take_unwrap());
255         }
256     }
257 }
258
259 impl HomingIO for TcpWatcher {
260     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
261 }
262
263 impl rtio::RtioSocket for TcpWatcher {
264     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
265         let _m = self.fire_missiles();
266         socket_name(Tcp, self.handle)
267     }
268 }
269
270 impl rtio::RtioTcpStream for TcpWatcher {
271     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
272         let _m = self.fire_missiles();
273         self.stream.read(buf).map_err(uv_error_to_io_error)
274     }
275
276     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
277         let _m = self.fire_missiles();
278         self.stream.write(buf).map_err(uv_error_to_io_error)
279     }
280
281     fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
282         let _m = self.fire_missiles();
283         socket_name(TcpPeer, self.handle)
284     }
285
286     fn control_congestion(&mut self) -> Result<(), IoError> {
287         let _m = self.fire_missiles();
288         status_to_io_result(unsafe {
289             uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
290         })
291     }
292
293     fn nodelay(&mut self) -> Result<(), IoError> {
294         let _m = self.fire_missiles();
295         status_to_io_result(unsafe {
296             uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
297         })
298     }
299
300     fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
301         let _m = self.fire_missiles();
302         status_to_io_result(unsafe {
303             uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
304                                    delay_in_seconds as c_uint)
305         })
306     }
307
308     fn letdie(&mut self) -> Result<(), IoError> {
309         let _m = self.fire_missiles();
310         status_to_io_result(unsafe {
311             uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
312         })
313     }
314 }
315
316 impl Drop for TcpWatcher {
317     fn drop(&mut self) {
318         let _m = self.fire_missiles();
319         self.stream.close(true);
320     }
321 }
322
323 // TCP listeners (unbound servers)
324
325 impl TcpListener {
326     pub fn bind(loop_: &mut Loop, address: SocketAddr)
327         -> Result<~TcpListener, UvError>
328     {
329         let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
330         assert_eq!(unsafe {
331             uvll::uv_tcp_init(loop_.native_handle(), handle)
332         }, 0);
333         let l = ~TcpListener {
334             home: get_handle_to_current_scheduler!(),
335             handle: handle,
336             closing_task: None,
337             outgoing: Tube::new(),
338         };
339         let res = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
340             match addr {
341                 UvIpv4SocketAddr(addr) => uvll::tcp_bind(l.handle, addr),
342                 UvIpv6SocketAddr(addr) => uvll::tcp_bind6(l.handle, addr),
343             }
344         });
345         match res {
346             0 => Ok(l.install()),
347             n => Err(UvError(n))
348         }
349     }
350 }
351
352 impl HomingIO for TcpListener {
353     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
354 }
355
356 impl UvHandle<uvll::uv_tcp_t> for TcpListener {
357     fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
358 }
359
360 impl rtio::RtioSocket for TcpListener {
361     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
362         let _m = self.fire_missiles();
363         socket_name(Tcp, self.handle)
364     }
365 }
366
367 impl rtio::RtioTcpListener for TcpListener {
368     fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
369         // create the acceptor object from ourselves
370         let incoming = self.outgoing.clone();
371         let mut acceptor = ~TcpAcceptor {
372             listener: self,
373             incoming: incoming,
374         };
375
376         let _m = acceptor.fire_missiles();
377         // XXX: the 128 backlog should be configurable
378         match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
379             0 => Ok(acceptor as ~rtio::RtioTcpAcceptor),
380             n => Err(uv_error_to_io_error(UvError(n))),
381         }
382     }
383 }
384
385 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
386     let msg = match status {
387         0 => {
388             let loop_ = NativeHandle::from_native_handle(unsafe {
389                 uvll::get_loop_for_uv_handle(server)
390             });
391             let client = TcpWatcher::new(&loop_);
392             assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
393             Ok(~client as ~rtio::RtioTcpStream)
394         }
395         uvll::ECANCELED => return,
396         n => Err(uv_error_to_io_error(UvError(n)))
397     };
398
399     let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
400     tcp.outgoing.send(msg);
401 }
402
403 impl Drop for TcpListener {
404     fn drop(&mut self) {
405         let (_m, sched) = self.fire_missiles_sched();
406
407         do sched.deschedule_running_task_and_then |_, task| {
408             self.closing_task = Some(task);
409             unsafe { uvll::uv_close(self.handle, listener_close_cb) }
410         }
411     }
412 }
413
414 extern fn listener_close_cb(handle: *uvll::uv_handle_t) {
415     let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&handle) };
416     unsafe { uvll::free_handle(handle) }
417
418     let sched: ~Scheduler = Local::take();
419     sched.resume_blocked_task_immediately(tcp.closing_task.take_unwrap());
420 }
421
422 // TCP acceptors (bound servers)
423
424 impl HomingIO for TcpAcceptor {
425     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
426 }
427
428 impl rtio::RtioSocket for TcpAcceptor {
429     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
430         let _m = self.fire_missiles();
431         socket_name(Tcp, self.listener.handle)
432     }
433 }
434
435 impl rtio::RtioTcpAcceptor for TcpAcceptor {
436     fn accept(&mut self) -> Result<~rtio::RtioTcpStream, IoError> {
437         let _m = self.fire_missiles();
438         self.incoming.recv()
439     }
440
441     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
442         let _m = self.fire_missiles();
443         status_to_io_result(unsafe {
444             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
445         })
446     }
447
448     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
449         let _m = self.fire_missiles();
450         status_to_io_result(unsafe {
451             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
452         })
453     }
454 }
455
456 ////////////////////////////////////////////////////////////////////////////////
457 /// UDP implementation
458 ////////////////////////////////////////////////////////////////////////////////
459
460 pub struct UdpWatcher {
461     handle: *uvll::uv_udp_t,
462     home: SchedHandle,
463 }
464
465 impl UdpWatcher {
466     pub fn bind(loop_: &Loop, address: SocketAddr)
467         -> Result<UdpWatcher, UvError>
468     {
469         let udp = UdpWatcher {
470             handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
471             home: get_handle_to_current_scheduler!(),
472         };
473         assert_eq!(unsafe {
474             uvll::uv_udp_init(loop_.native_handle(), udp.handle)
475         }, 0);
476         let result = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
477             match addr {
478                 UvIpv4SocketAddr(addr) => uvll::udp_bind(udp.handle, addr, 0u32),
479                 UvIpv6SocketAddr(addr) => uvll::udp_bind6(udp.handle, addr, 0u32),
480             }
481         });
482         match result {
483             0 => Ok(udp),
484             n => Err(UvError(n)),
485         }
486     }
487 }
488
489 impl HomingIO for UdpWatcher {
490     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
491 }
492
493 impl rtio::RtioSocket for UdpWatcher {
494     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
495         let _m = self.fire_missiles();
496         socket_name(Udp, self.handle)
497     }
498 }
499
500 impl rtio::RtioUdpSocket for UdpWatcher {
501     fn recvfrom(&mut self, buf: &mut [u8])
502         -> Result<(uint, SocketAddr), IoError>
503     {
504         struct Ctx {
505             task: Option<BlockedTask>,
506             buf: Option<Buf>,
507             result: Option<(ssize_t, SocketAddr)>,
508         }
509         let _m = self.fire_missiles();
510
511         return match unsafe {
512             uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
513         } {
514             0 => {
515                 let mut cx = Ctx {
516                     task: None,
517                     buf: Some(slice_to_uv_buf(buf)),
518                     result: None,
519                 };
520                 unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) }
521                 let scheduler: ~Scheduler = Local::take();
522                 do scheduler.deschedule_running_task_and_then |_, task| {
523                     cx.task = Some(task);
524                 }
525                 match cx.result.take_unwrap() {
526                     (n, _) if n < 0 =>
527                         Err(uv_error_to_io_error(UvError(n as c_int))),
528                     (n, addr) => Ok((n as uint, addr))
529                 }
530             }
531             n => Err(uv_error_to_io_error(UvError(n)))
532         };
533
534         extern fn alloc_cb(handle: *uvll::uv_udp_t,
535                            _suggested_size: size_t) -> Buf {
536             let cx: &mut Ctx = unsafe {
537                 cast::transmute(uvll::get_data_for_uv_handle(handle))
538             };
539             cx.buf.take().expect("alloc_cb called more than once")
540         }
541
542         extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, _buf: Buf,
543                           addr: *uvll::sockaddr, _flags: c_uint) {
544
545             // When there's no data to read the recv callback can be a no-op.
546             // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
547             // this we just drop back to kqueue and wait for the next callback.
548             if nread == 0 { return }
549             if nread == uvll::ECANCELED as ssize_t { return }
550
551             unsafe {
552                 assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
553             }
554
555             let cx: &mut Ctx = unsafe {
556                 cast::transmute(uvll::get_data_for_uv_handle(handle))
557             };
558             let addr = sockaddr_to_UvSocketAddr(addr);
559             let addr = uv_socket_addr_to_socket_addr(addr);
560             cx.result = Some((nread, addr));
561
562             let sched: ~Scheduler = Local::take();
563             sched.resume_blocked_task_immediately(cx.task.take_unwrap());
564         }
565     }
566
567     fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
568         struct Ctx { task: Option<BlockedTask>, result: c_int }
569
570         let _m = self.fire_missiles();
571
572         let req = Request::new(uvll::UV_UDP_SEND);
573         let buf = slice_to_uv_buf(buf);
574         let result = socket_addr_as_uv_socket_addr(dst, |dst| unsafe {
575             match dst {
576                 UvIpv4SocketAddr(dst) =>
577                     uvll::udp_send(req.handle, self.handle, [buf], dst, send_cb),
578                 UvIpv6SocketAddr(dst) =>
579                     uvll::udp_send6(req.handle, self.handle, [buf], dst, send_cb),
580             }
581         });
582
583         return match result {
584             0 => {
585                 let mut cx = Ctx { task: None, result: 0 };
586                 req.set_data(&cx);
587                 req.defuse();
588
589                 let sched: ~Scheduler = Local::take();
590                 do sched.deschedule_running_task_and_then |_, task| {
591                     cx.task = Some(task);
592                 }
593
594                 match cx.result {
595                     0 => Ok(()),
596                     n => Err(uv_error_to_io_error(UvError(n)))
597                 }
598             }
599             n => Err(uv_error_to_io_error(UvError(n)))
600         };
601
602         extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
603             let req = Request::wrap(req);
604             let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
605             cx.result = status;
606
607             let sched: ~Scheduler = Local::take();
608             sched.resume_blocked_task_immediately(cx.task.take_unwrap());
609         }
610     }
611
612     fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
613         let _m = self.fire_missiles();
614         status_to_io_result(unsafe {
615             do multi.to_str().with_c_str |m_addr| {
616                 uvll::uv_udp_set_membership(self.handle,
617                                             m_addr, ptr::null(),
618                                             uvll::UV_JOIN_GROUP)
619             }
620         })
621     }
622
623     fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
624         let _m = self.fire_missiles();
625         status_to_io_result(unsafe {
626             do multi.to_str().with_c_str |m_addr| {
627                 uvll::uv_udp_set_membership(self.handle,
628                                             m_addr, ptr::null(),
629                                             uvll::UV_LEAVE_GROUP)
630             }
631         })
632     }
633
634     fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
635         let _m = self.fire_missiles();
636         status_to_io_result(unsafe {
637             uvll::uv_udp_set_multicast_loop(self.handle,
638                                             1 as c_int)
639         })
640     }
641
642     fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
643         let _m = self.fire_missiles();
644         status_to_io_result(unsafe {
645             uvll::uv_udp_set_multicast_loop(self.handle,
646                                             0 as c_int)
647         })
648     }
649
650     fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
651         let _m = self.fire_missiles();
652         status_to_io_result(unsafe {
653             uvll::uv_udp_set_multicast_ttl(self.handle,
654                                            ttl as c_int)
655         })
656     }
657
658     fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
659         let _m = self.fire_missiles();
660         status_to_io_result(unsafe {
661             uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
662         })
663     }
664
665     fn hear_broadcasts(&mut self) -> Result<(), IoError> {
666         let _m = self.fire_missiles();
667         status_to_io_result(unsafe {
668             uvll::uv_udp_set_broadcast(self.handle,
669                                        1 as c_int)
670         })
671     }
672
673     fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
674         let _m = self.fire_missiles();
675         status_to_io_result(unsafe {
676             uvll::uv_udp_set_broadcast(self.handle,
677                                        0 as c_int)
678         })
679     }
680 }
681
682 impl Drop for UdpWatcher {
683     fn drop(&mut self) {
684         // Send ourselves home to close this handle (blocking while doing so).
685         let (_m, sched) = self.fire_missiles_sched();
686         let mut slot = None;
687         unsafe {
688             uvll::set_data_for_uv_handle(self.handle, &slot);
689             uvll::uv_close(self.handle, close_cb);
690         }
691         do sched.deschedule_running_task_and_then |_, task| {
692             slot = Some(task);
693         }
694
695         extern fn close_cb(handle: *uvll::uv_handle_t) {
696             let slot: &mut Option<BlockedTask> = unsafe {
697                 cast::transmute(uvll::get_data_for_uv_handle(handle))
698             };
699             let sched: ~Scheduler = Local::take();
700             sched.resume_blocked_task_immediately(slot.take_unwrap());
701         }
702     }
703 }
704
705 ////////////////////////////////////////////////////////////////////////////////
706 /// UV request support
707 ////////////////////////////////////////////////////////////////////////////////
708
709 #[cfg(test)]
710 mod test {
711     use super::*;
712     use std::util::ignore;
713     use std::cell::Cell;
714     use std::vec;
715     use std::unstable::run_in_bare_thread;
716     use std::rt::thread::Thread;
717     use std::rt::test::*;
718     use super::super::{Loop, AllocCallback};
719     use super::super::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
720
721     #[test]
722     fn connect_close_ip4() {
723         do run_in_bare_thread() {
724             let mut loop_ = Loop::new();
725             let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
726             // Connect to a port where nobody is listening
727             let addr = next_test_ip4();
728             do tcp_watcher.connect(addr) |stream_watcher, status| {
729                 uvdebug!("tcp_watcher.connect!");
730                 assert!(status.is_some());
731                 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
732                 stream_watcher.close(||());
733             }
734             loop_.run();
735             loop_.close();
736         }
737     }
738
739     #[test]
740     fn connect_close_ip6() {
741         do run_in_bare_thread() {
742             let mut loop_ = Loop::new();
743             let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
744             // Connect to a port where nobody is listening
745             let addr = next_test_ip6();
746             do tcp_watcher.connect(addr) |stream_watcher, status| {
747                 uvdebug!("tcp_watcher.connect!");
748                 assert!(status.is_some());
749                 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
750                 stream_watcher.close(||());
751             }
752             loop_.run();
753             loop_.close();
754         }
755     }
756
757     #[test]
758     fn udp_bind_close_ip4() {
759         do run_in_bare_thread() {
760             let mut loop_ = Loop::new();
761             let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
762             let addr = next_test_ip4();
763             udp_watcher.bind(addr);
764             udp_watcher.close(||());
765             loop_.run();
766             loop_.close();
767         }
768     }
769
770     #[test]
771     fn udp_bind_close_ip6() {
772         do run_in_bare_thread() {
773             let mut loop_ = Loop::new();
774             let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
775             let addr = next_test_ip6();
776             udp_watcher.bind(addr);
777             udp_watcher.close(||());
778             loop_.run();
779             loop_.close();
780         }
781     }
782
783     #[test]
784     fn listen_ip4() {
785         do run_in_bare_thread() {
786             static MAX: int = 10;
787             let mut loop_ = Loop::new();
788             let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
789             let addr = next_test_ip4();
790             server_tcp_watcher.bind(addr);
791             let loop_ = loop_;
792             uvdebug!("listening");
793             let mut stream = server_tcp_watcher.as_stream();
794             let res = do stream.listen |mut server_stream_watcher, status| {
795                 uvdebug!("listened!");
796                 assert!(status.is_none());
797                 let mut loop_ = loop_;
798                 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
799                 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
800                 server_stream_watcher.accept(client_tcp_watcher);
801                 let count_cell = Cell::new(0);
802                 let server_stream_watcher = server_stream_watcher;
803                 uvdebug!("starting read");
804                 let alloc: AllocCallback = |size| {
805                     vec_to_uv_buf(vec::from_elem(size, 0u8))
806                 };
807                 do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| {
808
809                     uvdebug!("i'm reading!");
810                     let buf = vec_from_uv_buf(buf);
811                     let mut count = count_cell.take();
812                     if status.is_none() {
813                         uvdebug!("got {} bytes", nread);
814                         let buf = buf.unwrap();
815                         for byte in buf.slice(0, nread as uint).iter() {
816                             assert!(*byte == count as u8);
817                             uvdebug!("{}", *byte as uint);
818                             count += 1;
819                         }
820                     } else {
821                         assert_eq!(count, MAX);
822                         do stream_watcher.close {
823                             server_stream_watcher.close(||());
824                         }
825                     }
826                     count_cell.put_back(count);
827                 }
828             };
829
830             assert!(res.is_ok());
831
832             let client_thread = do Thread::start {
833                 uvdebug!("starting client thread");
834                 let mut loop_ = Loop::new();
835                 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
836                 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
837                     uvdebug!("connecting");
838                     assert!(status.is_none());
839                     let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
840                     let buf = slice_to_uv_buf(msg);
841                     let msg_cell = Cell::new(msg);
842                     do stream_watcher.write(buf) |stream_watcher, status| {
843                         uvdebug!("writing");
844                         assert!(status.is_none());
845                         let msg_cell = Cell::new(msg_cell.take());
846                         stream_watcher.close(||ignore(msg_cell.take()));
847                     }
848                 }
849                 loop_.run();
850                 loop_.close();
851             };
852
853             let mut loop_ = loop_;
854             loop_.run();
855             loop_.close();
856             client_thread.join();
857         };
858     }
859
860     #[test]
861     fn listen_ip6() {
862         do run_in_bare_thread() {
863             static MAX: int = 10;
864             let mut loop_ = Loop::new();
865             let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
866             let addr = next_test_ip6();
867             server_tcp_watcher.bind(addr);
868             let loop_ = loop_;
869             uvdebug!("listening");
870             let mut stream = server_tcp_watcher.as_stream();
871             let res = do stream.listen |mut server_stream_watcher, status| {
872                 uvdebug!("listened!");
873                 assert!(status.is_none());
874                 let mut loop_ = loop_;
875                 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
876                 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
877                 server_stream_watcher.accept(client_tcp_watcher);
878                 let count_cell = Cell::new(0);
879                 let server_stream_watcher = server_stream_watcher;
880                 uvdebug!("starting read");
881                 let alloc: AllocCallback = |size| {
882                     vec_to_uv_buf(vec::from_elem(size, 0u8))
883                 };
884                 do client_tcp_watcher.read_start(alloc)
885                     |stream_watcher, nread, buf, status| {
886
887                     uvdebug!("i'm reading!");
888                     let buf = vec_from_uv_buf(buf);
889                     let mut count = count_cell.take();
890                     if status.is_none() {
891                         uvdebug!("got {} bytes", nread);
892                         let buf = buf.unwrap();
893                         let r = buf.slice(0, nread as uint);
894                         for byte in r.iter() {
895                             assert!(*byte == count as u8);
896                             uvdebug!("{}", *byte as uint);
897                             count += 1;
898                         }
899                     } else {
900                         assert_eq!(count, MAX);
901                         do stream_watcher.close {
902                             server_stream_watcher.close(||());
903                         }
904                     }
905                     count_cell.put_back(count);
906                 }
907             };
908             assert!(res.is_ok());
909
910             let client_thread = do Thread::start {
911                 uvdebug!("starting client thread");
912                 let mut loop_ = Loop::new();
913                 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
914                 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
915                     uvdebug!("connecting");
916                     assert!(status.is_none());
917                     let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
918                     let buf = slice_to_uv_buf(msg);
919                     let msg_cell = Cell::new(msg);
920                     do stream_watcher.write(buf) |stream_watcher, status| {
921                         uvdebug!("writing");
922                         assert!(status.is_none());
923                         let msg_cell = Cell::new(msg_cell.take());
924                         stream_watcher.close(||ignore(msg_cell.take()));
925                     }
926                 }
927                 loop_.run();
928                 loop_.close();
929             };
930
931             let mut loop_ = loop_;
932             loop_.run();
933             loop_.close();
934             client_thread.join();
935         }
936     }
937
938     #[test]
939     fn udp_recv_ip4() {
940         do run_in_bare_thread() {
941             static MAX: int = 10;
942             let mut loop_ = Loop::new();
943             let server_addr = next_test_ip4();
944             let client_addr = next_test_ip4();
945
946             let mut server = UdpWatcher::new(&loop_);
947             assert!(server.bind(server_addr).is_ok());
948
949             uvdebug!("starting read");
950             let alloc: AllocCallback = |size| {
951                 vec_to_uv_buf(vec::from_elem(size, 0u8))
952             };
953
954             do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
955                 server.recv_stop();
956                 uvdebug!("i'm reading!");
957                 assert!(status.is_none());
958                 assert_eq!(flags, 0);
959                 assert_eq!(src, client_addr);
960
961                 let buf = vec_from_uv_buf(buf);
962                 let mut count = 0;
963                 uvdebug!("got {} bytes", nread);
964
965                 let buf = buf.unwrap();
966                 for &byte in buf.slice(0, nread as uint).iter() {
967                     assert!(byte == count as u8);
968                     uvdebug!("{}", byte as uint);
969                     count += 1;
970                 }
971                 assert_eq!(count, MAX);
972
973                 server.close(||{});
974             }
975
976             let thread = do Thread::start {
977                 let mut loop_ = Loop::new();
978                 let mut client = UdpWatcher::new(&loop_);
979                 assert!(client.bind(client_addr).is_ok());
980                 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
981                 let buf = slice_to_uv_buf(msg);
982                 do client.send(buf, server_addr) |client, status| {
983                     uvdebug!("writing");
984                     assert!(status.is_none());
985                     client.close(||{});
986                 }
987
988                 loop_.run();
989                 loop_.close();
990             };
991
992             loop_.run();
993             loop_.close();
994             thread.join();
995         }
996     }
997
998     #[test]
999     fn udp_recv_ip6() {
1000         do run_in_bare_thread() {
1001             static MAX: int = 10;
1002             let mut loop_ = Loop::new();
1003             let server_addr = next_test_ip6();
1004             let client_addr = next_test_ip6();
1005
1006             let mut server = UdpWatcher::new(&loop_);
1007             assert!(server.bind(server_addr).is_ok());
1008
1009             uvdebug!("starting read");
1010             let alloc: AllocCallback = |size| {
1011                 vec_to_uv_buf(vec::from_elem(size, 0u8))
1012             };
1013
1014             do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
1015                 server.recv_stop();
1016                 uvdebug!("i'm reading!");
1017                 assert!(status.is_none());
1018                 assert_eq!(flags, 0);
1019                 assert_eq!(src, client_addr);
1020
1021                 let buf = vec_from_uv_buf(buf);
1022                 let mut count = 0;
1023                 uvdebug!("got {} bytes", nread);
1024
1025                 let buf = buf.unwrap();
1026                 for &byte in buf.slice(0, nread as uint).iter() {
1027                     assert!(byte == count as u8);
1028                     uvdebug!("{}", byte as uint);
1029                     count += 1;
1030                 }
1031                 assert_eq!(count, MAX);
1032
1033                 server.close(||{});
1034             }
1035
1036             let thread = do Thread::start {
1037                 let mut loop_ = Loop::new();
1038                 let mut client = UdpWatcher::new(&loop_);
1039                 assert!(client.bind(client_addr).is_ok());
1040                 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
1041                 let buf = slice_to_uv_buf(msg);
1042                 do client.send(buf, server_addr) |client, status| {
1043                     uvdebug!("writing");
1044                     assert!(status.is_none());
1045                     client.close(||{});
1046                 }
1047
1048                 loop_.run();
1049                 loop_.close();
1050             };
1051
1052             loop_.run();
1053             loop_.close();
1054             thread.join();
1055         }
1056     }
1057 }