]> git.lizzy.rs Git - rust.git/blob - src/librustuv/net.rs
Update all uv tests to pass again
[rust.git] / src / librustuv / net.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use std::cast;
12 use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char};
13 use std::ptr;
14 use std::rt::BlockedTask;
15 use std::rt::io::IoError;
16 use std::rt::io::net::ip::{Ipv4Addr, Ipv6Addr, SocketAddr, IpAddr};
17 use std::rt::local::Local;
18 use std::rt::rtio;
19 use std::rt::sched::{Scheduler, SchedHandle};
20 use std::rt::tube::Tube;
21 use std::str;
22 use std::vec;
23
24 use stream::StreamWatcher;
25 use super::{Loop, Request, UvError, Buf, status_to_io_result,
26             uv_error_to_io_error, UvHandle, slice_to_uv_buf};
27 use uvio::HomingIO;
28 use uvll;
29
30 ////////////////////////////////////////////////////////////////////////////////
31 /// Generic functions related to dealing with sockaddr things
32 ////////////////////////////////////////////////////////////////////////////////
33
34 pub enum UvSocketAddr {
35     UvIpv4SocketAddr(*uvll::sockaddr_in),
36     UvIpv6SocketAddr(*uvll::sockaddr_in6),
37 }
38
39 pub fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
40     unsafe {
41         assert!((uvll::is_ip4_addr(addr) || uvll::is_ip6_addr(addr)));
42         assert!(!(uvll::is_ip4_addr(addr) && uvll::is_ip6_addr(addr)));
43         match addr {
44             _ if uvll::is_ip4_addr(addr) =>
45                 UvIpv4SocketAddr(addr as *uvll::sockaddr_in),
46             _ if uvll::is_ip6_addr(addr) =>
47                 UvIpv6SocketAddr(addr as *uvll::sockaddr_in6),
48             _ => fail!(),
49         }
50     }
51 }
52
53 fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
54     let malloc = match addr.ip {
55         Ipv4Addr(*) => uvll::malloc_ip4_addr,
56         Ipv6Addr(*) => uvll::malloc_ip6_addr,
57     };
58     let wrap = match addr.ip {
59         Ipv4Addr(*) => UvIpv4SocketAddr,
60         Ipv6Addr(*) => UvIpv6SocketAddr,
61     };
62     let free = match addr.ip {
63         Ipv4Addr(*) => uvll::free_ip4_addr,
64         Ipv6Addr(*) => uvll::free_ip6_addr,
65     };
66
67     let addr = unsafe { malloc(addr.ip.to_str(), addr.port as int) };
68     do (|| {
69         f(wrap(addr))
70     }).finally {
71         unsafe { free(addr) };
72     }
73 }
74
75 fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
76     let ip_size = match addr {
77         UvIpv4SocketAddr(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/,
78         UvIpv6SocketAddr(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/,
79     };
80     let ip_name = {
81         let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
82         unsafe {
83             let buf_ptr = vec::raw::to_ptr(buf);
84             match addr {
85                 UvIpv4SocketAddr(addr) =>
86                     uvll::uv_ip4_name(addr, buf_ptr as *c_char, ip_size as size_t),
87                 UvIpv6SocketAddr(addr) =>
88                     uvll::uv_ip6_name(addr, buf_ptr as *c_char, ip_size as size_t),
89             }
90         };
91         buf
92     };
93     let ip_port = unsafe {
94         let port = match addr {
95             UvIpv4SocketAddr(addr) => uvll::ip4_port(addr),
96             UvIpv6SocketAddr(addr) => uvll::ip6_port(addr),
97         };
98         port as u16
99     };
100     let ip_str = str::from_utf8_slice(ip_name).trim_right_chars(&'\x00');
101     let ip_addr = FromStr::from_str(ip_str).unwrap();
102
103     // finally run the closure
104     f(SocketAddr { ip: ip_addr, port: ip_port })
105 }
106
107 pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
108     use std::util;
109     uv_socket_addr_as_socket_addr(addr, util::id)
110 }
111
112 #[cfg(test)]
113 #[test]
114 fn test_ip4_conversion() {
115     use std::rt;
116     let ip4 = rt::test::next_test_ip4();
117     assert_eq!(ip4, socket_addr_as_uv_socket_addr(ip4, uv_socket_addr_to_socket_addr));
118 }
119
120 #[cfg(test)]
121 #[test]
122 fn test_ip6_conversion() {
123     use std::rt;
124     let ip6 = rt::test::next_test_ip6();
125     assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
126 }
127
128 enum SocketNameKind {
129     TcpPeer,
130     Tcp,
131     Udp
132 }
133
134 fn socket_name(sk: SocketNameKind, handle: *c_void) -> Result<SocketAddr, IoError> {
135     let getsockname = match sk {
136         TcpPeer => uvll::tcp_getpeername,
137         Tcp     => uvll::tcp_getsockname,
138         Udp     => uvll::udp_getsockname,
139     };
140
141     // Allocate a sockaddr_storage
142     // since we don't know if it's ipv4 or ipv6
143     let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
144
145     let r = unsafe {
146         getsockname(handle, r_addr as *uvll::sockaddr_storage)
147     };
148
149     if r != 0 {
150         return Err(uv_error_to_io_error(UvError(r)));
151     }
152
153     let addr = unsafe {
154         if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
155             uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
156         } else {
157             uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
158         }
159     };
160
161     unsafe { uvll::free_sockaddr_storage(r_addr); }
162
163     Ok(addr)
164
165 }
166
167 ////////////////////////////////////////////////////////////////////////////////
168 /// TCP implementation
169 ////////////////////////////////////////////////////////////////////////////////
170
171 pub struct TcpWatcher {
172     handle: *uvll::uv_tcp_t,
173     stream: StreamWatcher,
174     home: SchedHandle,
175 }
176
177 pub struct TcpListener {
178     home: SchedHandle,
179     handle: *uvll::uv_pipe_t,
180     priv closing_task: Option<BlockedTask>,
181     priv outgoing: Tube<Result<~rtio::RtioTcpStream, IoError>>,
182 }
183
184 pub struct TcpAcceptor {
185     listener: ~TcpListener,
186     priv incoming: Tube<Result<~rtio::RtioTcpStream, IoError>>,
187 }
188
189 // TCP watchers (clients/streams)
190
191 impl TcpWatcher {
192     pub fn new(loop_: &Loop) -> TcpWatcher {
193         let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
194         assert_eq!(unsafe {
195             uvll::uv_tcp_init(loop_.handle, handle)
196         }, 0);
197         TcpWatcher {
198             home: get_handle_to_current_scheduler!(),
199             handle: handle,
200             stream: StreamWatcher::new(handle),
201         }
202     }
203
204     pub fn connect(loop_: &mut Loop, address: SocketAddr)
205         -> Result<TcpWatcher, UvError>
206     {
207         struct Ctx { status: c_int, task: Option<BlockedTask> }
208
209         let tcp = TcpWatcher::new(loop_);
210         let ret = do socket_addr_as_uv_socket_addr(address) |addr| {
211             let req = Request::new(uvll::UV_CONNECT);
212             let result = match addr {
213                 UvIpv4SocketAddr(addr) => unsafe {
214                     uvll::tcp_connect(req.handle, tcp.handle, addr,
215                                       connect_cb)
216                 },
217                 UvIpv6SocketAddr(addr) => unsafe {
218                     uvll::tcp_connect6(req.handle, tcp.handle, addr,
219                                        connect_cb)
220                 },
221             };
222             match result {
223                 0 => {
224                     let mut cx = Ctx { status: 0, task: None };
225                     req.set_data(&cx);
226                     req.defuse();
227                     let scheduler: ~Scheduler = Local::take();
228                     do scheduler.deschedule_running_task_and_then |_, task| {
229                         cx.task = Some(task);
230                     }
231                     match cx.status {
232                         0 => Ok(()),
233                         n => Err(UvError(n)),
234                     }
235                 }
236                 n => Err(UvError(n))
237             }
238         };
239
240         return match ret {
241             Ok(()) => Ok(tcp),
242             Err(e) => Err(e),
243         };
244
245         extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
246             let req = Request::wrap(req);
247             if status == uvll::ECANCELED { return }
248             let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
249             cx.status = status;
250             let scheduler: ~Scheduler = Local::take();
251             scheduler.resume_blocked_task_immediately(cx.task.take_unwrap());
252         }
253     }
254 }
255
256 impl HomingIO for TcpWatcher {
257     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
258 }
259
260 impl rtio::RtioSocket for TcpWatcher {
261     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
262         let _m = self.fire_missiles();
263         socket_name(Tcp, self.handle)
264     }
265 }
266
267 impl rtio::RtioTcpStream for TcpWatcher {
268     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
269         let _m = self.fire_missiles();
270         self.stream.read(buf).map_err(uv_error_to_io_error)
271     }
272
273     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
274         let _m = self.fire_missiles();
275         self.stream.write(buf).map_err(uv_error_to_io_error)
276     }
277
278     fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
279         let _m = self.fire_missiles();
280         socket_name(TcpPeer, self.handle)
281     }
282
283     fn control_congestion(&mut self) -> Result<(), IoError> {
284         let _m = self.fire_missiles();
285         status_to_io_result(unsafe {
286             uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
287         })
288     }
289
290     fn nodelay(&mut self) -> Result<(), IoError> {
291         let _m = self.fire_missiles();
292         status_to_io_result(unsafe {
293             uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
294         })
295     }
296
297     fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
298         let _m = self.fire_missiles();
299         status_to_io_result(unsafe {
300             uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
301                                    delay_in_seconds as c_uint)
302         })
303     }
304
305     fn letdie(&mut self) -> Result<(), IoError> {
306         let _m = self.fire_missiles();
307         status_to_io_result(unsafe {
308             uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
309         })
310     }
311 }
312
313 impl Drop for TcpWatcher {
314     fn drop(&mut self) {
315         let _m = self.fire_missiles();
316         self.stream.close();
317     }
318 }
319
320 // TCP listeners (unbound servers)
321
322 impl TcpListener {
323     pub fn bind(loop_: &mut Loop, address: SocketAddr)
324         -> Result<~TcpListener, UvError>
325     {
326         let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
327         assert_eq!(unsafe {
328             uvll::uv_tcp_init(loop_.handle, handle)
329         }, 0);
330         let l = ~TcpListener {
331             home: get_handle_to_current_scheduler!(),
332             handle: handle,
333             closing_task: None,
334             outgoing: Tube::new(),
335         };
336         let res = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
337             match addr {
338                 UvIpv4SocketAddr(addr) => uvll::tcp_bind(l.handle, addr),
339                 UvIpv6SocketAddr(addr) => uvll::tcp_bind6(l.handle, addr),
340             }
341         });
342         match res {
343             0 => Ok(l.install()),
344             n => Err(UvError(n))
345         }
346     }
347 }
348
349 impl HomingIO for TcpListener {
350     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
351 }
352
353 impl UvHandle<uvll::uv_tcp_t> for TcpListener {
354     fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
355 }
356
357 impl rtio::RtioSocket for TcpListener {
358     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
359         let _m = self.fire_missiles();
360         socket_name(Tcp, self.handle)
361     }
362 }
363
364 impl rtio::RtioTcpListener for TcpListener {
365     fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
366         // create the acceptor object from ourselves
367         let incoming = self.outgoing.clone();
368         let mut acceptor = ~TcpAcceptor {
369             listener: self,
370             incoming: incoming,
371         };
372
373         let _m = acceptor.fire_missiles();
374         // XXX: the 128 backlog should be configurable
375         match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
376             0 => Ok(acceptor as ~rtio::RtioTcpAcceptor),
377             n => Err(uv_error_to_io_error(UvError(n))),
378         }
379     }
380 }
381
382 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
383     let msg = match status {
384         0 => {
385             let loop_ = Loop::wrap(unsafe {
386                 uvll::get_loop_for_uv_handle(server)
387             });
388             let client = TcpWatcher::new(&loop_);
389             assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
390             Ok(~client as ~rtio::RtioTcpStream)
391         }
392         uvll::ECANCELED => return,
393         n => Err(uv_error_to_io_error(UvError(n)))
394     };
395
396     let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
397     tcp.outgoing.send(msg);
398 }
399
400 impl Drop for TcpListener {
401     fn drop(&mut self) {
402         let (_m, sched) = self.fire_missiles_sched();
403
404         do sched.deschedule_running_task_and_then |_, task| {
405             self.closing_task = Some(task);
406             unsafe { uvll::uv_close(self.handle, listener_close_cb) }
407         }
408     }
409 }
410
411 extern fn listener_close_cb(handle: *uvll::uv_handle_t) {
412     let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&handle) };
413     unsafe { uvll::free_handle(handle) }
414
415     let sched: ~Scheduler = Local::take();
416     sched.resume_blocked_task_immediately(tcp.closing_task.take_unwrap());
417 }
418
419 // TCP acceptors (bound servers)
420
421 impl HomingIO for TcpAcceptor {
422     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
423 }
424
425 impl rtio::RtioSocket for TcpAcceptor {
426     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
427         let _m = self.fire_missiles();
428         socket_name(Tcp, self.listener.handle)
429     }
430 }
431
432 impl rtio::RtioTcpAcceptor for TcpAcceptor {
433     fn accept(&mut self) -> Result<~rtio::RtioTcpStream, IoError> {
434         let _m = self.fire_missiles();
435         self.incoming.recv()
436     }
437
438     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
439         let _m = self.fire_missiles();
440         status_to_io_result(unsafe {
441             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
442         })
443     }
444
445     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
446         let _m = self.fire_missiles();
447         status_to_io_result(unsafe {
448             uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
449         })
450     }
451 }
452
453 ////////////////////////////////////////////////////////////////////////////////
454 /// UDP implementation
455 ////////////////////////////////////////////////////////////////////////////////
456
457 pub struct UdpWatcher {
458     handle: *uvll::uv_udp_t,
459     home: SchedHandle,
460 }
461
462 impl UdpWatcher {
463     pub fn bind(loop_: &Loop, address: SocketAddr)
464         -> Result<UdpWatcher, UvError>
465     {
466         let udp = UdpWatcher {
467             handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
468             home: get_handle_to_current_scheduler!(),
469         };
470         assert_eq!(unsafe {
471             uvll::uv_udp_init(loop_.handle, udp.handle)
472         }, 0);
473         let result = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
474             match addr {
475                 UvIpv4SocketAddr(addr) => uvll::udp_bind(udp.handle, addr, 0u32),
476                 UvIpv6SocketAddr(addr) => uvll::udp_bind6(udp.handle, addr, 0u32),
477             }
478         });
479         match result {
480             0 => Ok(udp),
481             n => Err(UvError(n)),
482         }
483     }
484 }
485
486 impl HomingIO for UdpWatcher {
487     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
488 }
489
490 impl rtio::RtioSocket for UdpWatcher {
491     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
492         let _m = self.fire_missiles();
493         socket_name(Udp, self.handle)
494     }
495 }
496
497 impl rtio::RtioUdpSocket for UdpWatcher {
498     fn recvfrom(&mut self, buf: &mut [u8])
499         -> Result<(uint, SocketAddr), IoError>
500     {
501         struct Ctx {
502             task: Option<BlockedTask>,
503             buf: Option<Buf>,
504             result: Option<(ssize_t, SocketAddr)>,
505         }
506         let _m = self.fire_missiles();
507
508         return match unsafe {
509             uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
510         } {
511             0 => {
512                 let mut cx = Ctx {
513                     task: None,
514                     buf: Some(slice_to_uv_buf(buf)),
515                     result: None,
516                 };
517                 unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) }
518                 let scheduler: ~Scheduler = Local::take();
519                 do scheduler.deschedule_running_task_and_then |_, task| {
520                     cx.task = Some(task);
521                 }
522                 match cx.result.take_unwrap() {
523                     (n, _) if n < 0 =>
524                         Err(uv_error_to_io_error(UvError(n as c_int))),
525                     (n, addr) => Ok((n as uint, addr))
526                 }
527             }
528             n => Err(uv_error_to_io_error(UvError(n)))
529         };
530
531         extern fn alloc_cb(handle: *uvll::uv_udp_t,
532                            _suggested_size: size_t) -> Buf {
533             let cx: &mut Ctx = unsafe {
534                 cast::transmute(uvll::get_data_for_uv_handle(handle))
535             };
536             cx.buf.take().expect("alloc_cb called more than once")
537         }
538
539         extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, _buf: Buf,
540                           addr: *uvll::sockaddr, _flags: c_uint) {
541
542             // When there's no data to read the recv callback can be a no-op.
543             // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
544             // this we just drop back to kqueue and wait for the next callback.
545             if nread == 0 { return }
546             if nread == uvll::ECANCELED as ssize_t { return }
547
548             unsafe {
549                 assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
550             }
551
552             let cx: &mut Ctx = unsafe {
553                 cast::transmute(uvll::get_data_for_uv_handle(handle))
554             };
555             let addr = sockaddr_to_UvSocketAddr(addr);
556             let addr = uv_socket_addr_to_socket_addr(addr);
557             cx.result = Some((nread, addr));
558
559             let sched: ~Scheduler = Local::take();
560             sched.resume_blocked_task_immediately(cx.task.take_unwrap());
561         }
562     }
563
564     fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
565         struct Ctx { task: Option<BlockedTask>, result: c_int }
566
567         let _m = self.fire_missiles();
568
569         let req = Request::new(uvll::UV_UDP_SEND);
570         let buf = slice_to_uv_buf(buf);
571         let result = socket_addr_as_uv_socket_addr(dst, |dst| unsafe {
572             match dst {
573                 UvIpv4SocketAddr(dst) =>
574                     uvll::udp_send(req.handle, self.handle, [buf], dst, send_cb),
575                 UvIpv6SocketAddr(dst) =>
576                     uvll::udp_send6(req.handle, self.handle, [buf], dst, send_cb),
577             }
578         });
579
580         return match result {
581             0 => {
582                 let mut cx = Ctx { task: None, result: 0 };
583                 req.set_data(&cx);
584                 req.defuse();
585
586                 let sched: ~Scheduler = Local::take();
587                 do sched.deschedule_running_task_and_then |_, task| {
588                     cx.task = Some(task);
589                 }
590
591                 match cx.result {
592                     0 => Ok(()),
593                     n => Err(uv_error_to_io_error(UvError(n)))
594                 }
595             }
596             n => Err(uv_error_to_io_error(UvError(n)))
597         };
598
599         extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
600             let req = Request::wrap(req);
601             let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
602             cx.result = status;
603
604             let sched: ~Scheduler = Local::take();
605             sched.resume_blocked_task_immediately(cx.task.take_unwrap());
606         }
607     }
608
609     fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
610         let _m = self.fire_missiles();
611         status_to_io_result(unsafe {
612             do multi.to_str().with_c_str |m_addr| {
613                 uvll::uv_udp_set_membership(self.handle,
614                                             m_addr, ptr::null(),
615                                             uvll::UV_JOIN_GROUP)
616             }
617         })
618     }
619
620     fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
621         let _m = self.fire_missiles();
622         status_to_io_result(unsafe {
623             do multi.to_str().with_c_str |m_addr| {
624                 uvll::uv_udp_set_membership(self.handle,
625                                             m_addr, ptr::null(),
626                                             uvll::UV_LEAVE_GROUP)
627             }
628         })
629     }
630
631     fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
632         let _m = self.fire_missiles();
633         status_to_io_result(unsafe {
634             uvll::uv_udp_set_multicast_loop(self.handle,
635                                             1 as c_int)
636         })
637     }
638
639     fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
640         let _m = self.fire_missiles();
641         status_to_io_result(unsafe {
642             uvll::uv_udp_set_multicast_loop(self.handle,
643                                             0 as c_int)
644         })
645     }
646
647     fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
648         let _m = self.fire_missiles();
649         status_to_io_result(unsafe {
650             uvll::uv_udp_set_multicast_ttl(self.handle,
651                                            ttl as c_int)
652         })
653     }
654
655     fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
656         let _m = self.fire_missiles();
657         status_to_io_result(unsafe {
658             uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
659         })
660     }
661
662     fn hear_broadcasts(&mut self) -> Result<(), IoError> {
663         let _m = self.fire_missiles();
664         status_to_io_result(unsafe {
665             uvll::uv_udp_set_broadcast(self.handle,
666                                        1 as c_int)
667         })
668     }
669
670     fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
671         let _m = self.fire_missiles();
672         status_to_io_result(unsafe {
673             uvll::uv_udp_set_broadcast(self.handle,
674                                        0 as c_int)
675         })
676     }
677 }
678
679 impl Drop for UdpWatcher {
680     fn drop(&mut self) {
681         // Send ourselves home to close this handle (blocking while doing so).
682         let (_m, sched) = self.fire_missiles_sched();
683         let mut slot = None;
684         unsafe {
685             uvll::set_data_for_uv_handle(self.handle, &slot);
686             uvll::uv_close(self.handle, close_cb);
687         }
688         do sched.deschedule_running_task_and_then |_, task| {
689             slot = Some(task);
690         }
691
692         extern fn close_cb(handle: *uvll::uv_handle_t) {
693             let slot: &mut Option<BlockedTask> = unsafe {
694                 cast::transmute(uvll::get_data_for_uv_handle(handle))
695             };
696             let sched: ~Scheduler = Local::take();
697             sched.resume_blocked_task_immediately(slot.take_unwrap());
698         }
699     }
700 }
701
702 ////////////////////////////////////////////////////////////////////////////////
703 /// UV request support
704 ////////////////////////////////////////////////////////////////////////////////
705
706 #[cfg(test)]
707 mod test {
708     use std::cell::Cell;
709     use std::comm::oneshot;
710     use std::rt::test::*;
711     use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
712                         RtioUdpSocket};
713     use std::task;
714
715     use super::*;
716     use super::super::{Loop, run_uv_loop};
717
718     #[test]
719     fn connect_close_ip4() {
720         do run_uv_loop |l| {
721             match TcpWatcher::connect(l, next_test_ip4()) {
722                 Ok(*) => fail!(),
723                 Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
724             }
725         }
726     }
727
728     #[test]
729     fn connect_close_ip6() {
730         do run_uv_loop |l| {
731             match TcpWatcher::connect(l, next_test_ip6()) {
732                 Ok(*) => fail!(),
733                 Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
734             }
735         }
736     }
737
738     #[test]
739     fn udp_bind_close_ip4() {
740         do run_uv_loop |l| {
741             match UdpWatcher::bind(l, next_test_ip4()) {
742                 Ok(*) => {}
743                 Err(*) => fail!()
744             }
745         }
746     }
747
748     #[test]
749     fn udp_bind_close_ip6() {
750         do run_uv_loop |l| {
751             match UdpWatcher::bind(l, next_test_ip6()) {
752                 Ok(*) => {}
753                 Err(*) => fail!()
754             }
755         }
756     }
757
758     #[test]
759     fn listen_ip4() {
760         do run_uv_loop |l| {
761             let (port, chan) = oneshot();
762             let chan = Cell::new(chan);
763             let addr = next_test_ip4();
764
765             let handle = l.handle;
766             do spawn {
767                 let w = match TcpListener::bind(&mut Loop::wrap(handle), addr) {
768                     Ok(w) => w, Err(e) => fail!("{:?}", e)
769                 };
770                 let mut w = match w.listen() {
771                     Ok(w) => w, Err(e) => fail!("{:?}", e),
772                 };
773                 chan.take().send(());
774                 match w.accept() {
775                     Ok(mut stream) => {
776                         let mut buf = [0u8, ..10];
777                         match stream.read(buf) {
778                             Ok(10) => {} e => fail!("{:?}", e),
779                         }
780                         for i in range(0, 10u8) {
781                             assert_eq!(buf[i], i + 1);
782                         }
783                     }
784                     Err(e) => fail!("{:?}", e)
785                 }
786             }
787
788             port.recv();
789             let mut w = match TcpWatcher::connect(&mut Loop::wrap(handle), addr) {
790                 Ok(w) => w, Err(e) => fail!("{:?}", e)
791             };
792             match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
793                 Ok(()) => {}, Err(e) => fail!("{:?}", e)
794             }
795         }
796     }
797
798     #[test]
799     fn listen_ip6() {
800         do run_uv_loop |l| {
801             let (port, chan) = oneshot();
802             let chan = Cell::new(chan);
803             let addr = next_test_ip6();
804
805             let handle = l.handle;
806             do spawn {
807                 let w = match TcpListener::bind(&mut Loop::wrap(handle), addr) {
808                     Ok(w) => w, Err(e) => fail!("{:?}", e)
809                 };
810                 let mut w = match w.listen() {
811                     Ok(w) => w, Err(e) => fail!("{:?}", e),
812                 };
813                 chan.take().send(());
814                 match w.accept() {
815                     Ok(mut stream) => {
816                         let mut buf = [0u8, ..10];
817                         match stream.read(buf) {
818                             Ok(10) => {} e => fail!("{:?}", e),
819                         }
820                         for i in range(0, 10u8) {
821                             assert_eq!(buf[i], i + 1);
822                         }
823                     }
824                     Err(e) => fail!("{:?}", e)
825                 }
826             }
827
828             port.recv();
829             let mut w = match TcpWatcher::connect(&mut Loop::wrap(handle), addr) {
830                 Ok(w) => w, Err(e) => fail!("{:?}", e)
831             };
832             match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
833                 Ok(()) => {}, Err(e) => fail!("{:?}", e)
834             }
835         }
836     }
837
838     #[test]
839     fn udp_recv_ip4() {
840         do run_uv_loop |l| {
841             let (port, chan) = oneshot();
842             let chan = Cell::new(chan);
843             let client = next_test_ip4();
844             let server = next_test_ip4();
845
846             let handle = l.handle;
847             do spawn {
848                 match UdpWatcher::bind(&mut Loop::wrap(handle), server) {
849                     Ok(mut w) => {
850                         chan.take().send(());
851                         let mut buf = [0u8, ..10];
852                         match w.recvfrom(buf) {
853                             Ok((10, addr)) => assert_eq!(addr, client),
854                             e => fail!("{:?}", e),
855                         }
856                         for i in range(0, 10u8) {
857                             assert_eq!(buf[i], i + 1);
858                         }
859                     }
860                     Err(e) => fail!("{:?}", e)
861                 }
862             }
863
864             port.recv();
865             let mut w = match UdpWatcher::bind(&mut Loop::wrap(handle), client) {
866                 Ok(w) => w, Err(e) => fail!("{:?}", e)
867             };
868             match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
869                 Ok(()) => {}, Err(e) => fail!("{:?}", e)
870             }
871         }
872     }
873
874     #[test]
875     fn udp_recv_ip6() {
876         do run_uv_loop |l| {
877             let (port, chan) = oneshot();
878             let chan = Cell::new(chan);
879             let client = next_test_ip6();
880             let server = next_test_ip6();
881
882             let handle = l.handle;
883             do spawn {
884                 match UdpWatcher::bind(&mut Loop::wrap(handle), server) {
885                     Ok(mut w) => {
886                         chan.take().send(());
887                         let mut buf = [0u8, ..10];
888                         match w.recvfrom(buf) {
889                             Ok((10, addr)) => assert_eq!(addr, client),
890                             e => fail!("{:?}", e),
891                         }
892                         for i in range(0, 10u8) {
893                             assert_eq!(buf[i], i + 1);
894                         }
895                     }
896                     Err(e) => fail!("{:?}", e)
897                 }
898             }
899
900             port.recv();
901             let mut w = match UdpWatcher::bind(&mut Loop::wrap(handle), client) {
902                 Ok(w) => w, Err(e) => fail!("{:?}", e)
903             };
904             match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
905                 Ok(()) => {}, Err(e) => fail!("{:?}", e)
906             }
907         }
908     }
909
910     #[test]
911     fn test_read_read_read() {
912         do run_uv_loop |l| {
913             let addr = next_test_ip4();
914             static MAX: uint = 500000;
915             let (port, chan) = oneshot();
916             let port = Cell::new(port);
917             let chan = Cell::new(chan);
918
919             let handle = l.handle;
920             do spawntask {
921                 let l = &mut Loop::wrap(handle);
922                 let listener = TcpListener::bind(l, addr).unwrap();
923                 let mut acceptor = listener.listen().unwrap();
924                 chan.take().send(());
925                 let mut stream = acceptor.accept().unwrap();
926                 let buf = [1, .. 2048];
927                 let mut total_bytes_written = 0;
928                 while total_bytes_written < MAX {
929                     stream.write(buf);
930                     total_bytes_written += buf.len();
931                 }
932             }
933
934             do spawntask {
935                 let l = &mut Loop::wrap(handle);
936                 port.take().recv();
937                 let mut stream = TcpWatcher::connect(l, addr).unwrap();
938                 let mut buf = [0, .. 2048];
939                 let mut total_bytes_read = 0;
940                 while total_bytes_read < MAX {
941                     let nread = stream.read(buf).unwrap();
942                     uvdebug!("read {} bytes", nread);
943                     total_bytes_read += nread;
944                     for i in range(0u, nread) {
945                         assert_eq!(buf[i], 1);
946                     }
947                 }
948                 uvdebug!("read {} bytes total", total_bytes_read);
949             }
950         }
951     }
952
953     #[test]
954     #[ignore(cfg(windows))] // FIXME(#10102) the server never sees the second send
955     fn test_udp_twice() {
956         do run_uv_loop |l| {
957             let server_addr = next_test_ip4();
958             let client_addr = next_test_ip4();
959             let (port, chan) = oneshot();
960             let port = Cell::new(port);
961             let chan = Cell::new(chan);
962
963             let handle = l.handle;
964             do spawntask {
965                 let l = &mut Loop::wrap(handle);
966                 let mut client = UdpWatcher::bind(l, client_addr).unwrap();
967                 port.take().recv();
968                 assert!(client.sendto([1], server_addr).is_ok());
969                 assert!(client.sendto([2], server_addr).is_ok());
970             }
971
972             do spawntask {
973                 let l = &mut Loop::wrap(handle);
974                 let mut server = UdpWatcher::bind(l, server_addr).unwrap();
975                 chan.take().send(());
976                 let mut buf1 = [0];
977                 let mut buf2 = [0];
978                 let (nread1, src1) = server.recvfrom(buf1).unwrap();
979                 let (nread2, src2) = server.recvfrom(buf2).unwrap();
980                 assert_eq!(nread1, 1);
981                 assert_eq!(nread2, 1);
982                 assert_eq!(src1, client_addr);
983                 assert_eq!(src2, client_addr);
984                 assert_eq!(buf1[0], 1);
985                 assert_eq!(buf2[0], 2);
986             }
987         }
988     }
989
990     #[test]
991     fn test_udp_many_read() {
992         do run_uv_loop |l| {
993             let server_out_addr = next_test_ip4();
994             let server_in_addr = next_test_ip4();
995             let client_out_addr = next_test_ip4();
996             let client_in_addr = next_test_ip4();
997             static MAX: uint = 500_000;
998
999             let (p1, c1) = oneshot();
1000             let (p2, c2) = oneshot();
1001
1002             let first = Cell::new((p1, c2));
1003             let second = Cell::new((p2, c1));
1004
1005             let handle = l.handle;
1006             do spawntask {
1007                 let l = &mut Loop::wrap(handle);
1008                 let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
1009                 let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
1010                 let (port, chan) = first.take();
1011                 chan.send(());
1012                 port.recv();
1013                 let msg = [1, .. 2048];
1014                 let mut total_bytes_sent = 0;
1015                 let mut buf = [1];
1016                 while buf[0] == 1 {
1017                     // send more data
1018                     assert!(server_out.sendto(msg, client_in_addr).is_ok());
1019                     total_bytes_sent += msg.len();
1020                     // check if the client has received enough
1021                     let res = server_in.recvfrom(buf);
1022                     assert!(res.is_ok());
1023                     let (nread, src) = res.unwrap();
1024                     assert_eq!(nread, 1);
1025                     assert_eq!(src, client_out_addr);
1026                 }
1027                 assert!(total_bytes_sent >= MAX);
1028             }
1029
1030             do spawntask {
1031                 let l = &mut Loop::wrap(handle);
1032                 let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
1033                 let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
1034                 let (port, chan) = second.take();
1035                 port.recv();
1036                 chan.send(());
1037                 let mut total_bytes_recv = 0;
1038                 let mut buf = [0, .. 2048];
1039                 while total_bytes_recv < MAX {
1040                     // ask for more
1041                     assert!(client_out.sendto([1], server_in_addr).is_ok());
1042                     // wait for data
1043                     let res = client_in.recvfrom(buf);
1044                     assert!(res.is_ok());
1045                     let (nread, src) = res.unwrap();
1046                     assert_eq!(src, server_out_addr);
1047                     total_bytes_recv += nread;
1048                     for i in range(0u, nread) {
1049                         assert_eq!(buf[i], 1);
1050                     }
1051                 }
1052                 // tell the server we're done
1053                 assert!(client_out.sendto([0], server_in_addr).is_ok());
1054             }
1055         }
1056     }
1057
1058     #[test]
1059     fn test_read_and_block() {
1060         do run_uv_loop |l| {
1061             let addr = next_test_ip4();
1062             let (port, chan) = oneshot();
1063             let port = Cell::new(port);
1064             let chan = Cell::new(chan);
1065
1066             let handle = l.handle;
1067             do spawntask {
1068                 let l = &mut Loop::wrap(handle);
1069                 let listener = TcpListener::bind(l, addr).unwrap();
1070                 let mut acceptor = listener.listen().unwrap();
1071                 let (port2, chan2) = stream();
1072                 chan.take().send(port2);
1073                 let mut stream = acceptor.accept().unwrap();
1074                 let mut buf = [0, .. 2048];
1075
1076                 let expected = 32;
1077                 let mut current = 0;
1078                 let mut reads = 0;
1079
1080                 while current < expected {
1081                     let nread = stream.read(buf).unwrap();
1082                     for i in range(0u, nread) {
1083                         let val = buf[i] as uint;
1084                         assert_eq!(val, current % 8);
1085                         current += 1;
1086                     }
1087                     reads += 1;
1088
1089                     chan2.send(());
1090                 }
1091
1092                 // Make sure we had multiple reads
1093                 assert!(reads > 1);
1094             }
1095
1096             do spawntask {
1097                 let l = &mut Loop::wrap(handle);
1098                 let port2 = port.take().recv();
1099                 let mut stream = TcpWatcher::connect(l, addr).unwrap();
1100                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1101                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1102                 port2.recv();
1103                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1104                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1105                 port2.recv();
1106             }
1107         }
1108     }
1109
1110     #[test]
1111     fn test_simple_tcp_server_and_client_on_diff_threads() {
1112         let addr = next_test_ip4();
1113
1114         do task::spawn_sched(task::SingleThreaded) {
1115             do run_uv_loop |l| {
1116                 let listener = TcpListener::bind(l, addr).unwrap();
1117                 let mut acceptor = listener.listen().unwrap();
1118                 let mut stream = acceptor.accept().unwrap();
1119                 let mut buf = [0, .. 2048];
1120                 let nread = stream.read(buf).unwrap();
1121                 assert_eq!(nread, 8);
1122                 for i in range(0u, nread) {
1123                     assert_eq!(buf[i], i as u8);
1124                 }
1125             }
1126         }
1127
1128         do task::spawn_sched(task::SingleThreaded) {
1129             do run_uv_loop |l| {
1130                 let mut stream = TcpWatcher::connect(l, addr);
1131                 while stream.is_err() {
1132                     stream = TcpWatcher::connect(l, addr);
1133                 }
1134                 stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
1135             }
1136         }
1137     }
1138
1139     // On one thread, create a udp socket. Then send that socket to another
1140     // thread and destroy the socket on the remote thread. This should make sure
1141     // that homing kicks in for the socket to go back home to the original
1142     // thread, close itself, and then come back to the last thread.
1143     #[test]
1144     fn test_homing_closes_correctly() {
1145         let (port, chan) = oneshot();
1146         let port = Cell::new(port);
1147         let chan = Cell::new(chan);
1148
1149         do task::spawn_sched(task::SingleThreaded) {
1150             let chan = Cell::new(chan.take());
1151             do run_uv_loop |l| {
1152                 let listener = UdpWatcher::bind(l, next_test_ip4()).unwrap();
1153                 chan.take().send(listener);
1154             }
1155         }
1156
1157         do task::spawn_sched(task::SingleThreaded) {
1158             let port = Cell::new(port.take());
1159             do run_uv_loop |_l| {
1160                 port.take().recv();
1161             }
1162         }
1163     }
1164
1165     // This is a bit of a crufty old test, but it has its uses.
1166     #[test]
1167     fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
1168         use std::cast;
1169         use std::rt::local::Local;
1170         use std::rt::rtio::{EventLoop, IoFactory};
1171         use std::rt::sched::Scheduler;
1172         use std::rt::sched::{Shutdown, TaskFromFriend};
1173         use std::rt::sleeper_list::SleeperList;
1174         use std::rt::task::Task;
1175         use std::rt::task::UnwindResult;
1176         use std::rt::thread::Thread;
1177         use std::rt::work_queue::WorkQueue;
1178         use std::unstable::run_in_bare_thread;
1179         use uvio::UvEventLoop;
1180
1181         do run_in_bare_thread {
1182             let sleepers = SleeperList::new();
1183             let work_queue1 = WorkQueue::new();
1184             let work_queue2 = WorkQueue::new();
1185             let queues = ~[work_queue1.clone(), work_queue2.clone()];
1186
1187             let loop1 = ~UvEventLoop::new() as ~EventLoop;
1188             let mut sched1 = ~Scheduler::new(loop1, work_queue1, queues.clone(),
1189                                              sleepers.clone());
1190             let loop2 = ~UvEventLoop::new() as ~EventLoop;
1191             let mut sched2 = ~Scheduler::new(loop2, work_queue2, queues.clone(),
1192                                              sleepers.clone());
1193
1194             let handle1 = Cell::new(sched1.make_handle());
1195             let handle2 = Cell::new(sched2.make_handle());
1196             let tasksFriendHandle = Cell::new(sched2.make_handle());
1197
1198             let on_exit: ~fn(UnwindResult) = |exit_status| {
1199                 handle1.take().send(Shutdown);
1200                 handle2.take().send(Shutdown);
1201                 assert!(exit_status.is_success());
1202             };
1203
1204             unsafe fn local_io() -> &'static mut IoFactory {
1205                 do Local::borrow |sched: &mut Scheduler| {
1206                     let mut io = None;
1207                     sched.event_loop.io(|i| io = Some(i));
1208                     cast::transmute(io.unwrap())
1209                 }
1210             }
1211
1212             let test_function: ~fn() = || {
1213                 let io = unsafe { local_io() };
1214                 let addr = next_test_ip4();
1215                 let maybe_socket = io.udp_bind(addr);
1216                 // this socket is bound to this event loop
1217                 assert!(maybe_socket.is_ok());
1218
1219                 // block self on sched1
1220                 do task::unkillable { // FIXME(#8674)
1221                     let scheduler: ~Scheduler = Local::take();
1222                     do scheduler.deschedule_running_task_and_then |_, task| {
1223                         // unblock task
1224                         do task.wake().map |task| {
1225                             // send self to sched2
1226                             tasksFriendHandle.take().send(TaskFromFriend(task));
1227                         };
1228                         // sched1 should now sleep since it has nothing else to do
1229                     }
1230                 }
1231                 // sched2 will wake up and get the task as we do nothing else,
1232                 // the function ends and the socket goes out of scope sched2
1233                 // will start to run the destructor the destructor will first
1234                 // block the task, set it's home as sched1, then enqueue it
1235                 // sched2 will dequeue the task, see that it has a home, and
1236                 // send it to sched1 sched1 will wake up, exec the close
1237                 // function on the correct loop, and then we're done
1238             };
1239
1240             let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None,
1241                                                 test_function);
1242             main_task.death.on_exit = Some(on_exit);
1243             let main_task = Cell::new(main_task);
1244
1245             let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool,
1246                                                          None) || {});
1247
1248             let sched1 = Cell::new(sched1);
1249             let sched2 = Cell::new(sched2);
1250
1251             let thread1 = do Thread::start {
1252                 sched1.take().bootstrap(main_task.take());
1253             };
1254             let thread2 = do Thread::start {
1255                 sched2.take().bootstrap(null_task.take());
1256             };
1257
1258             thread1.join();
1259             thread2.join();
1260         }
1261     }
1262
1263 }