]> git.lizzy.rs Git - rust.git/blob - src/libstd/rt/uv/net.rs
Upgrade libuv to the current master (again)
[rust.git] / src / libstd / rt / uv / net.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use prelude::*;
12 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
13 use rt::uv::uvll;
14 use rt::uv::uvll::*;
15 use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, UdpSendCallback};
16 use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback,
17              status_to_maybe_uv_error};
18 use rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
19 use vec;
20 use str;
21 use from_str::{FromStr};
22
23 pub struct UvAddrInfo(*uvll::addrinfo);
24
25 pub enum UvSocketAddr {
26     UvIpv4SocketAddr(*sockaddr_in),
27     UvIpv6SocketAddr(*sockaddr_in6),
28 }
29
30 fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
31     unsafe {
32         assert!((is_ip4_addr(addr) || is_ip6_addr(addr)));
33         assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr)));
34         match addr {
35             _ if is_ip4_addr(addr) => UvIpv4SocketAddr(addr as *uvll::sockaddr_in),
36             _ if is_ip6_addr(addr) => UvIpv6SocketAddr(addr as *uvll::sockaddr_in6),
37             _ => fail!(),
38         }
39     }
40 }
41
42 fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
43     let malloc = match addr.ip {
44         Ipv4Addr(*) => malloc_ip4_addr,
45         Ipv6Addr(*) => malloc_ip6_addr,
46     };
47     let wrap = match addr.ip {
48         Ipv4Addr(*) => UvIpv4SocketAddr,
49         Ipv6Addr(*) => UvIpv6SocketAddr,
50     };
51     let free = match addr.ip {
52         Ipv4Addr(*) => free_ip4_addr,
53         Ipv6Addr(*) => free_ip6_addr,
54     };
55
56     let addr = unsafe { malloc(addr.ip.to_str(), addr.port as int) };
57     do (|| {
58         f(wrap(addr))
59     }).finally {
60         unsafe { free(addr) };
61     }
62 }
63
64 fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
65     let ip_size = match addr {
66         UvIpv4SocketAddr(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/,
67         UvIpv6SocketAddr(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/,
68     };
69     let ip_name = {
70         let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
71         unsafe {
72             let buf_ptr = vec::raw::to_ptr(buf);
73             match addr {
74                 UvIpv4SocketAddr(addr) => uvll::ip4_name(addr, buf_ptr, ip_size as size_t),
75                 UvIpv6SocketAddr(addr) => uvll::ip6_name(addr, buf_ptr, ip_size as size_t),
76             }
77         };
78         buf
79     };
80     let ip_port = unsafe {
81         let port = match addr {
82             UvIpv4SocketAddr(addr) => uvll::ip4_port(addr),
83             UvIpv6SocketAddr(addr) => uvll::ip6_port(addr),
84         };
85         port as u16
86     };
87     let ip_str = str::from_utf8_slice(ip_name).trim_right_chars(&'\x00');
88     let ip_addr = FromStr::from_str(ip_str).unwrap();
89
90     // finally run the closure
91     f(SocketAddr { ip: ip_addr, port: ip_port })
92 }
93
94 pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
95     use util;
96     uv_socket_addr_as_socket_addr(addr, util::id)
97 }
98
99 // Traverse the addrinfo linked list, producing a vector of Rust socket addresses
100 pub fn accum_sockaddrs(addr: &UvAddrInfo) -> ~[SocketAddr] {
101     unsafe {
102         let &UvAddrInfo(addr) = addr;
103         let mut addr = addr;
104
105         let mut addrs = ~[];
106         loop {
107             let uvaddr = sockaddr_to_UvSocketAddr((*addr).ai_addr);
108             let rustaddr = uv_socket_addr_to_socket_addr(uvaddr);
109             addrs.push(rustaddr);
110             if (*addr).ai_next.is_not_null() {
111                 addr = (*addr).ai_next;
112             } else {
113                 break;
114             }
115         }
116
117         return addrs;
118     }
119 }
120
121 #[cfg(test)]
122 #[test]
123 fn test_ip4_conversion() {
124     use rt;
125     let ip4 = rt::test::next_test_ip4();
126     assert_eq!(ip4, socket_addr_as_uv_socket_addr(ip4, uv_socket_addr_to_socket_addr));
127 }
128
129 #[cfg(test)]
130 #[test]
131 fn test_ip6_conversion() {
132     use rt;
133     let ip6 = rt::test::next_test_ip6();
134     assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
135 }
136
137 // uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
138 // and uv_file_t
139 pub struct StreamWatcher(*uvll::uv_stream_t);
140 impl Watcher for StreamWatcher { }
141
142 impl StreamWatcher {
143     pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
144         {
145             let data = self.get_watcher_data();
146             data.alloc_cb = Some(alloc);
147             data.read_cb = Some(cb);
148         }
149
150         unsafe { uvll::read_start(self.native_handle(), alloc_cb, read_cb); }
151
152         extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
153             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
154             let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref();
155             return (*alloc_cb)(suggested_size as uint);
156         }
157
158         extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) {
159             rtdebug!("buf addr: %x", buf.base as uint);
160             rtdebug!("buf len: %d", buf.len as int);
161             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
162             let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
163             let status = status_to_maybe_uv_error(nread as c_int);
164             (*cb)(stream_watcher, nread as int, buf, status);
165         }
166     }
167
168     pub fn read_stop(&mut self) {
169         // It would be nice to drop the alloc and read callbacks here,
170         // but read_stop may be called from inside one of them and we
171         // would end up freeing the in-use environment
172         let handle = self.native_handle();
173         unsafe { uvll::read_stop(handle); }
174     }
175
176     pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
177         {
178             let data = self.get_watcher_data();
179             assert!(data.write_cb.is_none());
180             data.write_cb = Some(cb);
181         }
182
183         let req = WriteRequest::new();
184         unsafe {
185         assert_eq!(0, uvll::write(req.native_handle(), self.native_handle(), [buf], write_cb));
186         }
187
188         extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
189             let write_request: WriteRequest = NativeHandle::from_native_handle(req);
190             let mut stream_watcher = write_request.stream();
191             write_request.delete();
192             let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap();
193             let status = status_to_maybe_uv_error(status);
194             cb(stream_watcher, status);
195         }
196     }
197
198     pub fn accept(&mut self, stream: StreamWatcher) {
199         let self_handle = self.native_handle() as *c_void;
200         let stream_handle = stream.native_handle() as *c_void;
201         assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } );
202     }
203
204     pub fn close(self, cb: NullCallback) {
205         {
206             let mut this = self;
207             let data = this.get_watcher_data();
208             assert!(data.close_cb.is_none());
209             data.close_cb = Some(cb);
210         }
211
212         unsafe { uvll::close(self.native_handle(), close_cb); }
213
214         extern fn close_cb(handle: *uvll::uv_stream_t) {
215             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
216             let cb = stream_watcher.get_watcher_data().close_cb.take_unwrap();
217             stream_watcher.drop_watcher_data();
218             unsafe { free_handle(handle as *c_void) }
219             cb();
220         }
221     }
222 }
223
224 impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
225     fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
226         StreamWatcher(handle)
227     }
228     fn native_handle(&self) -> *uvll::uv_stream_t {
229         match self { &StreamWatcher(ptr) => ptr }
230     }
231 }
232
233 pub struct TcpWatcher(*uvll::uv_tcp_t);
234 impl Watcher for TcpWatcher { }
235
236 impl TcpWatcher {
237     pub fn new(loop_: &Loop) -> TcpWatcher {
238         unsafe {
239             let handle = malloc_handle(UV_TCP);
240             assert!(handle.is_not_null());
241             assert_eq!(0, uvll::tcp_init(loop_.native_handle(), handle));
242             let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
243             watcher.install_watcher_data();
244             return watcher;
245         }
246     }
247
248     pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
249         do socket_addr_as_uv_socket_addr(address) |addr| {
250             let result = unsafe {
251                 match addr {
252                     UvIpv4SocketAddr(addr) => uvll::tcp_bind(self.native_handle(), addr),
253                     UvIpv6SocketAddr(addr) => uvll::tcp_bind6(self.native_handle(), addr),
254                 }
255             };
256             match result {
257                 0 => Ok(()),
258                 _ => Err(UvError(result)),
259             }
260         }
261     }
262
263     pub fn connect(&mut self, address: SocketAddr, cb: ConnectionCallback) {
264         unsafe {
265             assert!(self.get_watcher_data().connect_cb.is_none());
266             self.get_watcher_data().connect_cb = Some(cb);
267
268             let connect_handle = ConnectRequest::new().native_handle();
269             rtdebug!("connect_t: %x", connect_handle as uint);
270             do socket_addr_as_uv_socket_addr(address) |addr| {
271                 let result = match addr {
272                     UvIpv4SocketAddr(addr) => uvll::tcp_connect(connect_handle,
273                                                       self.native_handle(), addr, connect_cb),
274                     UvIpv6SocketAddr(addr) => uvll::tcp_connect6(connect_handle,
275                                                        self.native_handle(), addr, connect_cb),
276                 };
277                 assert_eq!(0, result);
278             }
279
280             extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
281                 rtdebug!("connect_t: %x", req as uint);
282                 let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
283                 let mut stream_watcher = connect_request.stream();
284                 connect_request.delete();
285                 let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
286                 let status = status_to_maybe_uv_error(status);
287                 cb(stream_watcher, status);
288             }
289         }
290     }
291
292     pub fn listen(&mut self, cb: ConnectionCallback) {
293         {
294             let data = self.get_watcher_data();
295             assert!(data.connect_cb.is_none());
296             data.connect_cb = Some(cb);
297         }
298
299         unsafe {
300             static BACKLOG: c_int = 128; // XXX should be configurable
301             // XXX: This can probably fail
302             assert_eq!(0, uvll::listen(self.native_handle(), BACKLOG, connection_cb));
303         }
304
305         extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
306             rtdebug!("connection_cb");
307             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
308             let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
309             let status = status_to_maybe_uv_error(status);
310             (*cb)(stream_watcher, status);
311         }
312     }
313
314     pub fn as_stream(&self) -> StreamWatcher {
315         NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t)
316     }
317 }
318
319 impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
320     fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
321         TcpWatcher(handle)
322     }
323     fn native_handle(&self) -> *uvll::uv_tcp_t {
324         match self { &TcpWatcher(ptr) => ptr }
325     }
326 }
327
328 pub struct UdpWatcher(*uvll::uv_udp_t);
329 impl Watcher for UdpWatcher { }
330
331 impl UdpWatcher {
332     pub fn new(loop_: &Loop) -> UdpWatcher {
333         unsafe {
334             let handle = malloc_handle(UV_UDP);
335             assert!(handle.is_not_null());
336             assert_eq!(0, uvll::udp_init(loop_.native_handle(), handle));
337             let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
338             watcher.install_watcher_data();
339             return watcher;
340         }
341     }
342
343     pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
344         do socket_addr_as_uv_socket_addr(address) |addr| {
345             let result = unsafe {
346                 match addr {
347                     UvIpv4SocketAddr(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32),
348                     UvIpv6SocketAddr(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32),
349                 }
350             };
351             match result {
352                 0 => Ok(()),
353                 _ => Err(UvError(result)),
354             }
355         }
356     }
357
358     pub fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) {
359         {
360             let data = self.get_watcher_data();
361             data.alloc_cb = Some(alloc);
362             data.udp_recv_cb = Some(cb);
363         }
364
365         unsafe { uvll::udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
366
367         extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
368             let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
369             let alloc_cb = udp_watcher.get_watcher_data().alloc_cb.get_ref();
370             return (*alloc_cb)(suggested_size as uint);
371         }
372
373         extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
374                           addr: *uvll::sockaddr, flags: c_uint) {
375             // When there's no data to read the recv callback can be a no-op.
376             // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
377             // this we just drop back to kqueue and wait for the next callback.
378             if nread == 0 {
379                 return;
380             }
381
382             rtdebug!("buf addr: %x", buf.base as uint);
383             rtdebug!("buf len: %d", buf.len as int);
384             let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
385             let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
386             let status = status_to_maybe_uv_error(nread as c_int);
387             let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
388             (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
389         }
390     }
391
392     pub fn recv_stop(&mut self) {
393         unsafe { uvll::udp_recv_stop(self.native_handle()); }
394     }
395
396     pub fn send(&mut self, buf: Buf, address: SocketAddr, cb: UdpSendCallback) {
397         {
398             let data = self.get_watcher_data();
399             assert!(data.udp_send_cb.is_none());
400             data.udp_send_cb = Some(cb);
401         }
402
403         let req = UdpSendRequest::new();
404         do socket_addr_as_uv_socket_addr(address) |addr| {
405             let result = unsafe {
406                 match addr {
407                     UvIpv4SocketAddr(addr) => uvll::udp_send(req.native_handle(),
408                                                    self.native_handle(), [buf], addr, send_cb),
409                     UvIpv6SocketAddr(addr) => uvll::udp_send6(req.native_handle(),
410                                                     self.native_handle(), [buf], addr, send_cb),
411                 }
412             };
413             assert_eq!(0, result);
414         }
415
416         extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
417             let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
418             let mut udp_watcher = send_request.handle();
419             send_request.delete();
420             let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap();
421             let status = status_to_maybe_uv_error(status);
422             cb(udp_watcher, status);
423         }
424     }
425
426     pub fn close(self, cb: NullCallback) {
427         {
428             let mut this = self;
429             let data = this.get_watcher_data();
430             assert!(data.close_cb.is_none());
431             data.close_cb = Some(cb);
432         }
433
434         unsafe { uvll::close(self.native_handle(), close_cb); }
435
436         extern fn close_cb(handle: *uvll::uv_udp_t) {
437             let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
438             let cb = udp_watcher.get_watcher_data().close_cb.take_unwrap();
439             udp_watcher.drop_watcher_data();
440             unsafe { free_handle(handle as *c_void) }
441             cb();
442         }
443     }
444 }
445
446 impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
447     fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
448         UdpWatcher(handle)
449     }
450     fn native_handle(&self) -> *uvll::uv_udp_t {
451         match self { &UdpWatcher(ptr) => ptr }
452     }
453 }
454
455 // uv_connect_t is a subclass of uv_req_t
456 struct ConnectRequest(*uvll::uv_connect_t);
457 impl Request for ConnectRequest { }
458
459 impl ConnectRequest {
460
461     fn new() -> ConnectRequest {
462         let connect_handle = unsafe { malloc_req(UV_CONNECT) };
463         assert!(connect_handle.is_not_null());
464         ConnectRequest(connect_handle as *uvll::uv_connect_t)
465     }
466
467     fn stream(&self) -> StreamWatcher {
468         unsafe {
469             let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle());
470             NativeHandle::from_native_handle(stream_handle)
471         }
472     }
473
474     fn delete(self) {
475         unsafe { free_req(self.native_handle() as *c_void) }
476     }
477 }
478
479 impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
480     fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
481         ConnectRequest(handle)
482     }
483     fn native_handle(&self) -> *uvll::uv_connect_t {
484         match self { &ConnectRequest(ptr) => ptr }
485     }
486 }
487
488 pub struct WriteRequest(*uvll::uv_write_t);
489
490 impl Request for WriteRequest { }
491
492 impl WriteRequest {
493     pub fn new() -> WriteRequest {
494         let write_handle = unsafe { malloc_req(UV_WRITE) };
495         assert!(write_handle.is_not_null());
496         WriteRequest(write_handle as *uvll::uv_write_t)
497     }
498
499     pub fn stream(&self) -> StreamWatcher {
500         unsafe {
501             let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle());
502             NativeHandle::from_native_handle(stream_handle)
503         }
504     }
505
506     pub fn delete(self) {
507         unsafe { free_req(self.native_handle() as *c_void) }
508     }
509 }
510
511 impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
512     fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
513         WriteRequest(handle)
514     }
515     fn native_handle(&self) -> *uvll::uv_write_t {
516         match self { &WriteRequest(ptr) => ptr }
517     }
518 }
519
520 pub struct UdpSendRequest(*uvll::uv_udp_send_t);
521 impl Request for UdpSendRequest { }
522
523 impl UdpSendRequest {
524     pub fn new() -> UdpSendRequest {
525         let send_handle = unsafe { malloc_req(UV_UDP_SEND) };
526         assert!(send_handle.is_not_null());
527         UdpSendRequest(send_handle as *uvll::uv_udp_send_t)
528     }
529
530     pub fn handle(&self) -> UdpWatcher {
531         let send_request_handle = unsafe {
532             uvll::get_udp_handle_from_send_req(self.native_handle())
533         };
534         NativeHandle::from_native_handle(send_request_handle)
535     }
536
537     pub fn delete(self) {
538         unsafe { free_req(self.native_handle() as *c_void) }
539     }
540 }
541
542 impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
543     fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
544         UdpSendRequest(handle)
545     }
546     fn native_handle(&self) -> *uvll::uv_udp_send_t {
547         match self { &UdpSendRequest(ptr) => ptr }
548     }
549 }
550
551 #[cfg(test)]
552 mod test {
553     use super::*;
554     use util::ignore;
555     use cell::Cell;
556     use vec;
557     use unstable::run_in_bare_thread;
558     use rt::thread::Thread;
559     use rt::test::*;
560     use rt::uv::{Loop, AllocCallback};
561     use rt::uv::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
562     use prelude::*;
563
564     #[test]
565     fn connect_close_ip4() {
566         do run_in_bare_thread() {
567             let mut loop_ = Loop::new();
568             let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
569             // Connect to a port where nobody is listening
570             let addr = next_test_ip4();
571             do tcp_watcher.connect(addr) |stream_watcher, status| {
572                 rtdebug!("tcp_watcher.connect!");
573                 assert!(status.is_some());
574                 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
575                 stream_watcher.close(||());
576             }
577             loop_.run();
578             loop_.close();
579         }
580     }
581
582     #[test]
583     fn connect_close_ip6() {
584         do run_in_bare_thread() {
585             let mut loop_ = Loop::new();
586             let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
587             // Connect to a port where nobody is listening
588             let addr = next_test_ip6();
589             do tcp_watcher.connect(addr) |stream_watcher, status| {
590                 rtdebug!("tcp_watcher.connect!");
591                 assert!(status.is_some());
592                 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
593                 stream_watcher.close(||());
594             }
595             loop_.run();
596             loop_.close();
597         }
598     }
599
600     #[test]
601     fn udp_bind_close_ip4() {
602         do run_in_bare_thread() {
603             let mut loop_ = Loop::new();
604             let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
605             let addr = next_test_ip4();
606             udp_watcher.bind(addr);
607             udp_watcher.close(||());
608             loop_.run();
609             loop_.close();
610         }
611     }
612
613     #[test]
614     fn udp_bind_close_ip6() {
615         do run_in_bare_thread() {
616             let mut loop_ = Loop::new();
617             let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
618             let addr = next_test_ip6();
619             udp_watcher.bind(addr);
620             udp_watcher.close(||());
621             loop_.run();
622             loop_.close();
623         }
624     }
625
626     #[test]
627     #[ignore(cfg(windows))] // FIXME #8815
628     fn listen_ip4() {
629         do run_in_bare_thread() {
630             static MAX: int = 10;
631             let mut loop_ = Loop::new();
632             let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
633             let addr = next_test_ip4();
634             server_tcp_watcher.bind(addr);
635             let loop_ = loop_;
636             rtdebug!("listening");
637             do server_tcp_watcher.listen |mut server_stream_watcher, status| {
638                 rtdebug!("listened!");
639                 assert!(status.is_none());
640                 let mut loop_ = loop_;
641                 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
642                 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
643                 server_stream_watcher.accept(client_tcp_watcher);
644                 let count_cell = Cell::new(0);
645                 let server_stream_watcher = server_stream_watcher;
646                 rtdebug!("starting read");
647                 let alloc: AllocCallback = |size| {
648                     vec_to_uv_buf(vec::from_elem(size, 0u8))
649                 };
650                 do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| {
651
652                     rtdebug!("i'm reading!");
653                     let buf = vec_from_uv_buf(buf);
654                     let mut count = count_cell.take();
655                     if status.is_none() {
656                         rtdebug!("got %d bytes", nread);
657                         let buf = buf.unwrap();
658                         for byte in buf.slice(0, nread as uint).iter() {
659                             assert!(*byte == count as u8);
660                             rtdebug!("%u", *byte as uint);
661                             count += 1;
662                         }
663                     } else {
664                         assert_eq!(count, MAX);
665                         do stream_watcher.close {
666                             server_stream_watcher.close(||());
667                         }
668                     }
669                     count_cell.put_back(count);
670                 }
671             }
672
673             let client_thread = do Thread::start {
674                 rtdebug!("starting client thread");
675                 let mut loop_ = Loop::new();
676                 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
677                 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
678                     rtdebug!("connecting");
679                     assert!(status.is_none());
680                     let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
681                     let buf = slice_to_uv_buf(msg);
682                     let msg_cell = Cell::new(msg);
683                     do stream_watcher.write(buf) |stream_watcher, status| {
684                         rtdebug!("writing");
685                         assert!(status.is_none());
686                         let msg_cell = Cell::new(msg_cell.take());
687                         stream_watcher.close(||ignore(msg_cell.take()));
688                     }
689                 }
690                 loop_.run();
691                 loop_.close();
692             };
693
694             let mut loop_ = loop_;
695             loop_.run();
696             loop_.close();
697             client_thread.join();
698         }
699     }
700
701     #[test]
702     #[ignore(cfg(windows))] // FIXME #8815
703     fn listen_ip6() {
704         do run_in_bare_thread() {
705             static MAX: int = 10;
706             let mut loop_ = Loop::new();
707             let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
708             let addr = next_test_ip6();
709             server_tcp_watcher.bind(addr);
710             let loop_ = loop_;
711             rtdebug!("listening");
712             do server_tcp_watcher.listen |mut server_stream_watcher, status| {
713                 rtdebug!("listened!");
714                 assert!(status.is_none());
715                 let mut loop_ = loop_;
716                 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
717                 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
718                 server_stream_watcher.accept(client_tcp_watcher);
719                 let count_cell = Cell::new(0);
720                 let server_stream_watcher = server_stream_watcher;
721                 rtdebug!("starting read");
722                 let alloc: AllocCallback = |size| {
723                     vec_to_uv_buf(vec::from_elem(size, 0u8))
724                 };
725                 do client_tcp_watcher.read_start(alloc)
726                     |stream_watcher, nread, buf, status| {
727
728                     rtdebug!("i'm reading!");
729                     let buf = vec_from_uv_buf(buf);
730                     let mut count = count_cell.take();
731                     if status.is_none() {
732                         rtdebug!("got %d bytes", nread);
733                         let buf = buf.unwrap();
734                         let r = buf.slice(0, nread as uint);
735                         for byte in r.iter() {
736                             assert!(*byte == count as u8);
737                             rtdebug!("%u", *byte as uint);
738                             count += 1;
739                         }
740                     } else {
741                         assert_eq!(count, MAX);
742                         do stream_watcher.close {
743                             server_stream_watcher.close(||());
744                         }
745                     }
746                     count_cell.put_back(count);
747                 }
748             }
749
750             let client_thread = do Thread::start {
751                 rtdebug!("starting client thread");
752                 let mut loop_ = Loop::new();
753                 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
754                 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
755                     rtdebug!("connecting");
756                     assert!(status.is_none());
757                     let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
758                     let buf = slice_to_uv_buf(msg);
759                     let msg_cell = Cell::new(msg);
760                     do stream_watcher.write(buf) |stream_watcher, status| {
761                         rtdebug!("writing");
762                         assert!(status.is_none());
763                         let msg_cell = Cell::new(msg_cell.take());
764                         stream_watcher.close(||ignore(msg_cell.take()));
765                     }
766                 }
767                 loop_.run();
768                 loop_.close();
769             };
770
771             let mut loop_ = loop_;
772             loop_.run();
773             loop_.close();
774             client_thread.join();
775         }
776     }
777
778     #[test]
779     #[ignore(cfg(windows))] // FIXME #8815
780     fn udp_recv_ip4() {
781         do run_in_bare_thread() {
782             static MAX: int = 10;
783             let mut loop_ = Loop::new();
784             let server_addr = next_test_ip4();
785             let client_addr = next_test_ip4();
786
787             let mut server = UdpWatcher::new(&loop_);
788             assert!(server.bind(server_addr).is_ok());
789
790             rtdebug!("starting read");
791             let alloc: AllocCallback = |size| {
792                 vec_to_uv_buf(vec::from_elem(size, 0u8))
793             };
794
795             do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
796                 server.recv_stop();
797                 rtdebug!("i'm reading!");
798                 assert!(status.is_none());
799                 assert_eq!(flags, 0);
800                 assert_eq!(src, client_addr);
801
802                 let buf = vec_from_uv_buf(buf);
803                 let mut count = 0;
804                 rtdebug!("got %d bytes", nread);
805
806                 let buf = buf.unwrap();
807                 for &byte in buf.slice(0, nread as uint).iter() {
808                     assert!(byte == count as u8);
809                     rtdebug!("%u", byte as uint);
810                     count += 1;
811                 }
812                 assert_eq!(count, MAX);
813
814                 server.close(||{});
815             }
816
817             let thread = do Thread::start {
818                 let mut loop_ = Loop::new();
819                 let mut client = UdpWatcher::new(&loop_);
820                 assert!(client.bind(client_addr).is_ok());
821                 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
822                 let buf = slice_to_uv_buf(msg);
823                 do client.send(buf, server_addr) |client, status| {
824                     rtdebug!("writing");
825                     assert!(status.is_none());
826                     client.close(||{});
827                 }
828
829                 loop_.run();
830                 loop_.close();
831             };
832
833             loop_.run();
834             loop_.close();
835             thread.join();
836         }
837     }
838
839     #[test]
840     #[ignore(cfg(windows))] // FIXME #8815
841     fn udp_recv_ip6() {
842         do run_in_bare_thread() {
843             static MAX: int = 10;
844             let mut loop_ = Loop::new();
845             let server_addr = next_test_ip6();
846             let client_addr = next_test_ip6();
847
848             let mut server = UdpWatcher::new(&loop_);
849             assert!(server.bind(server_addr).is_ok());
850
851             rtdebug!("starting read");
852             let alloc: AllocCallback = |size| {
853                 vec_to_uv_buf(vec::from_elem(size, 0u8))
854             };
855
856             do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
857                 server.recv_stop();
858                 rtdebug!("i'm reading!");
859                 assert!(status.is_none());
860                 assert_eq!(flags, 0);
861                 assert_eq!(src, client_addr);
862
863                 let buf = vec_from_uv_buf(buf);
864                 let mut count = 0;
865                 rtdebug!("got %d bytes", nread);
866
867                 let buf = buf.unwrap();
868                 for &byte in buf.slice(0, nread as uint).iter() {
869                     assert!(byte == count as u8);
870                     rtdebug!("%u", byte as uint);
871                     count += 1;
872                 }
873                 assert_eq!(count, MAX);
874
875                 server.close(||{});
876             }
877
878             let thread = do Thread::start {
879                 let mut loop_ = Loop::new();
880                 let mut client = UdpWatcher::new(&loop_);
881                 assert!(client.bind(client_addr).is_ok());
882                 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
883                 let buf = slice_to_uv_buf(msg);
884                 do client.send(buf, server_addr) |client, status| {
885                     rtdebug!("writing");
886                     assert!(status.is_none());
887                     client.close(||{});
888                 }
889
890                 loop_.run();
891                 loop_.close();
892             };
893
894             loop_.run();
895             loop_.close();
896             thread.join();
897         }
898     }
899 }