]> git.lizzy.rs Git - rust.git/blob - src/libstd/rt/uv/net.rs
Turned off libstd unit tests that currently fail on Windows.
[rust.git] / src / libstd / rt / uv / net.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use prelude::*;
12 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
13 use rt::uv::uvll;
14 use rt::uv::uvll::*;
15 use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, UdpSendCallback};
16 use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback,
17              status_to_maybe_uv_error};
18 use rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
19 use vec;
20 use str;
21 use from_str::{FromStr};
22
23 pub enum UvSocketAddr {
24     UvIpv4SocketAddr(*sockaddr_in),
25     UvIpv6SocketAddr(*sockaddr_in6),
26 }
27
28 fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
29     unsafe {
30         assert!((is_ip4_addr(addr) || is_ip6_addr(addr)));
31         assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr)));
32         match addr {
33             _ if is_ip4_addr(addr) => UvIpv4SocketAddr(addr as *uvll::sockaddr_in),
34             _ if is_ip6_addr(addr) => UvIpv6SocketAddr(addr as *uvll::sockaddr_in6),
35             _ => fail!(),
36         }
37     }
38 }
39
40 fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
41     let malloc = match addr.ip {
42         Ipv4Addr(*) => malloc_ip4_addr,
43         Ipv6Addr(*) => malloc_ip6_addr,
44     };
45     let wrap = match addr.ip {
46         Ipv4Addr(*) => UvIpv4SocketAddr,
47         Ipv6Addr(*) => UvIpv6SocketAddr,
48     };
49     let free = match addr.ip {
50         Ipv4Addr(*) => free_ip4_addr,
51         Ipv6Addr(*) => free_ip6_addr,
52     };
53
54     let addr = unsafe { malloc(addr.ip.to_str(), addr.port as int) };
55     do (|| {
56         f(wrap(addr))
57     }).finally {
58         unsafe { free(addr) };
59     }
60 }
61
62 fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
63     let ip_size = match addr {
64         UvIpv4SocketAddr(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/,
65         UvIpv6SocketAddr(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/,
66     };
67     let ip_name = {
68         let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
69         unsafe {
70             let buf_ptr = vec::raw::to_ptr(buf);
71             match addr {
72                 UvIpv4SocketAddr(addr) => uvll::ip4_name(addr, buf_ptr, ip_size as size_t),
73                 UvIpv6SocketAddr(addr) => uvll::ip6_name(addr, buf_ptr, ip_size as size_t),
74             }
75         };
76         buf
77     };
78     let ip_port = unsafe {
79         let port = match addr {
80             UvIpv4SocketAddr(addr) => uvll::ip4_port(addr),
81             UvIpv6SocketAddr(addr) => uvll::ip6_port(addr),
82         };
83         port as u16
84     };
85     let ip_str = str::from_bytes_slice(ip_name).trim_right_chars(&'\x00');
86     let ip_addr = FromStr::from_str(ip_str).unwrap();
87
88     // finally run the closure
89     f(SocketAddr { ip: ip_addr, port: ip_port })
90 }
91
92 pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
93     use util;
94     uv_socket_addr_as_socket_addr(addr, util::id)
95 }
96
97 #[cfg(test)]
98 #[test]
99 fn test_ip4_conversion() {
100     use rt;
101     let ip4 = rt::test::next_test_ip4();
102     assert_eq!(ip4, socket_addr_as_uv_socket_addr(ip4, uv_socket_addr_to_socket_addr));
103 }
104
105 #[cfg(test)]
106 #[test]
107 fn test_ip6_conversion() {
108     use rt;
109     let ip6 = rt::test::next_test_ip6();
110     assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
111 }
112
113 // uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
114 // and uv_file_t
115 pub struct StreamWatcher(*uvll::uv_stream_t);
116 impl Watcher for StreamWatcher { }
117
118 impl StreamWatcher {
119     pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
120         {
121             let data = self.get_watcher_data();
122             data.alloc_cb = Some(alloc);
123             data.read_cb = Some(cb);
124         }
125
126         unsafe { uvll::read_start(self.native_handle(), alloc_cb, read_cb); }
127
128         extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
129             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
130             let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref();
131             return (*alloc_cb)(suggested_size as uint);
132         }
133
134         extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) {
135             rtdebug!("buf addr: %x", buf.base as uint);
136             rtdebug!("buf len: %d", buf.len as int);
137             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
138             let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
139             let status = status_to_maybe_uv_error(nread as c_int);
140             (*cb)(stream_watcher, nread as int, buf, status);
141         }
142     }
143
144     pub fn read_stop(&mut self) {
145         // It would be nice to drop the alloc and read callbacks here,
146         // but read_stop may be called from inside one of them and we
147         // would end up freeing the in-use environment
148         let handle = self.native_handle();
149         unsafe { uvll::read_stop(handle); }
150     }
151
152     pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
153         {
154             let data = self.get_watcher_data();
155             assert!(data.write_cb.is_none());
156             data.write_cb = Some(cb);
157         }
158
159         let req = WriteRequest::new();
160         unsafe {
161         assert_eq!(0, uvll::write(req.native_handle(), self.native_handle(), [buf], write_cb));
162         }
163
164         extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
165             let write_request: WriteRequest = NativeHandle::from_native_handle(req);
166             let mut stream_watcher = write_request.stream();
167             write_request.delete();
168             let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap();
169             let status = status_to_maybe_uv_error(status);
170             cb(stream_watcher, status);
171         }
172     }
173
174     pub fn accept(&mut self, stream: StreamWatcher) {
175         let self_handle = self.native_handle() as *c_void;
176         let stream_handle = stream.native_handle() as *c_void;
177         assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } );
178     }
179
180     pub fn close(self, cb: NullCallback) {
181         {
182             let mut this = self;
183             let data = this.get_watcher_data();
184             assert!(data.close_cb.is_none());
185             data.close_cb = Some(cb);
186         }
187
188         unsafe { uvll::close(self.native_handle(), close_cb); }
189
190         extern fn close_cb(handle: *uvll::uv_stream_t) {
191             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
192             let cb = stream_watcher.get_watcher_data().close_cb.take_unwrap();
193             stream_watcher.drop_watcher_data();
194             unsafe { free_handle(handle as *c_void) }
195             cb();
196         }
197     }
198 }
199
200 impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
201     fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
202         StreamWatcher(handle)
203     }
204     fn native_handle(&self) -> *uvll::uv_stream_t {
205         match self { &StreamWatcher(ptr) => ptr }
206     }
207 }
208
209 pub struct TcpWatcher(*uvll::uv_tcp_t);
210 impl Watcher for TcpWatcher { }
211
212 impl TcpWatcher {
213     pub fn new(loop_: &Loop) -> TcpWatcher {
214         unsafe {
215             let handle = malloc_handle(UV_TCP);
216             assert!(handle.is_not_null());
217             assert_eq!(0, uvll::tcp_init(loop_.native_handle(), handle));
218             let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
219             watcher.install_watcher_data();
220             return watcher;
221         }
222     }
223
224     pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
225         do socket_addr_as_uv_socket_addr(address) |addr| {
226             let result = unsafe {
227                 match addr {
228                     UvIpv4SocketAddr(addr) => uvll::tcp_bind(self.native_handle(), addr),
229                     UvIpv6SocketAddr(addr) => uvll::tcp_bind6(self.native_handle(), addr),
230                 }
231             };
232             match result {
233                 0 => Ok(()),
234                 _ => Err(UvError(result)),
235             }
236         }
237     }
238
239     pub fn connect(&mut self, address: SocketAddr, cb: ConnectionCallback) {
240         unsafe {
241             assert!(self.get_watcher_data().connect_cb.is_none());
242             self.get_watcher_data().connect_cb = Some(cb);
243
244             let connect_handle = ConnectRequest::new().native_handle();
245             rtdebug!("connect_t: %x", connect_handle as uint);
246             do socket_addr_as_uv_socket_addr(address) |addr| {
247                 let result = match addr {
248                     UvIpv4SocketAddr(addr) => uvll::tcp_connect(connect_handle,
249                                                       self.native_handle(), addr, connect_cb),
250                     UvIpv6SocketAddr(addr) => uvll::tcp_connect6(connect_handle,
251                                                        self.native_handle(), addr, connect_cb),
252                 };
253                 assert_eq!(0, result);
254             }
255
256             extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
257                 rtdebug!("connect_t: %x", req as uint);
258                 let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
259                 let mut stream_watcher = connect_request.stream();
260                 connect_request.delete();
261                 let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
262                 let status = status_to_maybe_uv_error(status);
263                 cb(stream_watcher, status);
264             }
265         }
266     }
267
268     pub fn listen(&mut self, cb: ConnectionCallback) {
269         {
270             let data = self.get_watcher_data();
271             assert!(data.connect_cb.is_none());
272             data.connect_cb = Some(cb);
273         }
274
275         unsafe {
276             static BACKLOG: c_int = 128; // XXX should be configurable
277             // XXX: This can probably fail
278             assert_eq!(0, uvll::listen(self.native_handle(), BACKLOG, connection_cb));
279         }
280
281         extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
282             rtdebug!("connection_cb");
283             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
284             let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
285             let status = status_to_maybe_uv_error(status);
286             (*cb)(stream_watcher, status);
287         }
288     }
289
290     pub fn as_stream(&self) -> StreamWatcher {
291         NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t)
292     }
293 }
294
295 impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
296     fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
297         TcpWatcher(handle)
298     }
299     fn native_handle(&self) -> *uvll::uv_tcp_t {
300         match self { &TcpWatcher(ptr) => ptr }
301     }
302 }
303
304 pub struct UdpWatcher(*uvll::uv_udp_t);
305 impl Watcher for UdpWatcher { }
306
307 impl UdpWatcher {
308     pub fn new(loop_: &Loop) -> UdpWatcher {
309         unsafe {
310             let handle = malloc_handle(UV_UDP);
311             assert!(handle.is_not_null());
312             assert_eq!(0, uvll::udp_init(loop_.native_handle(), handle));
313             let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
314             watcher.install_watcher_data();
315             return watcher;
316         }
317     }
318
319     pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
320         do socket_addr_as_uv_socket_addr(address) |addr| {
321             let result = unsafe {
322                 match addr {
323                     UvIpv4SocketAddr(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32),
324                     UvIpv6SocketAddr(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32),
325                 }
326             };
327             match result {
328                 0 => Ok(()),
329                 _ => Err(UvError(result)),
330             }
331         }
332     }
333
334     pub fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) {
335         {
336             let data = self.get_watcher_data();
337             data.alloc_cb = Some(alloc);
338             data.udp_recv_cb = Some(cb);
339         }
340
341         unsafe { uvll::udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
342
343         extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
344             let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
345             let alloc_cb = udp_watcher.get_watcher_data().alloc_cb.get_ref();
346             return (*alloc_cb)(suggested_size as uint);
347         }
348
349         extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
350                           addr: *uvll::sockaddr, flags: c_uint) {
351             // When there's no data to read the recv callback can be a no-op.
352             // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
353             // this we just drop back to kqueue and wait for the next callback.
354             if nread == 0 {
355                 return;
356             }
357
358             rtdebug!("buf addr: %x", buf.base as uint);
359             rtdebug!("buf len: %d", buf.len as int);
360             let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
361             let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
362             let status = status_to_maybe_uv_error(nread as c_int);
363             let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
364             (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
365         }
366     }
367
368     pub fn recv_stop(&mut self) {
369         unsafe { uvll::udp_recv_stop(self.native_handle()); }
370     }
371
372     pub fn send(&mut self, buf: Buf, address: SocketAddr, cb: UdpSendCallback) {
373         {
374             let data = self.get_watcher_data();
375             assert!(data.udp_send_cb.is_none());
376             data.udp_send_cb = Some(cb);
377         }
378
379         let req = UdpSendRequest::new();
380         do socket_addr_as_uv_socket_addr(address) |addr| {
381             let result = unsafe {
382                 match addr {
383                     UvIpv4SocketAddr(addr) => uvll::udp_send(req.native_handle(),
384                                                    self.native_handle(), [buf], addr, send_cb),
385                     UvIpv6SocketAddr(addr) => uvll::udp_send6(req.native_handle(),
386                                                     self.native_handle(), [buf], addr, send_cb),
387                 }
388             };
389             assert_eq!(0, result);
390         }
391
392         extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
393             let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
394             let mut udp_watcher = send_request.handle();
395             send_request.delete();
396             let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap();
397             let status = status_to_maybe_uv_error(status);
398             cb(udp_watcher, status);
399         }
400     }
401
402     pub fn close(self, cb: NullCallback) {
403         {
404             let mut this = self;
405             let data = this.get_watcher_data();
406             assert!(data.close_cb.is_none());
407             data.close_cb = Some(cb);
408         }
409
410         unsafe { uvll::close(self.native_handle(), close_cb); }
411
412         extern fn close_cb(handle: *uvll::uv_udp_t) {
413             let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
414             let cb = udp_watcher.get_watcher_data().close_cb.take_unwrap();
415             udp_watcher.drop_watcher_data();
416             unsafe { free_handle(handle as *c_void) }
417             cb();
418         }
419     }
420 }
421
422 impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
423     fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
424         UdpWatcher(handle)
425     }
426     fn native_handle(&self) -> *uvll::uv_udp_t {
427         match self { &UdpWatcher(ptr) => ptr }
428     }
429 }
430
431 // uv_connect_t is a subclass of uv_req_t
432 struct ConnectRequest(*uvll::uv_connect_t);
433 impl Request for ConnectRequest { }
434
435 impl ConnectRequest {
436
437     fn new() -> ConnectRequest {
438         let connect_handle = unsafe { malloc_req(UV_CONNECT) };
439         assert!(connect_handle.is_not_null());
440         ConnectRequest(connect_handle as *uvll::uv_connect_t)
441     }
442
443     fn stream(&self) -> StreamWatcher {
444         unsafe {
445             let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle());
446             NativeHandle::from_native_handle(stream_handle)
447         }
448     }
449
450     fn delete(self) {
451         unsafe { free_req(self.native_handle() as *c_void) }
452     }
453 }
454
455 impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
456     fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
457         ConnectRequest(handle)
458     }
459     fn native_handle(&self) -> *uvll::uv_connect_t {
460         match self { &ConnectRequest(ptr) => ptr }
461     }
462 }
463
464 pub struct WriteRequest(*uvll::uv_write_t);
465
466 impl Request for WriteRequest { }
467
468 impl WriteRequest {
469     pub fn new() -> WriteRequest {
470         let write_handle = unsafe { malloc_req(UV_WRITE) };
471         assert!(write_handle.is_not_null());
472         WriteRequest(write_handle as *uvll::uv_write_t)
473     }
474
475     pub fn stream(&self) -> StreamWatcher {
476         unsafe {
477             let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle());
478             NativeHandle::from_native_handle(stream_handle)
479         }
480     }
481
482     pub fn delete(self) {
483         unsafe { free_req(self.native_handle() as *c_void) }
484     }
485 }
486
487 impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
488     fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
489         WriteRequest(handle)
490     }
491     fn native_handle(&self) -> *uvll::uv_write_t {
492         match self { &WriteRequest(ptr) => ptr }
493     }
494 }
495
496 pub struct UdpSendRequest(*uvll::uv_udp_send_t);
497 impl Request for UdpSendRequest { }
498
499 impl UdpSendRequest {
500     pub fn new() -> UdpSendRequest {
501         let send_handle = unsafe { malloc_req(UV_UDP_SEND) };
502         assert!(send_handle.is_not_null());
503         UdpSendRequest(send_handle as *uvll::uv_udp_send_t)
504     }
505
506     pub fn handle(&self) -> UdpWatcher {
507         let send_request_handle = unsafe {
508             uvll::get_udp_handle_from_send_req(self.native_handle())
509         };
510         NativeHandle::from_native_handle(send_request_handle)
511     }
512
513     pub fn delete(self) {
514         unsafe { free_req(self.native_handle() as *c_void) }
515     }
516 }
517
518 impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
519     fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
520         UdpSendRequest(handle)
521     }
522     fn native_handle(&self) -> *uvll::uv_udp_send_t {
523         match self { &UdpSendRequest(ptr) => ptr }
524     }
525 }
526
527 #[cfg(test)]
528 mod test {
529     use super::*;
530     use util::ignore;
531     use cell::Cell;
532     use vec;
533     use unstable::run_in_bare_thread;
534     use rt::thread::Thread;
535     use rt::test::*;
536     use rt::uv::{Loop, AllocCallback};
537     use rt::uv::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
538     use prelude::*;
539
540     #[test]
541     fn connect_close_ip4() {
542         do run_in_bare_thread() {
543             let mut loop_ = Loop::new();
544             let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
545             // Connect to a port where nobody is listening
546             let addr = next_test_ip4();
547             do tcp_watcher.connect(addr) |stream_watcher, status| {
548                 rtdebug!("tcp_watcher.connect!");
549                 assert!(status.is_some());
550                 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
551                 stream_watcher.close(||());
552             }
553             loop_.run();
554             loop_.close();
555         }
556     }
557
558     #[test]
559     fn connect_close_ip6() {
560         do run_in_bare_thread() {
561             let mut loop_ = Loop::new();
562             let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
563             // Connect to a port where nobody is listening
564             let addr = next_test_ip6();
565             do tcp_watcher.connect(addr) |stream_watcher, status| {
566                 rtdebug!("tcp_watcher.connect!");
567                 assert!(status.is_some());
568                 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
569                 stream_watcher.close(||());
570             }
571             loop_.run();
572             loop_.close();
573         }
574     }
575
576     #[test]
577     fn udp_bind_close_ip4() {
578         do run_in_bare_thread() {
579             let mut loop_ = Loop::new();
580             let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
581             let addr = next_test_ip4();
582             udp_watcher.bind(addr);
583             udp_watcher.close(||());
584             loop_.run();
585             loop_.close();
586         }
587     }
588
589     #[test]
590     fn udp_bind_close_ip6() {
591         do run_in_bare_thread() {
592             let mut loop_ = Loop::new();
593             let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
594             let addr = next_test_ip6();
595             udp_watcher.bind(addr);
596             udp_watcher.close(||());
597             loop_.run();
598             loop_.close();
599         }
600     }
601
602     #[test]
603     #[ignore(cfg(windows))] // FIXME #8815
604     fn listen_ip4() {
605         do run_in_bare_thread() {
606             static MAX: int = 10;
607             let mut loop_ = Loop::new();
608             let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
609             let addr = next_test_ip4();
610             server_tcp_watcher.bind(addr);
611             let loop_ = loop_;
612             rtdebug!("listening");
613             do server_tcp_watcher.listen |mut server_stream_watcher, status| {
614                 rtdebug!("listened!");
615                 assert!(status.is_none());
616                 let mut loop_ = loop_;
617                 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
618                 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
619                 server_stream_watcher.accept(client_tcp_watcher);
620                 let count_cell = Cell::new(0);
621                 let server_stream_watcher = server_stream_watcher;
622                 rtdebug!("starting read");
623                 let alloc: AllocCallback = |size| {
624                     vec_to_uv_buf(vec::from_elem(size, 0u8))
625                 };
626                 do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| {
627
628                     rtdebug!("i'm reading!");
629                     let buf = vec_from_uv_buf(buf);
630                     let mut count = count_cell.take();
631                     if status.is_none() {
632                         rtdebug!("got %d bytes", nread);
633                         let buf = buf.unwrap();
634                         for byte in buf.slice(0, nread as uint).iter() {
635                             assert!(*byte == count as u8);
636                             rtdebug!("%u", *byte as uint);
637                             count += 1;
638                         }
639                     } else {
640                         assert_eq!(count, MAX);
641                         do stream_watcher.close {
642                             server_stream_watcher.close(||());
643                         }
644                     }
645                     count_cell.put_back(count);
646                 }
647             }
648
649             let client_thread = do Thread::start {
650                 rtdebug!("starting client thread");
651                 let mut loop_ = Loop::new();
652                 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
653                 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
654                     rtdebug!("connecting");
655                     assert!(status.is_none());
656                     let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
657                     let buf = slice_to_uv_buf(msg);
658                     let msg_cell = Cell::new(msg);
659                     do stream_watcher.write(buf) |stream_watcher, status| {
660                         rtdebug!("writing");
661                         assert!(status.is_none());
662                         let msg_cell = Cell::new(msg_cell.take());
663                         stream_watcher.close(||ignore(msg_cell.take()));
664                     }
665                 }
666                 loop_.run();
667                 loop_.close();
668             };
669
670             let mut loop_ = loop_;
671             loop_.run();
672             loop_.close();
673             client_thread.join();
674         }
675     }
676
677     #[test]
678     #[ignore(cfg(windows))] // FIXME #8815
679     fn listen_ip6() {
680         do run_in_bare_thread() {
681             static MAX: int = 10;
682             let mut loop_ = Loop::new();
683             let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
684             let addr = next_test_ip6();
685             server_tcp_watcher.bind(addr);
686             let loop_ = loop_;
687             rtdebug!("listening");
688             do server_tcp_watcher.listen |mut server_stream_watcher, status| {
689                 rtdebug!("listened!");
690                 assert!(status.is_none());
691                 let mut loop_ = loop_;
692                 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
693                 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
694                 server_stream_watcher.accept(client_tcp_watcher);
695                 let count_cell = Cell::new(0);
696                 let server_stream_watcher = server_stream_watcher;
697                 rtdebug!("starting read");
698                 let alloc: AllocCallback = |size| {
699                     vec_to_uv_buf(vec::from_elem(size, 0u8))
700                 };
701                 do client_tcp_watcher.read_start(alloc)
702                     |stream_watcher, nread, buf, status| {
703
704                     rtdebug!("i'm reading!");
705                     let buf = vec_from_uv_buf(buf);
706                     let mut count = count_cell.take();
707                     if status.is_none() {
708                         rtdebug!("got %d bytes", nread);
709                         let buf = buf.unwrap();
710                         let r = buf.slice(0, nread as uint);
711                         for byte in r.iter() {
712                             assert!(*byte == count as u8);
713                             rtdebug!("%u", *byte as uint);
714                             count += 1;
715                         }
716                     } else {
717                         assert_eq!(count, MAX);
718                         do stream_watcher.close {
719                             server_stream_watcher.close(||());
720                         }
721                     }
722                     count_cell.put_back(count);
723                 }
724             }
725
726             let client_thread = do Thread::start {
727                 rtdebug!("starting client thread");
728                 let mut loop_ = Loop::new();
729                 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
730                 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
731                     rtdebug!("connecting");
732                     assert!(status.is_none());
733                     let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
734                     let buf = slice_to_uv_buf(msg);
735                     let msg_cell = Cell::new(msg);
736                     do stream_watcher.write(buf) |stream_watcher, status| {
737                         rtdebug!("writing");
738                         assert!(status.is_none());
739                         let msg_cell = Cell::new(msg_cell.take());
740                         stream_watcher.close(||ignore(msg_cell.take()));
741                     }
742                 }
743                 loop_.run();
744                 loop_.close();
745             };
746
747             let mut loop_ = loop_;
748             loop_.run();
749             loop_.close();
750             client_thread.join();
751         }
752     }
753
754     #[test]
755     #[ignore(cfg(windows))] // FIXME #8815
756     fn udp_recv_ip4() {
757         do run_in_bare_thread() {
758             static MAX: int = 10;
759             let mut loop_ = Loop::new();
760             let server_addr = next_test_ip4();
761             let client_addr = next_test_ip4();
762
763             let mut server = UdpWatcher::new(&loop_);
764             assert!(server.bind(server_addr).is_ok());
765
766             rtdebug!("starting read");
767             let alloc: AllocCallback = |size| {
768                 vec_to_uv_buf(vec::from_elem(size, 0u8))
769             };
770
771             do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
772                 server.recv_stop();
773                 rtdebug!("i'm reading!");
774                 assert!(status.is_none());
775                 assert_eq!(flags, 0);
776                 assert_eq!(src, client_addr);
777
778                 let buf = vec_from_uv_buf(buf);
779                 let mut count = 0;
780                 rtdebug!("got %d bytes", nread);
781
782                 let buf = buf.unwrap();
783                 for &byte in buf.slice(0, nread as uint).iter() {
784                     assert!(byte == count as u8);
785                     rtdebug!("%u", byte as uint);
786                     count += 1;
787                 }
788                 assert_eq!(count, MAX);
789
790                 server.close(||{});
791             }
792
793             let thread = do Thread::start {
794                 let mut loop_ = Loop::new();
795                 let mut client = UdpWatcher::new(&loop_);
796                 assert!(client.bind(client_addr).is_ok());
797                 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
798                 let buf = slice_to_uv_buf(msg);
799                 do client.send(buf, server_addr) |client, status| {
800                     rtdebug!("writing");
801                     assert!(status.is_none());
802                     client.close(||{});
803                 }
804
805                 loop_.run();
806                 loop_.close();
807             };
808
809             loop_.run();
810             loop_.close();
811             thread.join();
812         }
813     }
814
815     #[test]
816     #[ignore(cfg(windows))] // FIXME #8815
817     fn udp_recv_ip6() {
818         do run_in_bare_thread() {
819             static MAX: int = 10;
820             let mut loop_ = Loop::new();
821             let server_addr = next_test_ip6();
822             let client_addr = next_test_ip6();
823
824             let mut server = UdpWatcher::new(&loop_);
825             assert!(server.bind(server_addr).is_ok());
826
827             rtdebug!("starting read");
828             let alloc: AllocCallback = |size| {
829                 vec_to_uv_buf(vec::from_elem(size, 0u8))
830             };
831
832             do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
833                 server.recv_stop();
834                 rtdebug!("i'm reading!");
835                 assert!(status.is_none());
836                 assert_eq!(flags, 0);
837                 assert_eq!(src, client_addr);
838
839                 let buf = vec_from_uv_buf(buf);
840                 let mut count = 0;
841                 rtdebug!("got %d bytes", nread);
842
843                 let buf = buf.unwrap();
844                 for &byte in buf.slice(0, nread as uint).iter() {
845                     assert!(byte == count as u8);
846                     rtdebug!("%u", byte as uint);
847                     count += 1;
848                 }
849                 assert_eq!(count, MAX);
850
851                 server.close(||{});
852             }
853
854             let thread = do Thread::start {
855                 let mut loop_ = Loop::new();
856                 let mut client = UdpWatcher::new(&loop_);
857                 assert!(client.bind(client_addr).is_ok());
858                 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
859                 let buf = slice_to_uv_buf(msg);
860                 do client.send(buf, server_addr) |client, status| {
861                     rtdebug!("writing");
862                     assert!(status.is_none());
863                     client.close(||{});
864                 }
865
866                 loop_.run();
867                 loop_.close();
868             };
869
870             loop_.run();
871             loop_.close();
872             thread.join();
873         }
874     }
875 }