1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char};
14 use std::rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
18 use super::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback,
19 UdpSendCallback, Loop, Watcher, Request, UvError, Buf, NativeHandle,
20 status_to_maybe_uv_error, empty_buf};
22 pub struct UvAddrInfo(*uvll::addrinfo);
24 pub enum UvSocketAddr {
25 UvIpv4SocketAddr(*sockaddr_in),
26 UvIpv6SocketAddr(*sockaddr_in6),
29 pub fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
31 assert!((is_ip4_addr(addr) || is_ip6_addr(addr)));
32 assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr)));
34 _ if is_ip4_addr(addr) => UvIpv4SocketAddr(addr as *uvll::sockaddr_in),
35 _ if is_ip6_addr(addr) => UvIpv6SocketAddr(addr as *uvll::sockaddr_in6),
41 fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
42 let malloc = match addr.ip {
43 Ipv4Addr(*) => malloc_ip4_addr,
44 Ipv6Addr(*) => malloc_ip6_addr,
46 let wrap = match addr.ip {
47 Ipv4Addr(*) => UvIpv4SocketAddr,
48 Ipv6Addr(*) => UvIpv6SocketAddr,
50 let free = match addr.ip {
51 Ipv4Addr(*) => free_ip4_addr,
52 Ipv6Addr(*) => free_ip6_addr,
55 let addr = unsafe { malloc(addr.ip.to_str(), addr.port as int) };
59 unsafe { free(addr) };
63 fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
64 let ip_size = match addr {
65 UvIpv4SocketAddr(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/,
66 UvIpv6SocketAddr(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/,
69 let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
71 let buf_ptr = vec::raw::to_ptr(buf);
73 UvIpv4SocketAddr(addr) =>
74 uvll::uv_ip4_name(addr, buf_ptr as *c_char, ip_size as size_t),
75 UvIpv6SocketAddr(addr) =>
76 uvll::uv_ip6_name(addr, buf_ptr as *c_char, ip_size as size_t),
81 let ip_port = unsafe {
82 let port = match addr {
83 UvIpv4SocketAddr(addr) => uvll::ip4_port(addr),
84 UvIpv6SocketAddr(addr) => uvll::ip6_port(addr),
88 let ip_str = str::from_utf8_slice(ip_name).trim_right_chars(&'\x00');
89 let ip_addr = FromStr::from_str(ip_str).unwrap();
91 // finally run the closure
92 f(SocketAddr { ip: ip_addr, port: ip_port })
95 pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
97 uv_socket_addr_as_socket_addr(addr, util::id)
102 fn test_ip4_conversion() {
104 let ip4 = rt::test::next_test_ip4();
105 assert_eq!(ip4, socket_addr_as_uv_socket_addr(ip4, uv_socket_addr_to_socket_addr));
110 fn test_ip6_conversion() {
112 let ip6 = rt::test::next_test_ip6();
113 assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
116 // uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
118 pub struct StreamWatcher(*uvll::uv_stream_t);
119 impl Watcher for StreamWatcher { }
122 pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
124 match uvll::uv_read_start(self.native_handle(), alloc_cb, read_cb) {
126 let data = self.get_watcher_data();
127 data.alloc_cb = Some(alloc);
128 data.read_cb = Some(cb);
131 cb(*self, 0, empty_buf(), Some(UvError(n)))
136 extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
137 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
138 let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref();
139 return (*alloc_cb)(suggested_size as uint);
142 extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) {
143 uvdebug!("buf addr: {}", buf.base);
144 uvdebug!("buf len: {}", buf.len);
145 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
146 let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
147 let status = status_to_maybe_uv_error(nread as c_int);
148 (*cb)(stream_watcher, nread as int, buf, status);
152 pub fn read_stop(&mut self) {
153 // It would be nice to drop the alloc and read callbacks here,
154 // but read_stop may be called from inside one of them and we
155 // would end up freeing the in-use environment
156 let handle = self.native_handle();
157 unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); }
160 pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
161 let req = WriteRequest::new();
163 match uvll::uv_write(req.native_handle(), self.native_handle(),
166 let data = self.get_watcher_data();
167 assert!(data.write_cb.is_none());
168 data.write_cb = Some(cb);
172 cb(*self, Some(UvError(n)))
177 extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
178 let write_request: WriteRequest = NativeHandle::from_native_handle(req);
179 let mut stream_watcher = write_request.stream();
180 write_request.delete();
181 let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap();
182 let status = status_to_maybe_uv_error(status);
183 cb(stream_watcher, status);
188 pub fn listen(&mut self, cb: ConnectionCallback) -> Result<(), UvError> {
190 let data = self.get_watcher_data();
191 assert!(data.connect_cb.is_none());
192 data.connect_cb = Some(cb);
196 static BACKLOG: c_int = 128; // XXX should be configurable
197 match uvll::uv_listen(self.native_handle(), BACKLOG, connection_cb) {
203 extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
204 uvdebug!("connection_cb");
205 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
206 let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
207 let status = status_to_maybe_uv_error(status);
208 (*cb)(stream_watcher, status);
212 pub fn accept(&mut self, stream: StreamWatcher) {
213 let self_handle = self.native_handle() as *c_void;
214 let stream_handle = stream.native_handle() as *c_void;
215 assert_eq!(0, unsafe { uvll::uv_accept(self_handle, stream_handle) } );
219 impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
220 fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
221 StreamWatcher(handle)
223 fn native_handle(&self) -> *uvll::uv_stream_t {
224 match self { &StreamWatcher(ptr) => ptr }
228 pub struct TcpWatcher(*uvll::uv_tcp_t);
229 impl Watcher for TcpWatcher { }
232 pub fn new(loop_: &Loop) -> TcpWatcher {
234 let handle = malloc_handle(UV_TCP);
235 assert!(handle.is_not_null());
236 assert_eq!(0, uvll::uv_tcp_init(loop_.native_handle(), handle));
237 let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
238 watcher.install_watcher_data();
243 pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
244 do socket_addr_as_uv_socket_addr(address) |addr| {
245 let result = unsafe {
247 UvIpv4SocketAddr(addr) => uvll::tcp_bind(self.native_handle(), addr),
248 UvIpv6SocketAddr(addr) => uvll::tcp_bind6(self.native_handle(), addr),
253 _ => Err(UvError(result)),
258 pub fn connect(&mut self, address: SocketAddr, cb: ConnectionCallback) {
260 assert!(self.get_watcher_data().connect_cb.is_none());
261 self.get_watcher_data().connect_cb = Some(cb);
263 let connect_handle = ConnectRequest::new().native_handle();
264 uvdebug!("connect_t: {}", connect_handle);
265 do socket_addr_as_uv_socket_addr(address) |addr| {
266 let result = match addr {
267 UvIpv4SocketAddr(addr) => uvll::tcp_connect(connect_handle,
268 self.native_handle(), addr, connect_cb),
269 UvIpv6SocketAddr(addr) => uvll::tcp_connect6(connect_handle,
270 self.native_handle(), addr, connect_cb),
272 assert_eq!(0, result);
275 extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
276 uvdebug!("connect_t: {}", req);
277 let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
278 let mut stream_watcher = connect_request.stream();
279 connect_request.delete();
280 let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
281 let status = status_to_maybe_uv_error(status);
282 cb(stream_watcher, status);
287 pub fn as_stream(&self) -> StreamWatcher {
288 NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t)
292 impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
293 fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
296 fn native_handle(&self) -> *uvll::uv_tcp_t {
297 match self { &TcpWatcher(ptr) => ptr }
301 pub struct UdpWatcher(*uvll::uv_udp_t);
302 impl Watcher for UdpWatcher { }
305 pub fn new(loop_: &Loop) -> UdpWatcher {
307 let handle = malloc_handle(UV_UDP);
308 assert!(handle.is_not_null());
309 assert_eq!(0, uvll::uv_udp_init(loop_.native_handle(), handle));
310 let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
311 watcher.install_watcher_data();
316 pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
317 do socket_addr_as_uv_socket_addr(address) |addr| {
318 let result = unsafe {
320 UvIpv4SocketAddr(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32),
321 UvIpv6SocketAddr(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32),
326 _ => Err(UvError(result)),
331 pub fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) {
333 let data = self.get_watcher_data();
334 data.alloc_cb = Some(alloc);
335 data.udp_recv_cb = Some(cb);
338 unsafe { uvll::uv_udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
340 extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
341 let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
342 let alloc_cb = udp_watcher.get_watcher_data().alloc_cb.get_ref();
343 return (*alloc_cb)(suggested_size as uint);
346 extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
347 addr: *uvll::sockaddr, flags: c_uint) {
348 // When there's no data to read the recv callback can be a no-op.
349 // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
350 // this we just drop back to kqueue and wait for the next callback.
355 uvdebug!("buf addr: {}", buf.base);
356 uvdebug!("buf len: {}", buf.len);
357 let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
358 let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
359 let status = status_to_maybe_uv_error(nread as c_int);
360 let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
361 (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
365 pub fn recv_stop(&mut self) {
366 unsafe { uvll::uv_udp_recv_stop(self.native_handle()); }
369 pub fn send(&mut self, buf: Buf, address: SocketAddr, cb: UdpSendCallback) {
371 let data = self.get_watcher_data();
372 assert!(data.udp_send_cb.is_none());
373 data.udp_send_cb = Some(cb);
376 let req = UdpSendRequest::new();
377 do socket_addr_as_uv_socket_addr(address) |addr| {
378 let result = unsafe {
380 UvIpv4SocketAddr(addr) => uvll::udp_send(req.native_handle(),
381 self.native_handle(), [buf], addr, send_cb),
382 UvIpv6SocketAddr(addr) => uvll::udp_send6(req.native_handle(),
383 self.native_handle(), [buf], addr, send_cb),
386 assert_eq!(0, result);
389 extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
390 let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
391 let mut udp_watcher = send_request.handle();
392 send_request.delete();
393 let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap();
394 let status = status_to_maybe_uv_error(status);
395 cb(udp_watcher, status);
400 impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
401 fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
404 fn native_handle(&self) -> *uvll::uv_udp_t {
405 match self { &UdpWatcher(ptr) => ptr }
409 // uv_connect_t is a subclass of uv_req_t
410 pub struct ConnectRequest(*uvll::uv_connect_t);
411 impl Request for ConnectRequest { }
413 impl ConnectRequest {
415 pub fn new() -> ConnectRequest {
416 let connect_handle = unsafe { malloc_req(UV_CONNECT) };
417 assert!(connect_handle.is_not_null());
418 ConnectRequest(connect_handle as *uvll::uv_connect_t)
421 fn stream(&self) -> StreamWatcher {
423 let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle());
424 NativeHandle::from_native_handle(stream_handle)
429 unsafe { free_req(self.native_handle() as *c_void) }
433 impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
434 fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
435 ConnectRequest(handle)
437 fn native_handle(&self) -> *uvll::uv_connect_t {
438 match self { &ConnectRequest(ptr) => ptr }
442 pub struct WriteRequest(*uvll::uv_write_t);
444 impl Request for WriteRequest { }
447 pub fn new() -> WriteRequest {
448 let write_handle = unsafe { malloc_req(UV_WRITE) };
449 assert!(write_handle.is_not_null());
450 WriteRequest(write_handle as *uvll::uv_write_t)
453 pub fn stream(&self) -> StreamWatcher {
455 let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle());
456 NativeHandle::from_native_handle(stream_handle)
460 pub fn delete(self) {
461 unsafe { free_req(self.native_handle() as *c_void) }
465 impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
466 fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
469 fn native_handle(&self) -> *uvll::uv_write_t {
470 match self { &WriteRequest(ptr) => ptr }
474 pub struct UdpSendRequest(*uvll::uv_udp_send_t);
475 impl Request for UdpSendRequest { }
477 impl UdpSendRequest {
478 pub fn new() -> UdpSendRequest {
479 let send_handle = unsafe { malloc_req(UV_UDP_SEND) };
480 assert!(send_handle.is_not_null());
481 UdpSendRequest(send_handle as *uvll::uv_udp_send_t)
484 pub fn handle(&self) -> UdpWatcher {
485 let send_request_handle = unsafe {
486 uvll::get_udp_handle_from_send_req(self.native_handle())
488 NativeHandle::from_native_handle(send_request_handle)
491 pub fn delete(self) {
492 unsafe { free_req(self.native_handle() as *c_void) }
496 impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
497 fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
498 UdpSendRequest(handle)
500 fn native_handle(&self) -> *uvll::uv_udp_send_t {
501 match self { &UdpSendRequest(ptr) => ptr }
508 use std::util::ignore;
511 use std::unstable::run_in_bare_thread;
512 use std::rt::thread::Thread;
513 use std::rt::test::*;
514 use super::super::{Loop, AllocCallback};
515 use super::super::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
518 fn connect_close_ip4() {
519 do run_in_bare_thread() {
520 let mut loop_ = Loop::new();
521 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
522 // Connect to a port where nobody is listening
523 let addr = next_test_ip4();
524 do tcp_watcher.connect(addr) |stream_watcher, status| {
525 uvdebug!("tcp_watcher.connect!");
526 assert!(status.is_some());
527 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
528 stream_watcher.close(||());
536 fn connect_close_ip6() {
537 do run_in_bare_thread() {
538 let mut loop_ = Loop::new();
539 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
540 // Connect to a port where nobody is listening
541 let addr = next_test_ip6();
542 do tcp_watcher.connect(addr) |stream_watcher, status| {
543 uvdebug!("tcp_watcher.connect!");
544 assert!(status.is_some());
545 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
546 stream_watcher.close(||());
554 fn udp_bind_close_ip4() {
555 do run_in_bare_thread() {
556 let mut loop_ = Loop::new();
557 let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
558 let addr = next_test_ip4();
559 udp_watcher.bind(addr);
560 udp_watcher.close(||());
567 fn udp_bind_close_ip6() {
568 do run_in_bare_thread() {
569 let mut loop_ = Loop::new();
570 let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
571 let addr = next_test_ip6();
572 udp_watcher.bind(addr);
573 udp_watcher.close(||());
581 do run_in_bare_thread() {
582 static MAX: int = 10;
583 let mut loop_ = Loop::new();
584 let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
585 let addr = next_test_ip4();
586 server_tcp_watcher.bind(addr);
588 uvdebug!("listening");
589 let mut stream = server_tcp_watcher.as_stream();
590 let res = do stream.listen |mut server_stream_watcher, status| {
591 uvdebug!("listened!");
592 assert!(status.is_none());
593 let mut loop_ = loop_;
594 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
595 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
596 server_stream_watcher.accept(client_tcp_watcher);
597 let count_cell = Cell::new(0);
598 let server_stream_watcher = server_stream_watcher;
599 uvdebug!("starting read");
600 let alloc: AllocCallback = |size| {
601 vec_to_uv_buf(vec::from_elem(size, 0u8))
603 do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| {
605 uvdebug!("i'm reading!");
606 let buf = vec_from_uv_buf(buf);
607 let mut count = count_cell.take();
608 if status.is_none() {
609 uvdebug!("got {} bytes", nread);
610 let buf = buf.unwrap();
611 for byte in buf.slice(0, nread as uint).iter() {
612 assert!(*byte == count as u8);
613 uvdebug!("{}", *byte as uint);
617 assert_eq!(count, MAX);
618 do stream_watcher.close {
619 server_stream_watcher.close(||());
622 count_cell.put_back(count);
626 assert!(res.is_ok());
628 let client_thread = do Thread::start {
629 uvdebug!("starting client thread");
630 let mut loop_ = Loop::new();
631 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
632 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
633 uvdebug!("connecting");
634 assert!(status.is_none());
635 let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
636 let buf = slice_to_uv_buf(msg);
637 let msg_cell = Cell::new(msg);
638 do stream_watcher.write(buf) |stream_watcher, status| {
640 assert!(status.is_none());
641 let msg_cell = Cell::new(msg_cell.take());
642 stream_watcher.close(||ignore(msg_cell.take()));
649 let mut loop_ = loop_;
652 client_thread.join();
658 do run_in_bare_thread() {
659 static MAX: int = 10;
660 let mut loop_ = Loop::new();
661 let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
662 let addr = next_test_ip6();
663 server_tcp_watcher.bind(addr);
665 uvdebug!("listening");
666 let mut stream = server_tcp_watcher.as_stream();
667 let res = do stream.listen |mut server_stream_watcher, status| {
668 uvdebug!("listened!");
669 assert!(status.is_none());
670 let mut loop_ = loop_;
671 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
672 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
673 server_stream_watcher.accept(client_tcp_watcher);
674 let count_cell = Cell::new(0);
675 let server_stream_watcher = server_stream_watcher;
676 uvdebug!("starting read");
677 let alloc: AllocCallback = |size| {
678 vec_to_uv_buf(vec::from_elem(size, 0u8))
680 do client_tcp_watcher.read_start(alloc)
681 |stream_watcher, nread, buf, status| {
683 uvdebug!("i'm reading!");
684 let buf = vec_from_uv_buf(buf);
685 let mut count = count_cell.take();
686 if status.is_none() {
687 uvdebug!("got {} bytes", nread);
688 let buf = buf.unwrap();
689 let r = buf.slice(0, nread as uint);
690 for byte in r.iter() {
691 assert!(*byte == count as u8);
692 uvdebug!("{}", *byte as uint);
696 assert_eq!(count, MAX);
697 do stream_watcher.close {
698 server_stream_watcher.close(||());
701 count_cell.put_back(count);
704 assert!(res.is_ok());
706 let client_thread = do Thread::start {
707 uvdebug!("starting client thread");
708 let mut loop_ = Loop::new();
709 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
710 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
711 uvdebug!("connecting");
712 assert!(status.is_none());
713 let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
714 let buf = slice_to_uv_buf(msg);
715 let msg_cell = Cell::new(msg);
716 do stream_watcher.write(buf) |stream_watcher, status| {
718 assert!(status.is_none());
719 let msg_cell = Cell::new(msg_cell.take());
720 stream_watcher.close(||ignore(msg_cell.take()));
727 let mut loop_ = loop_;
730 client_thread.join();
736 do run_in_bare_thread() {
737 static MAX: int = 10;
738 let mut loop_ = Loop::new();
739 let server_addr = next_test_ip4();
740 let client_addr = next_test_ip4();
742 let mut server = UdpWatcher::new(&loop_);
743 assert!(server.bind(server_addr).is_ok());
745 uvdebug!("starting read");
746 let alloc: AllocCallback = |size| {
747 vec_to_uv_buf(vec::from_elem(size, 0u8))
750 do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
752 uvdebug!("i'm reading!");
753 assert!(status.is_none());
754 assert_eq!(flags, 0);
755 assert_eq!(src, client_addr);
757 let buf = vec_from_uv_buf(buf);
759 uvdebug!("got {} bytes", nread);
761 let buf = buf.unwrap();
762 for &byte in buf.slice(0, nread as uint).iter() {
763 assert!(byte == count as u8);
764 uvdebug!("{}", byte as uint);
767 assert_eq!(count, MAX);
772 let thread = do Thread::start {
773 let mut loop_ = Loop::new();
774 let mut client = UdpWatcher::new(&loop_);
775 assert!(client.bind(client_addr).is_ok());
776 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
777 let buf = slice_to_uv_buf(msg);
778 do client.send(buf, server_addr) |client, status| {
780 assert!(status.is_none());
796 do run_in_bare_thread() {
797 static MAX: int = 10;
798 let mut loop_ = Loop::new();
799 let server_addr = next_test_ip6();
800 let client_addr = next_test_ip6();
802 let mut server = UdpWatcher::new(&loop_);
803 assert!(server.bind(server_addr).is_ok());
805 uvdebug!("starting read");
806 let alloc: AllocCallback = |size| {
807 vec_to_uv_buf(vec::from_elem(size, 0u8))
810 do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
812 uvdebug!("i'm reading!");
813 assert!(status.is_none());
814 assert_eq!(flags, 0);
815 assert_eq!(src, client_addr);
817 let buf = vec_from_uv_buf(buf);
819 uvdebug!("got {} bytes", nread);
821 let buf = buf.unwrap();
822 for &byte in buf.slice(0, nread as uint).iter() {
823 assert!(byte == count as u8);
824 uvdebug!("{}", byte as uint);
827 assert_eq!(count, MAX);
832 let thread = do Thread::start {
833 let mut loop_ = Loop::new();
834 let mut client = UdpWatcher::new(&loop_);
835 assert!(client.bind(client_addr).is_ok());
836 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
837 let buf = slice_to_uv_buf(msg);
838 do client.send(buf, server_addr) |client, status| {
840 assert!(status.is_none());