1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
12 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
15 use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, UdpSendCallback};
16 use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback,
17 status_to_maybe_uv_error};
18 use rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
21 use from_str::{FromStr};
23 pub enum UvSocketAddr {
24 UvIpv4SocketAddr(*sockaddr_in),
25 UvIpv6SocketAddr(*sockaddr_in6),
28 fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
30 assert!((is_ip4_addr(addr) || is_ip6_addr(addr)));
31 assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr)));
33 _ if is_ip4_addr(addr) => UvIpv4SocketAddr(addr as *uvll::sockaddr_in),
34 _ if is_ip6_addr(addr) => UvIpv6SocketAddr(addr as *uvll::sockaddr_in6),
40 fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
41 let malloc = match addr.ip {
42 Ipv4Addr(*) => malloc_ip4_addr,
43 Ipv6Addr(*) => malloc_ip6_addr,
45 let wrap = match addr.ip {
46 Ipv4Addr(*) => UvIpv4SocketAddr,
47 Ipv6Addr(*) => UvIpv6SocketAddr,
49 let free = match addr.ip {
50 Ipv4Addr(*) => free_ip4_addr,
51 Ipv6Addr(*) => free_ip6_addr,
54 let addr = unsafe { malloc(addr.ip.to_str(), addr.port as int) };
58 unsafe { free(addr) };
62 fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
63 let ip_size = match addr {
64 UvIpv4SocketAddr(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/,
65 UvIpv6SocketAddr(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/,
68 let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
70 let buf_ptr = vec::raw::to_ptr(buf);
72 UvIpv4SocketAddr(addr) => uvll::ip4_name(addr, buf_ptr, ip_size as size_t),
73 UvIpv6SocketAddr(addr) => uvll::ip6_name(addr, buf_ptr, ip_size as size_t),
78 let ip_port = unsafe {
79 let port = match addr {
80 UvIpv4SocketAddr(addr) => uvll::ip4_port(addr),
81 UvIpv6SocketAddr(addr) => uvll::ip6_port(addr),
85 let ip_str = str::from_bytes_slice(ip_name).trim_right_chars(&'\x00');
86 let ip_addr = FromStr::from_str(ip_str).unwrap();
88 // finally run the closure
89 f(SocketAddr { ip: ip_addr, port: ip_port })
92 pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
94 uv_socket_addr_as_socket_addr(addr, util::id)
99 fn test_ip4_conversion() {
101 let ip4 = rt::test::next_test_ip4();
102 assert_eq!(ip4, socket_addr_as_uv_socket_addr(ip4, uv_socket_addr_to_socket_addr));
107 fn test_ip6_conversion() {
109 let ip6 = rt::test::next_test_ip6();
110 assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
113 // uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
115 pub struct StreamWatcher(*uvll::uv_stream_t);
116 impl Watcher for StreamWatcher { }
119 pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
121 let data = self.get_watcher_data();
122 data.alloc_cb = Some(alloc);
123 data.read_cb = Some(cb);
126 unsafe { uvll::read_start(self.native_handle(), alloc_cb, read_cb); }
128 extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
129 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
130 let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref();
131 return (*alloc_cb)(suggested_size as uint);
134 extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) {
135 rtdebug!("buf addr: %x", buf.base as uint);
136 rtdebug!("buf len: %d", buf.len as int);
137 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
138 let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
139 let status = status_to_maybe_uv_error(nread as c_int);
140 (*cb)(stream_watcher, nread as int, buf, status);
144 pub fn read_stop(&mut self) {
145 // It would be nice to drop the alloc and read callbacks here,
146 // but read_stop may be called from inside one of them and we
147 // would end up freeing the in-use environment
148 let handle = self.native_handle();
149 unsafe { uvll::read_stop(handle); }
152 pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
154 let data = self.get_watcher_data();
155 assert!(data.write_cb.is_none());
156 data.write_cb = Some(cb);
159 let req = WriteRequest::new();
161 assert_eq!(0, uvll::write(req.native_handle(), self.native_handle(), [buf], write_cb));
164 extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
165 let write_request: WriteRequest = NativeHandle::from_native_handle(req);
166 let mut stream_watcher = write_request.stream();
167 write_request.delete();
168 let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap();
169 let status = status_to_maybe_uv_error(status);
170 cb(stream_watcher, status);
174 pub fn accept(&mut self, stream: StreamWatcher) {
175 let self_handle = self.native_handle() as *c_void;
176 let stream_handle = stream.native_handle() as *c_void;
177 assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } );
180 pub fn close(self, cb: NullCallback) {
183 let data = this.get_watcher_data();
184 assert!(data.close_cb.is_none());
185 data.close_cb = Some(cb);
188 unsafe { uvll::close(self.native_handle(), close_cb); }
190 extern fn close_cb(handle: *uvll::uv_stream_t) {
191 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
192 let cb = stream_watcher.get_watcher_data().close_cb.take_unwrap();
193 stream_watcher.drop_watcher_data();
194 unsafe { free_handle(handle as *c_void) }
200 impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
201 fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
202 StreamWatcher(handle)
204 fn native_handle(&self) -> *uvll::uv_stream_t {
205 match self { &StreamWatcher(ptr) => ptr }
209 pub struct TcpWatcher(*uvll::uv_tcp_t);
210 impl Watcher for TcpWatcher { }
213 pub fn new(loop_: &Loop) -> TcpWatcher {
215 let handle = malloc_handle(UV_TCP);
216 assert!(handle.is_not_null());
217 assert_eq!(0, uvll::tcp_init(loop_.native_handle(), handle));
218 let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
219 watcher.install_watcher_data();
224 pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
225 do socket_addr_as_uv_socket_addr(address) |addr| {
226 let result = unsafe {
228 UvIpv4SocketAddr(addr) => uvll::tcp_bind(self.native_handle(), addr),
229 UvIpv6SocketAddr(addr) => uvll::tcp_bind6(self.native_handle(), addr),
234 _ => Err(UvError(result)),
239 pub fn connect(&mut self, address: SocketAddr, cb: ConnectionCallback) {
241 assert!(self.get_watcher_data().connect_cb.is_none());
242 self.get_watcher_data().connect_cb = Some(cb);
244 let connect_handle = ConnectRequest::new().native_handle();
245 rtdebug!("connect_t: %x", connect_handle as uint);
246 do socket_addr_as_uv_socket_addr(address) |addr| {
247 let result = match addr {
248 UvIpv4SocketAddr(addr) => uvll::tcp_connect(connect_handle,
249 self.native_handle(), addr, connect_cb),
250 UvIpv6SocketAddr(addr) => uvll::tcp_connect6(connect_handle,
251 self.native_handle(), addr, connect_cb),
253 assert_eq!(0, result);
256 extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
257 rtdebug!("connect_t: %x", req as uint);
258 let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
259 let mut stream_watcher = connect_request.stream();
260 connect_request.delete();
261 let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
262 let status = status_to_maybe_uv_error(status);
263 cb(stream_watcher, status);
268 pub fn listen(&mut self, cb: ConnectionCallback) {
270 let data = self.get_watcher_data();
271 assert!(data.connect_cb.is_none());
272 data.connect_cb = Some(cb);
276 static BACKLOG: c_int = 128; // XXX should be configurable
277 // XXX: This can probably fail
278 assert_eq!(0, uvll::listen(self.native_handle(), BACKLOG, connection_cb));
281 extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
282 rtdebug!("connection_cb");
283 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
284 let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
285 let status = status_to_maybe_uv_error(status);
286 (*cb)(stream_watcher, status);
290 pub fn as_stream(&self) -> StreamWatcher {
291 NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t)
295 impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
296 fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
299 fn native_handle(&self) -> *uvll::uv_tcp_t {
300 match self { &TcpWatcher(ptr) => ptr }
304 pub struct UdpWatcher(*uvll::uv_udp_t);
305 impl Watcher for UdpWatcher { }
308 pub fn new(loop_: &Loop) -> UdpWatcher {
310 let handle = malloc_handle(UV_UDP);
311 assert!(handle.is_not_null());
312 assert_eq!(0, uvll::udp_init(loop_.native_handle(), handle));
313 let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
314 watcher.install_watcher_data();
319 pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
320 do socket_addr_as_uv_socket_addr(address) |addr| {
321 let result = unsafe {
323 UvIpv4SocketAddr(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32),
324 UvIpv6SocketAddr(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32),
329 _ => Err(UvError(result)),
334 pub fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) {
336 let data = self.get_watcher_data();
337 data.alloc_cb = Some(alloc);
338 data.udp_recv_cb = Some(cb);
341 unsafe { uvll::udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
343 extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
344 let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
345 let alloc_cb = udp_watcher.get_watcher_data().alloc_cb.get_ref();
346 return (*alloc_cb)(suggested_size as uint);
349 extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
350 addr: *uvll::sockaddr, flags: c_uint) {
351 // When there's no data to read the recv callback can be a no-op.
352 // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
353 // this we just drop back to kqueue and wait for the next callback.
358 rtdebug!("buf addr: %x", buf.base as uint);
359 rtdebug!("buf len: %d", buf.len as int);
360 let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
361 let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
362 let status = status_to_maybe_uv_error(nread as c_int);
363 let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
364 (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
368 pub fn recv_stop(&mut self) {
369 unsafe { uvll::udp_recv_stop(self.native_handle()); }
372 pub fn send(&mut self, buf: Buf, address: SocketAddr, cb: UdpSendCallback) {
374 let data = self.get_watcher_data();
375 assert!(data.udp_send_cb.is_none());
376 data.udp_send_cb = Some(cb);
379 let req = UdpSendRequest::new();
380 do socket_addr_as_uv_socket_addr(address) |addr| {
381 let result = unsafe {
383 UvIpv4SocketAddr(addr) => uvll::udp_send(req.native_handle(),
384 self.native_handle(), [buf], addr, send_cb),
385 UvIpv6SocketAddr(addr) => uvll::udp_send6(req.native_handle(),
386 self.native_handle(), [buf], addr, send_cb),
389 assert_eq!(0, result);
392 extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
393 let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
394 let mut udp_watcher = send_request.handle();
395 send_request.delete();
396 let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap();
397 let status = status_to_maybe_uv_error(status);
398 cb(udp_watcher, status);
402 pub fn close(self, cb: NullCallback) {
405 let data = this.get_watcher_data();
406 assert!(data.close_cb.is_none());
407 data.close_cb = Some(cb);
410 unsafe { uvll::close(self.native_handle(), close_cb); }
412 extern fn close_cb(handle: *uvll::uv_udp_t) {
413 let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
414 let cb = udp_watcher.get_watcher_data().close_cb.take_unwrap();
415 udp_watcher.drop_watcher_data();
416 unsafe { free_handle(handle as *c_void) }
422 impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
423 fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
426 fn native_handle(&self) -> *uvll::uv_udp_t {
427 match self { &UdpWatcher(ptr) => ptr }
431 // uv_connect_t is a subclass of uv_req_t
432 struct ConnectRequest(*uvll::uv_connect_t);
433 impl Request for ConnectRequest { }
435 impl ConnectRequest {
437 fn new() -> ConnectRequest {
438 let connect_handle = unsafe { malloc_req(UV_CONNECT) };
439 assert!(connect_handle.is_not_null());
440 ConnectRequest(connect_handle as *uvll::uv_connect_t)
443 fn stream(&self) -> StreamWatcher {
445 let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle());
446 NativeHandle::from_native_handle(stream_handle)
451 unsafe { free_req(self.native_handle() as *c_void) }
455 impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
456 fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
457 ConnectRequest(handle)
459 fn native_handle(&self) -> *uvll::uv_connect_t {
460 match self { &ConnectRequest(ptr) => ptr }
464 pub struct WriteRequest(*uvll::uv_write_t);
466 impl Request for WriteRequest { }
469 pub fn new() -> WriteRequest {
470 let write_handle = unsafe { malloc_req(UV_WRITE) };
471 assert!(write_handle.is_not_null());
472 WriteRequest(write_handle as *uvll::uv_write_t)
475 pub fn stream(&self) -> StreamWatcher {
477 let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle());
478 NativeHandle::from_native_handle(stream_handle)
482 pub fn delete(self) {
483 unsafe { free_req(self.native_handle() as *c_void) }
487 impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
488 fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
491 fn native_handle(&self) -> *uvll::uv_write_t {
492 match self { &WriteRequest(ptr) => ptr }
496 pub struct UdpSendRequest(*uvll::uv_udp_send_t);
497 impl Request for UdpSendRequest { }
499 impl UdpSendRequest {
500 pub fn new() -> UdpSendRequest {
501 let send_handle = unsafe { malloc_req(UV_UDP_SEND) };
502 assert!(send_handle.is_not_null());
503 UdpSendRequest(send_handle as *uvll::uv_udp_send_t)
506 pub fn handle(&self) -> UdpWatcher {
507 let send_request_handle = unsafe {
508 uvll::get_udp_handle_from_send_req(self.native_handle())
510 NativeHandle::from_native_handle(send_request_handle)
513 pub fn delete(self) {
514 unsafe { free_req(self.native_handle() as *c_void) }
518 impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
519 fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
520 UdpSendRequest(handle)
522 fn native_handle(&self) -> *uvll::uv_udp_send_t {
523 match self { &UdpSendRequest(ptr) => ptr }
533 use unstable::run_in_bare_thread;
534 use rt::thread::Thread;
536 use rt::uv::{Loop, AllocCallback};
537 use rt::uv::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
541 fn connect_close_ip4() {
542 do run_in_bare_thread() {
543 let mut loop_ = Loop::new();
544 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
545 // Connect to a port where nobody is listening
546 let addr = next_test_ip4();
547 do tcp_watcher.connect(addr) |stream_watcher, status| {
548 rtdebug!("tcp_watcher.connect!");
549 assert!(status.is_some());
550 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
551 stream_watcher.close(||());
559 fn connect_close_ip6() {
560 do run_in_bare_thread() {
561 let mut loop_ = Loop::new();
562 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
563 // Connect to a port where nobody is listening
564 let addr = next_test_ip6();
565 do tcp_watcher.connect(addr) |stream_watcher, status| {
566 rtdebug!("tcp_watcher.connect!");
567 assert!(status.is_some());
568 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
569 stream_watcher.close(||());
577 fn udp_bind_close_ip4() {
578 do run_in_bare_thread() {
579 let mut loop_ = Loop::new();
580 let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
581 let addr = next_test_ip4();
582 udp_watcher.bind(addr);
583 udp_watcher.close(||());
590 fn udp_bind_close_ip6() {
591 do run_in_bare_thread() {
592 let mut loop_ = Loop::new();
593 let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
594 let addr = next_test_ip6();
595 udp_watcher.bind(addr);
596 udp_watcher.close(||());
603 #[ignore(cfg(windows))] // FIXME #8815
605 do run_in_bare_thread() {
606 static MAX: int = 10;
607 let mut loop_ = Loop::new();
608 let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
609 let addr = next_test_ip4();
610 server_tcp_watcher.bind(addr);
612 rtdebug!("listening");
613 do server_tcp_watcher.listen |mut server_stream_watcher, status| {
614 rtdebug!("listened!");
615 assert!(status.is_none());
616 let mut loop_ = loop_;
617 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
618 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
619 server_stream_watcher.accept(client_tcp_watcher);
620 let count_cell = Cell::new(0);
621 let server_stream_watcher = server_stream_watcher;
622 rtdebug!("starting read");
623 let alloc: AllocCallback = |size| {
624 vec_to_uv_buf(vec::from_elem(size, 0u8))
626 do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| {
628 rtdebug!("i'm reading!");
629 let buf = vec_from_uv_buf(buf);
630 let mut count = count_cell.take();
631 if status.is_none() {
632 rtdebug!("got %d bytes", nread);
633 let buf = buf.unwrap();
634 for byte in buf.slice(0, nread as uint).iter() {
635 assert!(*byte == count as u8);
636 rtdebug!("%u", *byte as uint);
640 assert_eq!(count, MAX);
641 do stream_watcher.close {
642 server_stream_watcher.close(||());
645 count_cell.put_back(count);
649 let client_thread = do Thread::start {
650 rtdebug!("starting client thread");
651 let mut loop_ = Loop::new();
652 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
653 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
654 rtdebug!("connecting");
655 assert!(status.is_none());
656 let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
657 let buf = slice_to_uv_buf(msg);
658 let msg_cell = Cell::new(msg);
659 do stream_watcher.write(buf) |stream_watcher, status| {
661 assert!(status.is_none());
662 let msg_cell = Cell::new(msg_cell.take());
663 stream_watcher.close(||ignore(msg_cell.take()));
670 let mut loop_ = loop_;
673 client_thread.join();
678 #[ignore(cfg(windows))] // FIXME #8815
680 do run_in_bare_thread() {
681 static MAX: int = 10;
682 let mut loop_ = Loop::new();
683 let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
684 let addr = next_test_ip6();
685 server_tcp_watcher.bind(addr);
687 rtdebug!("listening");
688 do server_tcp_watcher.listen |mut server_stream_watcher, status| {
689 rtdebug!("listened!");
690 assert!(status.is_none());
691 let mut loop_ = loop_;
692 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
693 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
694 server_stream_watcher.accept(client_tcp_watcher);
695 let count_cell = Cell::new(0);
696 let server_stream_watcher = server_stream_watcher;
697 rtdebug!("starting read");
698 let alloc: AllocCallback = |size| {
699 vec_to_uv_buf(vec::from_elem(size, 0u8))
701 do client_tcp_watcher.read_start(alloc)
702 |stream_watcher, nread, buf, status| {
704 rtdebug!("i'm reading!");
705 let buf = vec_from_uv_buf(buf);
706 let mut count = count_cell.take();
707 if status.is_none() {
708 rtdebug!("got %d bytes", nread);
709 let buf = buf.unwrap();
710 let r = buf.slice(0, nread as uint);
711 for byte in r.iter() {
712 assert!(*byte == count as u8);
713 rtdebug!("%u", *byte as uint);
717 assert_eq!(count, MAX);
718 do stream_watcher.close {
719 server_stream_watcher.close(||());
722 count_cell.put_back(count);
726 let client_thread = do Thread::start {
727 rtdebug!("starting client thread");
728 let mut loop_ = Loop::new();
729 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
730 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
731 rtdebug!("connecting");
732 assert!(status.is_none());
733 let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
734 let buf = slice_to_uv_buf(msg);
735 let msg_cell = Cell::new(msg);
736 do stream_watcher.write(buf) |stream_watcher, status| {
738 assert!(status.is_none());
739 let msg_cell = Cell::new(msg_cell.take());
740 stream_watcher.close(||ignore(msg_cell.take()));
747 let mut loop_ = loop_;
750 client_thread.join();
755 #[ignore(cfg(windows))] // FIXME #8815
757 do run_in_bare_thread() {
758 static MAX: int = 10;
759 let mut loop_ = Loop::new();
760 let server_addr = next_test_ip4();
761 let client_addr = next_test_ip4();
763 let mut server = UdpWatcher::new(&loop_);
764 assert!(server.bind(server_addr).is_ok());
766 rtdebug!("starting read");
767 let alloc: AllocCallback = |size| {
768 vec_to_uv_buf(vec::from_elem(size, 0u8))
771 do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
773 rtdebug!("i'm reading!");
774 assert!(status.is_none());
775 assert_eq!(flags, 0);
776 assert_eq!(src, client_addr);
778 let buf = vec_from_uv_buf(buf);
780 rtdebug!("got %d bytes", nread);
782 let buf = buf.unwrap();
783 for &byte in buf.slice(0, nread as uint).iter() {
784 assert!(byte == count as u8);
785 rtdebug!("%u", byte as uint);
788 assert_eq!(count, MAX);
793 let thread = do Thread::start {
794 let mut loop_ = Loop::new();
795 let mut client = UdpWatcher::new(&loop_);
796 assert!(client.bind(client_addr).is_ok());
797 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
798 let buf = slice_to_uv_buf(msg);
799 do client.send(buf, server_addr) |client, status| {
801 assert!(status.is_none());
816 #[ignore(cfg(windows))] // FIXME #8815
818 do run_in_bare_thread() {
819 static MAX: int = 10;
820 let mut loop_ = Loop::new();
821 let server_addr = next_test_ip6();
822 let client_addr = next_test_ip6();
824 let mut server = UdpWatcher::new(&loop_);
825 assert!(server.bind(server_addr).is_ok());
827 rtdebug!("starting read");
828 let alloc: AllocCallback = |size| {
829 vec_to_uv_buf(vec::from_elem(size, 0u8))
832 do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
834 rtdebug!("i'm reading!");
835 assert!(status.is_none());
836 assert_eq!(flags, 0);
837 assert_eq!(src, client_addr);
839 let buf = vec_from_uv_buf(buf);
841 rtdebug!("got %d bytes", nread);
843 let buf = buf.unwrap();
844 for &byte in buf.slice(0, nread as uint).iter() {
845 assert!(byte == count as u8);
846 rtdebug!("%u", byte as uint);
849 assert_eq!(count, MAX);
854 let thread = do Thread::start {
855 let mut loop_ = Loop::new();
856 let mut client = UdpWatcher::new(&loop_);
857 assert!(client.bind(client_addr).is_ok());
858 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
859 let buf = slice_to_uv_buf(msg);
860 do client.send(buf, server_addr) |client, status| {
862 assert!(status.is_none());