1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
12 use std::libc::{size_t, ssize_t, c_int, c_void, c_uint, c_char};
14 use std::rt::BlockedTask;
15 use std::rt::io::IoError;
16 use std::rt::io::net::ip::{Ipv4Addr, Ipv6Addr};
17 use std::rt::local::Local;
18 use std::rt::io::net::ip::{SocketAddr, IpAddr};
20 use std::rt::sched::{Scheduler, SchedHandle};
21 use std::rt::tube::Tube;
28 Loop, Request, UvError, Buf, NativeHandle,
30 uv_error_to_io_error, UvHandle, slice_to_uv_buf};
32 use stream::StreamWatcher;
34 ////////////////////////////////////////////////////////////////////////////////
35 /// Generic functions related to dealing with sockaddr things
36 ////////////////////////////////////////////////////////////////////////////////
38 pub enum UvSocketAddr {
39 UvIpv4SocketAddr(*sockaddr_in),
40 UvIpv6SocketAddr(*sockaddr_in6),
43 pub fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
45 assert!((is_ip4_addr(addr) || is_ip6_addr(addr)));
46 assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr)));
48 _ if is_ip4_addr(addr) => UvIpv4SocketAddr(addr as *uvll::sockaddr_in),
49 _ if is_ip6_addr(addr) => UvIpv6SocketAddr(addr as *uvll::sockaddr_in6),
55 fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
56 let malloc = match addr.ip {
57 Ipv4Addr(*) => malloc_ip4_addr,
58 Ipv6Addr(*) => malloc_ip6_addr,
60 let wrap = match addr.ip {
61 Ipv4Addr(*) => UvIpv4SocketAddr,
62 Ipv6Addr(*) => UvIpv6SocketAddr,
64 let free = match addr.ip {
65 Ipv4Addr(*) => free_ip4_addr,
66 Ipv6Addr(*) => free_ip6_addr,
69 let addr = unsafe { malloc(addr.ip.to_str(), addr.port as int) };
73 unsafe { free(addr) };
77 fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
78 let ip_size = match addr {
79 UvIpv4SocketAddr(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/,
80 UvIpv6SocketAddr(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/,
83 let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
85 let buf_ptr = vec::raw::to_ptr(buf);
87 UvIpv4SocketAddr(addr) =>
88 uvll::uv_ip4_name(addr, buf_ptr as *c_char, ip_size as size_t),
89 UvIpv6SocketAddr(addr) =>
90 uvll::uv_ip6_name(addr, buf_ptr as *c_char, ip_size as size_t),
95 let ip_port = unsafe {
96 let port = match addr {
97 UvIpv4SocketAddr(addr) => uvll::ip4_port(addr),
98 UvIpv6SocketAddr(addr) => uvll::ip6_port(addr),
102 let ip_str = str::from_utf8_slice(ip_name).trim_right_chars(&'\x00');
103 let ip_addr = FromStr::from_str(ip_str).unwrap();
105 // finally run the closure
106 f(SocketAddr { ip: ip_addr, port: ip_port })
109 pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
111 uv_socket_addr_as_socket_addr(addr, util::id)
116 fn test_ip4_conversion() {
118 let ip4 = rt::test::next_test_ip4();
119 assert_eq!(ip4, socket_addr_as_uv_socket_addr(ip4, uv_socket_addr_to_socket_addr));
124 fn test_ip6_conversion() {
126 let ip6 = rt::test::next_test_ip6();
127 assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
130 enum SocketNameKind {
136 fn socket_name(sk: SocketNameKind, handle: *c_void) -> Result<SocketAddr, IoError> {
137 let getsockname = match sk {
138 TcpPeer => uvll::tcp_getpeername,
139 Tcp => uvll::tcp_getsockname,
140 Udp => uvll::udp_getsockname,
143 // Allocate a sockaddr_storage
144 // since we don't know if it's ipv4 or ipv6
145 let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
148 getsockname(handle, r_addr as *uvll::sockaddr_storage)
152 return Err(uv_error_to_io_error(UvError(r)));
156 if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
157 uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
159 uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
163 unsafe { uvll::free_sockaddr_storage(r_addr); }
169 ////////////////////////////////////////////////////////////////////////////////
170 /// TCP implementation
171 ////////////////////////////////////////////////////////////////////////////////
173 pub struct TcpWatcher {
174 handle: *uvll::uv_tcp_t,
175 stream: StreamWatcher,
179 pub struct TcpListener {
181 handle: *uvll::uv_pipe_t,
182 priv closing_task: Option<BlockedTask>,
183 priv outgoing: Tube<Result<~rtio::RtioTcpStream, IoError>>,
186 pub struct TcpAcceptor {
187 listener: ~TcpListener,
188 priv incoming: Tube<Result<~rtio::RtioTcpStream, IoError>>,
191 // TCP watchers (clients/streams)
194 pub fn new(loop_: &Loop) -> TcpWatcher {
195 let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
197 uvll::uv_tcp_init(loop_.native_handle(), handle)
200 home: get_handle_to_current_scheduler!(),
202 stream: StreamWatcher::new(handle),
206 pub fn connect(loop_: &mut Loop, address: SocketAddr)
207 -> Result<TcpWatcher, UvError>
209 struct Ctx { status: c_int, task: Option<BlockedTask> }
211 let tcp = TcpWatcher::new(loop_);
212 let ret = do socket_addr_as_uv_socket_addr(address) |addr| {
213 let req = Request::new(uvll::UV_CONNECT);
214 let result = match addr {
215 UvIpv4SocketAddr(addr) => unsafe {
216 uvll::tcp_connect(req.handle, tcp.handle, addr,
219 UvIpv6SocketAddr(addr) => unsafe {
220 uvll::tcp_connect6(req.handle, tcp.handle, addr,
227 let mut cx = Ctx { status: 0, task: None };
228 let scheduler: ~Scheduler = Local::take();
229 do scheduler.deschedule_running_task_and_then |_, task| {
230 cx.task = Some(task);
234 n => Err(UvError(n)),
246 extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
247 let _req = Request::wrap(req);
248 if status == uvll::ECANCELED { return }
249 let cx: &mut Ctx = unsafe {
250 cast::transmute(uvll::get_data_for_req(req))
253 let scheduler: ~Scheduler = Local::take();
254 scheduler.resume_blocked_task_immediately(cx.task.take_unwrap());
259 impl HomingIO for TcpWatcher {
260 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
263 impl rtio::RtioSocket for TcpWatcher {
264 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
265 let _m = self.fire_missiles();
266 socket_name(Tcp, self.handle)
270 impl rtio::RtioTcpStream for TcpWatcher {
271 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
272 let _m = self.fire_missiles();
273 self.stream.read(buf).map_err(uv_error_to_io_error)
276 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
277 let _m = self.fire_missiles();
278 self.stream.write(buf).map_err(uv_error_to_io_error)
281 fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
282 let _m = self.fire_missiles();
283 socket_name(TcpPeer, self.handle)
286 fn control_congestion(&mut self) -> Result<(), IoError> {
287 let _m = self.fire_missiles();
288 status_to_io_result(unsafe {
289 uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
293 fn nodelay(&mut self) -> Result<(), IoError> {
294 let _m = self.fire_missiles();
295 status_to_io_result(unsafe {
296 uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
300 fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
301 let _m = self.fire_missiles();
302 status_to_io_result(unsafe {
303 uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
304 delay_in_seconds as c_uint)
308 fn letdie(&mut self) -> Result<(), IoError> {
309 let _m = self.fire_missiles();
310 status_to_io_result(unsafe {
311 uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
316 impl Drop for TcpWatcher {
318 let _m = self.fire_missiles();
319 self.stream.close(true);
323 // TCP listeners (unbound servers)
326 pub fn bind(loop_: &mut Loop, address: SocketAddr)
327 -> Result<~TcpListener, UvError>
329 let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
331 uvll::uv_tcp_init(loop_.native_handle(), handle)
333 let l = ~TcpListener {
334 home: get_handle_to_current_scheduler!(),
337 outgoing: Tube::new(),
339 let res = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
341 UvIpv4SocketAddr(addr) => uvll::tcp_bind(l.handle, addr),
342 UvIpv6SocketAddr(addr) => uvll::tcp_bind6(l.handle, addr),
346 0 => Ok(l.install()),
352 impl HomingIO for TcpListener {
353 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
356 impl UvHandle<uvll::uv_tcp_t> for TcpListener {
357 fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
360 impl rtio::RtioSocket for TcpListener {
361 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
362 let _m = self.fire_missiles();
363 socket_name(Tcp, self.handle)
367 impl rtio::RtioTcpListener for TcpListener {
368 fn listen(mut ~self) -> Result<~rtio::RtioTcpAcceptor, IoError> {
369 // create the acceptor object from ourselves
370 let incoming = self.outgoing.clone();
371 let mut acceptor = ~TcpAcceptor {
376 let _m = acceptor.fire_missiles();
377 // XXX: the 128 backlog should be configurable
378 match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
379 0 => Ok(acceptor as ~rtio::RtioTcpAcceptor),
380 n => Err(uv_error_to_io_error(UvError(n))),
385 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
386 let msg = match status {
388 let loop_ = NativeHandle::from_native_handle(unsafe {
389 uvll::get_loop_for_uv_handle(server)
391 let client = TcpWatcher::new(&loop_);
392 assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
393 Ok(~client as ~rtio::RtioTcpStream)
395 uvll::ECANCELED => return,
396 n => Err(uv_error_to_io_error(UvError(n)))
399 let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
400 tcp.outgoing.send(msg);
403 impl Drop for TcpListener {
405 let (_m, sched) = self.fire_missiles_sched();
407 do sched.deschedule_running_task_and_then |_, task| {
408 self.closing_task = Some(task);
409 unsafe { uvll::uv_close(self.handle, listener_close_cb) }
414 extern fn listener_close_cb(handle: *uvll::uv_handle_t) {
415 let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&handle) };
416 unsafe { uvll::free_handle(handle) }
418 let sched: ~Scheduler = Local::take();
419 sched.resume_blocked_task_immediately(tcp.closing_task.take_unwrap());
422 // TCP acceptors (bound servers)
424 impl HomingIO for TcpAcceptor {
425 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
428 impl rtio::RtioSocket for TcpAcceptor {
429 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
430 let _m = self.fire_missiles();
431 socket_name(Tcp, self.listener.handle)
435 impl rtio::RtioTcpAcceptor for TcpAcceptor {
436 fn accept(&mut self) -> Result<~rtio::RtioTcpStream, IoError> {
437 let _m = self.fire_missiles();
441 fn accept_simultaneously(&mut self) -> Result<(), IoError> {
442 let _m = self.fire_missiles();
443 status_to_io_result(unsafe {
444 uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
448 fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
449 let _m = self.fire_missiles();
450 status_to_io_result(unsafe {
451 uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
456 ////////////////////////////////////////////////////////////////////////////////
457 /// UDP implementation
458 ////////////////////////////////////////////////////////////////////////////////
460 pub struct UdpWatcher {
461 handle: *uvll::uv_udp_t,
466 pub fn bind(loop_: &Loop, address: SocketAddr)
467 -> Result<UdpWatcher, UvError>
469 let udp = UdpWatcher {
470 handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
471 home: get_handle_to_current_scheduler!(),
474 uvll::uv_udp_init(loop_.native_handle(), udp.handle)
476 let result = socket_addr_as_uv_socket_addr(address, |addr| unsafe {
478 UvIpv4SocketAddr(addr) => uvll::udp_bind(udp.handle, addr, 0u32),
479 UvIpv6SocketAddr(addr) => uvll::udp_bind6(udp.handle, addr, 0u32),
484 n => Err(UvError(n)),
489 impl HomingIO for UdpWatcher {
490 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
493 impl rtio::RtioSocket for UdpWatcher {
494 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
495 let _m = self.fire_missiles();
496 socket_name(Udp, self.handle)
500 impl rtio::RtioUdpSocket for UdpWatcher {
501 fn recvfrom(&mut self, buf: &mut [u8])
502 -> Result<(uint, SocketAddr), IoError>
505 task: Option<BlockedTask>,
507 result: Option<(ssize_t, SocketAddr)>,
509 let _m = self.fire_missiles();
511 return match unsafe {
512 uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
517 buf: Some(slice_to_uv_buf(buf)),
520 unsafe { uvll::set_data_for_uv_handle(self.handle, &cx) }
521 let scheduler: ~Scheduler = Local::take();
522 do scheduler.deschedule_running_task_and_then |_, task| {
523 cx.task = Some(task);
525 match cx.result.take_unwrap() {
527 Err(uv_error_to_io_error(UvError(n as c_int))),
528 (n, addr) => Ok((n as uint, addr))
531 n => Err(uv_error_to_io_error(UvError(n)))
534 extern fn alloc_cb(handle: *uvll::uv_udp_t,
535 _suggested_size: size_t) -> Buf {
536 let cx: &mut Ctx = unsafe {
537 cast::transmute(uvll::get_data_for_uv_handle(handle))
539 cx.buf.take().expect("alloc_cb called more than once")
542 extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, _buf: Buf,
543 addr: *uvll::sockaddr, _flags: c_uint) {
545 // When there's no data to read the recv callback can be a no-op.
546 // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
547 // this we just drop back to kqueue and wait for the next callback.
548 if nread == 0 { return }
549 if nread == uvll::ECANCELED as ssize_t { return }
552 assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
555 let cx: &mut Ctx = unsafe {
556 cast::transmute(uvll::get_data_for_uv_handle(handle))
558 let addr = sockaddr_to_UvSocketAddr(addr);
559 let addr = uv_socket_addr_to_socket_addr(addr);
560 cx.result = Some((nread, addr));
562 let sched: ~Scheduler = Local::take();
563 sched.resume_blocked_task_immediately(cx.task.take_unwrap());
567 fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
568 struct Ctx { task: Option<BlockedTask>, result: c_int }
570 let _m = self.fire_missiles();
572 let req = Request::new(uvll::UV_UDP_SEND);
573 let buf = slice_to_uv_buf(buf);
574 let result = socket_addr_as_uv_socket_addr(dst, |dst| unsafe {
576 UvIpv4SocketAddr(dst) =>
577 uvll::udp_send(req.handle, self.handle, [buf], dst, send_cb),
578 UvIpv6SocketAddr(dst) =>
579 uvll::udp_send6(req.handle, self.handle, [buf], dst, send_cb),
583 return match result {
585 let mut cx = Ctx { task: None, result: 0 };
589 let sched: ~Scheduler = Local::take();
590 do sched.deschedule_running_task_and_then |_, task| {
591 cx.task = Some(task);
596 n => Err(uv_error_to_io_error(UvError(n)))
599 n => Err(uv_error_to_io_error(UvError(n)))
602 extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
603 let req = Request::wrap(req);
604 let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
607 let sched: ~Scheduler = Local::take();
608 sched.resume_blocked_task_immediately(cx.task.take_unwrap());
612 fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
613 let _m = self.fire_missiles();
614 status_to_io_result(unsafe {
615 do multi.to_str().with_c_str |m_addr| {
616 uvll::uv_udp_set_membership(self.handle,
623 fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
624 let _m = self.fire_missiles();
625 status_to_io_result(unsafe {
626 do multi.to_str().with_c_str |m_addr| {
627 uvll::uv_udp_set_membership(self.handle,
629 uvll::UV_LEAVE_GROUP)
634 fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
635 let _m = self.fire_missiles();
636 status_to_io_result(unsafe {
637 uvll::uv_udp_set_multicast_loop(self.handle,
642 fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
643 let _m = self.fire_missiles();
644 status_to_io_result(unsafe {
645 uvll::uv_udp_set_multicast_loop(self.handle,
650 fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
651 let _m = self.fire_missiles();
652 status_to_io_result(unsafe {
653 uvll::uv_udp_set_multicast_ttl(self.handle,
658 fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
659 let _m = self.fire_missiles();
660 status_to_io_result(unsafe {
661 uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
665 fn hear_broadcasts(&mut self) -> Result<(), IoError> {
666 let _m = self.fire_missiles();
667 status_to_io_result(unsafe {
668 uvll::uv_udp_set_broadcast(self.handle,
673 fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
674 let _m = self.fire_missiles();
675 status_to_io_result(unsafe {
676 uvll::uv_udp_set_broadcast(self.handle,
682 impl Drop for UdpWatcher {
684 // Send ourselves home to close this handle (blocking while doing so).
685 let (_m, sched) = self.fire_missiles_sched();
688 uvll::set_data_for_uv_handle(self.handle, &slot);
689 uvll::uv_close(self.handle, close_cb);
691 do sched.deschedule_running_task_and_then |_, task| {
695 extern fn close_cb(handle: *uvll::uv_handle_t) {
696 let slot: &mut Option<BlockedTask> = unsafe {
697 cast::transmute(uvll::get_data_for_uv_handle(handle))
699 let sched: ~Scheduler = Local::take();
700 sched.resume_blocked_task_immediately(slot.take_unwrap());
705 ////////////////////////////////////////////////////////////////////////////////
706 /// UV request support
707 ////////////////////////////////////////////////////////////////////////////////
712 use std::util::ignore;
715 use std::unstable::run_in_bare_thread;
716 use std::rt::thread::Thread;
717 use std::rt::test::*;
718 use super::super::{Loop, AllocCallback};
719 use super::super::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
722 fn connect_close_ip4() {
723 do run_in_bare_thread() {
724 let mut loop_ = Loop::new();
725 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
726 // Connect to a port where nobody is listening
727 let addr = next_test_ip4();
728 do tcp_watcher.connect(addr) |stream_watcher, status| {
729 uvdebug!("tcp_watcher.connect!");
730 assert!(status.is_some());
731 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
732 stream_watcher.close(||());
740 fn connect_close_ip6() {
741 do run_in_bare_thread() {
742 let mut loop_ = Loop::new();
743 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
744 // Connect to a port where nobody is listening
745 let addr = next_test_ip6();
746 do tcp_watcher.connect(addr) |stream_watcher, status| {
747 uvdebug!("tcp_watcher.connect!");
748 assert!(status.is_some());
749 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
750 stream_watcher.close(||());
758 fn udp_bind_close_ip4() {
759 do run_in_bare_thread() {
760 let mut loop_ = Loop::new();
761 let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
762 let addr = next_test_ip4();
763 udp_watcher.bind(addr);
764 udp_watcher.close(||());
771 fn udp_bind_close_ip6() {
772 do run_in_bare_thread() {
773 let mut loop_ = Loop::new();
774 let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
775 let addr = next_test_ip6();
776 udp_watcher.bind(addr);
777 udp_watcher.close(||());
785 do run_in_bare_thread() {
786 static MAX: int = 10;
787 let mut loop_ = Loop::new();
788 let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
789 let addr = next_test_ip4();
790 server_tcp_watcher.bind(addr);
792 uvdebug!("listening");
793 let mut stream = server_tcp_watcher.as_stream();
794 let res = do stream.listen |mut server_stream_watcher, status| {
795 uvdebug!("listened!");
796 assert!(status.is_none());
797 let mut loop_ = loop_;
798 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
799 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
800 server_stream_watcher.accept(client_tcp_watcher);
801 let count_cell = Cell::new(0);
802 let server_stream_watcher = server_stream_watcher;
803 uvdebug!("starting read");
804 let alloc: AllocCallback = |size| {
805 vec_to_uv_buf(vec::from_elem(size, 0u8))
807 do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| {
809 uvdebug!("i'm reading!");
810 let buf = vec_from_uv_buf(buf);
811 let mut count = count_cell.take();
812 if status.is_none() {
813 uvdebug!("got {} bytes", nread);
814 let buf = buf.unwrap();
815 for byte in buf.slice(0, nread as uint).iter() {
816 assert!(*byte == count as u8);
817 uvdebug!("{}", *byte as uint);
821 assert_eq!(count, MAX);
822 do stream_watcher.close {
823 server_stream_watcher.close(||());
826 count_cell.put_back(count);
830 assert!(res.is_ok());
832 let client_thread = do Thread::start {
833 uvdebug!("starting client thread");
834 let mut loop_ = Loop::new();
835 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
836 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
837 uvdebug!("connecting");
838 assert!(status.is_none());
839 let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
840 let buf = slice_to_uv_buf(msg);
841 let msg_cell = Cell::new(msg);
842 do stream_watcher.write(buf) |stream_watcher, status| {
844 assert!(status.is_none());
845 let msg_cell = Cell::new(msg_cell.take());
846 stream_watcher.close(||ignore(msg_cell.take()));
853 let mut loop_ = loop_;
856 client_thread.join();
862 do run_in_bare_thread() {
863 static MAX: int = 10;
864 let mut loop_ = Loop::new();
865 let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
866 let addr = next_test_ip6();
867 server_tcp_watcher.bind(addr);
869 uvdebug!("listening");
870 let mut stream = server_tcp_watcher.as_stream();
871 let res = do stream.listen |mut server_stream_watcher, status| {
872 uvdebug!("listened!");
873 assert!(status.is_none());
874 let mut loop_ = loop_;
875 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
876 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
877 server_stream_watcher.accept(client_tcp_watcher);
878 let count_cell = Cell::new(0);
879 let server_stream_watcher = server_stream_watcher;
880 uvdebug!("starting read");
881 let alloc: AllocCallback = |size| {
882 vec_to_uv_buf(vec::from_elem(size, 0u8))
884 do client_tcp_watcher.read_start(alloc)
885 |stream_watcher, nread, buf, status| {
887 uvdebug!("i'm reading!");
888 let buf = vec_from_uv_buf(buf);
889 let mut count = count_cell.take();
890 if status.is_none() {
891 uvdebug!("got {} bytes", nread);
892 let buf = buf.unwrap();
893 let r = buf.slice(0, nread as uint);
894 for byte in r.iter() {
895 assert!(*byte == count as u8);
896 uvdebug!("{}", *byte as uint);
900 assert_eq!(count, MAX);
901 do stream_watcher.close {
902 server_stream_watcher.close(||());
905 count_cell.put_back(count);
908 assert!(res.is_ok());
910 let client_thread = do Thread::start {
911 uvdebug!("starting client thread");
912 let mut loop_ = Loop::new();
913 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
914 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
915 uvdebug!("connecting");
916 assert!(status.is_none());
917 let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
918 let buf = slice_to_uv_buf(msg);
919 let msg_cell = Cell::new(msg);
920 do stream_watcher.write(buf) |stream_watcher, status| {
922 assert!(status.is_none());
923 let msg_cell = Cell::new(msg_cell.take());
924 stream_watcher.close(||ignore(msg_cell.take()));
931 let mut loop_ = loop_;
934 client_thread.join();
940 do run_in_bare_thread() {
941 static MAX: int = 10;
942 let mut loop_ = Loop::new();
943 let server_addr = next_test_ip4();
944 let client_addr = next_test_ip4();
946 let mut server = UdpWatcher::new(&loop_);
947 assert!(server.bind(server_addr).is_ok());
949 uvdebug!("starting read");
950 let alloc: AllocCallback = |size| {
951 vec_to_uv_buf(vec::from_elem(size, 0u8))
954 do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
956 uvdebug!("i'm reading!");
957 assert!(status.is_none());
958 assert_eq!(flags, 0);
959 assert_eq!(src, client_addr);
961 let buf = vec_from_uv_buf(buf);
963 uvdebug!("got {} bytes", nread);
965 let buf = buf.unwrap();
966 for &byte in buf.slice(0, nread as uint).iter() {
967 assert!(byte == count as u8);
968 uvdebug!("{}", byte as uint);
971 assert_eq!(count, MAX);
976 let thread = do Thread::start {
977 let mut loop_ = Loop::new();
978 let mut client = UdpWatcher::new(&loop_);
979 assert!(client.bind(client_addr).is_ok());
980 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
981 let buf = slice_to_uv_buf(msg);
982 do client.send(buf, server_addr) |client, status| {
984 assert!(status.is_none());
1000 do run_in_bare_thread() {
1001 static MAX: int = 10;
1002 let mut loop_ = Loop::new();
1003 let server_addr = next_test_ip6();
1004 let client_addr = next_test_ip6();
1006 let mut server = UdpWatcher::new(&loop_);
1007 assert!(server.bind(server_addr).is_ok());
1009 uvdebug!("starting read");
1010 let alloc: AllocCallback = |size| {
1011 vec_to_uv_buf(vec::from_elem(size, 0u8))
1014 do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
1016 uvdebug!("i'm reading!");
1017 assert!(status.is_none());
1018 assert_eq!(flags, 0);
1019 assert_eq!(src, client_addr);
1021 let buf = vec_from_uv_buf(buf);
1023 uvdebug!("got {} bytes", nread);
1025 let buf = buf.unwrap();
1026 for &byte in buf.slice(0, nread as uint).iter() {
1027 assert!(byte == count as u8);
1028 uvdebug!("{}", byte as uint);
1031 assert_eq!(count, MAX);
1036 let thread = do Thread::start {
1037 let mut loop_ = Loop::new();
1038 let mut client = UdpWatcher::new(&loop_);
1039 assert!(client.bind(client_addr).is_ok());
1040 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
1041 let buf = slice_to_uv_buf(msg);
1042 do client.send(buf, server_addr) |client, status| {
1043 uvdebug!("writing");
1044 assert!(status.is_none());