1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
12 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
15 use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, UdpSendCallback};
16 use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback,
17 status_to_maybe_uv_error};
18 use rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
19 use rt::uv::last_uv_error;
22 use from_str::{FromStr};
24 pub enum UvSocketAddr {
25 UvIpv4SocketAddr(*sockaddr_in),
26 UvIpv6SocketAddr(*sockaddr_in6),
29 fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
31 assert!((is_ip4_addr(addr) || is_ip6_addr(addr)));
32 assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr)));
34 _ if is_ip4_addr(addr) => UvIpv4SocketAddr(addr as *uvll::sockaddr_in),
35 _ if is_ip6_addr(addr) => UvIpv6SocketAddr(addr as *uvll::sockaddr_in6),
41 fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
42 let malloc = match addr.ip {
43 Ipv4Addr(*) => malloc_ip4_addr,
44 Ipv6Addr(*) => malloc_ip6_addr,
46 let wrap = match addr.ip {
47 Ipv4Addr(*) => UvIpv4SocketAddr,
48 Ipv6Addr(*) => UvIpv6SocketAddr,
50 let free = match addr.ip {
51 Ipv4Addr(*) => free_ip4_addr,
52 Ipv6Addr(*) => free_ip6_addr,
55 let addr = unsafe { malloc(addr.ip.to_str(), addr.port as int) };
59 unsafe { free(addr) };
63 fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
64 let ip_size = match addr {
65 UvIpv4SocketAddr(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/,
66 UvIpv6SocketAddr(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/,
69 let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
71 let buf_ptr = vec::raw::to_ptr(buf);
73 UvIpv4SocketAddr(addr) => uvll::ip4_name(addr, buf_ptr, ip_size as size_t),
74 UvIpv6SocketAddr(addr) => uvll::ip6_name(addr, buf_ptr, ip_size as size_t),
79 let ip_port = unsafe {
80 let port = match addr {
81 UvIpv4SocketAddr(addr) => uvll::ip4_port(addr),
82 UvIpv6SocketAddr(addr) => uvll::ip6_port(addr),
86 let ip_str = str::from_bytes_slice(ip_name).trim_right_chars(&'\x00');
87 let ip_addr = FromStr::from_str(ip_str).unwrap();
89 // finally run the closure
90 f(SocketAddr { ip: ip_addr, port: ip_port })
93 pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
95 uv_socket_addr_as_socket_addr(addr, util::id)
100 fn test_ip4_conversion() {
102 let ip4 = rt::test::next_test_ip4();
103 assert_eq!(ip4, socket_addr_as_uv_socket_addr(ip4, uv_socket_addr_to_socket_addr));
108 fn test_ip6_conversion() {
110 let ip6 = rt::test::next_test_ip6();
111 assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
114 // uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
116 pub struct StreamWatcher(*uvll::uv_stream_t);
117 impl Watcher for StreamWatcher { }
120 pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
122 let data = self.get_watcher_data();
123 data.alloc_cb = Some(alloc);
124 data.read_cb = Some(cb);
127 unsafe { uvll::read_start(self.native_handle(), alloc_cb, read_cb); }
129 extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
130 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
131 let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref();
132 return (*alloc_cb)(suggested_size as uint);
135 extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) {
136 rtdebug!("buf addr: %x", buf.base as uint);
137 rtdebug!("buf len: %d", buf.len as int);
138 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
139 let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
140 let status = status_to_maybe_uv_error(stream_watcher, nread as c_int);
141 (*cb)(stream_watcher, nread as int, buf, status);
145 pub fn read_stop(&mut self) {
146 // It would be nice to drop the alloc and read callbacks here,
147 // but read_stop may be called from inside one of them and we
148 // would end up freeing the in-use environment
149 let handle = self.native_handle();
150 unsafe { uvll::read_stop(handle); }
153 pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
155 let data = self.get_watcher_data();
156 assert!(data.write_cb.is_none());
157 data.write_cb = Some(cb);
160 let req = WriteRequest::new();
162 assert_eq!(0, uvll::write(req.native_handle(), self.native_handle(), [buf], write_cb));
165 extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
166 let write_request: WriteRequest = NativeHandle::from_native_handle(req);
167 let mut stream_watcher = write_request.stream();
168 write_request.delete();
169 let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap();
170 let status = status_to_maybe_uv_error(stream_watcher, status);
171 cb(stream_watcher, status);
175 pub fn accept(&mut self, stream: StreamWatcher) {
176 let self_handle = self.native_handle() as *c_void;
177 let stream_handle = stream.native_handle() as *c_void;
178 assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } );
181 pub fn close(self, cb: NullCallback) {
184 let data = this.get_watcher_data();
185 assert!(data.close_cb.is_none());
186 data.close_cb = Some(cb);
189 unsafe { uvll::close(self.native_handle(), close_cb); }
191 extern fn close_cb(handle: *uvll::uv_stream_t) {
192 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
193 let cb = stream_watcher.get_watcher_data().close_cb.take_unwrap();
194 stream_watcher.drop_watcher_data();
195 unsafe { free_handle(handle as *c_void) }
201 impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
202 fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
203 StreamWatcher(handle)
205 fn native_handle(&self) -> *uvll::uv_stream_t {
206 match self { &StreamWatcher(ptr) => ptr }
210 pub struct TcpWatcher(*uvll::uv_tcp_t);
211 impl Watcher for TcpWatcher { }
214 pub fn new(loop_: &Loop) -> TcpWatcher {
216 let handle = malloc_handle(UV_TCP);
217 assert!(handle.is_not_null());
218 assert_eq!(0, uvll::tcp_init(loop_.native_handle(), handle));
219 let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
220 watcher.install_watcher_data();
225 pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
226 do socket_addr_as_uv_socket_addr(address) |addr| {
227 let result = unsafe {
229 UvIpv4SocketAddr(addr) => uvll::tcp_bind(self.native_handle(), addr),
230 UvIpv6SocketAddr(addr) => uvll::tcp_bind6(self.native_handle(), addr),
235 _ => Err(last_uv_error(self)),
240 pub fn connect(&mut self, address: SocketAddr, cb: ConnectionCallback) {
242 assert!(self.get_watcher_data().connect_cb.is_none());
243 self.get_watcher_data().connect_cb = Some(cb);
245 let connect_handle = ConnectRequest::new().native_handle();
246 rtdebug!("connect_t: %x", connect_handle as uint);
247 do socket_addr_as_uv_socket_addr(address) |addr| {
248 let result = match addr {
249 UvIpv4SocketAddr(addr) => uvll::tcp_connect(connect_handle,
250 self.native_handle(), addr, connect_cb),
251 UvIpv6SocketAddr(addr) => uvll::tcp_connect6(connect_handle,
252 self.native_handle(), addr, connect_cb),
254 assert_eq!(0, result);
257 extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
258 rtdebug!("connect_t: %x", req as uint);
259 let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
260 let mut stream_watcher = connect_request.stream();
261 connect_request.delete();
262 let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
263 let status = status_to_maybe_uv_error(stream_watcher, status);
264 cb(stream_watcher, status);
269 pub fn listen(&mut self, cb: ConnectionCallback) {
271 let data = self.get_watcher_data();
272 assert!(data.connect_cb.is_none());
273 data.connect_cb = Some(cb);
277 static BACKLOG: c_int = 128; // XXX should be configurable
278 // XXX: This can probably fail
279 assert_eq!(0, uvll::listen(self.native_handle(), BACKLOG, connection_cb));
282 extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
283 rtdebug!("connection_cb");
284 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
285 let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
286 let status = status_to_maybe_uv_error(stream_watcher, status);
287 (*cb)(stream_watcher, status);
291 pub fn as_stream(&self) -> StreamWatcher {
292 NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t)
296 impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
297 fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
300 fn native_handle(&self) -> *uvll::uv_tcp_t {
301 match self { &TcpWatcher(ptr) => ptr }
305 pub struct UdpWatcher(*uvll::uv_udp_t);
306 impl Watcher for UdpWatcher { }
309 pub fn new(loop_: &Loop) -> UdpWatcher {
311 let handle = malloc_handle(UV_UDP);
312 assert!(handle.is_not_null());
313 assert_eq!(0, uvll::udp_init(loop_.native_handle(), handle));
314 let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
315 watcher.install_watcher_data();
320 pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
321 do socket_addr_as_uv_socket_addr(address) |addr| {
322 let result = unsafe {
324 UvIpv4SocketAddr(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32),
325 UvIpv6SocketAddr(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32),
330 _ => Err(last_uv_error(self)),
335 pub fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) {
337 let data = self.get_watcher_data();
338 data.alloc_cb = Some(alloc);
339 data.udp_recv_cb = Some(cb);
342 unsafe { uvll::udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
344 extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
345 let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
346 let alloc_cb = udp_watcher.get_watcher_data().alloc_cb.get_ref();
347 return (*alloc_cb)(suggested_size as uint);
350 extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
351 addr: *uvll::sockaddr, flags: c_uint) {
352 // When there's no data to read the recv callback can be a no-op.
353 // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
354 // this we just drop back to kqueue and wait for the next callback.
359 rtdebug!("buf addr: %x", buf.base as uint);
360 rtdebug!("buf len: %d", buf.len as int);
361 let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
362 let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
363 let status = status_to_maybe_uv_error(udp_watcher, nread as c_int);
364 let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
365 (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
369 pub fn recv_stop(&mut self) {
370 unsafe { uvll::udp_recv_stop(self.native_handle()); }
373 pub fn send(&mut self, buf: Buf, address: SocketAddr, cb: UdpSendCallback) {
375 let data = self.get_watcher_data();
376 assert!(data.udp_send_cb.is_none());
377 data.udp_send_cb = Some(cb);
380 let req = UdpSendRequest::new();
381 do socket_addr_as_uv_socket_addr(address) |addr| {
382 let result = unsafe {
384 UvIpv4SocketAddr(addr) => uvll::udp_send(req.native_handle(),
385 self.native_handle(), [buf], addr, send_cb),
386 UvIpv6SocketAddr(addr) => uvll::udp_send6(req.native_handle(),
387 self.native_handle(), [buf], addr, send_cb),
390 assert_eq!(0, result);
393 extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
394 let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
395 let mut udp_watcher = send_request.handle();
396 send_request.delete();
397 let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap();
398 let status = status_to_maybe_uv_error(udp_watcher, status);
399 cb(udp_watcher, status);
403 pub fn close(self, cb: NullCallback) {
406 let data = this.get_watcher_data();
407 assert!(data.close_cb.is_none());
408 data.close_cb = Some(cb);
411 unsafe { uvll::close(self.native_handle(), close_cb); }
413 extern fn close_cb(handle: *uvll::uv_udp_t) {
414 let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
415 let cb = udp_watcher.get_watcher_data().close_cb.take_unwrap();
416 udp_watcher.drop_watcher_data();
417 unsafe { free_handle(handle as *c_void) }
423 impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
424 fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
427 fn native_handle(&self) -> *uvll::uv_udp_t {
428 match self { &UdpWatcher(ptr) => ptr }
432 // uv_connect_t is a subclass of uv_req_t
433 struct ConnectRequest(*uvll::uv_connect_t);
434 impl Request for ConnectRequest { }
436 impl ConnectRequest {
438 fn new() -> ConnectRequest {
439 let connect_handle = unsafe { malloc_req(UV_CONNECT) };
440 assert!(connect_handle.is_not_null());
441 ConnectRequest(connect_handle as *uvll::uv_connect_t)
444 fn stream(&self) -> StreamWatcher {
446 let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle());
447 NativeHandle::from_native_handle(stream_handle)
452 unsafe { free_req(self.native_handle() as *c_void) }
456 impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
457 fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
458 ConnectRequest(handle)
460 fn native_handle(&self) -> *uvll::uv_connect_t {
461 match self { &ConnectRequest(ptr) => ptr }
465 pub struct WriteRequest(*uvll::uv_write_t);
467 impl Request for WriteRequest { }
470 pub fn new() -> WriteRequest {
471 let write_handle = unsafe { malloc_req(UV_WRITE) };
472 assert!(write_handle.is_not_null());
473 WriteRequest(write_handle as *uvll::uv_write_t)
476 pub fn stream(&self) -> StreamWatcher {
478 let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle());
479 NativeHandle::from_native_handle(stream_handle)
483 pub fn delete(self) {
484 unsafe { free_req(self.native_handle() as *c_void) }
488 impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
489 fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
492 fn native_handle(&self) -> *uvll::uv_write_t {
493 match self { &WriteRequest(ptr) => ptr }
497 pub struct UdpSendRequest(*uvll::uv_udp_send_t);
498 impl Request for UdpSendRequest { }
500 impl UdpSendRequest {
501 pub fn new() -> UdpSendRequest {
502 let send_handle = unsafe { malloc_req(UV_UDP_SEND) };
503 assert!(send_handle.is_not_null());
504 UdpSendRequest(send_handle as *uvll::uv_udp_send_t)
507 pub fn handle(&self) -> UdpWatcher {
508 let send_request_handle = unsafe {
509 uvll::get_udp_handle_from_send_req(self.native_handle())
511 NativeHandle::from_native_handle(send_request_handle)
514 pub fn delete(self) {
515 unsafe { free_req(self.native_handle() as *c_void) }
519 impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
520 fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
521 UdpSendRequest(handle)
523 fn native_handle(&self) -> *uvll::uv_udp_send_t {
524 match self { &UdpSendRequest(ptr) => ptr }
534 use unstable::run_in_bare_thread;
535 use rt::thread::Thread;
537 use rt::uv::{Loop, AllocCallback};
538 use rt::uv::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
542 fn connect_close_ip4() {
543 do run_in_bare_thread() {
544 let mut loop_ = Loop::new();
545 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
546 // Connect to a port where nobody is listening
547 let addr = next_test_ip4();
548 do tcp_watcher.connect(addr) |stream_watcher, status| {
549 rtdebug!("tcp_watcher.connect!");
550 assert!(status.is_some());
551 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
552 stream_watcher.close(||());
560 fn connect_close_ip6() {
561 do run_in_bare_thread() {
562 let mut loop_ = Loop::new();
563 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
564 // Connect to a port where nobody is listening
565 let addr = next_test_ip6();
566 do tcp_watcher.connect(addr) |stream_watcher, status| {
567 rtdebug!("tcp_watcher.connect!");
568 assert!(status.is_some());
569 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
570 stream_watcher.close(||());
578 fn udp_bind_close_ip4() {
579 do run_in_bare_thread() {
580 let mut loop_ = Loop::new();
581 let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
582 let addr = next_test_ip4();
583 udp_watcher.bind(addr);
584 udp_watcher.close(||());
591 fn udp_bind_close_ip6() {
592 do run_in_bare_thread() {
593 let mut loop_ = Loop::new();
594 let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
595 let addr = next_test_ip6();
596 udp_watcher.bind(addr);
597 udp_watcher.close(||());
605 do run_in_bare_thread() {
606 static MAX: int = 10;
607 let mut loop_ = Loop::new();
608 let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
609 let addr = next_test_ip4();
610 server_tcp_watcher.bind(addr);
612 rtdebug!("listening");
613 do server_tcp_watcher.listen |mut server_stream_watcher, status| {
614 rtdebug!("listened!");
615 assert!(status.is_none());
616 let mut loop_ = loop_;
617 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
618 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
619 server_stream_watcher.accept(client_tcp_watcher);
620 let count_cell = Cell::new(0);
621 let server_stream_watcher = server_stream_watcher;
622 rtdebug!("starting read");
623 let alloc: AllocCallback = |size| {
624 vec_to_uv_buf(vec::from_elem(size, 0u8))
626 do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| {
628 rtdebug!("i'm reading!");
629 let buf = vec_from_uv_buf(buf);
630 let mut count = count_cell.take();
631 if status.is_none() {
632 rtdebug!("got %d bytes", nread);
633 let buf = buf.unwrap();
634 for byte in buf.slice(0, nread as uint).iter() {
635 assert!(*byte == count as u8);
636 rtdebug!("%u", *byte as uint);
640 assert_eq!(count, MAX);
641 do stream_watcher.close {
642 server_stream_watcher.close(||());
645 count_cell.put_back(count);
649 let client_thread = do Thread::start {
650 rtdebug!("starting client thread");
651 let mut loop_ = Loop::new();
652 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
653 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
654 rtdebug!("connecting");
655 assert!(status.is_none());
656 let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
657 let buf = slice_to_uv_buf(msg);
658 let msg_cell = Cell::new(msg);
659 do stream_watcher.write(buf) |stream_watcher, status| {
661 assert!(status.is_none());
662 let msg_cell = Cell::new(msg_cell.take());
663 stream_watcher.close(||ignore(msg_cell.take()));
670 let mut loop_ = loop_;
673 client_thread.join();
679 do run_in_bare_thread() {
680 static MAX: int = 10;
681 let mut loop_ = Loop::new();
682 let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
683 let addr = next_test_ip6();
684 server_tcp_watcher.bind(addr);
686 rtdebug!("listening");
687 do server_tcp_watcher.listen |mut server_stream_watcher, status| {
688 rtdebug!("listened!");
689 assert!(status.is_none());
690 let mut loop_ = loop_;
691 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
692 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
693 server_stream_watcher.accept(client_tcp_watcher);
694 let count_cell = Cell::new(0);
695 let server_stream_watcher = server_stream_watcher;
696 rtdebug!("starting read");
697 let alloc: AllocCallback = |size| {
698 vec_to_uv_buf(vec::from_elem(size, 0u8))
700 do client_tcp_watcher.read_start(alloc)
701 |stream_watcher, nread, buf, status| {
703 rtdebug!("i'm reading!");
704 let buf = vec_from_uv_buf(buf);
705 let mut count = count_cell.take();
706 if status.is_none() {
707 rtdebug!("got %d bytes", nread);
708 let buf = buf.unwrap();
709 let r = buf.slice(0, nread as uint);
710 for byte in r.iter() {
711 assert!(*byte == count as u8);
712 rtdebug!("%u", *byte as uint);
716 assert_eq!(count, MAX);
717 do stream_watcher.close {
718 server_stream_watcher.close(||());
721 count_cell.put_back(count);
725 let client_thread = do Thread::start {
726 rtdebug!("starting client thread");
727 let mut loop_ = Loop::new();
728 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
729 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
730 rtdebug!("connecting");
731 assert!(status.is_none());
732 let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
733 let buf = slice_to_uv_buf(msg);
734 let msg_cell = Cell::new(msg);
735 do stream_watcher.write(buf) |stream_watcher, status| {
737 assert!(status.is_none());
738 let msg_cell = Cell::new(msg_cell.take());
739 stream_watcher.close(||ignore(msg_cell.take()));
746 let mut loop_ = loop_;
749 client_thread.join();
755 do run_in_bare_thread() {
756 static MAX: int = 10;
757 let mut loop_ = Loop::new();
758 let server_addr = next_test_ip4();
759 let client_addr = next_test_ip4();
761 let mut server = UdpWatcher::new(&loop_);
762 assert!(server.bind(server_addr).is_ok());
764 rtdebug!("starting read");
765 let alloc: AllocCallback = |size| {
766 vec_to_uv_buf(vec::from_elem(size, 0u8))
769 do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
771 rtdebug!("i'm reading!");
772 assert!(status.is_none());
773 assert_eq!(flags, 0);
774 assert_eq!(src, client_addr);
776 let buf = vec_from_uv_buf(buf);
778 rtdebug!("got %d bytes", nread);
780 let buf = buf.unwrap();
781 for &byte in buf.slice(0, nread as uint).iter() {
782 assert!(byte == count as u8);
783 rtdebug!("%u", byte as uint);
786 assert_eq!(count, MAX);
791 let thread = do Thread::start {
792 let mut loop_ = Loop::new();
793 let mut client = UdpWatcher::new(&loop_);
794 assert!(client.bind(client_addr).is_ok());
795 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
796 let buf = slice_to_uv_buf(msg);
797 do client.send(buf, server_addr) |client, status| {
799 assert!(status.is_none());
815 do run_in_bare_thread() {
816 static MAX: int = 10;
817 let mut loop_ = Loop::new();
818 let server_addr = next_test_ip6();
819 let client_addr = next_test_ip6();
821 let mut server = UdpWatcher::new(&loop_);
822 assert!(server.bind(server_addr).is_ok());
824 rtdebug!("starting read");
825 let alloc: AllocCallback = |size| {
826 vec_to_uv_buf(vec::from_elem(size, 0u8))
829 do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
831 rtdebug!("i'm reading!");
832 assert!(status.is_none());
833 assert_eq!(flags, 0);
834 assert_eq!(src, client_addr);
836 let buf = vec_from_uv_buf(buf);
838 rtdebug!("got %d bytes", nread);
840 let buf = buf.unwrap();
841 for &byte in buf.slice(0, nread as uint).iter() {
842 assert!(byte == count as u8);
843 rtdebug!("%u", byte as uint);
846 assert_eq!(count, MAX);
851 let thread = do Thread::start {
852 let mut loop_ = Loop::new();
853 let mut client = UdpWatcher::new(&loop_);
854 assert!(client.bind(client_addr).is_ok());
855 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
856 let buf = slice_to_uv_buf(msg);
857 do client.send(buf, server_addr) |client, status| {
859 assert!(status.is_none());