]> git.lizzy.rs Git - rust.git/blob - src/librustuv/net.rs
auto merge of #10203 : kud1ing/rust/ios, r=alexcrichton
[rust.git] / src / librustuv / net.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use std::libc::{size_t, ssize_t, c_int, c_void, c_uint};
12 use std::vec;
13 use std::str;
14 use std::rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
15
16 use uvll;
17 use uvll::*;
18 use super::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback,
19             UdpSendCallback, Loop, Watcher, Request, UvError, Buf, NativeHandle,
20             status_to_maybe_uv_error, empty_buf};
21
22 pub struct UvAddrInfo(*uvll::addrinfo);
23
24 pub enum UvSocketAddr {
25     UvIpv4SocketAddr(*sockaddr_in),
26     UvIpv6SocketAddr(*sockaddr_in6),
27 }
28
29 pub fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
30     unsafe {
31         assert!((is_ip4_addr(addr) || is_ip6_addr(addr)));
32         assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr)));
33         match addr {
34             _ if is_ip4_addr(addr) => UvIpv4SocketAddr(addr as *uvll::sockaddr_in),
35             _ if is_ip6_addr(addr) => UvIpv6SocketAddr(addr as *uvll::sockaddr_in6),
36             _ => fail!(),
37         }
38     }
39 }
40
41 fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
42     let malloc = match addr.ip {
43         Ipv4Addr(*) => malloc_ip4_addr,
44         Ipv6Addr(*) => malloc_ip6_addr,
45     };
46     let wrap = match addr.ip {
47         Ipv4Addr(*) => UvIpv4SocketAddr,
48         Ipv6Addr(*) => UvIpv6SocketAddr,
49     };
50     let free = match addr.ip {
51         Ipv4Addr(*) => free_ip4_addr,
52         Ipv6Addr(*) => free_ip6_addr,
53     };
54
55     let addr = unsafe { malloc(addr.ip.to_str(), addr.port as int) };
56     do (|| {
57         f(wrap(addr))
58     }).finally {
59         unsafe { free(addr) };
60     }
61 }
62
63 fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
64     let ip_size = match addr {
65         UvIpv4SocketAddr(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/,
66         UvIpv6SocketAddr(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/,
67     };
68     let ip_name = {
69         let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
70         unsafe {
71             let buf_ptr = vec::raw::to_ptr(buf);
72             match addr {
73                 UvIpv4SocketAddr(addr) => uvll::ip4_name(addr, buf_ptr, ip_size as size_t),
74                 UvIpv6SocketAddr(addr) => uvll::ip6_name(addr, buf_ptr, ip_size as size_t),
75             }
76         };
77         buf
78     };
79     let ip_port = unsafe {
80         let port = match addr {
81             UvIpv4SocketAddr(addr) => uvll::ip4_port(addr),
82             UvIpv6SocketAddr(addr) => uvll::ip6_port(addr),
83         };
84         port as u16
85     };
86     let ip_str = str::from_utf8_slice(ip_name).trim_right_chars(&'\x00');
87     let ip_addr = FromStr::from_str(ip_str).unwrap();
88
89     // finally run the closure
90     f(SocketAddr { ip: ip_addr, port: ip_port })
91 }
92
93 pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
94     use std::util;
95     uv_socket_addr_as_socket_addr(addr, util::id)
96 }
97
98 #[cfg(test)]
99 #[test]
100 fn test_ip4_conversion() {
101     use std::rt;
102     let ip4 = rt::test::next_test_ip4();
103     assert_eq!(ip4, socket_addr_as_uv_socket_addr(ip4, uv_socket_addr_to_socket_addr));
104 }
105
106 #[cfg(test)]
107 #[test]
108 fn test_ip6_conversion() {
109     use std::rt;
110     let ip6 = rt::test::next_test_ip6();
111     assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
112 }
113
114 // uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
115 // and uv_file_t
116 pub struct StreamWatcher(*uvll::uv_stream_t);
117 impl Watcher for StreamWatcher { }
118
119 impl StreamWatcher {
120     pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
121         unsafe {
122             match uvll::read_start(self.native_handle(), alloc_cb, read_cb) {
123                 0 => {
124                     let data = self.get_watcher_data();
125                     data.alloc_cb = Some(alloc);
126                     data.read_cb = Some(cb);
127                 }
128                 n => {
129                     cb(*self, 0, empty_buf(), Some(UvError(n)))
130                 }
131             }
132         }
133
134         extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
135             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
136             let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref();
137             return (*alloc_cb)(suggested_size as uint);
138         }
139
140         extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) {
141             uvdebug!("buf addr: {}", buf.base);
142             uvdebug!("buf len: {}", buf.len);
143             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
144             let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
145             let status = status_to_maybe_uv_error(nread as c_int);
146             (*cb)(stream_watcher, nread as int, buf, status);
147         }
148     }
149
150     pub fn read_stop(&mut self) {
151         // It would be nice to drop the alloc and read callbacks here,
152         // but read_stop may be called from inside one of them and we
153         // would end up freeing the in-use environment
154         let handle = self.native_handle();
155         unsafe { assert_eq!(uvll::read_stop(handle), 0); }
156     }
157
158     pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
159         let req = WriteRequest::new();
160         return unsafe {
161             match uvll::write(req.native_handle(), self.native_handle(),
162                               [buf], write_cb) {
163                 0 => {
164                     let data = self.get_watcher_data();
165                     assert!(data.write_cb.is_none());
166                     data.write_cb = Some(cb);
167                 }
168                 n => {
169                     req.delete();
170                     cb(*self, Some(UvError(n)))
171                 }
172             }
173         };
174
175         extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
176             let write_request: WriteRequest = NativeHandle::from_native_handle(req);
177             let mut stream_watcher = write_request.stream();
178             write_request.delete();
179             let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap();
180             let status = status_to_maybe_uv_error(status);
181             cb(stream_watcher, status);
182         }
183     }
184
185
186     pub fn listen(&mut self, cb: ConnectionCallback) -> Result<(), UvError> {
187         {
188             let data = self.get_watcher_data();
189             assert!(data.connect_cb.is_none());
190             data.connect_cb = Some(cb);
191         }
192
193         return unsafe {
194             static BACKLOG: c_int = 128; // XXX should be configurable
195             match uvll::listen(self.native_handle(), BACKLOG, connection_cb) {
196                 0 => Ok(()),
197                 n => Err(UvError(n))
198             }
199         };
200
201         extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
202             uvdebug!("connection_cb");
203             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
204             let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
205             let status = status_to_maybe_uv_error(status);
206             (*cb)(stream_watcher, status);
207         }
208     }
209
210     pub fn accept(&mut self, stream: StreamWatcher) {
211         let self_handle = self.native_handle() as *c_void;
212         let stream_handle = stream.native_handle() as *c_void;
213         assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } );
214     }
215 }
216
217 impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
218     fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
219         StreamWatcher(handle)
220     }
221     fn native_handle(&self) -> *uvll::uv_stream_t {
222         match self { &StreamWatcher(ptr) => ptr }
223     }
224 }
225
226 pub struct TcpWatcher(*uvll::uv_tcp_t);
227 impl Watcher for TcpWatcher { }
228
229 impl TcpWatcher {
230     pub fn new(loop_: &Loop) -> TcpWatcher {
231         unsafe {
232             let handle = malloc_handle(UV_TCP);
233             assert!(handle.is_not_null());
234             assert_eq!(0, uvll::tcp_init(loop_.native_handle(), handle));
235             let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
236             watcher.install_watcher_data();
237             return watcher;
238         }
239     }
240
241     pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
242         do socket_addr_as_uv_socket_addr(address) |addr| {
243             let result = unsafe {
244                 match addr {
245                     UvIpv4SocketAddr(addr) => uvll::tcp_bind(self.native_handle(), addr),
246                     UvIpv6SocketAddr(addr) => uvll::tcp_bind6(self.native_handle(), addr),
247                 }
248             };
249             match result {
250                 0 => Ok(()),
251                 _ => Err(UvError(result)),
252             }
253         }
254     }
255
256     pub fn connect(&mut self, address: SocketAddr, cb: ConnectionCallback) {
257         unsafe {
258             assert!(self.get_watcher_data().connect_cb.is_none());
259             self.get_watcher_data().connect_cb = Some(cb);
260
261             let connect_handle = ConnectRequest::new().native_handle();
262             uvdebug!("connect_t: {}", connect_handle);
263             do socket_addr_as_uv_socket_addr(address) |addr| {
264                 let result = match addr {
265                     UvIpv4SocketAddr(addr) => uvll::tcp_connect(connect_handle,
266                                                       self.native_handle(), addr, connect_cb),
267                     UvIpv6SocketAddr(addr) => uvll::tcp_connect6(connect_handle,
268                                                        self.native_handle(), addr, connect_cb),
269                 };
270                 assert_eq!(0, result);
271             }
272
273             extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
274                 uvdebug!("connect_t: {}", req);
275                 let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
276                 let mut stream_watcher = connect_request.stream();
277                 connect_request.delete();
278                 let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
279                 let status = status_to_maybe_uv_error(status);
280                 cb(stream_watcher, status);
281             }
282         }
283     }
284
285     pub fn as_stream(&self) -> StreamWatcher {
286         NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t)
287     }
288 }
289
290 impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
291     fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
292         TcpWatcher(handle)
293     }
294     fn native_handle(&self) -> *uvll::uv_tcp_t {
295         match self { &TcpWatcher(ptr) => ptr }
296     }
297 }
298
299 pub struct UdpWatcher(*uvll::uv_udp_t);
300 impl Watcher for UdpWatcher { }
301
302 impl UdpWatcher {
303     pub fn new(loop_: &Loop) -> UdpWatcher {
304         unsafe {
305             let handle = malloc_handle(UV_UDP);
306             assert!(handle.is_not_null());
307             assert_eq!(0, uvll::udp_init(loop_.native_handle(), handle));
308             let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
309             watcher.install_watcher_data();
310             return watcher;
311         }
312     }
313
314     pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
315         do socket_addr_as_uv_socket_addr(address) |addr| {
316             let result = unsafe {
317                 match addr {
318                     UvIpv4SocketAddr(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32),
319                     UvIpv6SocketAddr(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32),
320                 }
321             };
322             match result {
323                 0 => Ok(()),
324                 _ => Err(UvError(result)),
325             }
326         }
327     }
328
329     pub fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) {
330         {
331             let data = self.get_watcher_data();
332             data.alloc_cb = Some(alloc);
333             data.udp_recv_cb = Some(cb);
334         }
335
336         unsafe { uvll::udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
337
338         extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
339             let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
340             let alloc_cb = udp_watcher.get_watcher_data().alloc_cb.get_ref();
341             return (*alloc_cb)(suggested_size as uint);
342         }
343
344         extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
345                           addr: *uvll::sockaddr, flags: c_uint) {
346             // When there's no data to read the recv callback can be a no-op.
347             // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
348             // this we just drop back to kqueue and wait for the next callback.
349             if nread == 0 {
350                 return;
351             }
352
353             uvdebug!("buf addr: {}", buf.base);
354             uvdebug!("buf len: {}", buf.len);
355             let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
356             let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
357             let status = status_to_maybe_uv_error(nread as c_int);
358             let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
359             (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
360         }
361     }
362
363     pub fn recv_stop(&mut self) {
364         unsafe { uvll::udp_recv_stop(self.native_handle()); }
365     }
366
367     pub fn send(&mut self, buf: Buf, address: SocketAddr, cb: UdpSendCallback) {
368         {
369             let data = self.get_watcher_data();
370             assert!(data.udp_send_cb.is_none());
371             data.udp_send_cb = Some(cb);
372         }
373
374         let req = UdpSendRequest::new();
375         do socket_addr_as_uv_socket_addr(address) |addr| {
376             let result = unsafe {
377                 match addr {
378                     UvIpv4SocketAddr(addr) => uvll::udp_send(req.native_handle(),
379                                                    self.native_handle(), [buf], addr, send_cb),
380                     UvIpv6SocketAddr(addr) => uvll::udp_send6(req.native_handle(),
381                                                     self.native_handle(), [buf], addr, send_cb),
382                 }
383             };
384             assert_eq!(0, result);
385         }
386
387         extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
388             let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
389             let mut udp_watcher = send_request.handle();
390             send_request.delete();
391             let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap();
392             let status = status_to_maybe_uv_error(status);
393             cb(udp_watcher, status);
394         }
395     }
396 }
397
398 impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
399     fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
400         UdpWatcher(handle)
401     }
402     fn native_handle(&self) -> *uvll::uv_udp_t {
403         match self { &UdpWatcher(ptr) => ptr }
404     }
405 }
406
407 // uv_connect_t is a subclass of uv_req_t
408 pub struct ConnectRequest(*uvll::uv_connect_t);
409 impl Request for ConnectRequest { }
410
411 impl ConnectRequest {
412
413     pub fn new() -> ConnectRequest {
414         let connect_handle = unsafe { malloc_req(UV_CONNECT) };
415         assert!(connect_handle.is_not_null());
416         ConnectRequest(connect_handle as *uvll::uv_connect_t)
417     }
418
419     fn stream(&self) -> StreamWatcher {
420         unsafe {
421             let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle());
422             NativeHandle::from_native_handle(stream_handle)
423         }
424     }
425
426     fn delete(self) {
427         unsafe { free_req(self.native_handle() as *c_void) }
428     }
429 }
430
431 impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
432     fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
433         ConnectRequest(handle)
434     }
435     fn native_handle(&self) -> *uvll::uv_connect_t {
436         match self { &ConnectRequest(ptr) => ptr }
437     }
438 }
439
440 pub struct WriteRequest(*uvll::uv_write_t);
441
442 impl Request for WriteRequest { }
443
444 impl WriteRequest {
445     pub fn new() -> WriteRequest {
446         let write_handle = unsafe { malloc_req(UV_WRITE) };
447         assert!(write_handle.is_not_null());
448         WriteRequest(write_handle as *uvll::uv_write_t)
449     }
450
451     pub fn stream(&self) -> StreamWatcher {
452         unsafe {
453             let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle());
454             NativeHandle::from_native_handle(stream_handle)
455         }
456     }
457
458     pub fn delete(self) {
459         unsafe { free_req(self.native_handle() as *c_void) }
460     }
461 }
462
463 impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
464     fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
465         WriteRequest(handle)
466     }
467     fn native_handle(&self) -> *uvll::uv_write_t {
468         match self { &WriteRequest(ptr) => ptr }
469     }
470 }
471
472 pub struct UdpSendRequest(*uvll::uv_udp_send_t);
473 impl Request for UdpSendRequest { }
474
475 impl UdpSendRequest {
476     pub fn new() -> UdpSendRequest {
477         let send_handle = unsafe { malloc_req(UV_UDP_SEND) };
478         assert!(send_handle.is_not_null());
479         UdpSendRequest(send_handle as *uvll::uv_udp_send_t)
480     }
481
482     pub fn handle(&self) -> UdpWatcher {
483         let send_request_handle = unsafe {
484             uvll::get_udp_handle_from_send_req(self.native_handle())
485         };
486         NativeHandle::from_native_handle(send_request_handle)
487     }
488
489     pub fn delete(self) {
490         unsafe { free_req(self.native_handle() as *c_void) }
491     }
492 }
493
494 impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
495     fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
496         UdpSendRequest(handle)
497     }
498     fn native_handle(&self) -> *uvll::uv_udp_send_t {
499         match self { &UdpSendRequest(ptr) => ptr }
500     }
501 }
502
503 #[cfg(test)]
504 mod test {
505     use super::*;
506     use std::util::ignore;
507     use std::cell::Cell;
508     use std::vec;
509     use std::unstable::run_in_bare_thread;
510     use std::rt::thread::Thread;
511     use std::rt::test::*;
512     use super::super::{Loop, AllocCallback};
513     use super::super::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
514
515     #[test]
516     fn connect_close_ip4() {
517         do run_in_bare_thread() {
518             let mut loop_ = Loop::new();
519             let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
520             // Connect to a port where nobody is listening
521             let addr = next_test_ip4();
522             do tcp_watcher.connect(addr) |stream_watcher, status| {
523                 uvdebug!("tcp_watcher.connect!");
524                 assert!(status.is_some());
525                 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
526                 stream_watcher.close(||());
527             }
528             loop_.run();
529             loop_.close();
530         }
531     }
532
533     #[test]
534     fn connect_close_ip6() {
535         do run_in_bare_thread() {
536             let mut loop_ = Loop::new();
537             let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
538             // Connect to a port where nobody is listening
539             let addr = next_test_ip6();
540             do tcp_watcher.connect(addr) |stream_watcher, status| {
541                 uvdebug!("tcp_watcher.connect!");
542                 assert!(status.is_some());
543                 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
544                 stream_watcher.close(||());
545             }
546             loop_.run();
547             loop_.close();
548         }
549     }
550
551     #[test]
552     fn udp_bind_close_ip4() {
553         do run_in_bare_thread() {
554             let mut loop_ = Loop::new();
555             let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
556             let addr = next_test_ip4();
557             udp_watcher.bind(addr);
558             udp_watcher.close(||());
559             loop_.run();
560             loop_.close();
561         }
562     }
563
564     #[test]
565     fn udp_bind_close_ip6() {
566         do run_in_bare_thread() {
567             let mut loop_ = Loop::new();
568             let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
569             let addr = next_test_ip6();
570             udp_watcher.bind(addr);
571             udp_watcher.close(||());
572             loop_.run();
573             loop_.close();
574         }
575     }
576
577     #[test]
578     fn listen_ip4() {
579         do run_in_bare_thread() {
580             static MAX: int = 10;
581             let mut loop_ = Loop::new();
582             let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
583             let addr = next_test_ip4();
584             server_tcp_watcher.bind(addr);
585             let loop_ = loop_;
586             uvdebug!("listening");
587             let mut stream = server_tcp_watcher.as_stream();
588             let res = do stream.listen |mut server_stream_watcher, status| {
589                 uvdebug!("listened!");
590                 assert!(status.is_none());
591                 let mut loop_ = loop_;
592                 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
593                 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
594                 server_stream_watcher.accept(client_tcp_watcher);
595                 let count_cell = Cell::new(0);
596                 let server_stream_watcher = server_stream_watcher;
597                 uvdebug!("starting read");
598                 let alloc: AllocCallback = |size| {
599                     vec_to_uv_buf(vec::from_elem(size, 0u8))
600                 };
601                 do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| {
602
603                     uvdebug!("i'm reading!");
604                     let buf = vec_from_uv_buf(buf);
605                     let mut count = count_cell.take();
606                     if status.is_none() {
607                         uvdebug!("got {} bytes", nread);
608                         let buf = buf.unwrap();
609                         for byte in buf.slice(0, nread as uint).iter() {
610                             assert!(*byte == count as u8);
611                             uvdebug!("{}", *byte as uint);
612                             count += 1;
613                         }
614                     } else {
615                         assert_eq!(count, MAX);
616                         do stream_watcher.close {
617                             server_stream_watcher.close(||());
618                         }
619                     }
620                     count_cell.put_back(count);
621                 }
622             };
623
624             assert!(res.is_ok());
625
626             let client_thread = do Thread::start {
627                 uvdebug!("starting client thread");
628                 let mut loop_ = Loop::new();
629                 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
630                 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
631                     uvdebug!("connecting");
632                     assert!(status.is_none());
633                     let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
634                     let buf = slice_to_uv_buf(msg);
635                     let msg_cell = Cell::new(msg);
636                     do stream_watcher.write(buf) |stream_watcher, status| {
637                         uvdebug!("writing");
638                         assert!(status.is_none());
639                         let msg_cell = Cell::new(msg_cell.take());
640                         stream_watcher.close(||ignore(msg_cell.take()));
641                     }
642                 }
643                 loop_.run();
644                 loop_.close();
645             };
646
647             let mut loop_ = loop_;
648             loop_.run();
649             loop_.close();
650             client_thread.join();
651         };
652     }
653
654     #[test]
655     fn listen_ip6() {
656         do run_in_bare_thread() {
657             static MAX: int = 10;
658             let mut loop_ = Loop::new();
659             let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
660             let addr = next_test_ip6();
661             server_tcp_watcher.bind(addr);
662             let loop_ = loop_;
663             uvdebug!("listening");
664             let mut stream = server_tcp_watcher.as_stream();
665             let res = do stream.listen |mut server_stream_watcher, status| {
666                 uvdebug!("listened!");
667                 assert!(status.is_none());
668                 let mut loop_ = loop_;
669                 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
670                 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
671                 server_stream_watcher.accept(client_tcp_watcher);
672                 let count_cell = Cell::new(0);
673                 let server_stream_watcher = server_stream_watcher;
674                 uvdebug!("starting read");
675                 let alloc: AllocCallback = |size| {
676                     vec_to_uv_buf(vec::from_elem(size, 0u8))
677                 };
678                 do client_tcp_watcher.read_start(alloc)
679                     |stream_watcher, nread, buf, status| {
680
681                     uvdebug!("i'm reading!");
682                     let buf = vec_from_uv_buf(buf);
683                     let mut count = count_cell.take();
684                     if status.is_none() {
685                         uvdebug!("got {} bytes", nread);
686                         let buf = buf.unwrap();
687                         let r = buf.slice(0, nread as uint);
688                         for byte in r.iter() {
689                             assert!(*byte == count as u8);
690                             uvdebug!("{}", *byte as uint);
691                             count += 1;
692                         }
693                     } else {
694                         assert_eq!(count, MAX);
695                         do stream_watcher.close {
696                             server_stream_watcher.close(||());
697                         }
698                     }
699                     count_cell.put_back(count);
700                 }
701             };
702             assert!(res.is_ok());
703
704             let client_thread = do Thread::start {
705                 uvdebug!("starting client thread");
706                 let mut loop_ = Loop::new();
707                 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
708                 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
709                     uvdebug!("connecting");
710                     assert!(status.is_none());
711                     let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
712                     let buf = slice_to_uv_buf(msg);
713                     let msg_cell = Cell::new(msg);
714                     do stream_watcher.write(buf) |stream_watcher, status| {
715                         uvdebug!("writing");
716                         assert!(status.is_none());
717                         let msg_cell = Cell::new(msg_cell.take());
718                         stream_watcher.close(||ignore(msg_cell.take()));
719                     }
720                 }
721                 loop_.run();
722                 loop_.close();
723             };
724
725             let mut loop_ = loop_;
726             loop_.run();
727             loop_.close();
728             client_thread.join();
729         }
730     }
731
732     #[test]
733     fn udp_recv_ip4() {
734         do run_in_bare_thread() {
735             static MAX: int = 10;
736             let mut loop_ = Loop::new();
737             let server_addr = next_test_ip4();
738             let client_addr = next_test_ip4();
739
740             let mut server = UdpWatcher::new(&loop_);
741             assert!(server.bind(server_addr).is_ok());
742
743             uvdebug!("starting read");
744             let alloc: AllocCallback = |size| {
745                 vec_to_uv_buf(vec::from_elem(size, 0u8))
746             };
747
748             do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
749                 server.recv_stop();
750                 uvdebug!("i'm reading!");
751                 assert!(status.is_none());
752                 assert_eq!(flags, 0);
753                 assert_eq!(src, client_addr);
754
755                 let buf = vec_from_uv_buf(buf);
756                 let mut count = 0;
757                 uvdebug!("got {} bytes", nread);
758
759                 let buf = buf.unwrap();
760                 for &byte in buf.slice(0, nread as uint).iter() {
761                     assert!(byte == count as u8);
762                     uvdebug!("{}", byte as uint);
763                     count += 1;
764                 }
765                 assert_eq!(count, MAX);
766
767                 server.close(||{});
768             }
769
770             let thread = do Thread::start {
771                 let mut loop_ = Loop::new();
772                 let mut client = UdpWatcher::new(&loop_);
773                 assert!(client.bind(client_addr).is_ok());
774                 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
775                 let buf = slice_to_uv_buf(msg);
776                 do client.send(buf, server_addr) |client, status| {
777                     uvdebug!("writing");
778                     assert!(status.is_none());
779                     client.close(||{});
780                 }
781
782                 loop_.run();
783                 loop_.close();
784             };
785
786             loop_.run();
787             loop_.close();
788             thread.join();
789         }
790     }
791
792     #[test]
793     fn udp_recv_ip6() {
794         do run_in_bare_thread() {
795             static MAX: int = 10;
796             let mut loop_ = Loop::new();
797             let server_addr = next_test_ip6();
798             let client_addr = next_test_ip6();
799
800             let mut server = UdpWatcher::new(&loop_);
801             assert!(server.bind(server_addr).is_ok());
802
803             uvdebug!("starting read");
804             let alloc: AllocCallback = |size| {
805                 vec_to_uv_buf(vec::from_elem(size, 0u8))
806             };
807
808             do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
809                 server.recv_stop();
810                 uvdebug!("i'm reading!");
811                 assert!(status.is_none());
812                 assert_eq!(flags, 0);
813                 assert_eq!(src, client_addr);
814
815                 let buf = vec_from_uv_buf(buf);
816                 let mut count = 0;
817                 uvdebug!("got {} bytes", nread);
818
819                 let buf = buf.unwrap();
820                 for &byte in buf.slice(0, nread as uint).iter() {
821                     assert!(byte == count as u8);
822                     uvdebug!("{}", byte as uint);
823                     count += 1;
824                 }
825                 assert_eq!(count, MAX);
826
827                 server.close(||{});
828             }
829
830             let thread = do Thread::start {
831                 let mut loop_ = Loop::new();
832                 let mut client = UdpWatcher::new(&loop_);
833                 assert!(client.bind(client_addr).is_ok());
834                 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
835                 let buf = slice_to_uv_buf(msg);
836                 do client.send(buf, server_addr) |client, status| {
837                     uvdebug!("writing");
838                     assert!(status.is_none());
839                     client.close(||{});
840                 }
841
842                 loop_.run();
843                 loop_.close();
844             };
845
846             loop_.run();
847             loop_.close();
848             thread.join();
849         }
850     }
851 }