]> git.lizzy.rs Git - rust.git/blob - src/libstd/rt/uv/net.rs
Revert "auto merge of #8645 : alexcrichton/rust/issue-6436-run-non-blocking, r=brson"
[rust.git] / src / libstd / rt / uv / net.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use prelude::*;
12 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
13 use rt::uv::uvll;
14 use rt::uv::uvll::*;
15 use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, UdpSendCallback};
16 use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback,
17              status_to_maybe_uv_error};
18 use rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
19 use rt::uv::last_uv_error;
20 use vec;
21 use str;
22 use from_str::{FromStr};
23
24 pub enum UvSocketAddr {
25     UvIpv4SocketAddr(*sockaddr_in),
26     UvIpv6SocketAddr(*sockaddr_in6),
27 }
28
29 fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
30     unsafe {
31         assert!((is_ip4_addr(addr) || is_ip6_addr(addr)));
32         assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr)));
33         match addr {
34             _ if is_ip4_addr(addr) => UvIpv4SocketAddr(addr as *uvll::sockaddr_in),
35             _ if is_ip6_addr(addr) => UvIpv6SocketAddr(addr as *uvll::sockaddr_in6),
36             _ => fail!(),
37         }
38     }
39 }
40
41 fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
42     let malloc = match addr.ip {
43         Ipv4Addr(*) => malloc_ip4_addr,
44         Ipv6Addr(*) => malloc_ip6_addr,
45     };
46     let wrap = match addr.ip {
47         Ipv4Addr(*) => UvIpv4SocketAddr,
48         Ipv6Addr(*) => UvIpv6SocketAddr,
49     };
50     let free = match addr.ip {
51         Ipv4Addr(*) => free_ip4_addr,
52         Ipv6Addr(*) => free_ip6_addr,
53     };
54
55     let addr = unsafe { malloc(addr.ip.to_str(), addr.port as int) };
56     do (|| {
57         f(wrap(addr))
58     }).finally {
59         unsafe { free(addr) };
60     }
61 }
62
63 fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
64     let ip_size = match addr {
65         UvIpv4SocketAddr(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/,
66         UvIpv6SocketAddr(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/,
67     };
68     let ip_name = {
69         let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
70         unsafe {
71             let buf_ptr = vec::raw::to_ptr(buf);
72             match addr {
73                 UvIpv4SocketAddr(addr) => uvll::ip4_name(addr, buf_ptr, ip_size as size_t),
74                 UvIpv6SocketAddr(addr) => uvll::ip6_name(addr, buf_ptr, ip_size as size_t),
75             }
76         };
77         buf
78     };
79     let ip_port = unsafe {
80         let port = match addr {
81             UvIpv4SocketAddr(addr) => uvll::ip4_port(addr),
82             UvIpv6SocketAddr(addr) => uvll::ip6_port(addr),
83         };
84         port as u16
85     };
86     let ip_str = str::from_bytes_slice(ip_name).trim_right_chars(&'\x00');
87     let ip_addr = FromStr::from_str(ip_str).unwrap();
88
89     // finally run the closure
90     f(SocketAddr { ip: ip_addr, port: ip_port })
91 }
92
93 pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
94     use util;
95     uv_socket_addr_as_socket_addr(addr, util::id)
96 }
97
98 #[cfg(test)]
99 #[test]
100 fn test_ip4_conversion() {
101     use rt;
102     let ip4 = rt::test::next_test_ip4();
103     assert_eq!(ip4, socket_addr_as_uv_socket_addr(ip4, uv_socket_addr_to_socket_addr));
104 }
105
106 #[cfg(test)]
107 #[test]
108 fn test_ip6_conversion() {
109     use rt;
110     let ip6 = rt::test::next_test_ip6();
111     assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
112 }
113
114 // uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
115 // and uv_file_t
116 pub struct StreamWatcher(*uvll::uv_stream_t);
117 impl Watcher for StreamWatcher { }
118
119 impl StreamWatcher {
120     pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
121         {
122             let data = self.get_watcher_data();
123             data.alloc_cb = Some(alloc);
124             data.read_cb = Some(cb);
125         }
126
127         unsafe { uvll::read_start(self.native_handle(), alloc_cb, read_cb); }
128
129         extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
130             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
131             let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref();
132             return (*alloc_cb)(suggested_size as uint);
133         }
134
135         extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) {
136             rtdebug!("buf addr: %x", buf.base as uint);
137             rtdebug!("buf len: %d", buf.len as int);
138             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
139             let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
140             let status = status_to_maybe_uv_error(stream_watcher, nread as c_int);
141             (*cb)(stream_watcher, nread as int, buf, status);
142         }
143     }
144
145     pub fn read_stop(&mut self) {
146         // It would be nice to drop the alloc and read callbacks here,
147         // but read_stop may be called from inside one of them and we
148         // would end up freeing the in-use environment
149         let handle = self.native_handle();
150         unsafe { uvll::read_stop(handle); }
151     }
152
153     pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
154         {
155             let data = self.get_watcher_data();
156             assert!(data.write_cb.is_none());
157             data.write_cb = Some(cb);
158         }
159
160         let req = WriteRequest::new();
161         unsafe {
162         assert_eq!(0, uvll::write(req.native_handle(), self.native_handle(), [buf], write_cb));
163         }
164
165         extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
166             let write_request: WriteRequest = NativeHandle::from_native_handle(req);
167             let mut stream_watcher = write_request.stream();
168             write_request.delete();
169             let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap();
170             let status = status_to_maybe_uv_error(stream_watcher, status);
171             cb(stream_watcher, status);
172         }
173     }
174
175     pub fn accept(&mut self, stream: StreamWatcher) {
176         let self_handle = self.native_handle() as *c_void;
177         let stream_handle = stream.native_handle() as *c_void;
178         assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } );
179     }
180
181     pub fn close(self, cb: NullCallback) {
182         {
183             let mut this = self;
184             let data = this.get_watcher_data();
185             assert!(data.close_cb.is_none());
186             data.close_cb = Some(cb);
187         }
188
189         unsafe { uvll::close(self.native_handle(), close_cb); }
190
191         extern fn close_cb(handle: *uvll::uv_stream_t) {
192             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
193             let cb = stream_watcher.get_watcher_data().close_cb.take_unwrap();
194             stream_watcher.drop_watcher_data();
195             unsafe { free_handle(handle as *c_void) }
196             cb();
197         }
198     }
199 }
200
201 impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
202     fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
203         StreamWatcher(handle)
204     }
205     fn native_handle(&self) -> *uvll::uv_stream_t {
206         match self { &StreamWatcher(ptr) => ptr }
207     }
208 }
209
210 pub struct TcpWatcher(*uvll::uv_tcp_t);
211 impl Watcher for TcpWatcher { }
212
213 impl TcpWatcher {
214     pub fn new(loop_: &Loop) -> TcpWatcher {
215         unsafe {
216             let handle = malloc_handle(UV_TCP);
217             assert!(handle.is_not_null());
218             assert_eq!(0, uvll::tcp_init(loop_.native_handle(), handle));
219             let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
220             watcher.install_watcher_data();
221             return watcher;
222         }
223     }
224
225     pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
226         do socket_addr_as_uv_socket_addr(address) |addr| {
227             let result = unsafe {
228                 match addr {
229                     UvIpv4SocketAddr(addr) => uvll::tcp_bind(self.native_handle(), addr),
230                     UvIpv6SocketAddr(addr) => uvll::tcp_bind6(self.native_handle(), addr),
231                 }
232             };
233             match result {
234                 0 => Ok(()),
235                 _ => Err(last_uv_error(self)),
236             }
237         }
238     }
239
240     pub fn connect(&mut self, address: SocketAddr, cb: ConnectionCallback) {
241         unsafe {
242             assert!(self.get_watcher_data().connect_cb.is_none());
243             self.get_watcher_data().connect_cb = Some(cb);
244
245             let connect_handle = ConnectRequest::new().native_handle();
246             rtdebug!("connect_t: %x", connect_handle as uint);
247             do socket_addr_as_uv_socket_addr(address) |addr| {
248                 let result = match addr {
249                     UvIpv4SocketAddr(addr) => uvll::tcp_connect(connect_handle,
250                                                       self.native_handle(), addr, connect_cb),
251                     UvIpv6SocketAddr(addr) => uvll::tcp_connect6(connect_handle,
252                                                        self.native_handle(), addr, connect_cb),
253                 };
254                 assert_eq!(0, result);
255             }
256
257             extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
258                 rtdebug!("connect_t: %x", req as uint);
259                 let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
260                 let mut stream_watcher = connect_request.stream();
261                 connect_request.delete();
262                 let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
263                 let status = status_to_maybe_uv_error(stream_watcher, status);
264                 cb(stream_watcher, status);
265             }
266         }
267     }
268
269     pub fn listen(&mut self, cb: ConnectionCallback) {
270         {
271             let data = self.get_watcher_data();
272             assert!(data.connect_cb.is_none());
273             data.connect_cb = Some(cb);
274         }
275
276         unsafe {
277             static BACKLOG: c_int = 128; // XXX should be configurable
278             // XXX: This can probably fail
279             assert_eq!(0, uvll::listen(self.native_handle(), BACKLOG, connection_cb));
280         }
281
282         extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
283             rtdebug!("connection_cb");
284             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
285             let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
286             let status = status_to_maybe_uv_error(stream_watcher, status);
287             (*cb)(stream_watcher, status);
288         }
289     }
290
291     pub fn as_stream(&self) -> StreamWatcher {
292         NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t)
293     }
294 }
295
296 impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
297     fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
298         TcpWatcher(handle)
299     }
300     fn native_handle(&self) -> *uvll::uv_tcp_t {
301         match self { &TcpWatcher(ptr) => ptr }
302     }
303 }
304
305 pub struct UdpWatcher(*uvll::uv_udp_t);
306 impl Watcher for UdpWatcher { }
307
308 impl UdpWatcher {
309     pub fn new(loop_: &Loop) -> UdpWatcher {
310         unsafe {
311             let handle = malloc_handle(UV_UDP);
312             assert!(handle.is_not_null());
313             assert_eq!(0, uvll::udp_init(loop_.native_handle(), handle));
314             let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
315             watcher.install_watcher_data();
316             return watcher;
317         }
318     }
319
320     pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
321         do socket_addr_as_uv_socket_addr(address) |addr| {
322             let result = unsafe {
323                 match addr {
324                     UvIpv4SocketAddr(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32),
325                     UvIpv6SocketAddr(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32),
326                 }
327             };
328             match result {
329                 0 => Ok(()),
330                 _ => Err(last_uv_error(self)),
331             }
332         }
333     }
334
335     pub fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) {
336         {
337             let data = self.get_watcher_data();
338             data.alloc_cb = Some(alloc);
339             data.udp_recv_cb = Some(cb);
340         }
341
342         unsafe { uvll::udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
343
344         extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
345             let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
346             let alloc_cb = udp_watcher.get_watcher_data().alloc_cb.get_ref();
347             return (*alloc_cb)(suggested_size as uint);
348         }
349
350         extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
351                           addr: *uvll::sockaddr, flags: c_uint) {
352             // When there's no data to read the recv callback can be a no-op.
353             // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
354             // this we just drop back to kqueue and wait for the next callback.
355             if nread == 0 {
356                 return;
357             }
358
359             rtdebug!("buf addr: %x", buf.base as uint);
360             rtdebug!("buf len: %d", buf.len as int);
361             let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
362             let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
363             let status = status_to_maybe_uv_error(udp_watcher, nread as c_int);
364             let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
365             (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
366         }
367     }
368
369     pub fn recv_stop(&mut self) {
370         unsafe { uvll::udp_recv_stop(self.native_handle()); }
371     }
372
373     pub fn send(&mut self, buf: Buf, address: SocketAddr, cb: UdpSendCallback) {
374         {
375             let data = self.get_watcher_data();
376             assert!(data.udp_send_cb.is_none());
377             data.udp_send_cb = Some(cb);
378         }
379
380         let req = UdpSendRequest::new();
381         do socket_addr_as_uv_socket_addr(address) |addr| {
382             let result = unsafe {
383                 match addr {
384                     UvIpv4SocketAddr(addr) => uvll::udp_send(req.native_handle(),
385                                                    self.native_handle(), [buf], addr, send_cb),
386                     UvIpv6SocketAddr(addr) => uvll::udp_send6(req.native_handle(),
387                                                     self.native_handle(), [buf], addr, send_cb),
388                 }
389             };
390             assert_eq!(0, result);
391         }
392
393         extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
394             let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
395             let mut udp_watcher = send_request.handle();
396             send_request.delete();
397             let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap();
398             let status = status_to_maybe_uv_error(udp_watcher, status);
399             cb(udp_watcher, status);
400         }
401     }
402
403     pub fn close(self, cb: NullCallback) {
404         {
405             let mut this = self;
406             let data = this.get_watcher_data();
407             assert!(data.close_cb.is_none());
408             data.close_cb = Some(cb);
409         }
410
411         unsafe { uvll::close(self.native_handle(), close_cb); }
412
413         extern fn close_cb(handle: *uvll::uv_udp_t) {
414             let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
415             let cb = udp_watcher.get_watcher_data().close_cb.take_unwrap();
416             udp_watcher.drop_watcher_data();
417             unsafe { free_handle(handle as *c_void) }
418             cb();
419         }
420     }
421 }
422
423 impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
424     fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
425         UdpWatcher(handle)
426     }
427     fn native_handle(&self) -> *uvll::uv_udp_t {
428         match self { &UdpWatcher(ptr) => ptr }
429     }
430 }
431
432 // uv_connect_t is a subclass of uv_req_t
433 struct ConnectRequest(*uvll::uv_connect_t);
434 impl Request for ConnectRequest { }
435
436 impl ConnectRequest {
437
438     fn new() -> ConnectRequest {
439         let connect_handle = unsafe { malloc_req(UV_CONNECT) };
440         assert!(connect_handle.is_not_null());
441         ConnectRequest(connect_handle as *uvll::uv_connect_t)
442     }
443
444     fn stream(&self) -> StreamWatcher {
445         unsafe {
446             let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle());
447             NativeHandle::from_native_handle(stream_handle)
448         }
449     }
450
451     fn delete(self) {
452         unsafe { free_req(self.native_handle() as *c_void) }
453     }
454 }
455
456 impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
457     fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
458         ConnectRequest(handle)
459     }
460     fn native_handle(&self) -> *uvll::uv_connect_t {
461         match self { &ConnectRequest(ptr) => ptr }
462     }
463 }
464
465 pub struct WriteRequest(*uvll::uv_write_t);
466
467 impl Request for WriteRequest { }
468
469 impl WriteRequest {
470     pub fn new() -> WriteRequest {
471         let write_handle = unsafe { malloc_req(UV_WRITE) };
472         assert!(write_handle.is_not_null());
473         WriteRequest(write_handle as *uvll::uv_write_t)
474     }
475
476     pub fn stream(&self) -> StreamWatcher {
477         unsafe {
478             let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle());
479             NativeHandle::from_native_handle(stream_handle)
480         }
481     }
482
483     pub fn delete(self) {
484         unsafe { free_req(self.native_handle() as *c_void) }
485     }
486 }
487
488 impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
489     fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
490         WriteRequest(handle)
491     }
492     fn native_handle(&self) -> *uvll::uv_write_t {
493         match self { &WriteRequest(ptr) => ptr }
494     }
495 }
496
497 pub struct UdpSendRequest(*uvll::uv_udp_send_t);
498 impl Request for UdpSendRequest { }
499
500 impl UdpSendRequest {
501     pub fn new() -> UdpSendRequest {
502         let send_handle = unsafe { malloc_req(UV_UDP_SEND) };
503         assert!(send_handle.is_not_null());
504         UdpSendRequest(send_handle as *uvll::uv_udp_send_t)
505     }
506
507     pub fn handle(&self) -> UdpWatcher {
508         let send_request_handle = unsafe {
509             uvll::get_udp_handle_from_send_req(self.native_handle())
510         };
511         NativeHandle::from_native_handle(send_request_handle)
512     }
513
514     pub fn delete(self) {
515         unsafe { free_req(self.native_handle() as *c_void) }
516     }
517 }
518
519 impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
520     fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
521         UdpSendRequest(handle)
522     }
523     fn native_handle(&self) -> *uvll::uv_udp_send_t {
524         match self { &UdpSendRequest(ptr) => ptr }
525     }
526 }
527
528 #[cfg(test)]
529 mod test {
530     use super::*;
531     use util::ignore;
532     use cell::Cell;
533     use vec;
534     use unstable::run_in_bare_thread;
535     use rt::thread::Thread;
536     use rt::test::*;
537     use rt::uv::{Loop, AllocCallback};
538     use rt::uv::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
539     use prelude::*;
540
541     #[test]
542     fn connect_close_ip4() {
543         do run_in_bare_thread() {
544             let mut loop_ = Loop::new();
545             let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
546             // Connect to a port where nobody is listening
547             let addr = next_test_ip4();
548             do tcp_watcher.connect(addr) |stream_watcher, status| {
549                 rtdebug!("tcp_watcher.connect!");
550                 assert!(status.is_some());
551                 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
552                 stream_watcher.close(||());
553             }
554             loop_.run();
555             loop_.close();
556         }
557     }
558
559     #[test]
560     fn connect_close_ip6() {
561         do run_in_bare_thread() {
562             let mut loop_ = Loop::new();
563             let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
564             // Connect to a port where nobody is listening
565             let addr = next_test_ip6();
566             do tcp_watcher.connect(addr) |stream_watcher, status| {
567                 rtdebug!("tcp_watcher.connect!");
568                 assert!(status.is_some());
569                 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
570                 stream_watcher.close(||());
571             }
572             loop_.run();
573             loop_.close();
574         }
575     }
576
577     #[test]
578     fn udp_bind_close_ip4() {
579         do run_in_bare_thread() {
580             let mut loop_ = Loop::new();
581             let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
582             let addr = next_test_ip4();
583             udp_watcher.bind(addr);
584             udp_watcher.close(||());
585             loop_.run();
586             loop_.close();
587         }
588     }
589
590     #[test]
591     fn udp_bind_close_ip6() {
592         do run_in_bare_thread() {
593             let mut loop_ = Loop::new();
594             let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
595             let addr = next_test_ip6();
596             udp_watcher.bind(addr);
597             udp_watcher.close(||());
598             loop_.run();
599             loop_.close();
600         }
601     }
602
603     #[test]
604     fn listen_ip4() {
605         do run_in_bare_thread() {
606             static MAX: int = 10;
607             let mut loop_ = Loop::new();
608             let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
609             let addr = next_test_ip4();
610             server_tcp_watcher.bind(addr);
611             let loop_ = loop_;
612             rtdebug!("listening");
613             do server_tcp_watcher.listen |mut server_stream_watcher, status| {
614                 rtdebug!("listened!");
615                 assert!(status.is_none());
616                 let mut loop_ = loop_;
617                 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
618                 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
619                 server_stream_watcher.accept(client_tcp_watcher);
620                 let count_cell = Cell::new(0);
621                 let server_stream_watcher = server_stream_watcher;
622                 rtdebug!("starting read");
623                 let alloc: AllocCallback = |size| {
624                     vec_to_uv_buf(vec::from_elem(size, 0u8))
625                 };
626                 do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| {
627
628                     rtdebug!("i'm reading!");
629                     let buf = vec_from_uv_buf(buf);
630                     let mut count = count_cell.take();
631                     if status.is_none() {
632                         rtdebug!("got %d bytes", nread);
633                         let buf = buf.unwrap();
634                         for byte in buf.slice(0, nread as uint).iter() {
635                             assert!(*byte == count as u8);
636                             rtdebug!("%u", *byte as uint);
637                             count += 1;
638                         }
639                     } else {
640                         assert_eq!(count, MAX);
641                         do stream_watcher.close {
642                             server_stream_watcher.close(||());
643                         }
644                     }
645                     count_cell.put_back(count);
646                 }
647             }
648
649             let client_thread = do Thread::start {
650                 rtdebug!("starting client thread");
651                 let mut loop_ = Loop::new();
652                 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
653                 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
654                     rtdebug!("connecting");
655                     assert!(status.is_none());
656                     let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
657                     let buf = slice_to_uv_buf(msg);
658                     let msg_cell = Cell::new(msg);
659                     do stream_watcher.write(buf) |stream_watcher, status| {
660                         rtdebug!("writing");
661                         assert!(status.is_none());
662                         let msg_cell = Cell::new(msg_cell.take());
663                         stream_watcher.close(||ignore(msg_cell.take()));
664                     }
665                 }
666                 loop_.run();
667                 loop_.close();
668             };
669
670             let mut loop_ = loop_;
671             loop_.run();
672             loop_.close();
673             client_thread.join();
674         }
675     }
676
677     #[test]
678     fn listen_ip6() {
679         do run_in_bare_thread() {
680             static MAX: int = 10;
681             let mut loop_ = Loop::new();
682             let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
683             let addr = next_test_ip6();
684             server_tcp_watcher.bind(addr);
685             let loop_ = loop_;
686             rtdebug!("listening");
687             do server_tcp_watcher.listen |mut server_stream_watcher, status| {
688                 rtdebug!("listened!");
689                 assert!(status.is_none());
690                 let mut loop_ = loop_;
691                 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
692                 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
693                 server_stream_watcher.accept(client_tcp_watcher);
694                 let count_cell = Cell::new(0);
695                 let server_stream_watcher = server_stream_watcher;
696                 rtdebug!("starting read");
697                 let alloc: AllocCallback = |size| {
698                     vec_to_uv_buf(vec::from_elem(size, 0u8))
699                 };
700                 do client_tcp_watcher.read_start(alloc)
701                     |stream_watcher, nread, buf, status| {
702
703                     rtdebug!("i'm reading!");
704                     let buf = vec_from_uv_buf(buf);
705                     let mut count = count_cell.take();
706                     if status.is_none() {
707                         rtdebug!("got %d bytes", nread);
708                         let buf = buf.unwrap();
709                         let r = buf.slice(0, nread as uint);
710                         for byte in r.iter() {
711                             assert!(*byte == count as u8);
712                             rtdebug!("%u", *byte as uint);
713                             count += 1;
714                         }
715                     } else {
716                         assert_eq!(count, MAX);
717                         do stream_watcher.close {
718                             server_stream_watcher.close(||());
719                         }
720                     }
721                     count_cell.put_back(count);
722                 }
723             }
724
725             let client_thread = do Thread::start {
726                 rtdebug!("starting client thread");
727                 let mut loop_ = Loop::new();
728                 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
729                 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
730                     rtdebug!("connecting");
731                     assert!(status.is_none());
732                     let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
733                     let buf = slice_to_uv_buf(msg);
734                     let msg_cell = Cell::new(msg);
735                     do stream_watcher.write(buf) |stream_watcher, status| {
736                         rtdebug!("writing");
737                         assert!(status.is_none());
738                         let msg_cell = Cell::new(msg_cell.take());
739                         stream_watcher.close(||ignore(msg_cell.take()));
740                     }
741                 }
742                 loop_.run();
743                 loop_.close();
744             };
745
746             let mut loop_ = loop_;
747             loop_.run();
748             loop_.close();
749             client_thread.join();
750         }
751     }
752
753     #[test]
754     fn udp_recv_ip4() {
755         do run_in_bare_thread() {
756             static MAX: int = 10;
757             let mut loop_ = Loop::new();
758             let server_addr = next_test_ip4();
759             let client_addr = next_test_ip4();
760
761             let mut server = UdpWatcher::new(&loop_);
762             assert!(server.bind(server_addr).is_ok());
763
764             rtdebug!("starting read");
765             let alloc: AllocCallback = |size| {
766                 vec_to_uv_buf(vec::from_elem(size, 0u8))
767             };
768
769             do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
770                 server.recv_stop();
771                 rtdebug!("i'm reading!");
772                 assert!(status.is_none());
773                 assert_eq!(flags, 0);
774                 assert_eq!(src, client_addr);
775
776                 let buf = vec_from_uv_buf(buf);
777                 let mut count = 0;
778                 rtdebug!("got %d bytes", nread);
779
780                 let buf = buf.unwrap();
781                 for &byte in buf.slice(0, nread as uint).iter() {
782                     assert!(byte == count as u8);
783                     rtdebug!("%u", byte as uint);
784                     count += 1;
785                 }
786                 assert_eq!(count, MAX);
787
788                 server.close(||{});
789             }
790
791             let thread = do Thread::start {
792                 let mut loop_ = Loop::new();
793                 let mut client = UdpWatcher::new(&loop_);
794                 assert!(client.bind(client_addr).is_ok());
795                 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
796                 let buf = slice_to_uv_buf(msg);
797                 do client.send(buf, server_addr) |client, status| {
798                     rtdebug!("writing");
799                     assert!(status.is_none());
800                     client.close(||{});
801                 }
802
803                 loop_.run();
804                 loop_.close();
805             };
806
807             loop_.run();
808             loop_.close();
809             thread.join();
810         }
811     }
812
813     #[test]
814     fn udp_recv_ip6() {
815         do run_in_bare_thread() {
816             static MAX: int = 10;
817             let mut loop_ = Loop::new();
818             let server_addr = next_test_ip6();
819             let client_addr = next_test_ip6();
820
821             let mut server = UdpWatcher::new(&loop_);
822             assert!(server.bind(server_addr).is_ok());
823
824             rtdebug!("starting read");
825             let alloc: AllocCallback = |size| {
826                 vec_to_uv_buf(vec::from_elem(size, 0u8))
827             };
828
829             do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
830                 server.recv_stop();
831                 rtdebug!("i'm reading!");
832                 assert!(status.is_none());
833                 assert_eq!(flags, 0);
834                 assert_eq!(src, client_addr);
835
836                 let buf = vec_from_uv_buf(buf);
837                 let mut count = 0;
838                 rtdebug!("got %d bytes", nread);
839
840                 let buf = buf.unwrap();
841                 for &byte in buf.slice(0, nread as uint).iter() {
842                     assert!(byte == count as u8);
843                     rtdebug!("%u", byte as uint);
844                     count += 1;
845                 }
846                 assert_eq!(count, MAX);
847
848                 server.close(||{});
849             }
850
851             let thread = do Thread::start {
852                 let mut loop_ = Loop::new();
853                 let mut client = UdpWatcher::new(&loop_);
854                 assert!(client.bind(client_addr).is_ok());
855                 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
856                 let buf = slice_to_uv_buf(msg);
857                 do client.send(buf, server_addr) |client, status| {
858                     rtdebug!("writing");
859                     assert!(status.is_none());
860                     client.close(||{});
861                 }
862
863                 loop_.run();
864                 loop_.close();
865             };
866
867             loop_.run();
868             loop_.close();
869             thread.join();
870         }
871     }
872 }