1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 use std::libc::{size_t, ssize_t, c_int, c_void, c_uint};
14 use std::rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
18 use super::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback,
19 UdpSendCallback, Loop, Watcher, Request, UvError, Buf, NativeHandle,
20 status_to_maybe_uv_error, empty_buf};
22 pub struct UvAddrInfo(*uvll::addrinfo);
24 pub enum UvSocketAddr {
25 UvIpv4SocketAddr(*sockaddr_in),
26 UvIpv6SocketAddr(*sockaddr_in6),
29 pub fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
31 assert!((is_ip4_addr(addr) || is_ip6_addr(addr)));
32 assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr)));
34 _ if is_ip4_addr(addr) => UvIpv4SocketAddr(addr as *uvll::sockaddr_in),
35 _ if is_ip6_addr(addr) => UvIpv6SocketAddr(addr as *uvll::sockaddr_in6),
41 fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
42 let malloc = match addr.ip {
43 Ipv4Addr(*) => malloc_ip4_addr,
44 Ipv6Addr(*) => malloc_ip6_addr,
46 let wrap = match addr.ip {
47 Ipv4Addr(*) => UvIpv4SocketAddr,
48 Ipv6Addr(*) => UvIpv6SocketAddr,
50 let free = match addr.ip {
51 Ipv4Addr(*) => free_ip4_addr,
52 Ipv6Addr(*) => free_ip6_addr,
55 let addr = unsafe { malloc(addr.ip.to_str(), addr.port as int) };
59 unsafe { free(addr) };
63 fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
64 let ip_size = match addr {
65 UvIpv4SocketAddr(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/,
66 UvIpv6SocketAddr(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/,
69 let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
71 let buf_ptr = vec::raw::to_ptr(buf);
73 UvIpv4SocketAddr(addr) => uvll::ip4_name(addr, buf_ptr, ip_size as size_t),
74 UvIpv6SocketAddr(addr) => uvll::ip6_name(addr, buf_ptr, ip_size as size_t),
79 let ip_port = unsafe {
80 let port = match addr {
81 UvIpv4SocketAddr(addr) => uvll::ip4_port(addr),
82 UvIpv6SocketAddr(addr) => uvll::ip6_port(addr),
86 let ip_str = str::from_utf8_slice(ip_name).trim_right_chars(&'\x00');
87 let ip_addr = FromStr::from_str(ip_str).unwrap();
89 // finally run the closure
90 f(SocketAddr { ip: ip_addr, port: ip_port })
93 pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
95 uv_socket_addr_as_socket_addr(addr, util::id)
100 fn test_ip4_conversion() {
102 let ip4 = rt::test::next_test_ip4();
103 assert_eq!(ip4, socket_addr_as_uv_socket_addr(ip4, uv_socket_addr_to_socket_addr));
108 fn test_ip6_conversion() {
110 let ip6 = rt::test::next_test_ip6();
111 assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
114 // uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
116 pub struct StreamWatcher(*uvll::uv_stream_t);
117 impl Watcher for StreamWatcher { }
120 pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
122 match uvll::read_start(self.native_handle(), alloc_cb, read_cb) {
124 let data = self.get_watcher_data();
125 data.alloc_cb = Some(alloc);
126 data.read_cb = Some(cb);
129 cb(*self, 0, empty_buf(), Some(UvError(n)))
134 extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
135 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
136 let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref();
137 return (*alloc_cb)(suggested_size as uint);
140 extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) {
141 uvdebug!("buf addr: {}", buf.base);
142 uvdebug!("buf len: {}", buf.len);
143 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
144 let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
145 let status = status_to_maybe_uv_error(nread as c_int);
146 (*cb)(stream_watcher, nread as int, buf, status);
150 pub fn read_stop(&mut self) {
151 // It would be nice to drop the alloc and read callbacks here,
152 // but read_stop may be called from inside one of them and we
153 // would end up freeing the in-use environment
154 let handle = self.native_handle();
155 unsafe { assert_eq!(uvll::read_stop(handle), 0); }
158 pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
159 let req = WriteRequest::new();
161 match uvll::write(req.native_handle(), self.native_handle(),
164 let data = self.get_watcher_data();
165 assert!(data.write_cb.is_none());
166 data.write_cb = Some(cb);
170 cb(*self, Some(UvError(n)))
175 extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
176 let write_request: WriteRequest = NativeHandle::from_native_handle(req);
177 let mut stream_watcher = write_request.stream();
178 write_request.delete();
179 let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap();
180 let status = status_to_maybe_uv_error(status);
181 cb(stream_watcher, status);
186 pub fn listen(&mut self, cb: ConnectionCallback) -> Result<(), UvError> {
188 let data = self.get_watcher_data();
189 assert!(data.connect_cb.is_none());
190 data.connect_cb = Some(cb);
194 static BACKLOG: c_int = 128; // XXX should be configurable
195 match uvll::listen(self.native_handle(), BACKLOG, connection_cb) {
201 extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
202 uvdebug!("connection_cb");
203 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
204 let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
205 let status = status_to_maybe_uv_error(status);
206 (*cb)(stream_watcher, status);
210 pub fn accept(&mut self, stream: StreamWatcher) {
211 let self_handle = self.native_handle() as *c_void;
212 let stream_handle = stream.native_handle() as *c_void;
213 assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } );
217 impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
218 fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
219 StreamWatcher(handle)
221 fn native_handle(&self) -> *uvll::uv_stream_t {
222 match self { &StreamWatcher(ptr) => ptr }
226 pub struct TcpWatcher(*uvll::uv_tcp_t);
227 impl Watcher for TcpWatcher { }
230 pub fn new(loop_: &Loop) -> TcpWatcher {
232 let handle = malloc_handle(UV_TCP);
233 assert!(handle.is_not_null());
234 assert_eq!(0, uvll::tcp_init(loop_.native_handle(), handle));
235 let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
236 watcher.install_watcher_data();
241 pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
242 do socket_addr_as_uv_socket_addr(address) |addr| {
243 let result = unsafe {
245 UvIpv4SocketAddr(addr) => uvll::tcp_bind(self.native_handle(), addr),
246 UvIpv6SocketAddr(addr) => uvll::tcp_bind6(self.native_handle(), addr),
251 _ => Err(UvError(result)),
256 pub fn connect(&mut self, address: SocketAddr, cb: ConnectionCallback) {
258 assert!(self.get_watcher_data().connect_cb.is_none());
259 self.get_watcher_data().connect_cb = Some(cb);
261 let connect_handle = ConnectRequest::new().native_handle();
262 uvdebug!("connect_t: {}", connect_handle);
263 do socket_addr_as_uv_socket_addr(address) |addr| {
264 let result = match addr {
265 UvIpv4SocketAddr(addr) => uvll::tcp_connect(connect_handle,
266 self.native_handle(), addr, connect_cb),
267 UvIpv6SocketAddr(addr) => uvll::tcp_connect6(connect_handle,
268 self.native_handle(), addr, connect_cb),
270 assert_eq!(0, result);
273 extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
274 uvdebug!("connect_t: {}", req);
275 let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
276 let mut stream_watcher = connect_request.stream();
277 connect_request.delete();
278 let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
279 let status = status_to_maybe_uv_error(status);
280 cb(stream_watcher, status);
285 pub fn as_stream(&self) -> StreamWatcher {
286 NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t)
290 impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
291 fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
294 fn native_handle(&self) -> *uvll::uv_tcp_t {
295 match self { &TcpWatcher(ptr) => ptr }
299 pub struct UdpWatcher(*uvll::uv_udp_t);
300 impl Watcher for UdpWatcher { }
303 pub fn new(loop_: &Loop) -> UdpWatcher {
305 let handle = malloc_handle(UV_UDP);
306 assert!(handle.is_not_null());
307 assert_eq!(0, uvll::udp_init(loop_.native_handle(), handle));
308 let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
309 watcher.install_watcher_data();
314 pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
315 do socket_addr_as_uv_socket_addr(address) |addr| {
316 let result = unsafe {
318 UvIpv4SocketAddr(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32),
319 UvIpv6SocketAddr(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32),
324 _ => Err(UvError(result)),
329 pub fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) {
331 let data = self.get_watcher_data();
332 data.alloc_cb = Some(alloc);
333 data.udp_recv_cb = Some(cb);
336 unsafe { uvll::udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
338 extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
339 let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
340 let alloc_cb = udp_watcher.get_watcher_data().alloc_cb.get_ref();
341 return (*alloc_cb)(suggested_size as uint);
344 extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
345 addr: *uvll::sockaddr, flags: c_uint) {
346 // When there's no data to read the recv callback can be a no-op.
347 // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
348 // this we just drop back to kqueue and wait for the next callback.
353 uvdebug!("buf addr: {}", buf.base);
354 uvdebug!("buf len: {}", buf.len);
355 let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
356 let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
357 let status = status_to_maybe_uv_error(nread as c_int);
358 let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
359 (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
363 pub fn recv_stop(&mut self) {
364 unsafe { uvll::udp_recv_stop(self.native_handle()); }
367 pub fn send(&mut self, buf: Buf, address: SocketAddr, cb: UdpSendCallback) {
369 let data = self.get_watcher_data();
370 assert!(data.udp_send_cb.is_none());
371 data.udp_send_cb = Some(cb);
374 let req = UdpSendRequest::new();
375 do socket_addr_as_uv_socket_addr(address) |addr| {
376 let result = unsafe {
378 UvIpv4SocketAddr(addr) => uvll::udp_send(req.native_handle(),
379 self.native_handle(), [buf], addr, send_cb),
380 UvIpv6SocketAddr(addr) => uvll::udp_send6(req.native_handle(),
381 self.native_handle(), [buf], addr, send_cb),
384 assert_eq!(0, result);
387 extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
388 let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
389 let mut udp_watcher = send_request.handle();
390 send_request.delete();
391 let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap();
392 let status = status_to_maybe_uv_error(status);
393 cb(udp_watcher, status);
398 impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
399 fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
402 fn native_handle(&self) -> *uvll::uv_udp_t {
403 match self { &UdpWatcher(ptr) => ptr }
407 // uv_connect_t is a subclass of uv_req_t
408 pub struct ConnectRequest(*uvll::uv_connect_t);
409 impl Request for ConnectRequest { }
411 impl ConnectRequest {
413 pub fn new() -> ConnectRequest {
414 let connect_handle = unsafe { malloc_req(UV_CONNECT) };
415 assert!(connect_handle.is_not_null());
416 ConnectRequest(connect_handle as *uvll::uv_connect_t)
419 fn stream(&self) -> StreamWatcher {
421 let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle());
422 NativeHandle::from_native_handle(stream_handle)
427 unsafe { free_req(self.native_handle() as *c_void) }
431 impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
432 fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
433 ConnectRequest(handle)
435 fn native_handle(&self) -> *uvll::uv_connect_t {
436 match self { &ConnectRequest(ptr) => ptr }
440 pub struct WriteRequest(*uvll::uv_write_t);
442 impl Request for WriteRequest { }
445 pub fn new() -> WriteRequest {
446 let write_handle = unsafe { malloc_req(UV_WRITE) };
447 assert!(write_handle.is_not_null());
448 WriteRequest(write_handle as *uvll::uv_write_t)
451 pub fn stream(&self) -> StreamWatcher {
453 let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle());
454 NativeHandle::from_native_handle(stream_handle)
458 pub fn delete(self) {
459 unsafe { free_req(self.native_handle() as *c_void) }
463 impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
464 fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
467 fn native_handle(&self) -> *uvll::uv_write_t {
468 match self { &WriteRequest(ptr) => ptr }
472 pub struct UdpSendRequest(*uvll::uv_udp_send_t);
473 impl Request for UdpSendRequest { }
475 impl UdpSendRequest {
476 pub fn new() -> UdpSendRequest {
477 let send_handle = unsafe { malloc_req(UV_UDP_SEND) };
478 assert!(send_handle.is_not_null());
479 UdpSendRequest(send_handle as *uvll::uv_udp_send_t)
482 pub fn handle(&self) -> UdpWatcher {
483 let send_request_handle = unsafe {
484 uvll::get_udp_handle_from_send_req(self.native_handle())
486 NativeHandle::from_native_handle(send_request_handle)
489 pub fn delete(self) {
490 unsafe { free_req(self.native_handle() as *c_void) }
494 impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
495 fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
496 UdpSendRequest(handle)
498 fn native_handle(&self) -> *uvll::uv_udp_send_t {
499 match self { &UdpSendRequest(ptr) => ptr }
506 use std::util::ignore;
509 use std::unstable::run_in_bare_thread;
510 use std::rt::thread::Thread;
511 use std::rt::test::*;
512 use super::super::{Loop, AllocCallback};
513 use super::super::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
516 fn connect_close_ip4() {
517 do run_in_bare_thread() {
518 let mut loop_ = Loop::new();
519 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
520 // Connect to a port where nobody is listening
521 let addr = next_test_ip4();
522 do tcp_watcher.connect(addr) |stream_watcher, status| {
523 uvdebug!("tcp_watcher.connect!");
524 assert!(status.is_some());
525 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
526 stream_watcher.close(||());
534 fn connect_close_ip6() {
535 do run_in_bare_thread() {
536 let mut loop_ = Loop::new();
537 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
538 // Connect to a port where nobody is listening
539 let addr = next_test_ip6();
540 do tcp_watcher.connect(addr) |stream_watcher, status| {
541 uvdebug!("tcp_watcher.connect!");
542 assert!(status.is_some());
543 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
544 stream_watcher.close(||());
552 fn udp_bind_close_ip4() {
553 do run_in_bare_thread() {
554 let mut loop_ = Loop::new();
555 let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
556 let addr = next_test_ip4();
557 udp_watcher.bind(addr);
558 udp_watcher.close(||());
565 fn udp_bind_close_ip6() {
566 do run_in_bare_thread() {
567 let mut loop_ = Loop::new();
568 let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
569 let addr = next_test_ip6();
570 udp_watcher.bind(addr);
571 udp_watcher.close(||());
579 do run_in_bare_thread() {
580 static MAX: int = 10;
581 let mut loop_ = Loop::new();
582 let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
583 let addr = next_test_ip4();
584 server_tcp_watcher.bind(addr);
586 uvdebug!("listening");
587 let mut stream = server_tcp_watcher.as_stream();
588 let res = do stream.listen |mut server_stream_watcher, status| {
589 uvdebug!("listened!");
590 assert!(status.is_none());
591 let mut loop_ = loop_;
592 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
593 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
594 server_stream_watcher.accept(client_tcp_watcher);
595 let count_cell = Cell::new(0);
596 let server_stream_watcher = server_stream_watcher;
597 uvdebug!("starting read");
598 let alloc: AllocCallback = |size| {
599 vec_to_uv_buf(vec::from_elem(size, 0u8))
601 do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| {
603 uvdebug!("i'm reading!");
604 let buf = vec_from_uv_buf(buf);
605 let mut count = count_cell.take();
606 if status.is_none() {
607 uvdebug!("got {} bytes", nread);
608 let buf = buf.unwrap();
609 for byte in buf.slice(0, nread as uint).iter() {
610 assert!(*byte == count as u8);
611 uvdebug!("{}", *byte as uint);
615 assert_eq!(count, MAX);
616 do stream_watcher.close {
617 server_stream_watcher.close(||());
620 count_cell.put_back(count);
624 assert!(res.is_ok());
626 let client_thread = do Thread::start {
627 uvdebug!("starting client thread");
628 let mut loop_ = Loop::new();
629 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
630 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
631 uvdebug!("connecting");
632 assert!(status.is_none());
633 let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
634 let buf = slice_to_uv_buf(msg);
635 let msg_cell = Cell::new(msg);
636 do stream_watcher.write(buf) |stream_watcher, status| {
638 assert!(status.is_none());
639 let msg_cell = Cell::new(msg_cell.take());
640 stream_watcher.close(||ignore(msg_cell.take()));
647 let mut loop_ = loop_;
650 client_thread.join();
656 do run_in_bare_thread() {
657 static MAX: int = 10;
658 let mut loop_ = Loop::new();
659 let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
660 let addr = next_test_ip6();
661 server_tcp_watcher.bind(addr);
663 uvdebug!("listening");
664 let mut stream = server_tcp_watcher.as_stream();
665 let res = do stream.listen |mut server_stream_watcher, status| {
666 uvdebug!("listened!");
667 assert!(status.is_none());
668 let mut loop_ = loop_;
669 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
670 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
671 server_stream_watcher.accept(client_tcp_watcher);
672 let count_cell = Cell::new(0);
673 let server_stream_watcher = server_stream_watcher;
674 uvdebug!("starting read");
675 let alloc: AllocCallback = |size| {
676 vec_to_uv_buf(vec::from_elem(size, 0u8))
678 do client_tcp_watcher.read_start(alloc)
679 |stream_watcher, nread, buf, status| {
681 uvdebug!("i'm reading!");
682 let buf = vec_from_uv_buf(buf);
683 let mut count = count_cell.take();
684 if status.is_none() {
685 uvdebug!("got {} bytes", nread);
686 let buf = buf.unwrap();
687 let r = buf.slice(0, nread as uint);
688 for byte in r.iter() {
689 assert!(*byte == count as u8);
690 uvdebug!("{}", *byte as uint);
694 assert_eq!(count, MAX);
695 do stream_watcher.close {
696 server_stream_watcher.close(||());
699 count_cell.put_back(count);
702 assert!(res.is_ok());
704 let client_thread = do Thread::start {
705 uvdebug!("starting client thread");
706 let mut loop_ = Loop::new();
707 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
708 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
709 uvdebug!("connecting");
710 assert!(status.is_none());
711 let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
712 let buf = slice_to_uv_buf(msg);
713 let msg_cell = Cell::new(msg);
714 do stream_watcher.write(buf) |stream_watcher, status| {
716 assert!(status.is_none());
717 let msg_cell = Cell::new(msg_cell.take());
718 stream_watcher.close(||ignore(msg_cell.take()));
725 let mut loop_ = loop_;
728 client_thread.join();
734 do run_in_bare_thread() {
735 static MAX: int = 10;
736 let mut loop_ = Loop::new();
737 let server_addr = next_test_ip4();
738 let client_addr = next_test_ip4();
740 let mut server = UdpWatcher::new(&loop_);
741 assert!(server.bind(server_addr).is_ok());
743 uvdebug!("starting read");
744 let alloc: AllocCallback = |size| {
745 vec_to_uv_buf(vec::from_elem(size, 0u8))
748 do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
750 uvdebug!("i'm reading!");
751 assert!(status.is_none());
752 assert_eq!(flags, 0);
753 assert_eq!(src, client_addr);
755 let buf = vec_from_uv_buf(buf);
757 uvdebug!("got {} bytes", nread);
759 let buf = buf.unwrap();
760 for &byte in buf.slice(0, nread as uint).iter() {
761 assert!(byte == count as u8);
762 uvdebug!("{}", byte as uint);
765 assert_eq!(count, MAX);
770 let thread = do Thread::start {
771 let mut loop_ = Loop::new();
772 let mut client = UdpWatcher::new(&loop_);
773 assert!(client.bind(client_addr).is_ok());
774 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
775 let buf = slice_to_uv_buf(msg);
776 do client.send(buf, server_addr) |client, status| {
778 assert!(status.is_none());
794 do run_in_bare_thread() {
795 static MAX: int = 10;
796 let mut loop_ = Loop::new();
797 let server_addr = next_test_ip6();
798 let client_addr = next_test_ip6();
800 let mut server = UdpWatcher::new(&loop_);
801 assert!(server.bind(server_addr).is_ok());
803 uvdebug!("starting read");
804 let alloc: AllocCallback = |size| {
805 vec_to_uv_buf(vec::from_elem(size, 0u8))
808 do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
810 uvdebug!("i'm reading!");
811 assert!(status.is_none());
812 assert_eq!(flags, 0);
813 assert_eq!(src, client_addr);
815 let buf = vec_from_uv_buf(buf);
817 uvdebug!("got {} bytes", nread);
819 let buf = buf.unwrap();
820 for &byte in buf.slice(0, nread as uint).iter() {
821 assert!(byte == count as u8);
822 uvdebug!("{}", byte as uint);
825 assert_eq!(count, MAX);
830 let thread = do Thread::start {
831 let mut loop_ = Loop::new();
832 let mut client = UdpWatcher::new(&loop_);
833 assert!(client.bind(client_addr).is_ok());
834 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
835 let buf = slice_to_uv_buf(msg);
836 do client.send(buf, server_addr) |client, status| {
838 assert!(status.is_none());