1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
14 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
19 use std::rt::task::BlockedTask;
22 use homing::{HomingIO, HomeHandle};
24 use stream::StreamWatcher;
25 use super::{Loop, Request, UvError, Buf, status_to_io_result,
26 uv_error_to_io_error, UvHandle, slice_to_uv_buf,
27 wait_until_woken_after, wakeup};
28 use timer::TimerWatcher;
29 use uvio::UvIoFactory;
32 ////////////////////////////////////////////////////////////////////////////////
33 /// Generic functions related to dealing with sockaddr things
34 ////////////////////////////////////////////////////////////////////////////////
36 pub fn htons(u: u16) -> u16 { mem::to_be16(u) }
37 pub fn ntohs(u: u16) -> u16 { mem::from_be16(u) }
39 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
40 len: uint) -> ip::SocketAddr {
41 match storage.ss_family as c_int {
43 assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
44 let storage: &libc::sockaddr_in = unsafe {
45 cast::transmute(storage)
47 let addr = storage.sin_addr.s_addr as u32;
48 let a = (addr >> 0) as u8;
49 let b = (addr >> 8) as u8;
50 let c = (addr >> 16) as u8;
51 let d = (addr >> 24) as u8;
53 ip: ip::Ipv4Addr(a, b, c, d),
54 port: ntohs(storage.sin_port),
58 assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
59 let storage: &libc::sockaddr_in6 = unsafe {
60 cast::transmute(storage)
62 let a = ntohs(storage.sin6_addr.s6_addr[0]);
63 let b = ntohs(storage.sin6_addr.s6_addr[1]);
64 let c = ntohs(storage.sin6_addr.s6_addr[2]);
65 let d = ntohs(storage.sin6_addr.s6_addr[3]);
66 let e = ntohs(storage.sin6_addr.s6_addr[4]);
67 let f = ntohs(storage.sin6_addr.s6_addr[5]);
68 let g = ntohs(storage.sin6_addr.s6_addr[6]);
69 let h = ntohs(storage.sin6_addr.s6_addr[7]);
71 ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
72 port: ntohs(storage.sin6_port),
76 fail!("unknown family {}", n);
81 fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
83 let mut storage: libc::sockaddr_storage = mem::init();
84 let len = match addr.ip {
85 ip::Ipv4Addr(a, b, c, d) => {
86 let storage: &mut libc::sockaddr_in =
87 cast::transmute(&mut storage);
88 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
89 (*storage).sin_port = htons(addr.port);
90 (*storage).sin_addr = libc::in_addr {
91 s_addr: (d as u32 << 24) |
96 mem::size_of::<libc::sockaddr_in>()
98 ip::Ipv6Addr(a, b, c, d, e, f, g, h) => {
99 let storage: &mut libc::sockaddr_in6 =
100 cast::transmute(&mut storage);
101 storage.sin6_family = libc::AF_INET6 as libc::sa_family_t;
102 storage.sin6_port = htons(addr.port);
103 storage.sin6_addr = libc::in6_addr {
115 mem::size_of::<libc::sockaddr_in6>()
118 return (storage, len);
122 enum SocketNameKind {
128 fn socket_name(sk: SocketNameKind,
129 handle: *c_void) -> Result<ip::SocketAddr, IoError> {
130 let getsockname = match sk {
131 TcpPeer => uvll::uv_tcp_getpeername,
132 Tcp => uvll::uv_tcp_getsockname,
133 Udp => uvll::uv_udp_getsockname,
136 // Allocate a sockaddr_storage since we don't know if it's ipv4 or ipv6
137 let mut sockaddr: libc::sockaddr_storage = unsafe { mem::init() };
138 let mut namelen = mem::size_of::<libc::sockaddr_storage>() as c_int;
140 let sockaddr_p = &mut sockaddr as *mut libc::sockaddr_storage;
142 getsockname(handle, sockaddr_p as *mut libc::sockaddr, &mut namelen)
144 0 => Ok(sockaddr_to_addr(&sockaddr, namelen as uint)),
145 n => Err(uv_error_to_io_error(UvError(n)))
149 ////////////////////////////////////////////////////////////////////////////////
150 /// TCP implementation
151 ////////////////////////////////////////////////////////////////////////////////
153 pub struct TcpWatcher {
154 handle: *uvll::uv_tcp_t,
155 stream: StreamWatcher,
159 // libuv can't support concurrent reads and concurrent writes of the same
160 // stream object, so we use these access guards in order to arbitrate among
161 // multiple concurrent reads and writes. Note that libuv *can* read and
162 // write simultaneously, it just can't read and read simultaneously.
164 write_access: Access,
167 pub struct TcpListener {
169 handle: *uvll::uv_pipe_t,
170 closing_task: Option<BlockedTask>,
171 outgoing: Sender<Result<~rtio::RtioTcpStream:Send, IoError>>,
172 incoming: Receiver<Result<~rtio::RtioTcpStream:Send, IoError>>,
175 pub struct TcpAcceptor {
176 listener: ~TcpListener,
179 // TCP watchers (clients/streams)
182 pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
183 let handle = io.make_handle();
184 TcpWatcher::new_home(&io.loop_, handle)
187 fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
188 let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
190 uvll::uv_tcp_init(loop_.handle, handle)
195 stream: StreamWatcher::new(handle),
196 refcount: Refcount::new(),
197 read_access: Access::new(),
198 write_access: Access::new(),
202 pub fn connect(io: &mut UvIoFactory,
203 address: ip::SocketAddr,
204 timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
207 task: Option<BlockedTask>,
208 timer: Option<~TimerWatcher>,
211 let tcp = TcpWatcher::new(io);
212 let (addr, _len) = addr_to_sockaddr(address);
213 let mut req = Request::new(uvll::UV_CONNECT);
214 let result = unsafe {
215 let addr_p = &addr as *libc::sockaddr_storage;
216 uvll::uv_tcp_connect(req.handle, tcp.handle,
217 addr_p as *libc::sockaddr,
220 return match result {
222 req.defuse(); // uv callback now owns this request
223 let mut cx = Ctx { status: -1, task: None, timer: None };
226 let mut timer = TimerWatcher::new(io);
227 timer.start(timer_cb, t, 0);
228 cx.timer = Some(timer);
232 wait_until_woken_after(&mut cx.task, &io.loop_, || {
233 let data = &cx as *_;
235 Some(ref mut timer) => unsafe { timer.set_data(data) },
240 // Make sure an erroneously fired callback doesn't have access
241 // to the context any more.
242 req.set_data(0 as *int);
244 // If we failed because of a timeout, drop the TcpWatcher as
245 // soon as possible because it's data is now set to null and we
246 // want to cancel the callback ASAP.
249 n => { drop(tcp); Err(UvError(n)) }
255 extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) {
256 // Don't close the corresponding tcp request, just wake up the task
257 // and let RAII take care of the pending watcher.
258 assert_eq!(status, 0);
259 let cx: &mut Ctx = unsafe {
260 &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx)
262 cx.status = uvll::ECANCELED;
263 wakeup(&mut cx.task);
266 extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
267 // This callback can be invoked with ECANCELED if the watcher is
268 // closed by the timeout callback. In that case we just want to free
269 // the request and be along our merry way.
270 let req = Request::wrap(req);
271 if status == uvll::ECANCELED { return }
273 // Apparently on windows when the handle is closed this callback may
274 // not be invoked with ECANCELED but rather another error code.
275 // Either ways, if the data is null, then our timeout has expired
276 // and there's nothing we can do.
277 let data = unsafe { uvll::get_data_for_req(req.handle) };
278 if data.is_null() { return }
280 let cx: &mut Ctx = unsafe { &mut *(data as *mut Ctx) };
283 Some(ref mut t) => t.stop(),
286 // Note that the timer callback doesn't cancel the connect request
287 // (that's the job of uv_close()), so it's possible for this
288 // callback to get triggered after the timeout callback fires, but
289 // before the task wakes up. In that case, we did indeed
290 // successfully connect, but we don't need to wake someone up. We
291 // updated the status above (correctly so), and the task will pick
292 // up on this when it wakes up.
293 if cx.task.is_some() {
294 wakeup(&mut cx.task);
300 impl HomingIO for TcpWatcher {
301 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
304 impl rtio::RtioSocket for TcpWatcher {
305 fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
306 let _m = self.fire_homing_missile();
307 socket_name(Tcp, self.handle)
311 impl rtio::RtioTcpStream for TcpWatcher {
312 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
313 let m = self.fire_homing_missile();
314 let _g = self.read_access.grant(m);
315 self.stream.read(buf).map_err(uv_error_to_io_error)
318 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
319 let m = self.fire_homing_missile();
320 let _g = self.write_access.grant(m);
321 self.stream.write(buf).map_err(uv_error_to_io_error)
324 fn peer_name(&mut self) -> Result<ip::SocketAddr, IoError> {
325 let _m = self.fire_homing_missile();
326 socket_name(TcpPeer, self.handle)
329 fn control_congestion(&mut self) -> Result<(), IoError> {
330 let _m = self.fire_homing_missile();
331 status_to_io_result(unsafe {
332 uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
336 fn nodelay(&mut self) -> Result<(), IoError> {
337 let _m = self.fire_homing_missile();
338 status_to_io_result(unsafe {
339 uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
343 fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
344 let _m = self.fire_homing_missile();
345 status_to_io_result(unsafe {
346 uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
347 delay_in_seconds as c_uint)
351 fn letdie(&mut self) -> Result<(), IoError> {
352 let _m = self.fire_homing_missile();
353 status_to_io_result(unsafe {
354 uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
358 fn clone(&self) -> ~rtio::RtioTcpStream:Send {
361 stream: StreamWatcher::new(self.handle),
362 home: self.home.clone(),
363 refcount: self.refcount.clone(),
364 write_access: self.write_access.clone(),
365 read_access: self.read_access.clone(),
366 } as ~rtio::RtioTcpStream:Send
369 fn close_write(&mut self) -> Result<(), IoError> {
371 slot: Option<BlockedTask>,
374 let mut req = Request::new(uvll::UV_SHUTDOWN);
376 return match unsafe {
377 uvll::uv_shutdown(req.handle, self.handle, shutdown_cb)
380 req.defuse(); // uv callback now owns this request
381 let mut cx = Ctx { slot: None, status: 0 };
383 wait_until_woken_after(&mut cx.slot, &self.uv_loop(), || {
387 status_to_io_result(cx.status)
389 n => Err(uv_error_to_io_error(UvError(n)))
392 extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
393 let req = Request::wrap(req);
394 assert!(status != uvll::ECANCELED);
395 let cx: &mut Ctx = unsafe { req.get_data() };
397 wakeup(&mut cx.slot);
402 impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
403 fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
406 impl Drop for TcpWatcher {
408 let _m = self.fire_homing_missile();
409 if self.refcount.decrement() {
415 // TCP listeners (unbound servers)
418 pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
419 -> Result<~TcpListener, UvError> {
420 let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
422 uvll::uv_tcp_init(io.uv_loop(), handle)
424 let (tx, rx) = channel();
425 let l = ~TcpListener {
426 home: io.make_handle(),
432 let (addr, _len) = addr_to_sockaddr(address);
434 let addr_p = &addr as *libc::sockaddr_storage;
435 uvll::uv_tcp_bind(l.handle, addr_p as *libc::sockaddr)
438 0 => Ok(l.install()),
444 impl HomingIO for TcpListener {
445 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
448 impl UvHandle<uvll::uv_tcp_t> for TcpListener {
449 fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
452 impl rtio::RtioSocket for TcpListener {
453 fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
454 let _m = self.fire_homing_missile();
455 socket_name(Tcp, self.handle)
459 impl rtio::RtioTcpListener for TcpListener {
460 fn listen(~self) -> Result<~rtio::RtioTcpAcceptor:Send, IoError> {
461 // create the acceptor object from ourselves
462 let mut acceptor = ~TcpAcceptor { listener: self };
464 let _m = acceptor.fire_homing_missile();
465 // FIXME: the 128 backlog should be configurable
466 match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
467 0 => Ok(acceptor as ~rtio::RtioTcpAcceptor:Send),
468 n => Err(uv_error_to_io_error(UvError(n))),
473 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
474 assert!(status != uvll::ECANCELED);
475 let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
476 let msg = match status {
478 let loop_ = Loop::wrap(unsafe {
479 uvll::get_loop_for_uv_handle(server)
481 let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
482 assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
483 Ok(~client as ~rtio::RtioTcpStream:Send)
485 n => Err(uv_error_to_io_error(UvError(n)))
487 tcp.outgoing.send(msg);
490 impl Drop for TcpListener {
492 let _m = self.fire_homing_missile();
497 // TCP acceptors (bound servers)
499 impl HomingIO for TcpAcceptor {
500 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
503 impl rtio::RtioSocket for TcpAcceptor {
504 fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
505 let _m = self.fire_homing_missile();
506 socket_name(Tcp, self.listener.handle)
510 impl rtio::RtioTcpAcceptor for TcpAcceptor {
511 fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> {
512 self.listener.incoming.recv()
515 fn accept_simultaneously(&mut self) -> Result<(), IoError> {
516 let _m = self.fire_homing_missile();
517 status_to_io_result(unsafe {
518 uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
522 fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
523 let _m = self.fire_homing_missile();
524 status_to_io_result(unsafe {
525 uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
530 ////////////////////////////////////////////////////////////////////////////////
531 /// UDP implementation
532 ////////////////////////////////////////////////////////////////////////////////
534 pub struct UdpWatcher {
535 handle: *uvll::uv_udp_t,
538 // See above for what these fields are
541 write_access: Access,
545 pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
546 -> Result<UdpWatcher, UvError> {
547 let udp = UdpWatcher {
548 handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
549 home: io.make_handle(),
550 refcount: Refcount::new(),
551 read_access: Access::new(),
552 write_access: Access::new(),
555 uvll::uv_udp_init(io.uv_loop(), udp.handle)
557 let (addr, _len) = addr_to_sockaddr(address);
558 let result = unsafe {
559 let addr_p = &addr as *libc::sockaddr_storage;
560 uvll::uv_udp_bind(udp.handle, addr_p as *libc::sockaddr, 0u32)
562 return match result {
564 n => Err(UvError(n)),
569 impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
570 fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
573 impl HomingIO for UdpWatcher {
574 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
577 impl rtio::RtioSocket for UdpWatcher {
578 fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
579 let _m = self.fire_homing_missile();
580 socket_name(Udp, self.handle)
584 impl rtio::RtioUdpSocket for UdpWatcher {
585 fn recvfrom(&mut self, buf: &mut [u8])
586 -> Result<(uint, ip::SocketAddr), IoError>
589 task: Option<BlockedTask>,
591 result: Option<(ssize_t, Option<ip::SocketAddr>)>,
593 let loop_ = self.uv_loop();
594 let m = self.fire_homing_missile();
595 let _g = self.read_access.grant(m);
597 let a = match unsafe {
598 uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
603 buf: Some(slice_to_uv_buf(buf)),
606 let handle = self.handle;
607 wait_until_woken_after(&mut cx.task, &loop_, || {
608 unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
610 match cx.result.take_unwrap() {
612 Err(uv_error_to_io_error(UvError(n as c_int))),
613 (n, addr) => Ok((n as uint, addr.unwrap()))
616 n => Err(uv_error_to_io_error(UvError(n)))
620 extern fn alloc_cb(handle: *uvll::uv_udp_t,
621 _suggested_size: size_t,
625 cast::transmute(uvll::get_data_for_uv_handle(handle));
626 *buf = cx.buf.take().expect("recv alloc_cb called more than once")
630 extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
631 addr: *libc::sockaddr, _flags: c_uint) {
632 assert!(nread != uvll::ECANCELED as ssize_t);
633 let cx: &mut Ctx = unsafe {
634 cast::transmute(uvll::get_data_for_uv_handle(handle))
637 // When there's no data to read the recv callback can be a no-op.
638 // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
639 // this we just drop back to kqueue and wait for the next callback.
641 cx.buf = Some(unsafe { *buf });
646 assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
649 let cx: &mut Ctx = unsafe {
650 cast::transmute(uvll::get_data_for_uv_handle(handle))
652 let addr = if addr == ptr::null() {
655 let len = mem::size_of::<libc::sockaddr_storage>();
656 Some(sockaddr_to_addr(unsafe { cast::transmute(addr) }, len))
658 cx.result = Some((nread, addr));
659 wakeup(&mut cx.task);
663 fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> {
664 struct Ctx { task: Option<BlockedTask>, result: c_int }
666 let m = self.fire_homing_missile();
667 let loop_ = self.uv_loop();
668 let _g = self.write_access.grant(m);
670 let mut req = Request::new(uvll::UV_UDP_SEND);
671 let buf = slice_to_uv_buf(buf);
672 let (addr, _len) = addr_to_sockaddr(dst);
673 let result = unsafe {
674 let addr_p = &addr as *libc::sockaddr_storage;
675 uvll::uv_udp_send(req.handle, self.handle, [buf],
676 addr_p as *libc::sockaddr, send_cb)
679 return match result {
681 req.defuse(); // uv callback now owns this request
682 let mut cx = Ctx { task: None, result: 0 };
683 wait_until_woken_after(&mut cx.task, &loop_, || {
688 n => Err(uv_error_to_io_error(UvError(n)))
691 n => Err(uv_error_to_io_error(UvError(n)))
694 extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
695 let req = Request::wrap(req);
696 assert!(status != uvll::ECANCELED);
697 let cx: &mut Ctx = unsafe { req.get_data() };
699 wakeup(&mut cx.task);
703 fn join_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
704 let _m = self.fire_homing_missile();
705 status_to_io_result(unsafe {
706 multi.to_str().with_c_str(|m_addr| {
707 uvll::uv_udp_set_membership(self.handle,
714 fn leave_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
715 let _m = self.fire_homing_missile();
716 status_to_io_result(unsafe {
717 multi.to_str().with_c_str(|m_addr| {
718 uvll::uv_udp_set_membership(self.handle,
720 uvll::UV_LEAVE_GROUP)
725 fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
726 let _m = self.fire_homing_missile();
727 status_to_io_result(unsafe {
728 uvll::uv_udp_set_multicast_loop(self.handle,
733 fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
734 let _m = self.fire_homing_missile();
735 status_to_io_result(unsafe {
736 uvll::uv_udp_set_multicast_loop(self.handle,
741 fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
742 let _m = self.fire_homing_missile();
743 status_to_io_result(unsafe {
744 uvll::uv_udp_set_multicast_ttl(self.handle,
749 fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
750 let _m = self.fire_homing_missile();
751 status_to_io_result(unsafe {
752 uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
756 fn hear_broadcasts(&mut self) -> Result<(), IoError> {
757 let _m = self.fire_homing_missile();
758 status_to_io_result(unsafe {
759 uvll::uv_udp_set_broadcast(self.handle,
764 fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
765 let _m = self.fire_homing_missile();
766 status_to_io_result(unsafe {
767 uvll::uv_udp_set_broadcast(self.handle,
772 fn clone(&self) -> ~rtio::RtioUdpSocket:Send {
775 home: self.home.clone(),
776 refcount: self.refcount.clone(),
777 write_access: self.write_access.clone(),
778 read_access: self.read_access.clone(),
779 } as ~rtio::RtioUdpSocket:Send
783 impl Drop for UdpWatcher {
785 // Send ourselves home to close this handle (blocking while doing so).
786 let _m = self.fire_homing_missile();
787 if self.refcount.decrement() {
795 use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
797 use std::io::test::{next_test_ip4, next_test_ip6};
799 use super::{UdpWatcher, TcpWatcher, TcpListener};
800 use super::super::local_loop;
803 fn connect_close_ip4() {
804 match TcpWatcher::connect(local_loop(), next_test_ip4(), None) {
806 Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
811 fn connect_close_ip6() {
812 match TcpWatcher::connect(local_loop(), next_test_ip6(), None) {
814 Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
819 fn udp_bind_close_ip4() {
820 match UdpWatcher::bind(local_loop(), next_test_ip4()) {
827 fn udp_bind_close_ip6() {
828 match UdpWatcher::bind(local_loop(), next_test_ip6()) {
836 let (tx, rx) = channel();
837 let addr = next_test_ip4();
840 let w = match TcpListener::bind(local_loop(), addr) {
841 Ok(w) => w, Err(e) => fail!("{:?}", e)
843 let mut w = match w.listen() {
844 Ok(w) => w, Err(e) => fail!("{:?}", e),
849 let mut buf = [0u8, ..10];
850 match stream.read(buf) {
851 Ok(10) => {} e => fail!("{:?}", e),
853 for i in range(0, 10u8) {
854 assert_eq!(buf[i as uint], i + 1);
857 Err(e) => fail!("{:?}", e)
862 let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
863 Ok(w) => w, Err(e) => fail!("{:?}", e)
865 match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
866 Ok(()) => {}, Err(e) => fail!("{:?}", e)
872 let (tx, rx) = channel();
873 let addr = next_test_ip6();
876 let w = match TcpListener::bind(local_loop(), addr) {
877 Ok(w) => w, Err(e) => fail!("{:?}", e)
879 let mut w = match w.listen() {
880 Ok(w) => w, Err(e) => fail!("{:?}", e),
885 let mut buf = [0u8, ..10];
886 match stream.read(buf) {
887 Ok(10) => {} e => fail!("{:?}", e),
889 for i in range(0, 10u8) {
890 assert_eq!(buf[i as uint], i + 1);
893 Err(e) => fail!("{:?}", e)
898 let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
899 Ok(w) => w, Err(e) => fail!("{:?}", e)
901 match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
902 Ok(()) => {}, Err(e) => fail!("{:?}", e)
908 let (tx, rx) = channel();
909 let client = next_test_ip4();
910 let server = next_test_ip4();
913 match UdpWatcher::bind(local_loop(), server) {
916 let mut buf = [0u8, ..10];
917 match w.recvfrom(buf) {
918 Ok((10, addr)) => assert_eq!(addr, client),
919 e => fail!("{:?}", e),
921 for i in range(0, 10u8) {
922 assert_eq!(buf[i as uint], i + 1);
925 Err(e) => fail!("{:?}", e)
930 let mut w = match UdpWatcher::bind(local_loop(), client) {
931 Ok(w) => w, Err(e) => fail!("{:?}", e)
933 match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
934 Ok(()) => {}, Err(e) => fail!("{:?}", e)
940 let (tx, rx) = channel();
941 let client = next_test_ip6();
942 let server = next_test_ip6();
945 match UdpWatcher::bind(local_loop(), server) {
948 let mut buf = [0u8, ..10];
949 match w.recvfrom(buf) {
950 Ok((10, addr)) => assert_eq!(addr, client),
951 e => fail!("{:?}", e),
953 for i in range(0, 10u8) {
954 assert_eq!(buf[i as uint], i + 1);
957 Err(e) => fail!("{:?}", e)
962 let mut w = match UdpWatcher::bind(local_loop(), client) {
963 Ok(w) => w, Err(e) => fail!("{:?}", e)
965 match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
966 Ok(()) => {}, Err(e) => fail!("{:?}", e)
971 fn test_read_read_read() {
972 let addr = next_test_ip4();
973 static MAX: uint = 5000;
974 let (tx, rx) = channel();
977 let listener = TcpListener::bind(local_loop(), addr).unwrap();
978 let mut acceptor = listener.listen().unwrap();
980 let mut stream = acceptor.accept().unwrap();
981 let buf = [1, .. 2048];
982 let mut total_bytes_written = 0;
983 while total_bytes_written < MAX {
984 assert!(stream.write(buf).is_ok());
985 uvdebug!("wrote bytes");
986 total_bytes_written += buf.len();
991 let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
992 let mut buf = [0, .. 2048];
993 let mut total_bytes_read = 0;
994 while total_bytes_read < MAX {
995 let nread = stream.read(buf).unwrap();
996 total_bytes_read += nread;
997 for i in range(0u, nread) {
998 assert_eq!(buf[i], 1);
1001 uvdebug!("read {} bytes total", total_bytes_read);
1005 #[ignore(cfg(windows))] // FIXME(#10102) server never sees second packet
1006 fn test_udp_twice() {
1007 let server_addr = next_test_ip4();
1008 let client_addr = next_test_ip4();
1009 let (tx, rx) = channel();
1012 let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
1014 assert!(client.sendto([1], server_addr).is_ok());
1015 assert!(client.sendto([2], server_addr).is_ok());
1018 let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
1022 let (nread1, src1) = server.recvfrom(buf1).unwrap();
1023 let (nread2, src2) = server.recvfrom(buf2).unwrap();
1024 assert_eq!(nread1, 1);
1025 assert_eq!(nread2, 1);
1026 assert_eq!(src1, client_addr);
1027 assert_eq!(src2, client_addr);
1028 assert_eq!(buf1[0], 1);
1029 assert_eq!(buf2[0], 2);
1033 fn test_udp_many_read() {
1034 let server_out_addr = next_test_ip4();
1035 let server_in_addr = next_test_ip4();
1036 let client_out_addr = next_test_ip4();
1037 let client_in_addr = next_test_ip4();
1038 static MAX: uint = 500_000;
1040 let (tx1, rx1) = channel::<()>();
1041 let (tx2, rx2) = channel::<()>();
1044 let l = local_loop();
1045 let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
1046 let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
1047 let (tx, rx) = (tx2, rx1);
1050 let msg = [1, .. 2048];
1051 let mut total_bytes_sent = 0;
1055 assert!(server_out.sendto(msg, client_in_addr).is_ok());
1056 total_bytes_sent += msg.len();
1057 // check if the client has received enough
1058 let res = server_in.recvfrom(buf);
1059 assert!(res.is_ok());
1060 let (nread, src) = res.unwrap();
1061 assert_eq!(nread, 1);
1062 assert_eq!(src, client_out_addr);
1064 assert!(total_bytes_sent >= MAX);
1067 let l = local_loop();
1068 let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
1069 let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
1070 let (tx, rx) = (tx1, rx2);
1073 let mut total_bytes_recv = 0;
1074 let mut buf = [0, .. 2048];
1075 while total_bytes_recv < MAX {
1077 assert!(client_out.sendto([1], server_in_addr).is_ok());
1079 let res = client_in.recvfrom(buf);
1080 assert!(res.is_ok());
1081 let (nread, src) = res.unwrap();
1082 assert_eq!(src, server_out_addr);
1083 total_bytes_recv += nread;
1084 for i in range(0u, nread) {
1085 assert_eq!(buf[i], 1);
1088 // tell the server we're done
1089 assert!(client_out.sendto([0], server_in_addr).is_ok());
1093 fn test_read_and_block() {
1094 let addr = next_test_ip4();
1095 let (tx, rx) = channel::<Receiver<()>>();
1099 let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1100 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1101 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1103 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1104 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1108 let listener = TcpListener::bind(local_loop(), addr).unwrap();
1109 let mut acceptor = listener.listen().unwrap();
1110 let (tx2, rx2) = channel();
1112 let mut stream = acceptor.accept().unwrap();
1113 let mut buf = [0, .. 2048];
1116 let mut current = 0;
1119 while current < expected {
1120 let nread = stream.read(buf).unwrap();
1121 for i in range(0u, nread) {
1122 let val = buf[i] as uint;
1123 assert_eq!(val, current % 8);
1128 let _ = tx2.send_opt(());
1131 // Make sure we had multiple reads
1136 fn test_simple_tcp_server_and_client_on_diff_threads() {
1137 let addr = next_test_ip4();
1140 let listener = TcpListener::bind(local_loop(), addr).unwrap();
1141 let mut acceptor = listener.listen().unwrap();
1142 let mut stream = acceptor.accept().unwrap();
1143 let mut buf = [0, .. 2048];
1144 let nread = stream.read(buf).unwrap();
1145 assert_eq!(nread, 8);
1146 for i in range(0u, nread) {
1147 assert_eq!(buf[i], i as u8);
1151 let mut stream = TcpWatcher::connect(local_loop(), addr, None);
1152 while stream.is_err() {
1153 stream = TcpWatcher::connect(local_loop(), addr, None);
1155 stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1158 #[should_fail] #[test]
1159 fn tcp_listener_fail_cleanup() {
1160 let addr = next_test_ip4();
1161 let w = TcpListener::bind(local_loop(), addr).unwrap();
1162 let _w = w.listen().unwrap();
1166 #[should_fail] #[test]
1167 fn tcp_stream_fail_cleanup() {
1168 let (tx, rx) = channel();
1169 let addr = next_test_ip4();
1172 let w = TcpListener::bind(local_loop(), addr).unwrap();
1173 let mut w = w.listen().unwrap();
1175 drop(w.accept().unwrap());
1178 let _w = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1182 #[should_fail] #[test]
1183 fn udp_listener_fail_cleanup() {
1184 let addr = next_test_ip4();
1185 let _w = UdpWatcher::bind(local_loop(), addr).unwrap();
1189 #[should_fail] #[test]
1190 fn udp_fail_other_task() {
1191 let addr = next_test_ip4();
1192 let (tx, rx) = channel();
1194 // force the handle to be created on a different scheduler, failure in
1195 // the original task will force a homing operation back to this
1198 let w = UdpWatcher::bind(local_loop(), addr).unwrap();