1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
12 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
15 use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, UdpSendCallback};
16 use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback,
17 status_to_maybe_uv_error};
18 use rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
19 use rt::uv::last_uv_error;
22 use from_str::{FromStr};
24 pub struct UvAddrInfo(*uvll::addrinfo);
26 pub enum UvSocketAddr {
27 UvIpv4SocketAddr(*sockaddr_in),
28 UvIpv6SocketAddr(*sockaddr_in6),
31 fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
33 assert!((is_ip4_addr(addr) || is_ip6_addr(addr)));
34 assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr)));
36 _ if is_ip4_addr(addr) => UvIpv4SocketAddr(addr as *uvll::sockaddr_in),
37 _ if is_ip6_addr(addr) => UvIpv6SocketAddr(addr as *uvll::sockaddr_in6),
43 fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
44 let malloc = match addr.ip {
45 Ipv4Addr(*) => malloc_ip4_addr,
46 Ipv6Addr(*) => malloc_ip6_addr,
48 let wrap = match addr.ip {
49 Ipv4Addr(*) => UvIpv4SocketAddr,
50 Ipv6Addr(*) => UvIpv6SocketAddr,
52 let free = match addr.ip {
53 Ipv4Addr(*) => free_ip4_addr,
54 Ipv6Addr(*) => free_ip6_addr,
57 let addr = unsafe { malloc(addr.ip.to_str(), addr.port as int) };
61 unsafe { free(addr) };
65 fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
66 let ip_size = match addr {
67 UvIpv4SocketAddr(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/,
68 UvIpv6SocketAddr(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/,
71 let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
73 let buf_ptr = vec::raw::to_ptr(buf);
75 UvIpv4SocketAddr(addr) => uvll::ip4_name(addr, buf_ptr, ip_size as size_t),
76 UvIpv6SocketAddr(addr) => uvll::ip6_name(addr, buf_ptr, ip_size as size_t),
81 let ip_port = unsafe {
82 let port = match addr {
83 UvIpv4SocketAddr(addr) => uvll::ip4_port(addr),
84 UvIpv6SocketAddr(addr) => uvll::ip6_port(addr),
88 let ip_str = str::from_utf8_slice(ip_name).trim_right_chars(&'\x00');
89 let ip_addr = FromStr::from_str(ip_str).unwrap();
91 // finally run the closure
92 f(SocketAddr { ip: ip_addr, port: ip_port })
95 pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
97 uv_socket_addr_as_socket_addr(addr, util::id)
100 // Traverse the addrinfo linked list, producing a vector of Rust socket addresses
101 pub fn accum_sockaddrs(addr: &UvAddrInfo) -> ~[SocketAddr] {
103 let &UvAddrInfo(addr) = addr;
108 let uvaddr = sockaddr_to_UvSocketAddr((*addr).ai_addr);
109 let rustaddr = uv_socket_addr_to_socket_addr(uvaddr);
110 addrs.push(rustaddr);
111 if (*addr).ai_next.is_not_null() {
112 addr = (*addr).ai_next;
124 fn test_ip4_conversion() {
126 let ip4 = rt::test::next_test_ip4();
127 assert_eq!(ip4, socket_addr_as_uv_socket_addr(ip4, uv_socket_addr_to_socket_addr));
132 fn test_ip6_conversion() {
134 let ip6 = rt::test::next_test_ip6();
135 assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
138 // uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
140 pub struct StreamWatcher(*uvll::uv_stream_t);
141 impl Watcher for StreamWatcher { }
144 pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
146 let data = self.get_watcher_data();
147 data.alloc_cb = Some(alloc);
148 data.read_cb = Some(cb);
151 unsafe { uvll::read_start(self.native_handle(), alloc_cb, read_cb); }
153 extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
154 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
155 let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref();
156 return (*alloc_cb)(suggested_size as uint);
159 extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) {
160 rtdebug!("buf addr: %x", buf.base as uint);
161 rtdebug!("buf len: %d", buf.len as int);
162 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
163 let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
164 let status = status_to_maybe_uv_error(stream_watcher, nread as c_int);
165 (*cb)(stream_watcher, nread as int, buf, status);
169 pub fn read_stop(&mut self) {
170 // It would be nice to drop the alloc and read callbacks here,
171 // but read_stop may be called from inside one of them and we
172 // would end up freeing the in-use environment
173 let handle = self.native_handle();
174 unsafe { uvll::read_stop(handle); }
177 pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
179 let data = self.get_watcher_data();
180 assert!(data.write_cb.is_none());
181 data.write_cb = Some(cb);
184 let req = WriteRequest::new();
186 assert_eq!(0, uvll::write(req.native_handle(), self.native_handle(), [buf], write_cb));
189 extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
190 let write_request: WriteRequest = NativeHandle::from_native_handle(req);
191 let mut stream_watcher = write_request.stream();
192 write_request.delete();
193 let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap();
194 let status = status_to_maybe_uv_error(stream_watcher, status);
195 cb(stream_watcher, status);
199 pub fn accept(&mut self, stream: StreamWatcher) {
200 let self_handle = self.native_handle() as *c_void;
201 let stream_handle = stream.native_handle() as *c_void;
202 assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } );
205 pub fn close(self, cb: NullCallback) {
208 let data = this.get_watcher_data();
209 assert!(data.close_cb.is_none());
210 data.close_cb = Some(cb);
213 unsafe { uvll::close(self.native_handle(), close_cb); }
215 extern fn close_cb(handle: *uvll::uv_stream_t) {
216 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
217 let cb = stream_watcher.get_watcher_data().close_cb.take_unwrap();
218 stream_watcher.drop_watcher_data();
219 unsafe { free_handle(handle as *c_void) }
225 impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
226 fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
227 StreamWatcher(handle)
229 fn native_handle(&self) -> *uvll::uv_stream_t {
230 match self { &StreamWatcher(ptr) => ptr }
234 pub struct TcpWatcher(*uvll::uv_tcp_t);
235 impl Watcher for TcpWatcher { }
238 pub fn new(loop_: &Loop) -> TcpWatcher {
240 let handle = malloc_handle(UV_TCP);
241 assert!(handle.is_not_null());
242 assert_eq!(0, uvll::tcp_init(loop_.native_handle(), handle));
243 let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
244 watcher.install_watcher_data();
249 pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
250 do socket_addr_as_uv_socket_addr(address) |addr| {
251 let result = unsafe {
253 UvIpv4SocketAddr(addr) => uvll::tcp_bind(self.native_handle(), addr),
254 UvIpv6SocketAddr(addr) => uvll::tcp_bind6(self.native_handle(), addr),
259 _ => Err(last_uv_error(self)),
264 pub fn connect(&mut self, address: SocketAddr, cb: ConnectionCallback) {
266 assert!(self.get_watcher_data().connect_cb.is_none());
267 self.get_watcher_data().connect_cb = Some(cb);
269 let connect_handle = ConnectRequest::new().native_handle();
270 rtdebug!("connect_t: %x", connect_handle as uint);
271 do socket_addr_as_uv_socket_addr(address) |addr| {
272 let result = match addr {
273 UvIpv4SocketAddr(addr) => uvll::tcp_connect(connect_handle,
274 self.native_handle(), addr, connect_cb),
275 UvIpv6SocketAddr(addr) => uvll::tcp_connect6(connect_handle,
276 self.native_handle(), addr, connect_cb),
278 assert_eq!(0, result);
281 extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
282 rtdebug!("connect_t: %x", req as uint);
283 let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
284 let mut stream_watcher = connect_request.stream();
285 connect_request.delete();
286 let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
287 let status = status_to_maybe_uv_error(stream_watcher, status);
288 cb(stream_watcher, status);
293 pub fn listen(&mut self, cb: ConnectionCallback) {
295 let data = self.get_watcher_data();
296 assert!(data.connect_cb.is_none());
297 data.connect_cb = Some(cb);
301 static BACKLOG: c_int = 128; // XXX should be configurable
302 // XXX: This can probably fail
303 assert_eq!(0, uvll::listen(self.native_handle(), BACKLOG, connection_cb));
306 extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
307 rtdebug!("connection_cb");
308 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
309 let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
310 let status = status_to_maybe_uv_error(stream_watcher, status);
311 (*cb)(stream_watcher, status);
315 pub fn as_stream(&self) -> StreamWatcher {
316 NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t)
320 impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
321 fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
324 fn native_handle(&self) -> *uvll::uv_tcp_t {
325 match self { &TcpWatcher(ptr) => ptr }
329 pub struct UdpWatcher(*uvll::uv_udp_t);
330 impl Watcher for UdpWatcher { }
333 pub fn new(loop_: &Loop) -> UdpWatcher {
335 let handle = malloc_handle(UV_UDP);
336 assert!(handle.is_not_null());
337 assert_eq!(0, uvll::udp_init(loop_.native_handle(), handle));
338 let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
339 watcher.install_watcher_data();
344 pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
345 do socket_addr_as_uv_socket_addr(address) |addr| {
346 let result = unsafe {
348 UvIpv4SocketAddr(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32),
349 UvIpv6SocketAddr(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32),
354 _ => Err(last_uv_error(self)),
359 pub fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) {
361 let data = self.get_watcher_data();
362 data.alloc_cb = Some(alloc);
363 data.udp_recv_cb = Some(cb);
366 unsafe { uvll::udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
368 extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
369 let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
370 let alloc_cb = udp_watcher.get_watcher_data().alloc_cb.get_ref();
371 return (*alloc_cb)(suggested_size as uint);
374 extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
375 addr: *uvll::sockaddr, flags: c_uint) {
376 // When there's no data to read the recv callback can be a no-op.
377 // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
378 // this we just drop back to kqueue and wait for the next callback.
383 rtdebug!("buf addr: %x", buf.base as uint);
384 rtdebug!("buf len: %d", buf.len as int);
385 let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
386 let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
387 let status = status_to_maybe_uv_error(udp_watcher, nread as c_int);
388 let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
389 (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
393 pub fn recv_stop(&mut self) {
394 unsafe { uvll::udp_recv_stop(self.native_handle()); }
397 pub fn send(&mut self, buf: Buf, address: SocketAddr, cb: UdpSendCallback) {
399 let data = self.get_watcher_data();
400 assert!(data.udp_send_cb.is_none());
401 data.udp_send_cb = Some(cb);
404 let req = UdpSendRequest::new();
405 do socket_addr_as_uv_socket_addr(address) |addr| {
406 let result = unsafe {
408 UvIpv4SocketAddr(addr) => uvll::udp_send(req.native_handle(),
409 self.native_handle(), [buf], addr, send_cb),
410 UvIpv6SocketAddr(addr) => uvll::udp_send6(req.native_handle(),
411 self.native_handle(), [buf], addr, send_cb),
414 assert_eq!(0, result);
417 extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
418 let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
419 let mut udp_watcher = send_request.handle();
420 send_request.delete();
421 let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap();
422 let status = status_to_maybe_uv_error(udp_watcher, status);
423 cb(udp_watcher, status);
427 pub fn close(self, cb: NullCallback) {
430 let data = this.get_watcher_data();
431 assert!(data.close_cb.is_none());
432 data.close_cb = Some(cb);
435 unsafe { uvll::close(self.native_handle(), close_cb); }
437 extern fn close_cb(handle: *uvll::uv_udp_t) {
438 let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
439 let cb = udp_watcher.get_watcher_data().close_cb.take_unwrap();
440 udp_watcher.drop_watcher_data();
441 unsafe { free_handle(handle as *c_void) }
447 impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
448 fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
451 fn native_handle(&self) -> *uvll::uv_udp_t {
452 match self { &UdpWatcher(ptr) => ptr }
456 // uv_connect_t is a subclass of uv_req_t
457 struct ConnectRequest(*uvll::uv_connect_t);
458 impl Request for ConnectRequest { }
460 impl ConnectRequest {
462 fn new() -> ConnectRequest {
463 let connect_handle = unsafe { malloc_req(UV_CONNECT) };
464 assert!(connect_handle.is_not_null());
465 ConnectRequest(connect_handle as *uvll::uv_connect_t)
468 fn stream(&self) -> StreamWatcher {
470 let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle());
471 NativeHandle::from_native_handle(stream_handle)
476 unsafe { free_req(self.native_handle() as *c_void) }
480 impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
481 fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
482 ConnectRequest(handle)
484 fn native_handle(&self) -> *uvll::uv_connect_t {
485 match self { &ConnectRequest(ptr) => ptr }
489 pub struct WriteRequest(*uvll::uv_write_t);
491 impl Request for WriteRequest { }
494 pub fn new() -> WriteRequest {
495 let write_handle = unsafe { malloc_req(UV_WRITE) };
496 assert!(write_handle.is_not_null());
497 WriteRequest(write_handle as *uvll::uv_write_t)
500 pub fn stream(&self) -> StreamWatcher {
502 let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle());
503 NativeHandle::from_native_handle(stream_handle)
507 pub fn delete(self) {
508 unsafe { free_req(self.native_handle() as *c_void) }
512 impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
513 fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
516 fn native_handle(&self) -> *uvll::uv_write_t {
517 match self { &WriteRequest(ptr) => ptr }
521 pub struct UdpSendRequest(*uvll::uv_udp_send_t);
522 impl Request for UdpSendRequest { }
524 impl UdpSendRequest {
525 pub fn new() -> UdpSendRequest {
526 let send_handle = unsafe { malloc_req(UV_UDP_SEND) };
527 assert!(send_handle.is_not_null());
528 UdpSendRequest(send_handle as *uvll::uv_udp_send_t)
531 pub fn handle(&self) -> UdpWatcher {
532 let send_request_handle = unsafe {
533 uvll::get_udp_handle_from_send_req(self.native_handle())
535 NativeHandle::from_native_handle(send_request_handle)
538 pub fn delete(self) {
539 unsafe { free_req(self.native_handle() as *c_void) }
543 impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
544 fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
545 UdpSendRequest(handle)
547 fn native_handle(&self) -> *uvll::uv_udp_send_t {
548 match self { &UdpSendRequest(ptr) => ptr }
558 use unstable::run_in_bare_thread;
559 use rt::thread::Thread;
561 use rt::uv::{Loop, AllocCallback};
562 use rt::uv::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
566 fn connect_close_ip4() {
567 do run_in_bare_thread() {
568 let mut loop_ = Loop::new();
569 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
570 // Connect to a port where nobody is listening
571 let addr = next_test_ip4();
572 do tcp_watcher.connect(addr) |stream_watcher, status| {
573 rtdebug!("tcp_watcher.connect!");
574 assert!(status.is_some());
575 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
576 stream_watcher.close(||());
584 fn connect_close_ip6() {
585 do run_in_bare_thread() {
586 let mut loop_ = Loop::new();
587 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
588 // Connect to a port where nobody is listening
589 let addr = next_test_ip6();
590 do tcp_watcher.connect(addr) |stream_watcher, status| {
591 rtdebug!("tcp_watcher.connect!");
592 assert!(status.is_some());
593 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
594 stream_watcher.close(||());
602 fn udp_bind_close_ip4() {
603 do run_in_bare_thread() {
604 let mut loop_ = Loop::new();
605 let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
606 let addr = next_test_ip4();
607 udp_watcher.bind(addr);
608 udp_watcher.close(||());
615 fn udp_bind_close_ip6() {
616 do run_in_bare_thread() {
617 let mut loop_ = Loop::new();
618 let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
619 let addr = next_test_ip6();
620 udp_watcher.bind(addr);
621 udp_watcher.close(||());
628 #[ignore(cfg(windows))] // FIXME #8815
630 do run_in_bare_thread() {
631 static MAX: int = 10;
632 let mut loop_ = Loop::new();
633 let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
634 let addr = next_test_ip4();
635 server_tcp_watcher.bind(addr);
637 rtdebug!("listening");
638 do server_tcp_watcher.listen |mut server_stream_watcher, status| {
639 rtdebug!("listened!");
640 assert!(status.is_none());
641 let mut loop_ = loop_;
642 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
643 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
644 server_stream_watcher.accept(client_tcp_watcher);
645 let count_cell = Cell::new(0);
646 let server_stream_watcher = server_stream_watcher;
647 rtdebug!("starting read");
648 let alloc: AllocCallback = |size| {
649 vec_to_uv_buf(vec::from_elem(size, 0u8))
651 do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| {
653 rtdebug!("i'm reading!");
654 let buf = vec_from_uv_buf(buf);
655 let mut count = count_cell.take();
656 if status.is_none() {
657 rtdebug!("got %d bytes", nread);
658 let buf = buf.unwrap();
659 for byte in buf.slice(0, nread as uint).iter() {
660 assert!(*byte == count as u8);
661 rtdebug!("%u", *byte as uint);
665 assert_eq!(count, MAX);
666 do stream_watcher.close {
667 server_stream_watcher.close(||());
670 count_cell.put_back(count);
674 let client_thread = do Thread::start {
675 rtdebug!("starting client thread");
676 let mut loop_ = Loop::new();
677 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
678 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
679 rtdebug!("connecting");
680 assert!(status.is_none());
681 let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
682 let buf = slice_to_uv_buf(msg);
683 let msg_cell = Cell::new(msg);
684 do stream_watcher.write(buf) |stream_watcher, status| {
686 assert!(status.is_none());
687 let msg_cell = Cell::new(msg_cell.take());
688 stream_watcher.close(||ignore(msg_cell.take()));
695 let mut loop_ = loop_;
698 client_thread.join();
703 #[ignore(cfg(windows))] // FIXME #8815
705 do run_in_bare_thread() {
706 static MAX: int = 10;
707 let mut loop_ = Loop::new();
708 let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
709 let addr = next_test_ip6();
710 server_tcp_watcher.bind(addr);
712 rtdebug!("listening");
713 do server_tcp_watcher.listen |mut server_stream_watcher, status| {
714 rtdebug!("listened!");
715 assert!(status.is_none());
716 let mut loop_ = loop_;
717 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
718 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
719 server_stream_watcher.accept(client_tcp_watcher);
720 let count_cell = Cell::new(0);
721 let server_stream_watcher = server_stream_watcher;
722 rtdebug!("starting read");
723 let alloc: AllocCallback = |size| {
724 vec_to_uv_buf(vec::from_elem(size, 0u8))
726 do client_tcp_watcher.read_start(alloc)
727 |stream_watcher, nread, buf, status| {
729 rtdebug!("i'm reading!");
730 let buf = vec_from_uv_buf(buf);
731 let mut count = count_cell.take();
732 if status.is_none() {
733 rtdebug!("got %d bytes", nread);
734 let buf = buf.unwrap();
735 let r = buf.slice(0, nread as uint);
736 for byte in r.iter() {
737 assert!(*byte == count as u8);
738 rtdebug!("%u", *byte as uint);
742 assert_eq!(count, MAX);
743 do stream_watcher.close {
744 server_stream_watcher.close(||());
747 count_cell.put_back(count);
751 let client_thread = do Thread::start {
752 rtdebug!("starting client thread");
753 let mut loop_ = Loop::new();
754 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
755 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
756 rtdebug!("connecting");
757 assert!(status.is_none());
758 let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
759 let buf = slice_to_uv_buf(msg);
760 let msg_cell = Cell::new(msg);
761 do stream_watcher.write(buf) |stream_watcher, status| {
763 assert!(status.is_none());
764 let msg_cell = Cell::new(msg_cell.take());
765 stream_watcher.close(||ignore(msg_cell.take()));
772 let mut loop_ = loop_;
775 client_thread.join();
780 #[ignore(cfg(windows))] // FIXME #8815
782 do run_in_bare_thread() {
783 static MAX: int = 10;
784 let mut loop_ = Loop::new();
785 let server_addr = next_test_ip4();
786 let client_addr = next_test_ip4();
788 let mut server = UdpWatcher::new(&loop_);
789 assert!(server.bind(server_addr).is_ok());
791 rtdebug!("starting read");
792 let alloc: AllocCallback = |size| {
793 vec_to_uv_buf(vec::from_elem(size, 0u8))
796 do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
798 rtdebug!("i'm reading!");
799 assert!(status.is_none());
800 assert_eq!(flags, 0);
801 assert_eq!(src, client_addr);
803 let buf = vec_from_uv_buf(buf);
805 rtdebug!("got %d bytes", nread);
807 let buf = buf.unwrap();
808 for &byte in buf.slice(0, nread as uint).iter() {
809 assert!(byte == count as u8);
810 rtdebug!("%u", byte as uint);
813 assert_eq!(count, MAX);
818 let thread = do Thread::start {
819 let mut loop_ = Loop::new();
820 let mut client = UdpWatcher::new(&loop_);
821 assert!(client.bind(client_addr).is_ok());
822 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
823 let buf = slice_to_uv_buf(msg);
824 do client.send(buf, server_addr) |client, status| {
826 assert!(status.is_none());
841 #[ignore(cfg(windows))] // FIXME #8815
843 do run_in_bare_thread() {
844 static MAX: int = 10;
845 let mut loop_ = Loop::new();
846 let server_addr = next_test_ip6();
847 let client_addr = next_test_ip6();
849 let mut server = UdpWatcher::new(&loop_);
850 assert!(server.bind(server_addr).is_ok());
852 rtdebug!("starting read");
853 let alloc: AllocCallback = |size| {
854 vec_to_uv_buf(vec::from_elem(size, 0u8))
857 do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
859 rtdebug!("i'm reading!");
860 assert!(status.is_none());
861 assert_eq!(flags, 0);
862 assert_eq!(src, client_addr);
864 let buf = vec_from_uv_buf(buf);
866 rtdebug!("got %d bytes", nread);
868 let buf = buf.unwrap();
869 for &byte in buf.slice(0, nread as uint).iter() {
870 assert!(byte == count as u8);
871 rtdebug!("%u", byte as uint);
874 assert_eq!(count, MAX);
879 let thread = do Thread::start {
880 let mut loop_ = Loop::new();
881 let mut client = UdpWatcher::new(&loop_);
882 assert!(client.bind(client_addr).is_ok());
883 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
884 let buf = slice_to_uv_buf(msg);
885 do client.send(buf, server_addr) |client, status| {
887 assert!(status.is_none());