1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
12 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
15 use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, UdpSendCallback};
16 use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback,
17 status_to_maybe_uv_error};
18 use rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
21 use from_str::{FromStr};
23 pub struct UvAddrInfo(*uvll::addrinfo);
25 pub enum UvSocketAddr {
26 UvIpv4SocketAddr(*sockaddr_in),
27 UvIpv6SocketAddr(*sockaddr_in6),
30 fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
32 assert!((is_ip4_addr(addr) || is_ip6_addr(addr)));
33 assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr)));
35 _ if is_ip4_addr(addr) => UvIpv4SocketAddr(addr as *uvll::sockaddr_in),
36 _ if is_ip6_addr(addr) => UvIpv6SocketAddr(addr as *uvll::sockaddr_in6),
42 fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
43 let malloc = match addr.ip {
44 Ipv4Addr(*) => malloc_ip4_addr,
45 Ipv6Addr(*) => malloc_ip6_addr,
47 let wrap = match addr.ip {
48 Ipv4Addr(*) => UvIpv4SocketAddr,
49 Ipv6Addr(*) => UvIpv6SocketAddr,
51 let free = match addr.ip {
52 Ipv4Addr(*) => free_ip4_addr,
53 Ipv6Addr(*) => free_ip6_addr,
56 let addr = unsafe { malloc(addr.ip.to_str(), addr.port as int) };
60 unsafe { free(addr) };
64 fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
65 let ip_size = match addr {
66 UvIpv4SocketAddr(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/,
67 UvIpv6SocketAddr(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/,
70 let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
72 let buf_ptr = vec::raw::to_ptr(buf);
74 UvIpv4SocketAddr(addr) => uvll::ip4_name(addr, buf_ptr, ip_size as size_t),
75 UvIpv6SocketAddr(addr) => uvll::ip6_name(addr, buf_ptr, ip_size as size_t),
80 let ip_port = unsafe {
81 let port = match addr {
82 UvIpv4SocketAddr(addr) => uvll::ip4_port(addr),
83 UvIpv6SocketAddr(addr) => uvll::ip6_port(addr),
87 let ip_str = str::from_utf8_slice(ip_name).trim_right_chars(&'\x00');
88 let ip_addr = FromStr::from_str(ip_str).unwrap();
90 // finally run the closure
91 f(SocketAddr { ip: ip_addr, port: ip_port })
94 pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
96 uv_socket_addr_as_socket_addr(addr, util::id)
99 // Traverse the addrinfo linked list, producing a vector of Rust socket addresses
100 pub fn accum_sockaddrs(addr: &UvAddrInfo) -> ~[SocketAddr] {
102 let &UvAddrInfo(addr) = addr;
107 let uvaddr = sockaddr_to_UvSocketAddr((*addr).ai_addr);
108 let rustaddr = uv_socket_addr_to_socket_addr(uvaddr);
109 addrs.push(rustaddr);
110 if (*addr).ai_next.is_not_null() {
111 addr = (*addr).ai_next;
123 fn test_ip4_conversion() {
125 let ip4 = rt::test::next_test_ip4();
126 assert_eq!(ip4, socket_addr_as_uv_socket_addr(ip4, uv_socket_addr_to_socket_addr));
131 fn test_ip6_conversion() {
133 let ip6 = rt::test::next_test_ip6();
134 assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
137 // uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
139 pub struct StreamWatcher(*uvll::uv_stream_t);
140 impl Watcher for StreamWatcher { }
143 pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
145 let data = self.get_watcher_data();
146 data.alloc_cb = Some(alloc);
147 data.read_cb = Some(cb);
150 unsafe { uvll::read_start(self.native_handle(), alloc_cb, read_cb); }
152 extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
153 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
154 let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref();
155 return (*alloc_cb)(suggested_size as uint);
158 extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) {
159 rtdebug!("buf addr: %x", buf.base as uint);
160 rtdebug!("buf len: %d", buf.len as int);
161 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
162 let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
163 let status = status_to_maybe_uv_error(nread as c_int);
164 (*cb)(stream_watcher, nread as int, buf, status);
168 pub fn read_stop(&mut self) {
169 // It would be nice to drop the alloc and read callbacks here,
170 // but read_stop may be called from inside one of them and we
171 // would end up freeing the in-use environment
172 let handle = self.native_handle();
173 unsafe { uvll::read_stop(handle); }
176 pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
178 let data = self.get_watcher_data();
179 assert!(data.write_cb.is_none());
180 data.write_cb = Some(cb);
183 let req = WriteRequest::new();
185 assert_eq!(0, uvll::write(req.native_handle(), self.native_handle(), [buf], write_cb));
188 extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
189 let write_request: WriteRequest = NativeHandle::from_native_handle(req);
190 let mut stream_watcher = write_request.stream();
191 write_request.delete();
192 let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap();
193 let status = status_to_maybe_uv_error(status);
194 cb(stream_watcher, status);
198 pub fn accept(&mut self, stream: StreamWatcher) {
199 let self_handle = self.native_handle() as *c_void;
200 let stream_handle = stream.native_handle() as *c_void;
201 assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } );
204 pub fn close(self, cb: NullCallback) {
207 let data = this.get_watcher_data();
208 assert!(data.close_cb.is_none());
209 data.close_cb = Some(cb);
212 unsafe { uvll::close(self.native_handle(), close_cb); }
214 extern fn close_cb(handle: *uvll::uv_stream_t) {
215 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
216 let cb = stream_watcher.get_watcher_data().close_cb.take_unwrap();
217 stream_watcher.drop_watcher_data();
218 unsafe { free_handle(handle as *c_void) }
224 impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
225 fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
226 StreamWatcher(handle)
228 fn native_handle(&self) -> *uvll::uv_stream_t {
229 match self { &StreamWatcher(ptr) => ptr }
233 pub struct TcpWatcher(*uvll::uv_tcp_t);
234 impl Watcher for TcpWatcher { }
237 pub fn new(loop_: &Loop) -> TcpWatcher {
239 let handle = malloc_handle(UV_TCP);
240 assert!(handle.is_not_null());
241 assert_eq!(0, uvll::tcp_init(loop_.native_handle(), handle));
242 let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
243 watcher.install_watcher_data();
248 pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
249 do socket_addr_as_uv_socket_addr(address) |addr| {
250 let result = unsafe {
252 UvIpv4SocketAddr(addr) => uvll::tcp_bind(self.native_handle(), addr),
253 UvIpv6SocketAddr(addr) => uvll::tcp_bind6(self.native_handle(), addr),
258 _ => Err(UvError(result)),
263 pub fn connect(&mut self, address: SocketAddr, cb: ConnectionCallback) {
265 assert!(self.get_watcher_data().connect_cb.is_none());
266 self.get_watcher_data().connect_cb = Some(cb);
268 let connect_handle = ConnectRequest::new().native_handle();
269 rtdebug!("connect_t: %x", connect_handle as uint);
270 do socket_addr_as_uv_socket_addr(address) |addr| {
271 let result = match addr {
272 UvIpv4SocketAddr(addr) => uvll::tcp_connect(connect_handle,
273 self.native_handle(), addr, connect_cb),
274 UvIpv6SocketAddr(addr) => uvll::tcp_connect6(connect_handle,
275 self.native_handle(), addr, connect_cb),
277 assert_eq!(0, result);
280 extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
281 rtdebug!("connect_t: %x", req as uint);
282 let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
283 let mut stream_watcher = connect_request.stream();
284 connect_request.delete();
285 let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
286 let status = status_to_maybe_uv_error(status);
287 cb(stream_watcher, status);
292 pub fn listen(&mut self, cb: ConnectionCallback) {
294 let data = self.get_watcher_data();
295 assert!(data.connect_cb.is_none());
296 data.connect_cb = Some(cb);
300 static BACKLOG: c_int = 128; // XXX should be configurable
301 // XXX: This can probably fail
302 assert_eq!(0, uvll::listen(self.native_handle(), BACKLOG, connection_cb));
305 extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
306 rtdebug!("connection_cb");
307 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
308 let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
309 let status = status_to_maybe_uv_error(status);
310 (*cb)(stream_watcher, status);
314 pub fn as_stream(&self) -> StreamWatcher {
315 NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t)
319 impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
320 fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
323 fn native_handle(&self) -> *uvll::uv_tcp_t {
324 match self { &TcpWatcher(ptr) => ptr }
328 pub struct UdpWatcher(*uvll::uv_udp_t);
329 impl Watcher for UdpWatcher { }
332 pub fn new(loop_: &Loop) -> UdpWatcher {
334 let handle = malloc_handle(UV_UDP);
335 assert!(handle.is_not_null());
336 assert_eq!(0, uvll::udp_init(loop_.native_handle(), handle));
337 let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
338 watcher.install_watcher_data();
343 pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
344 do socket_addr_as_uv_socket_addr(address) |addr| {
345 let result = unsafe {
347 UvIpv4SocketAddr(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32),
348 UvIpv6SocketAddr(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32),
353 _ => Err(UvError(result)),
358 pub fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) {
360 let data = self.get_watcher_data();
361 data.alloc_cb = Some(alloc);
362 data.udp_recv_cb = Some(cb);
365 unsafe { uvll::udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
367 extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
368 let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
369 let alloc_cb = udp_watcher.get_watcher_data().alloc_cb.get_ref();
370 return (*alloc_cb)(suggested_size as uint);
373 extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
374 addr: *uvll::sockaddr, flags: c_uint) {
375 // When there's no data to read the recv callback can be a no-op.
376 // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
377 // this we just drop back to kqueue and wait for the next callback.
382 rtdebug!("buf addr: %x", buf.base as uint);
383 rtdebug!("buf len: %d", buf.len as int);
384 let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
385 let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
386 let status = status_to_maybe_uv_error(nread as c_int);
387 let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
388 (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
392 pub fn recv_stop(&mut self) {
393 unsafe { uvll::udp_recv_stop(self.native_handle()); }
396 pub fn send(&mut self, buf: Buf, address: SocketAddr, cb: UdpSendCallback) {
398 let data = self.get_watcher_data();
399 assert!(data.udp_send_cb.is_none());
400 data.udp_send_cb = Some(cb);
403 let req = UdpSendRequest::new();
404 do socket_addr_as_uv_socket_addr(address) |addr| {
405 let result = unsafe {
407 UvIpv4SocketAddr(addr) => uvll::udp_send(req.native_handle(),
408 self.native_handle(), [buf], addr, send_cb),
409 UvIpv6SocketAddr(addr) => uvll::udp_send6(req.native_handle(),
410 self.native_handle(), [buf], addr, send_cb),
413 assert_eq!(0, result);
416 extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
417 let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
418 let mut udp_watcher = send_request.handle();
419 send_request.delete();
420 let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap();
421 let status = status_to_maybe_uv_error(status);
422 cb(udp_watcher, status);
426 pub fn close(self, cb: NullCallback) {
429 let data = this.get_watcher_data();
430 assert!(data.close_cb.is_none());
431 data.close_cb = Some(cb);
434 unsafe { uvll::close(self.native_handle(), close_cb); }
436 extern fn close_cb(handle: *uvll::uv_udp_t) {
437 let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
438 let cb = udp_watcher.get_watcher_data().close_cb.take_unwrap();
439 udp_watcher.drop_watcher_data();
440 unsafe { free_handle(handle as *c_void) }
446 impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
447 fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
450 fn native_handle(&self) -> *uvll::uv_udp_t {
451 match self { &UdpWatcher(ptr) => ptr }
455 // uv_connect_t is a subclass of uv_req_t
456 struct ConnectRequest(*uvll::uv_connect_t);
457 impl Request for ConnectRequest { }
459 impl ConnectRequest {
461 fn new() -> ConnectRequest {
462 let connect_handle = unsafe { malloc_req(UV_CONNECT) };
463 assert!(connect_handle.is_not_null());
464 ConnectRequest(connect_handle as *uvll::uv_connect_t)
467 fn stream(&self) -> StreamWatcher {
469 let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle());
470 NativeHandle::from_native_handle(stream_handle)
475 unsafe { free_req(self.native_handle() as *c_void) }
479 impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
480 fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
481 ConnectRequest(handle)
483 fn native_handle(&self) -> *uvll::uv_connect_t {
484 match self { &ConnectRequest(ptr) => ptr }
488 pub struct WriteRequest(*uvll::uv_write_t);
490 impl Request for WriteRequest { }
493 pub fn new() -> WriteRequest {
494 let write_handle = unsafe { malloc_req(UV_WRITE) };
495 assert!(write_handle.is_not_null());
496 WriteRequest(write_handle as *uvll::uv_write_t)
499 pub fn stream(&self) -> StreamWatcher {
501 let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle());
502 NativeHandle::from_native_handle(stream_handle)
506 pub fn delete(self) {
507 unsafe { free_req(self.native_handle() as *c_void) }
511 impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
512 fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
515 fn native_handle(&self) -> *uvll::uv_write_t {
516 match self { &WriteRequest(ptr) => ptr }
520 pub struct UdpSendRequest(*uvll::uv_udp_send_t);
521 impl Request for UdpSendRequest { }
523 impl UdpSendRequest {
524 pub fn new() -> UdpSendRequest {
525 let send_handle = unsafe { malloc_req(UV_UDP_SEND) };
526 assert!(send_handle.is_not_null());
527 UdpSendRequest(send_handle as *uvll::uv_udp_send_t)
530 pub fn handle(&self) -> UdpWatcher {
531 let send_request_handle = unsafe {
532 uvll::get_udp_handle_from_send_req(self.native_handle())
534 NativeHandle::from_native_handle(send_request_handle)
537 pub fn delete(self) {
538 unsafe { free_req(self.native_handle() as *c_void) }
542 impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
543 fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
544 UdpSendRequest(handle)
546 fn native_handle(&self) -> *uvll::uv_udp_send_t {
547 match self { &UdpSendRequest(ptr) => ptr }
557 use unstable::run_in_bare_thread;
558 use rt::thread::Thread;
560 use rt::uv::{Loop, AllocCallback};
561 use rt::uv::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
565 fn connect_close_ip4() {
566 do run_in_bare_thread() {
567 let mut loop_ = Loop::new();
568 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
569 // Connect to a port where nobody is listening
570 let addr = next_test_ip4();
571 do tcp_watcher.connect(addr) |stream_watcher, status| {
572 rtdebug!("tcp_watcher.connect!");
573 assert!(status.is_some());
574 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
575 stream_watcher.close(||());
583 fn connect_close_ip6() {
584 do run_in_bare_thread() {
585 let mut loop_ = Loop::new();
586 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
587 // Connect to a port where nobody is listening
588 let addr = next_test_ip6();
589 do tcp_watcher.connect(addr) |stream_watcher, status| {
590 rtdebug!("tcp_watcher.connect!");
591 assert!(status.is_some());
592 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
593 stream_watcher.close(||());
601 fn udp_bind_close_ip4() {
602 do run_in_bare_thread() {
603 let mut loop_ = Loop::new();
604 let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
605 let addr = next_test_ip4();
606 udp_watcher.bind(addr);
607 udp_watcher.close(||());
614 fn udp_bind_close_ip6() {
615 do run_in_bare_thread() {
616 let mut loop_ = Loop::new();
617 let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
618 let addr = next_test_ip6();
619 udp_watcher.bind(addr);
620 udp_watcher.close(||());
627 #[ignore(cfg(windows))] // FIXME #8815
629 do run_in_bare_thread() {
630 static MAX: int = 10;
631 let mut loop_ = Loop::new();
632 let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
633 let addr = next_test_ip4();
634 server_tcp_watcher.bind(addr);
636 rtdebug!("listening");
637 do server_tcp_watcher.listen |mut server_stream_watcher, status| {
638 rtdebug!("listened!");
639 assert!(status.is_none());
640 let mut loop_ = loop_;
641 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
642 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
643 server_stream_watcher.accept(client_tcp_watcher);
644 let count_cell = Cell::new(0);
645 let server_stream_watcher = server_stream_watcher;
646 rtdebug!("starting read");
647 let alloc: AllocCallback = |size| {
648 vec_to_uv_buf(vec::from_elem(size, 0u8))
650 do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| {
652 rtdebug!("i'm reading!");
653 let buf = vec_from_uv_buf(buf);
654 let mut count = count_cell.take();
655 if status.is_none() {
656 rtdebug!("got %d bytes", nread);
657 let buf = buf.unwrap();
658 for byte in buf.slice(0, nread as uint).iter() {
659 assert!(*byte == count as u8);
660 rtdebug!("%u", *byte as uint);
664 assert_eq!(count, MAX);
665 do stream_watcher.close {
666 server_stream_watcher.close(||());
669 count_cell.put_back(count);
673 let client_thread = do Thread::start {
674 rtdebug!("starting client thread");
675 let mut loop_ = Loop::new();
676 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
677 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
678 rtdebug!("connecting");
679 assert!(status.is_none());
680 let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
681 let buf = slice_to_uv_buf(msg);
682 let msg_cell = Cell::new(msg);
683 do stream_watcher.write(buf) |stream_watcher, status| {
685 assert!(status.is_none());
686 let msg_cell = Cell::new(msg_cell.take());
687 stream_watcher.close(||ignore(msg_cell.take()));
694 let mut loop_ = loop_;
697 client_thread.join();
702 #[ignore(cfg(windows))] // FIXME #8815
704 do run_in_bare_thread() {
705 static MAX: int = 10;
706 let mut loop_ = Loop::new();
707 let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
708 let addr = next_test_ip6();
709 server_tcp_watcher.bind(addr);
711 rtdebug!("listening");
712 do server_tcp_watcher.listen |mut server_stream_watcher, status| {
713 rtdebug!("listened!");
714 assert!(status.is_none());
715 let mut loop_ = loop_;
716 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
717 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
718 server_stream_watcher.accept(client_tcp_watcher);
719 let count_cell = Cell::new(0);
720 let server_stream_watcher = server_stream_watcher;
721 rtdebug!("starting read");
722 let alloc: AllocCallback = |size| {
723 vec_to_uv_buf(vec::from_elem(size, 0u8))
725 do client_tcp_watcher.read_start(alloc)
726 |stream_watcher, nread, buf, status| {
728 rtdebug!("i'm reading!");
729 let buf = vec_from_uv_buf(buf);
730 let mut count = count_cell.take();
731 if status.is_none() {
732 rtdebug!("got %d bytes", nread);
733 let buf = buf.unwrap();
734 let r = buf.slice(0, nread as uint);
735 for byte in r.iter() {
736 assert!(*byte == count as u8);
737 rtdebug!("%u", *byte as uint);
741 assert_eq!(count, MAX);
742 do stream_watcher.close {
743 server_stream_watcher.close(||());
746 count_cell.put_back(count);
750 let client_thread = do Thread::start {
751 rtdebug!("starting client thread");
752 let mut loop_ = Loop::new();
753 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
754 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
755 rtdebug!("connecting");
756 assert!(status.is_none());
757 let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
758 let buf = slice_to_uv_buf(msg);
759 let msg_cell = Cell::new(msg);
760 do stream_watcher.write(buf) |stream_watcher, status| {
762 assert!(status.is_none());
763 let msg_cell = Cell::new(msg_cell.take());
764 stream_watcher.close(||ignore(msg_cell.take()));
771 let mut loop_ = loop_;
774 client_thread.join();
779 #[ignore(cfg(windows))] // FIXME #8815
781 do run_in_bare_thread() {
782 static MAX: int = 10;
783 let mut loop_ = Loop::new();
784 let server_addr = next_test_ip4();
785 let client_addr = next_test_ip4();
787 let mut server = UdpWatcher::new(&loop_);
788 assert!(server.bind(server_addr).is_ok());
790 rtdebug!("starting read");
791 let alloc: AllocCallback = |size| {
792 vec_to_uv_buf(vec::from_elem(size, 0u8))
795 do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
797 rtdebug!("i'm reading!");
798 assert!(status.is_none());
799 assert_eq!(flags, 0);
800 assert_eq!(src, client_addr);
802 let buf = vec_from_uv_buf(buf);
804 rtdebug!("got %d bytes", nread);
806 let buf = buf.unwrap();
807 for &byte in buf.slice(0, nread as uint).iter() {
808 assert!(byte == count as u8);
809 rtdebug!("%u", byte as uint);
812 assert_eq!(count, MAX);
817 let thread = do Thread::start {
818 let mut loop_ = Loop::new();
819 let mut client = UdpWatcher::new(&loop_);
820 assert!(client.bind(client_addr).is_ok());
821 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
822 let buf = slice_to_uv_buf(msg);
823 do client.send(buf, server_addr) |client, status| {
825 assert!(status.is_none());
840 #[ignore(cfg(windows))] // FIXME #8815
842 do run_in_bare_thread() {
843 static MAX: int = 10;
844 let mut loop_ = Loop::new();
845 let server_addr = next_test_ip6();
846 let client_addr = next_test_ip6();
848 let mut server = UdpWatcher::new(&loop_);
849 assert!(server.bind(server_addr).is_ok());
851 rtdebug!("starting read");
852 let alloc: AllocCallback = |size| {
853 vec_to_uv_buf(vec::from_elem(size, 0u8))
856 do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
858 rtdebug!("i'm reading!");
859 assert!(status.is_none());
860 assert_eq!(flags, 0);
861 assert_eq!(src, client_addr);
863 let buf = vec_from_uv_buf(buf);
865 rtdebug!("got %d bytes", nread);
867 let buf = buf.unwrap();
868 for &byte in buf.slice(0, nread as uint).iter() {
869 assert!(byte == count as u8);
870 rtdebug!("%u", byte as uint);
873 assert_eq!(count, MAX);
878 let thread = do Thread::start {
879 let mut loop_ = Loop::new();
880 let mut client = UdpWatcher::new(&loop_);
881 assert!(client.bind(client_addr).is_ok());
882 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
883 let buf = slice_to_uv_buf(msg);
884 do client.send(buf, server_addr) |client, status| {
886 assert!(status.is_none());