]> git.lizzy.rs Git - rust.git/blob - src/librustuv/net.rs
uv: Remove lots of uv/C++ wrappers
[rust.git] / src / librustuv / net.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char};
12 use std::vec;
13 use std::str;
14 use std::rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
15
16 use uvll;
17 use uvll::*;
18 use super::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback,
19             UdpSendCallback, Loop, Watcher, Request, UvError, Buf, NativeHandle,
20             status_to_maybe_uv_error, empty_buf};
21
22 pub struct UvAddrInfo(*uvll::addrinfo);
23
24 pub enum UvSocketAddr {
25     UvIpv4SocketAddr(*sockaddr_in),
26     UvIpv6SocketAddr(*sockaddr_in6),
27 }
28
29 pub fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
30     unsafe {
31         assert!((is_ip4_addr(addr) || is_ip6_addr(addr)));
32         assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr)));
33         match addr {
34             _ if is_ip4_addr(addr) => UvIpv4SocketAddr(addr as *uvll::sockaddr_in),
35             _ if is_ip6_addr(addr) => UvIpv6SocketAddr(addr as *uvll::sockaddr_in6),
36             _ => fail!(),
37         }
38     }
39 }
40
41 fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
42     let malloc = match addr.ip {
43         Ipv4Addr(*) => malloc_ip4_addr,
44         Ipv6Addr(*) => malloc_ip6_addr,
45     };
46     let wrap = match addr.ip {
47         Ipv4Addr(*) => UvIpv4SocketAddr,
48         Ipv6Addr(*) => UvIpv6SocketAddr,
49     };
50     let free = match addr.ip {
51         Ipv4Addr(*) => free_ip4_addr,
52         Ipv6Addr(*) => free_ip6_addr,
53     };
54
55     let addr = unsafe { malloc(addr.ip.to_str(), addr.port as int) };
56     do (|| {
57         f(wrap(addr))
58     }).finally {
59         unsafe { free(addr) };
60     }
61 }
62
63 fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
64     let ip_size = match addr {
65         UvIpv4SocketAddr(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/,
66         UvIpv6SocketAddr(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/,
67     };
68     let ip_name = {
69         let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
70         unsafe {
71             let buf_ptr = vec::raw::to_ptr(buf);
72             match addr {
73                 UvIpv4SocketAddr(addr) =>
74                     uvll::uv_ip4_name(addr, buf_ptr as *c_char, ip_size as size_t),
75                 UvIpv6SocketAddr(addr) =>
76                     uvll::uv_ip6_name(addr, buf_ptr as *c_char, ip_size as size_t),
77             }
78         };
79         buf
80     };
81     let ip_port = unsafe {
82         let port = match addr {
83             UvIpv4SocketAddr(addr) => uvll::ip4_port(addr),
84             UvIpv6SocketAddr(addr) => uvll::ip6_port(addr),
85         };
86         port as u16
87     };
88     let ip_str = str::from_utf8_slice(ip_name).trim_right_chars(&'\x00');
89     let ip_addr = FromStr::from_str(ip_str).unwrap();
90
91     // finally run the closure
92     f(SocketAddr { ip: ip_addr, port: ip_port })
93 }
94
95 pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
96     use std::util;
97     uv_socket_addr_as_socket_addr(addr, util::id)
98 }
99
100 #[cfg(test)]
101 #[test]
102 fn test_ip4_conversion() {
103     use std::rt;
104     let ip4 = rt::test::next_test_ip4();
105     assert_eq!(ip4, socket_addr_as_uv_socket_addr(ip4, uv_socket_addr_to_socket_addr));
106 }
107
108 #[cfg(test)]
109 #[test]
110 fn test_ip6_conversion() {
111     use std::rt;
112     let ip6 = rt::test::next_test_ip6();
113     assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
114 }
115
116 // uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
117 // and uv_file_t
118 pub struct StreamWatcher(*uvll::uv_stream_t);
119 impl Watcher for StreamWatcher { }
120
121 impl StreamWatcher {
122     pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
123         unsafe {
124             match uvll::uv_read_start(self.native_handle(), alloc_cb, read_cb) {
125                 0 => {
126                     let data = self.get_watcher_data();
127                     data.alloc_cb = Some(alloc);
128                     data.read_cb = Some(cb);
129                 }
130                 n => {
131                     cb(*self, 0, empty_buf(), Some(UvError(n)))
132                 }
133             }
134         }
135
136         extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
137             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
138             let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref();
139             return (*alloc_cb)(suggested_size as uint);
140         }
141
142         extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) {
143             uvdebug!("buf addr: {}", buf.base);
144             uvdebug!("buf len: {}", buf.len);
145             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
146             let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
147             let status = status_to_maybe_uv_error(nread as c_int);
148             (*cb)(stream_watcher, nread as int, buf, status);
149         }
150     }
151
152     pub fn read_stop(&mut self) {
153         // It would be nice to drop the alloc and read callbacks here,
154         // but read_stop may be called from inside one of them and we
155         // would end up freeing the in-use environment
156         let handle = self.native_handle();
157         unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); }
158     }
159
160     pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
161         let req = WriteRequest::new();
162         return unsafe {
163             match uvll::uv_write(req.native_handle(), self.native_handle(),
164                                  [buf], write_cb) {
165                 0 => {
166                     let data = self.get_watcher_data();
167                     assert!(data.write_cb.is_none());
168                     data.write_cb = Some(cb);
169                 }
170                 n => {
171                     req.delete();
172                     cb(*self, Some(UvError(n)))
173                 }
174             }
175         };
176
177         extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
178             let write_request: WriteRequest = NativeHandle::from_native_handle(req);
179             let mut stream_watcher = write_request.stream();
180             write_request.delete();
181             let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap();
182             let status = status_to_maybe_uv_error(status);
183             cb(stream_watcher, status);
184         }
185     }
186
187
188     pub fn listen(&mut self, cb: ConnectionCallback) -> Result<(), UvError> {
189         {
190             let data = self.get_watcher_data();
191             assert!(data.connect_cb.is_none());
192             data.connect_cb = Some(cb);
193         }
194
195         return unsafe {
196             static BACKLOG: c_int = 128; // XXX should be configurable
197             match uvll::uv_listen(self.native_handle(), BACKLOG, connection_cb) {
198                 0 => Ok(()),
199                 n => Err(UvError(n))
200             }
201         };
202
203         extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
204             uvdebug!("connection_cb");
205             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
206             let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
207             let status = status_to_maybe_uv_error(status);
208             (*cb)(stream_watcher, status);
209         }
210     }
211
212     pub fn accept(&mut self, stream: StreamWatcher) {
213         let self_handle = self.native_handle() as *c_void;
214         let stream_handle = stream.native_handle() as *c_void;
215         assert_eq!(0, unsafe { uvll::uv_accept(self_handle, stream_handle) } );
216     }
217 }
218
219 impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
220     fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
221         StreamWatcher(handle)
222     }
223     fn native_handle(&self) -> *uvll::uv_stream_t {
224         match self { &StreamWatcher(ptr) => ptr }
225     }
226 }
227
228 pub struct TcpWatcher(*uvll::uv_tcp_t);
229 impl Watcher for TcpWatcher { }
230
231 impl TcpWatcher {
232     pub fn new(loop_: &Loop) -> TcpWatcher {
233         unsafe {
234             let handle = malloc_handle(UV_TCP);
235             assert!(handle.is_not_null());
236             assert_eq!(0, uvll::uv_tcp_init(loop_.native_handle(), handle));
237             let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
238             watcher.install_watcher_data();
239             return watcher;
240         }
241     }
242
243     pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
244         do socket_addr_as_uv_socket_addr(address) |addr| {
245             let result = unsafe {
246                 match addr {
247                     UvIpv4SocketAddr(addr) => uvll::tcp_bind(self.native_handle(), addr),
248                     UvIpv6SocketAddr(addr) => uvll::tcp_bind6(self.native_handle(), addr),
249                 }
250             };
251             match result {
252                 0 => Ok(()),
253                 _ => Err(UvError(result)),
254             }
255         }
256     }
257
258     pub fn connect(&mut self, address: SocketAddr, cb: ConnectionCallback) {
259         unsafe {
260             assert!(self.get_watcher_data().connect_cb.is_none());
261             self.get_watcher_data().connect_cb = Some(cb);
262
263             let connect_handle = ConnectRequest::new().native_handle();
264             uvdebug!("connect_t: {}", connect_handle);
265             do socket_addr_as_uv_socket_addr(address) |addr| {
266                 let result = match addr {
267                     UvIpv4SocketAddr(addr) => uvll::tcp_connect(connect_handle,
268                                                       self.native_handle(), addr, connect_cb),
269                     UvIpv6SocketAddr(addr) => uvll::tcp_connect6(connect_handle,
270                                                        self.native_handle(), addr, connect_cb),
271                 };
272                 assert_eq!(0, result);
273             }
274
275             extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
276                 uvdebug!("connect_t: {}", req);
277                 let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
278                 let mut stream_watcher = connect_request.stream();
279                 connect_request.delete();
280                 let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
281                 let status = status_to_maybe_uv_error(status);
282                 cb(stream_watcher, status);
283             }
284         }
285     }
286
287     pub fn as_stream(&self) -> StreamWatcher {
288         NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t)
289     }
290 }
291
292 impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
293     fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
294         TcpWatcher(handle)
295     }
296     fn native_handle(&self) -> *uvll::uv_tcp_t {
297         match self { &TcpWatcher(ptr) => ptr }
298     }
299 }
300
301 pub struct UdpWatcher(*uvll::uv_udp_t);
302 impl Watcher for UdpWatcher { }
303
304 impl UdpWatcher {
305     pub fn new(loop_: &Loop) -> UdpWatcher {
306         unsafe {
307             let handle = malloc_handle(UV_UDP);
308             assert!(handle.is_not_null());
309             assert_eq!(0, uvll::uv_udp_init(loop_.native_handle(), handle));
310             let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
311             watcher.install_watcher_data();
312             return watcher;
313         }
314     }
315
316     pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
317         do socket_addr_as_uv_socket_addr(address) |addr| {
318             let result = unsafe {
319                 match addr {
320                     UvIpv4SocketAddr(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32),
321                     UvIpv6SocketAddr(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32),
322                 }
323             };
324             match result {
325                 0 => Ok(()),
326                 _ => Err(UvError(result)),
327             }
328         }
329     }
330
331     pub fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) {
332         {
333             let data = self.get_watcher_data();
334             data.alloc_cb = Some(alloc);
335             data.udp_recv_cb = Some(cb);
336         }
337
338         unsafe { uvll::uv_udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
339
340         extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
341             let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
342             let alloc_cb = udp_watcher.get_watcher_data().alloc_cb.get_ref();
343             return (*alloc_cb)(suggested_size as uint);
344         }
345
346         extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
347                           addr: *uvll::sockaddr, flags: c_uint) {
348             // When there's no data to read the recv callback can be a no-op.
349             // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
350             // this we just drop back to kqueue and wait for the next callback.
351             if nread == 0 {
352                 return;
353             }
354
355             uvdebug!("buf addr: {}", buf.base);
356             uvdebug!("buf len: {}", buf.len);
357             let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
358             let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
359             let status = status_to_maybe_uv_error(nread as c_int);
360             let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
361             (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
362         }
363     }
364
365     pub fn recv_stop(&mut self) {
366         unsafe { uvll::uv_udp_recv_stop(self.native_handle()); }
367     }
368
369     pub fn send(&mut self, buf: Buf, address: SocketAddr, cb: UdpSendCallback) {
370         {
371             let data = self.get_watcher_data();
372             assert!(data.udp_send_cb.is_none());
373             data.udp_send_cb = Some(cb);
374         }
375
376         let req = UdpSendRequest::new();
377         do socket_addr_as_uv_socket_addr(address) |addr| {
378             let result = unsafe {
379                 match addr {
380                     UvIpv4SocketAddr(addr) => uvll::udp_send(req.native_handle(),
381                                                    self.native_handle(), [buf], addr, send_cb),
382                     UvIpv6SocketAddr(addr) => uvll::udp_send6(req.native_handle(),
383                                                     self.native_handle(), [buf], addr, send_cb),
384                 }
385             };
386             assert_eq!(0, result);
387         }
388
389         extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
390             let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
391             let mut udp_watcher = send_request.handle();
392             send_request.delete();
393             let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap();
394             let status = status_to_maybe_uv_error(status);
395             cb(udp_watcher, status);
396         }
397     }
398 }
399
400 impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
401     fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
402         UdpWatcher(handle)
403     }
404     fn native_handle(&self) -> *uvll::uv_udp_t {
405         match self { &UdpWatcher(ptr) => ptr }
406     }
407 }
408
409 // uv_connect_t is a subclass of uv_req_t
410 pub struct ConnectRequest(*uvll::uv_connect_t);
411 impl Request for ConnectRequest { }
412
413 impl ConnectRequest {
414
415     pub fn new() -> ConnectRequest {
416         let connect_handle = unsafe { malloc_req(UV_CONNECT) };
417         assert!(connect_handle.is_not_null());
418         ConnectRequest(connect_handle as *uvll::uv_connect_t)
419     }
420
421     fn stream(&self) -> StreamWatcher {
422         unsafe {
423             let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle());
424             NativeHandle::from_native_handle(stream_handle)
425         }
426     }
427
428     fn delete(self) {
429         unsafe { free_req(self.native_handle() as *c_void) }
430     }
431 }
432
433 impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
434     fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
435         ConnectRequest(handle)
436     }
437     fn native_handle(&self) -> *uvll::uv_connect_t {
438         match self { &ConnectRequest(ptr) => ptr }
439     }
440 }
441
442 pub struct WriteRequest(*uvll::uv_write_t);
443
444 impl Request for WriteRequest { }
445
446 impl WriteRequest {
447     pub fn new() -> WriteRequest {
448         let write_handle = unsafe { malloc_req(UV_WRITE) };
449         assert!(write_handle.is_not_null());
450         WriteRequest(write_handle as *uvll::uv_write_t)
451     }
452
453     pub fn stream(&self) -> StreamWatcher {
454         unsafe {
455             let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle());
456             NativeHandle::from_native_handle(stream_handle)
457         }
458     }
459
460     pub fn delete(self) {
461         unsafe { free_req(self.native_handle() as *c_void) }
462     }
463 }
464
465 impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
466     fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
467         WriteRequest(handle)
468     }
469     fn native_handle(&self) -> *uvll::uv_write_t {
470         match self { &WriteRequest(ptr) => ptr }
471     }
472 }
473
474 pub struct UdpSendRequest(*uvll::uv_udp_send_t);
475 impl Request for UdpSendRequest { }
476
477 impl UdpSendRequest {
478     pub fn new() -> UdpSendRequest {
479         let send_handle = unsafe { malloc_req(UV_UDP_SEND) };
480         assert!(send_handle.is_not_null());
481         UdpSendRequest(send_handle as *uvll::uv_udp_send_t)
482     }
483
484     pub fn handle(&self) -> UdpWatcher {
485         let send_request_handle = unsafe {
486             uvll::get_udp_handle_from_send_req(self.native_handle())
487         };
488         NativeHandle::from_native_handle(send_request_handle)
489     }
490
491     pub fn delete(self) {
492         unsafe { free_req(self.native_handle() as *c_void) }
493     }
494 }
495
496 impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
497     fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
498         UdpSendRequest(handle)
499     }
500     fn native_handle(&self) -> *uvll::uv_udp_send_t {
501         match self { &UdpSendRequest(ptr) => ptr }
502     }
503 }
504
505 #[cfg(test)]
506 mod test {
507     use super::*;
508     use std::util::ignore;
509     use std::cell::Cell;
510     use std::vec;
511     use std::unstable::run_in_bare_thread;
512     use std::rt::thread::Thread;
513     use std::rt::test::*;
514     use super::super::{Loop, AllocCallback};
515     use super::super::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
516
517     #[test]
518     fn connect_close_ip4() {
519         do run_in_bare_thread() {
520             let mut loop_ = Loop::new();
521             let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
522             // Connect to a port where nobody is listening
523             let addr = next_test_ip4();
524             do tcp_watcher.connect(addr) |stream_watcher, status| {
525                 uvdebug!("tcp_watcher.connect!");
526                 assert!(status.is_some());
527                 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
528                 stream_watcher.close(||());
529             }
530             loop_.run();
531             loop_.close();
532         }
533     }
534
535     #[test]
536     fn connect_close_ip6() {
537         do run_in_bare_thread() {
538             let mut loop_ = Loop::new();
539             let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
540             // Connect to a port where nobody is listening
541             let addr = next_test_ip6();
542             do tcp_watcher.connect(addr) |stream_watcher, status| {
543                 uvdebug!("tcp_watcher.connect!");
544                 assert!(status.is_some());
545                 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
546                 stream_watcher.close(||());
547             }
548             loop_.run();
549             loop_.close();
550         }
551     }
552
553     #[test]
554     fn udp_bind_close_ip4() {
555         do run_in_bare_thread() {
556             let mut loop_ = Loop::new();
557             let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
558             let addr = next_test_ip4();
559             udp_watcher.bind(addr);
560             udp_watcher.close(||());
561             loop_.run();
562             loop_.close();
563         }
564     }
565
566     #[test]
567     fn udp_bind_close_ip6() {
568         do run_in_bare_thread() {
569             let mut loop_ = Loop::new();
570             let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
571             let addr = next_test_ip6();
572             udp_watcher.bind(addr);
573             udp_watcher.close(||());
574             loop_.run();
575             loop_.close();
576         }
577     }
578
579     #[test]
580     fn listen_ip4() {
581         do run_in_bare_thread() {
582             static MAX: int = 10;
583             let mut loop_ = Loop::new();
584             let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
585             let addr = next_test_ip4();
586             server_tcp_watcher.bind(addr);
587             let loop_ = loop_;
588             uvdebug!("listening");
589             let mut stream = server_tcp_watcher.as_stream();
590             let res = do stream.listen |mut server_stream_watcher, status| {
591                 uvdebug!("listened!");
592                 assert!(status.is_none());
593                 let mut loop_ = loop_;
594                 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
595                 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
596                 server_stream_watcher.accept(client_tcp_watcher);
597                 let count_cell = Cell::new(0);
598                 let server_stream_watcher = server_stream_watcher;
599                 uvdebug!("starting read");
600                 let alloc: AllocCallback = |size| {
601                     vec_to_uv_buf(vec::from_elem(size, 0u8))
602                 };
603                 do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| {
604
605                     uvdebug!("i'm reading!");
606                     let buf = vec_from_uv_buf(buf);
607                     let mut count = count_cell.take();
608                     if status.is_none() {
609                         uvdebug!("got {} bytes", nread);
610                         let buf = buf.unwrap();
611                         for byte in buf.slice(0, nread as uint).iter() {
612                             assert!(*byte == count as u8);
613                             uvdebug!("{}", *byte as uint);
614                             count += 1;
615                         }
616                     } else {
617                         assert_eq!(count, MAX);
618                         do stream_watcher.close {
619                             server_stream_watcher.close(||());
620                         }
621                     }
622                     count_cell.put_back(count);
623                 }
624             };
625
626             assert!(res.is_ok());
627
628             let client_thread = do Thread::start {
629                 uvdebug!("starting client thread");
630                 let mut loop_ = Loop::new();
631                 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
632                 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
633                     uvdebug!("connecting");
634                     assert!(status.is_none());
635                     let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
636                     let buf = slice_to_uv_buf(msg);
637                     let msg_cell = Cell::new(msg);
638                     do stream_watcher.write(buf) |stream_watcher, status| {
639                         uvdebug!("writing");
640                         assert!(status.is_none());
641                         let msg_cell = Cell::new(msg_cell.take());
642                         stream_watcher.close(||ignore(msg_cell.take()));
643                     }
644                 }
645                 loop_.run();
646                 loop_.close();
647             };
648
649             let mut loop_ = loop_;
650             loop_.run();
651             loop_.close();
652             client_thread.join();
653         };
654     }
655
656     #[test]
657     fn listen_ip6() {
658         do run_in_bare_thread() {
659             static MAX: int = 10;
660             let mut loop_ = Loop::new();
661             let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
662             let addr = next_test_ip6();
663             server_tcp_watcher.bind(addr);
664             let loop_ = loop_;
665             uvdebug!("listening");
666             let mut stream = server_tcp_watcher.as_stream();
667             let res = do stream.listen |mut server_stream_watcher, status| {
668                 uvdebug!("listened!");
669                 assert!(status.is_none());
670                 let mut loop_ = loop_;
671                 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
672                 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
673                 server_stream_watcher.accept(client_tcp_watcher);
674                 let count_cell = Cell::new(0);
675                 let server_stream_watcher = server_stream_watcher;
676                 uvdebug!("starting read");
677                 let alloc: AllocCallback = |size| {
678                     vec_to_uv_buf(vec::from_elem(size, 0u8))
679                 };
680                 do client_tcp_watcher.read_start(alloc)
681                     |stream_watcher, nread, buf, status| {
682
683                     uvdebug!("i'm reading!");
684                     let buf = vec_from_uv_buf(buf);
685                     let mut count = count_cell.take();
686                     if status.is_none() {
687                         uvdebug!("got {} bytes", nread);
688                         let buf = buf.unwrap();
689                         let r = buf.slice(0, nread as uint);
690                         for byte in r.iter() {
691                             assert!(*byte == count as u8);
692                             uvdebug!("{}", *byte as uint);
693                             count += 1;
694                         }
695                     } else {
696                         assert_eq!(count, MAX);
697                         do stream_watcher.close {
698                             server_stream_watcher.close(||());
699                         }
700                     }
701                     count_cell.put_back(count);
702                 }
703             };
704             assert!(res.is_ok());
705
706             let client_thread = do Thread::start {
707                 uvdebug!("starting client thread");
708                 let mut loop_ = Loop::new();
709                 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
710                 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
711                     uvdebug!("connecting");
712                     assert!(status.is_none());
713                     let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
714                     let buf = slice_to_uv_buf(msg);
715                     let msg_cell = Cell::new(msg);
716                     do stream_watcher.write(buf) |stream_watcher, status| {
717                         uvdebug!("writing");
718                         assert!(status.is_none());
719                         let msg_cell = Cell::new(msg_cell.take());
720                         stream_watcher.close(||ignore(msg_cell.take()));
721                     }
722                 }
723                 loop_.run();
724                 loop_.close();
725             };
726
727             let mut loop_ = loop_;
728             loop_.run();
729             loop_.close();
730             client_thread.join();
731         }
732     }
733
734     #[test]
735     fn udp_recv_ip4() {
736         do run_in_bare_thread() {
737             static MAX: int = 10;
738             let mut loop_ = Loop::new();
739             let server_addr = next_test_ip4();
740             let client_addr = next_test_ip4();
741
742             let mut server = UdpWatcher::new(&loop_);
743             assert!(server.bind(server_addr).is_ok());
744
745             uvdebug!("starting read");
746             let alloc: AllocCallback = |size| {
747                 vec_to_uv_buf(vec::from_elem(size, 0u8))
748             };
749
750             do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
751                 server.recv_stop();
752                 uvdebug!("i'm reading!");
753                 assert!(status.is_none());
754                 assert_eq!(flags, 0);
755                 assert_eq!(src, client_addr);
756
757                 let buf = vec_from_uv_buf(buf);
758                 let mut count = 0;
759                 uvdebug!("got {} bytes", nread);
760
761                 let buf = buf.unwrap();
762                 for &byte in buf.slice(0, nread as uint).iter() {
763                     assert!(byte == count as u8);
764                     uvdebug!("{}", byte as uint);
765                     count += 1;
766                 }
767                 assert_eq!(count, MAX);
768
769                 server.close(||{});
770             }
771
772             let thread = do Thread::start {
773                 let mut loop_ = Loop::new();
774                 let mut client = UdpWatcher::new(&loop_);
775                 assert!(client.bind(client_addr).is_ok());
776                 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
777                 let buf = slice_to_uv_buf(msg);
778                 do client.send(buf, server_addr) |client, status| {
779                     uvdebug!("writing");
780                     assert!(status.is_none());
781                     client.close(||{});
782                 }
783
784                 loop_.run();
785                 loop_.close();
786             };
787
788             loop_.run();
789             loop_.close();
790             thread.join();
791         }
792     }
793
794     #[test]
795     fn udp_recv_ip6() {
796         do run_in_bare_thread() {
797             static MAX: int = 10;
798             let mut loop_ = Loop::new();
799             let server_addr = next_test_ip6();
800             let client_addr = next_test_ip6();
801
802             let mut server = UdpWatcher::new(&loop_);
803             assert!(server.bind(server_addr).is_ok());
804
805             uvdebug!("starting read");
806             let alloc: AllocCallback = |size| {
807                 vec_to_uv_buf(vec::from_elem(size, 0u8))
808             };
809
810             do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
811                 server.recv_stop();
812                 uvdebug!("i'm reading!");
813                 assert!(status.is_none());
814                 assert_eq!(flags, 0);
815                 assert_eq!(src, client_addr);
816
817                 let buf = vec_from_uv_buf(buf);
818                 let mut count = 0;
819                 uvdebug!("got {} bytes", nread);
820
821                 let buf = buf.unwrap();
822                 for &byte in buf.slice(0, nread as uint).iter() {
823                     assert!(byte == count as u8);
824                     uvdebug!("{}", byte as uint);
825                     count += 1;
826                 }
827                 assert_eq!(count, MAX);
828
829                 server.close(||{});
830             }
831
832             let thread = do Thread::start {
833                 let mut loop_ = Loop::new();
834                 let mut client = UdpWatcher::new(&loop_);
835                 assert!(client.bind(client_addr).is_ok());
836                 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
837                 let buf = slice_to_uv_buf(msg);
838                 do client.send(buf, server_addr) |client, status| {
839                     uvdebug!("writing");
840                     assert!(status.is_none());
841                     client.close(||{});
842                 }
843
844                 loop_.run();
845                 loop_.close();
846             };
847
848             loop_.run();
849             loop_.close();
850             thread.join();
851         }
852     }
853 }