]> git.lizzy.rs Git - rust.git/blob - src/libstd/rt/uv/net.rs
9d4eb8f6302952135f0b7878637f1cc46a6c90bb
[rust.git] / src / libstd / rt / uv / net.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use prelude::*;
12 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
13 use rt::uv::uvll;
14 use rt::uv::uvll::*;
15 use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, UdpSendCallback};
16 use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback,
17              status_to_maybe_uv_error};
18 use rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
19 use rt::uv::last_uv_error;
20 use vec;
21 use str;
22 use from_str::{FromStr};
23
24 pub struct UvAddrInfo(*uvll::addrinfo);
25
26 pub enum UvSocketAddr {
27     UvIpv4SocketAddr(*sockaddr_in),
28     UvIpv6SocketAddr(*sockaddr_in6),
29 }
30
31 fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
32     unsafe {
33         assert!((is_ip4_addr(addr) || is_ip6_addr(addr)));
34         assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr)));
35         match addr {
36             _ if is_ip4_addr(addr) => UvIpv4SocketAddr(addr as *uvll::sockaddr_in),
37             _ if is_ip6_addr(addr) => UvIpv6SocketAddr(addr as *uvll::sockaddr_in6),
38             _ => fail!(),
39         }
40     }
41 }
42
43 fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
44     let malloc = match addr.ip {
45         Ipv4Addr(*) => malloc_ip4_addr,
46         Ipv6Addr(*) => malloc_ip6_addr,
47     };
48     let wrap = match addr.ip {
49         Ipv4Addr(*) => UvIpv4SocketAddr,
50         Ipv6Addr(*) => UvIpv6SocketAddr,
51     };
52     let free = match addr.ip {
53         Ipv4Addr(*) => free_ip4_addr,
54         Ipv6Addr(*) => free_ip6_addr,
55     };
56
57     let addr = unsafe { malloc(addr.ip.to_str(), addr.port as int) };
58     do (|| {
59         f(wrap(addr))
60     }).finally {
61         unsafe { free(addr) };
62     }
63 }
64
65 fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
66     let ip_size = match addr {
67         UvIpv4SocketAddr(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/,
68         UvIpv6SocketAddr(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/,
69     };
70     let ip_name = {
71         let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
72         unsafe {
73             let buf_ptr = vec::raw::to_ptr(buf);
74             match addr {
75                 UvIpv4SocketAddr(addr) => uvll::ip4_name(addr, buf_ptr, ip_size as size_t),
76                 UvIpv6SocketAddr(addr) => uvll::ip6_name(addr, buf_ptr, ip_size as size_t),
77             }
78         };
79         buf
80     };
81     let ip_port = unsafe {
82         let port = match addr {
83             UvIpv4SocketAddr(addr) => uvll::ip4_port(addr),
84             UvIpv6SocketAddr(addr) => uvll::ip6_port(addr),
85         };
86         port as u16
87     };
88     let ip_str = str::from_utf8_slice(ip_name).trim_right_chars(&'\x00');
89     let ip_addr = FromStr::from_str(ip_str).unwrap();
90
91     // finally run the closure
92     f(SocketAddr { ip: ip_addr, port: ip_port })
93 }
94
95 pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
96     use util;
97     uv_socket_addr_as_socket_addr(addr, util::id)
98 }
99
100 // Traverse the addrinfo linked list, producing a vector of Rust socket addresses
101 pub fn accum_sockaddrs(addr: &UvAddrInfo) -> ~[SocketAddr] {
102     unsafe {
103         let &UvAddrInfo(addr) = addr;
104         let mut addr = addr;
105
106         let mut addrs = ~[];
107         loop {
108             let uvaddr = sockaddr_to_UvSocketAddr((*addr).ai_addr);
109             let rustaddr = uv_socket_addr_to_socket_addr(uvaddr);
110             addrs.push(rustaddr);
111             if (*addr).ai_next.is_not_null() {
112                 addr = (*addr).ai_next;
113             } else {
114                 break;
115             }
116         }
117
118         return addrs;
119     }
120 }
121
122 #[cfg(test)]
123 #[test]
124 fn test_ip4_conversion() {
125     use rt;
126     let ip4 = rt::test::next_test_ip4();
127     assert_eq!(ip4, socket_addr_as_uv_socket_addr(ip4, uv_socket_addr_to_socket_addr));
128 }
129
130 #[cfg(test)]
131 #[test]
132 fn test_ip6_conversion() {
133     use rt;
134     let ip6 = rt::test::next_test_ip6();
135     assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
136 }
137
138 // uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
139 // and uv_file_t
140 pub struct StreamWatcher(*uvll::uv_stream_t);
141 impl Watcher for StreamWatcher { }
142
143 impl StreamWatcher {
144     pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
145         {
146             let data = self.get_watcher_data();
147             data.alloc_cb = Some(alloc);
148             data.read_cb = Some(cb);
149         }
150
151         unsafe { uvll::read_start(self.native_handle(), alloc_cb, read_cb); }
152
153         extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
154             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
155             let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref();
156             return (*alloc_cb)(suggested_size as uint);
157         }
158
159         extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) {
160             rtdebug!("buf addr: %x", buf.base as uint);
161             rtdebug!("buf len: %d", buf.len as int);
162             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
163             let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
164             let status = status_to_maybe_uv_error(stream_watcher, nread as c_int);
165             (*cb)(stream_watcher, nread as int, buf, status);
166         }
167     }
168
169     pub fn read_stop(&mut self) {
170         // It would be nice to drop the alloc and read callbacks here,
171         // but read_stop may be called from inside one of them and we
172         // would end up freeing the in-use environment
173         let handle = self.native_handle();
174         unsafe { uvll::read_stop(handle); }
175     }
176
177     pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
178         {
179             let data = self.get_watcher_data();
180             assert!(data.write_cb.is_none());
181             data.write_cb = Some(cb);
182         }
183
184         let req = WriteRequest::new();
185         unsafe {
186         assert_eq!(0, uvll::write(req.native_handle(), self.native_handle(), [buf], write_cb));
187         }
188
189         extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
190             let write_request: WriteRequest = NativeHandle::from_native_handle(req);
191             let mut stream_watcher = write_request.stream();
192             write_request.delete();
193             let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap();
194             let status = status_to_maybe_uv_error(stream_watcher, status);
195             cb(stream_watcher, status);
196         }
197     }
198
199     pub fn accept(&mut self, stream: StreamWatcher) {
200         let self_handle = self.native_handle() as *c_void;
201         let stream_handle = stream.native_handle() as *c_void;
202         assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } );
203     }
204
205     pub fn close(self, cb: NullCallback) {
206         {
207             let mut this = self;
208             let data = this.get_watcher_data();
209             assert!(data.close_cb.is_none());
210             data.close_cb = Some(cb);
211         }
212
213         unsafe { uvll::close(self.native_handle(), close_cb); }
214
215         extern fn close_cb(handle: *uvll::uv_stream_t) {
216             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
217             let cb = stream_watcher.get_watcher_data().close_cb.take_unwrap();
218             stream_watcher.drop_watcher_data();
219             unsafe { free_handle(handle as *c_void) }
220             cb();
221         }
222     }
223 }
224
225 impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
226     fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
227         StreamWatcher(handle)
228     }
229     fn native_handle(&self) -> *uvll::uv_stream_t {
230         match self { &StreamWatcher(ptr) => ptr }
231     }
232 }
233
234 pub struct TcpWatcher(*uvll::uv_tcp_t);
235 impl Watcher for TcpWatcher { }
236
237 impl TcpWatcher {
238     pub fn new(loop_: &Loop) -> TcpWatcher {
239         unsafe {
240             let handle = malloc_handle(UV_TCP);
241             assert!(handle.is_not_null());
242             assert_eq!(0, uvll::tcp_init(loop_.native_handle(), handle));
243             let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
244             watcher.install_watcher_data();
245             return watcher;
246         }
247     }
248
249     pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
250         do socket_addr_as_uv_socket_addr(address) |addr| {
251             let result = unsafe {
252                 match addr {
253                     UvIpv4SocketAddr(addr) => uvll::tcp_bind(self.native_handle(), addr),
254                     UvIpv6SocketAddr(addr) => uvll::tcp_bind6(self.native_handle(), addr),
255                 }
256             };
257             match result {
258                 0 => Ok(()),
259                 _ => Err(last_uv_error(self)),
260             }
261         }
262     }
263
264     pub fn connect(&mut self, address: SocketAddr, cb: ConnectionCallback) {
265         unsafe {
266             assert!(self.get_watcher_data().connect_cb.is_none());
267             self.get_watcher_data().connect_cb = Some(cb);
268
269             let connect_handle = ConnectRequest::new().native_handle();
270             rtdebug!("connect_t: %x", connect_handle as uint);
271             do socket_addr_as_uv_socket_addr(address) |addr| {
272                 let result = match addr {
273                     UvIpv4SocketAddr(addr) => uvll::tcp_connect(connect_handle,
274                                                       self.native_handle(), addr, connect_cb),
275                     UvIpv6SocketAddr(addr) => uvll::tcp_connect6(connect_handle,
276                                                        self.native_handle(), addr, connect_cb),
277                 };
278                 assert_eq!(0, result);
279             }
280
281             extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
282                 rtdebug!("connect_t: %x", req as uint);
283                 let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
284                 let mut stream_watcher = connect_request.stream();
285                 connect_request.delete();
286                 let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
287                 let status = status_to_maybe_uv_error(stream_watcher, status);
288                 cb(stream_watcher, status);
289             }
290         }
291     }
292
293     pub fn listen(&mut self, cb: ConnectionCallback) {
294         {
295             let data = self.get_watcher_data();
296             assert!(data.connect_cb.is_none());
297             data.connect_cb = Some(cb);
298         }
299
300         unsafe {
301             static BACKLOG: c_int = 128; // XXX should be configurable
302             // XXX: This can probably fail
303             assert_eq!(0, uvll::listen(self.native_handle(), BACKLOG, connection_cb));
304         }
305
306         extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
307             rtdebug!("connection_cb");
308             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
309             let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
310             let status = status_to_maybe_uv_error(stream_watcher, status);
311             (*cb)(stream_watcher, status);
312         }
313     }
314
315     pub fn as_stream(&self) -> StreamWatcher {
316         NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t)
317     }
318 }
319
320 impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
321     fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
322         TcpWatcher(handle)
323     }
324     fn native_handle(&self) -> *uvll::uv_tcp_t {
325         match self { &TcpWatcher(ptr) => ptr }
326     }
327 }
328
329 pub struct UdpWatcher(*uvll::uv_udp_t);
330 impl Watcher for UdpWatcher { }
331
332 impl UdpWatcher {
333     pub fn new(loop_: &Loop) -> UdpWatcher {
334         unsafe {
335             let handle = malloc_handle(UV_UDP);
336             assert!(handle.is_not_null());
337             assert_eq!(0, uvll::udp_init(loop_.native_handle(), handle));
338             let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
339             watcher.install_watcher_data();
340             return watcher;
341         }
342     }
343
344     pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
345         do socket_addr_as_uv_socket_addr(address) |addr| {
346             let result = unsafe {
347                 match addr {
348                     UvIpv4SocketAddr(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32),
349                     UvIpv6SocketAddr(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32),
350                 }
351             };
352             match result {
353                 0 => Ok(()),
354                 _ => Err(last_uv_error(self)),
355             }
356         }
357     }
358
359     pub fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) {
360         {
361             let data = self.get_watcher_data();
362             data.alloc_cb = Some(alloc);
363             data.udp_recv_cb = Some(cb);
364         }
365
366         unsafe { uvll::udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
367
368         extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
369             let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
370             let alloc_cb = udp_watcher.get_watcher_data().alloc_cb.get_ref();
371             return (*alloc_cb)(suggested_size as uint);
372         }
373
374         extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
375                           addr: *uvll::sockaddr, flags: c_uint) {
376             // When there's no data to read the recv callback can be a no-op.
377             // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
378             // this we just drop back to kqueue and wait for the next callback.
379             if nread == 0 {
380                 return;
381             }
382
383             rtdebug!("buf addr: %x", buf.base as uint);
384             rtdebug!("buf len: %d", buf.len as int);
385             let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
386             let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
387             let status = status_to_maybe_uv_error(udp_watcher, nread as c_int);
388             let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
389             (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
390         }
391     }
392
393     pub fn recv_stop(&mut self) {
394         unsafe { uvll::udp_recv_stop(self.native_handle()); }
395     }
396
397     pub fn send(&mut self, buf: Buf, address: SocketAddr, cb: UdpSendCallback) {
398         {
399             let data = self.get_watcher_data();
400             assert!(data.udp_send_cb.is_none());
401             data.udp_send_cb = Some(cb);
402         }
403
404         let req = UdpSendRequest::new();
405         do socket_addr_as_uv_socket_addr(address) |addr| {
406             let result = unsafe {
407                 match addr {
408                     UvIpv4SocketAddr(addr) => uvll::udp_send(req.native_handle(),
409                                                    self.native_handle(), [buf], addr, send_cb),
410                     UvIpv6SocketAddr(addr) => uvll::udp_send6(req.native_handle(),
411                                                     self.native_handle(), [buf], addr, send_cb),
412                 }
413             };
414             assert_eq!(0, result);
415         }
416
417         extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
418             let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
419             let mut udp_watcher = send_request.handle();
420             send_request.delete();
421             let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap();
422             let status = status_to_maybe_uv_error(udp_watcher, status);
423             cb(udp_watcher, status);
424         }
425     }
426
427     pub fn close(self, cb: NullCallback) {
428         {
429             let mut this = self;
430             let data = this.get_watcher_data();
431             assert!(data.close_cb.is_none());
432             data.close_cb = Some(cb);
433         }
434
435         unsafe { uvll::close(self.native_handle(), close_cb); }
436
437         extern fn close_cb(handle: *uvll::uv_udp_t) {
438             let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
439             let cb = udp_watcher.get_watcher_data().close_cb.take_unwrap();
440             udp_watcher.drop_watcher_data();
441             unsafe { free_handle(handle as *c_void) }
442             cb();
443         }
444     }
445 }
446
447 impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
448     fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
449         UdpWatcher(handle)
450     }
451     fn native_handle(&self) -> *uvll::uv_udp_t {
452         match self { &UdpWatcher(ptr) => ptr }
453     }
454 }
455
456 // uv_connect_t is a subclass of uv_req_t
457 struct ConnectRequest(*uvll::uv_connect_t);
458 impl Request for ConnectRequest { }
459
460 impl ConnectRequest {
461
462     fn new() -> ConnectRequest {
463         let connect_handle = unsafe { malloc_req(UV_CONNECT) };
464         assert!(connect_handle.is_not_null());
465         ConnectRequest(connect_handle as *uvll::uv_connect_t)
466     }
467
468     fn stream(&self) -> StreamWatcher {
469         unsafe {
470             let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle());
471             NativeHandle::from_native_handle(stream_handle)
472         }
473     }
474
475     fn delete(self) {
476         unsafe { free_req(self.native_handle() as *c_void) }
477     }
478 }
479
480 impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
481     fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
482         ConnectRequest(handle)
483     }
484     fn native_handle(&self) -> *uvll::uv_connect_t {
485         match self { &ConnectRequest(ptr) => ptr }
486     }
487 }
488
489 pub struct WriteRequest(*uvll::uv_write_t);
490
491 impl Request for WriteRequest { }
492
493 impl WriteRequest {
494     pub fn new() -> WriteRequest {
495         let write_handle = unsafe { malloc_req(UV_WRITE) };
496         assert!(write_handle.is_not_null());
497         WriteRequest(write_handle as *uvll::uv_write_t)
498     }
499
500     pub fn stream(&self) -> StreamWatcher {
501         unsafe {
502             let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle());
503             NativeHandle::from_native_handle(stream_handle)
504         }
505     }
506
507     pub fn delete(self) {
508         unsafe { free_req(self.native_handle() as *c_void) }
509     }
510 }
511
512 impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
513     fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
514         WriteRequest(handle)
515     }
516     fn native_handle(&self) -> *uvll::uv_write_t {
517         match self { &WriteRequest(ptr) => ptr }
518     }
519 }
520
521 pub struct UdpSendRequest(*uvll::uv_udp_send_t);
522 impl Request for UdpSendRequest { }
523
524 impl UdpSendRequest {
525     pub fn new() -> UdpSendRequest {
526         let send_handle = unsafe { malloc_req(UV_UDP_SEND) };
527         assert!(send_handle.is_not_null());
528         UdpSendRequest(send_handle as *uvll::uv_udp_send_t)
529     }
530
531     pub fn handle(&self) -> UdpWatcher {
532         let send_request_handle = unsafe {
533             uvll::get_udp_handle_from_send_req(self.native_handle())
534         };
535         NativeHandle::from_native_handle(send_request_handle)
536     }
537
538     pub fn delete(self) {
539         unsafe { free_req(self.native_handle() as *c_void) }
540     }
541 }
542
543 impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
544     fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
545         UdpSendRequest(handle)
546     }
547     fn native_handle(&self) -> *uvll::uv_udp_send_t {
548         match self { &UdpSendRequest(ptr) => ptr }
549     }
550 }
551
552 #[cfg(test)]
553 mod test {
554     use super::*;
555     use util::ignore;
556     use cell::Cell;
557     use vec;
558     use unstable::run_in_bare_thread;
559     use rt::thread::Thread;
560     use rt::test::*;
561     use rt::uv::{Loop, AllocCallback};
562     use rt::uv::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
563     use prelude::*;
564
565     #[test]
566     fn connect_close_ip4() {
567         do run_in_bare_thread() {
568             let mut loop_ = Loop::new();
569             let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
570             // Connect to a port where nobody is listening
571             let addr = next_test_ip4();
572             do tcp_watcher.connect(addr) |stream_watcher, status| {
573                 rtdebug!("tcp_watcher.connect!");
574                 assert!(status.is_some());
575                 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
576                 stream_watcher.close(||());
577             }
578             loop_.run();
579             loop_.close();
580         }
581     }
582
583     #[test]
584     fn connect_close_ip6() {
585         do run_in_bare_thread() {
586             let mut loop_ = Loop::new();
587             let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
588             // Connect to a port where nobody is listening
589             let addr = next_test_ip6();
590             do tcp_watcher.connect(addr) |stream_watcher, status| {
591                 rtdebug!("tcp_watcher.connect!");
592                 assert!(status.is_some());
593                 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
594                 stream_watcher.close(||());
595             }
596             loop_.run();
597             loop_.close();
598         }
599     }
600
601     #[test]
602     fn udp_bind_close_ip4() {
603         do run_in_bare_thread() {
604             let mut loop_ = Loop::new();
605             let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
606             let addr = next_test_ip4();
607             udp_watcher.bind(addr);
608             udp_watcher.close(||());
609             loop_.run();
610             loop_.close();
611         }
612     }
613
614     #[test]
615     fn udp_bind_close_ip6() {
616         do run_in_bare_thread() {
617             let mut loop_ = Loop::new();
618             let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
619             let addr = next_test_ip6();
620             udp_watcher.bind(addr);
621             udp_watcher.close(||());
622             loop_.run();
623             loop_.close();
624         }
625     }
626
627     #[test]
628     #[ignore(cfg(windows))] // FIXME #8815
629     fn listen_ip4() {
630         do run_in_bare_thread() {
631             static MAX: int = 10;
632             let mut loop_ = Loop::new();
633             let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
634             let addr = next_test_ip4();
635             server_tcp_watcher.bind(addr);
636             let loop_ = loop_;
637             rtdebug!("listening");
638             do server_tcp_watcher.listen |mut server_stream_watcher, status| {
639                 rtdebug!("listened!");
640                 assert!(status.is_none());
641                 let mut loop_ = loop_;
642                 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
643                 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
644                 server_stream_watcher.accept(client_tcp_watcher);
645                 let count_cell = Cell::new(0);
646                 let server_stream_watcher = server_stream_watcher;
647                 rtdebug!("starting read");
648                 let alloc: AllocCallback = |size| {
649                     vec_to_uv_buf(vec::from_elem(size, 0u8))
650                 };
651                 do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| {
652
653                     rtdebug!("i'm reading!");
654                     let buf = vec_from_uv_buf(buf);
655                     let mut count = count_cell.take();
656                     if status.is_none() {
657                         rtdebug!("got %d bytes", nread);
658                         let buf = buf.unwrap();
659                         for byte in buf.slice(0, nread as uint).iter() {
660                             assert!(*byte == count as u8);
661                             rtdebug!("%u", *byte as uint);
662                             count += 1;
663                         }
664                     } else {
665                         assert_eq!(count, MAX);
666                         do stream_watcher.close {
667                             server_stream_watcher.close(||());
668                         }
669                     }
670                     count_cell.put_back(count);
671                 }
672             }
673
674             let client_thread = do Thread::start {
675                 rtdebug!("starting client thread");
676                 let mut loop_ = Loop::new();
677                 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
678                 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
679                     rtdebug!("connecting");
680                     assert!(status.is_none());
681                     let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
682                     let buf = slice_to_uv_buf(msg);
683                     let msg_cell = Cell::new(msg);
684                     do stream_watcher.write(buf) |stream_watcher, status| {
685                         rtdebug!("writing");
686                         assert!(status.is_none());
687                         let msg_cell = Cell::new(msg_cell.take());
688                         stream_watcher.close(||ignore(msg_cell.take()));
689                     }
690                 }
691                 loop_.run();
692                 loop_.close();
693             };
694
695             let mut loop_ = loop_;
696             loop_.run();
697             loop_.close();
698             client_thread.join();
699         }
700     }
701
702     #[test]
703     #[ignore(cfg(windows))] // FIXME #8815
704     fn listen_ip6() {
705         do run_in_bare_thread() {
706             static MAX: int = 10;
707             let mut loop_ = Loop::new();
708             let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
709             let addr = next_test_ip6();
710             server_tcp_watcher.bind(addr);
711             let loop_ = loop_;
712             rtdebug!("listening");
713             do server_tcp_watcher.listen |mut server_stream_watcher, status| {
714                 rtdebug!("listened!");
715                 assert!(status.is_none());
716                 let mut loop_ = loop_;
717                 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
718                 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
719                 server_stream_watcher.accept(client_tcp_watcher);
720                 let count_cell = Cell::new(0);
721                 let server_stream_watcher = server_stream_watcher;
722                 rtdebug!("starting read");
723                 let alloc: AllocCallback = |size| {
724                     vec_to_uv_buf(vec::from_elem(size, 0u8))
725                 };
726                 do client_tcp_watcher.read_start(alloc)
727                     |stream_watcher, nread, buf, status| {
728
729                     rtdebug!("i'm reading!");
730                     let buf = vec_from_uv_buf(buf);
731                     let mut count = count_cell.take();
732                     if status.is_none() {
733                         rtdebug!("got %d bytes", nread);
734                         let buf = buf.unwrap();
735                         let r = buf.slice(0, nread as uint);
736                         for byte in r.iter() {
737                             assert!(*byte == count as u8);
738                             rtdebug!("%u", *byte as uint);
739                             count += 1;
740                         }
741                     } else {
742                         assert_eq!(count, MAX);
743                         do stream_watcher.close {
744                             server_stream_watcher.close(||());
745                         }
746                     }
747                     count_cell.put_back(count);
748                 }
749             }
750
751             let client_thread = do Thread::start {
752                 rtdebug!("starting client thread");
753                 let mut loop_ = Loop::new();
754                 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
755                 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
756                     rtdebug!("connecting");
757                     assert!(status.is_none());
758                     let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
759                     let buf = slice_to_uv_buf(msg);
760                     let msg_cell = Cell::new(msg);
761                     do stream_watcher.write(buf) |stream_watcher, status| {
762                         rtdebug!("writing");
763                         assert!(status.is_none());
764                         let msg_cell = Cell::new(msg_cell.take());
765                         stream_watcher.close(||ignore(msg_cell.take()));
766                     }
767                 }
768                 loop_.run();
769                 loop_.close();
770             };
771
772             let mut loop_ = loop_;
773             loop_.run();
774             loop_.close();
775             client_thread.join();
776         }
777     }
778
779     #[test]
780     #[ignore(cfg(windows))] // FIXME #8815
781     fn udp_recv_ip4() {
782         do run_in_bare_thread() {
783             static MAX: int = 10;
784             let mut loop_ = Loop::new();
785             let server_addr = next_test_ip4();
786             let client_addr = next_test_ip4();
787
788             let mut server = UdpWatcher::new(&loop_);
789             assert!(server.bind(server_addr).is_ok());
790
791             rtdebug!("starting read");
792             let alloc: AllocCallback = |size| {
793                 vec_to_uv_buf(vec::from_elem(size, 0u8))
794             };
795
796             do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
797                 server.recv_stop();
798                 rtdebug!("i'm reading!");
799                 assert!(status.is_none());
800                 assert_eq!(flags, 0);
801                 assert_eq!(src, client_addr);
802
803                 let buf = vec_from_uv_buf(buf);
804                 let mut count = 0;
805                 rtdebug!("got %d bytes", nread);
806
807                 let buf = buf.unwrap();
808                 for &byte in buf.slice(0, nread as uint).iter() {
809                     assert!(byte == count as u8);
810                     rtdebug!("%u", byte as uint);
811                     count += 1;
812                 }
813                 assert_eq!(count, MAX);
814
815                 server.close(||{});
816             }
817
818             let thread = do Thread::start {
819                 let mut loop_ = Loop::new();
820                 let mut client = UdpWatcher::new(&loop_);
821                 assert!(client.bind(client_addr).is_ok());
822                 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
823                 let buf = slice_to_uv_buf(msg);
824                 do client.send(buf, server_addr) |client, status| {
825                     rtdebug!("writing");
826                     assert!(status.is_none());
827                     client.close(||{});
828                 }
829
830                 loop_.run();
831                 loop_.close();
832             };
833
834             loop_.run();
835             loop_.close();
836             thread.join();
837         }
838     }
839
840     #[test]
841     #[ignore(cfg(windows))] // FIXME #8815
842     fn udp_recv_ip6() {
843         do run_in_bare_thread() {
844             static MAX: int = 10;
845             let mut loop_ = Loop::new();
846             let server_addr = next_test_ip6();
847             let client_addr = next_test_ip6();
848
849             let mut server = UdpWatcher::new(&loop_);
850             assert!(server.bind(server_addr).is_ok());
851
852             rtdebug!("starting read");
853             let alloc: AllocCallback = |size| {
854                 vec_to_uv_buf(vec::from_elem(size, 0u8))
855             };
856
857             do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
858                 server.recv_stop();
859                 rtdebug!("i'm reading!");
860                 assert!(status.is_none());
861                 assert_eq!(flags, 0);
862                 assert_eq!(src, client_addr);
863
864                 let buf = vec_from_uv_buf(buf);
865                 let mut count = 0;
866                 rtdebug!("got %d bytes", nread);
867
868                 let buf = buf.unwrap();
869                 for &byte in buf.slice(0, nread as uint).iter() {
870                     assert!(byte == count as u8);
871                     rtdebug!("%u", byte as uint);
872                     count += 1;
873                 }
874                 assert_eq!(count, MAX);
875
876                 server.close(||{});
877             }
878
879             let thread = do Thread::start {
880                 let mut loop_ = Loop::new();
881                 let mut client = UdpWatcher::new(&loop_);
882                 assert!(client.bind(client_addr).is_ok());
883                 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
884                 let buf = slice_to_uv_buf(msg);
885                 do client.send(buf, server_addr) |client, status| {
886                     rtdebug!("writing");
887                     assert!(status.is_none());
888                     client.close(||{});
889                 }
890
891                 loop_.run();
892                 loop_.close();
893             };
894
895             loop_.run();
896             loop_.close();
897             thread.join();
898         }
899     }
900 }