1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
14 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
19 use std::rt::task::BlockedTask;
22 use homing::{HomingIO, HomeHandle};
24 use stream::StreamWatcher;
25 use super::{Loop, Request, UvError, Buf, status_to_io_result,
26 uv_error_to_io_error, UvHandle, slice_to_uv_buf,
27 wait_until_woken_after, wakeup};
28 use uvio::UvIoFactory;
31 ////////////////////////////////////////////////////////////////////////////////
32 /// Generic functions related to dealing with sockaddr things
33 ////////////////////////////////////////////////////////////////////////////////
35 pub fn htons(u: u16) -> u16 { mem::to_be16(u as i16) as u16 }
36 pub fn ntohs(u: u16) -> u16 { mem::from_be16(u as i16) as u16 }
38 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
39 len: uint) -> ip::SocketAddr {
40 match storage.ss_family as c_int {
42 assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
43 let storage: &libc::sockaddr_in = unsafe {
44 cast::transmute(storage)
46 let addr = storage.sin_addr.s_addr as u32;
47 let a = (addr >> 0) as u8;
48 let b = (addr >> 8) as u8;
49 let c = (addr >> 16) as u8;
50 let d = (addr >> 24) as u8;
52 ip: ip::Ipv4Addr(a, b, c, d),
53 port: ntohs(storage.sin_port),
57 assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
58 let storage: &libc::sockaddr_in6 = unsafe {
59 cast::transmute(storage)
61 let a = ntohs(storage.sin6_addr.s6_addr[0]);
62 let b = ntohs(storage.sin6_addr.s6_addr[1]);
63 let c = ntohs(storage.sin6_addr.s6_addr[2]);
64 let d = ntohs(storage.sin6_addr.s6_addr[3]);
65 let e = ntohs(storage.sin6_addr.s6_addr[4]);
66 let f = ntohs(storage.sin6_addr.s6_addr[5]);
67 let g = ntohs(storage.sin6_addr.s6_addr[6]);
68 let h = ntohs(storage.sin6_addr.s6_addr[7]);
70 ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
71 port: ntohs(storage.sin6_port),
75 fail!("unknown family {}", n);
80 fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
82 let mut storage: libc::sockaddr_storage = mem::init();
83 let len = match addr.ip {
84 ip::Ipv4Addr(a, b, c, d) => {
85 let storage: &mut libc::sockaddr_in =
86 cast::transmute(&mut storage);
87 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
88 (*storage).sin_port = htons(addr.port);
89 (*storage).sin_addr = libc::in_addr {
90 s_addr: (d as u32 << 24) |
95 mem::size_of::<libc::sockaddr_in>()
97 ip::Ipv6Addr(a, b, c, d, e, f, g, h) => {
98 let storage: &mut libc::sockaddr_in6 =
99 cast::transmute(&mut storage);
100 storage.sin6_family = libc::AF_INET6 as libc::sa_family_t;
101 storage.sin6_port = htons(addr.port);
102 storage.sin6_addr = libc::in6_addr {
114 mem::size_of::<libc::sockaddr_in6>()
117 return (storage, len);
121 enum SocketNameKind {
127 fn socket_name(sk: SocketNameKind,
128 handle: *c_void) -> Result<ip::SocketAddr, IoError> {
129 let getsockname = match sk {
130 TcpPeer => uvll::uv_tcp_getpeername,
131 Tcp => uvll::uv_tcp_getsockname,
132 Udp => uvll::uv_udp_getsockname,
135 // Allocate a sockaddr_storage since we don't know if it's ipv4 or ipv6
136 let mut sockaddr: libc::sockaddr_storage = unsafe { mem::init() };
137 let mut namelen = mem::size_of::<libc::sockaddr_storage>() as c_int;
139 let sockaddr_p = &mut sockaddr as *mut libc::sockaddr_storage;
141 getsockname(handle, sockaddr_p as *mut libc::sockaddr, &mut namelen)
143 0 => Ok(sockaddr_to_addr(&sockaddr, namelen as uint)),
144 n => Err(uv_error_to_io_error(UvError(n)))
148 ////////////////////////////////////////////////////////////////////////////////
149 /// TCP implementation
150 ////////////////////////////////////////////////////////////////////////////////
152 pub struct TcpWatcher {
153 handle: *uvll::uv_tcp_t,
154 stream: StreamWatcher,
158 // libuv can't support concurrent reads and concurrent writes of the same
159 // stream object, so we use these access guards in order to arbitrate among
160 // multiple concurrent reads and writes. Note that libuv *can* read and
161 // write simultaneously, it just can't read and read simultaneously.
163 write_access: Access,
166 pub struct TcpListener {
168 handle: *uvll::uv_pipe_t,
169 closing_task: Option<BlockedTask>,
170 outgoing: Sender<Result<~rtio::RtioTcpStream:Send, IoError>>,
171 incoming: Receiver<Result<~rtio::RtioTcpStream:Send, IoError>>,
174 pub struct TcpAcceptor {
175 listener: ~TcpListener,
178 // TCP watchers (clients/streams)
181 pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
182 let handle = io.make_handle();
183 TcpWatcher::new_home(&io.loop_, handle)
186 fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
187 let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
189 uvll::uv_tcp_init(loop_.handle, handle)
194 stream: StreamWatcher::new(handle),
195 refcount: Refcount::new(),
196 read_access: Access::new(),
197 write_access: Access::new(),
201 pub fn connect(io: &mut UvIoFactory, address: ip::SocketAddr)
202 -> Result<TcpWatcher, UvError>
204 struct Ctx { status: c_int, task: Option<BlockedTask> }
206 let tcp = TcpWatcher::new(io);
207 let (addr, _len) = addr_to_sockaddr(address);
208 let mut req = Request::new(uvll::UV_CONNECT);
209 let result = unsafe {
210 let addr_p = &addr as *libc::sockaddr_storage;
211 uvll::uv_tcp_connect(req.handle, tcp.handle,
212 addr_p as *libc::sockaddr,
215 return match result {
217 req.defuse(); // uv callback now owns this request
218 let mut cx = Ctx { status: 0, task: None };
219 wait_until_woken_after(&mut cx.task, &io.loop_, || {
224 n => Err(UvError(n)),
230 extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
231 let req = Request::wrap(req);
232 assert!(status != uvll::ECANCELED);
233 let cx: &mut Ctx = unsafe { req.get_data() };
235 wakeup(&mut cx.task);
240 impl HomingIO for TcpWatcher {
241 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
244 impl rtio::RtioSocket for TcpWatcher {
245 fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
246 let _m = self.fire_homing_missile();
247 socket_name(Tcp, self.handle)
251 impl rtio::RtioTcpStream for TcpWatcher {
252 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
253 let m = self.fire_homing_missile();
254 let _g = self.read_access.grant(m);
255 self.stream.read(buf).map_err(uv_error_to_io_error)
258 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
259 let m = self.fire_homing_missile();
260 let _g = self.write_access.grant(m);
261 self.stream.write(buf).map_err(uv_error_to_io_error)
264 fn peer_name(&mut self) -> Result<ip::SocketAddr, IoError> {
265 let _m = self.fire_homing_missile();
266 socket_name(TcpPeer, self.handle)
269 fn control_congestion(&mut self) -> Result<(), IoError> {
270 let _m = self.fire_homing_missile();
271 status_to_io_result(unsafe {
272 uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
276 fn nodelay(&mut self) -> Result<(), IoError> {
277 let _m = self.fire_homing_missile();
278 status_to_io_result(unsafe {
279 uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
283 fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
284 let _m = self.fire_homing_missile();
285 status_to_io_result(unsafe {
286 uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
287 delay_in_seconds as c_uint)
291 fn letdie(&mut self) -> Result<(), IoError> {
292 let _m = self.fire_homing_missile();
293 status_to_io_result(unsafe {
294 uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
298 fn clone(&self) -> ~rtio::RtioTcpStream:Send {
301 stream: StreamWatcher::new(self.handle),
302 home: self.home.clone(),
303 refcount: self.refcount.clone(),
304 write_access: self.write_access.clone(),
305 read_access: self.read_access.clone(),
306 } as ~rtio::RtioTcpStream:Send
309 fn close_write(&mut self) -> Result<(), IoError> {
311 slot: Option<BlockedTask>,
314 let mut req = Request::new(uvll::UV_SHUTDOWN);
316 return match unsafe {
317 uvll::uv_shutdown(req.handle, self.handle, shutdown_cb)
320 req.defuse(); // uv callback now owns this request
321 let mut cx = Ctx { slot: None, status: 0 };
323 wait_until_woken_after(&mut cx.slot, &self.uv_loop(), || {
327 status_to_io_result(cx.status)
329 n => Err(uv_error_to_io_error(UvError(n)))
332 extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
333 let req = Request::wrap(req);
334 assert!(status != uvll::ECANCELED);
335 let cx: &mut Ctx = unsafe { req.get_data() };
337 wakeup(&mut cx.slot);
342 impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
343 fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
346 impl Drop for TcpWatcher {
348 let _m = self.fire_homing_missile();
349 if self.refcount.decrement() {
355 // TCP listeners (unbound servers)
358 pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
359 -> Result<~TcpListener, UvError> {
360 let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
362 uvll::uv_tcp_init(io.uv_loop(), handle)
364 let (tx, rx) = channel();
365 let l = ~TcpListener {
366 home: io.make_handle(),
372 let (addr, _len) = addr_to_sockaddr(address);
374 let addr_p = &addr as *libc::sockaddr_storage;
375 uvll::uv_tcp_bind(l.handle, addr_p as *libc::sockaddr)
378 0 => Ok(l.install()),
384 impl HomingIO for TcpListener {
385 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
388 impl UvHandle<uvll::uv_tcp_t> for TcpListener {
389 fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
392 impl rtio::RtioSocket for TcpListener {
393 fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
394 let _m = self.fire_homing_missile();
395 socket_name(Tcp, self.handle)
399 impl rtio::RtioTcpListener for TcpListener {
400 fn listen(~self) -> Result<~rtio::RtioTcpAcceptor:Send, IoError> {
401 // create the acceptor object from ourselves
402 let mut acceptor = ~TcpAcceptor { listener: self };
404 let _m = acceptor.fire_homing_missile();
405 // FIXME: the 128 backlog should be configurable
406 match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
407 0 => Ok(acceptor as ~rtio::RtioTcpAcceptor:Send),
408 n => Err(uv_error_to_io_error(UvError(n))),
413 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
414 assert!(status != uvll::ECANCELED);
415 let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
416 let msg = match status {
418 let loop_ = Loop::wrap(unsafe {
419 uvll::get_loop_for_uv_handle(server)
421 let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
422 assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
423 Ok(~client as ~rtio::RtioTcpStream:Send)
425 n => Err(uv_error_to_io_error(UvError(n)))
427 tcp.outgoing.send(msg);
430 impl Drop for TcpListener {
432 let _m = self.fire_homing_missile();
437 // TCP acceptors (bound servers)
439 impl HomingIO for TcpAcceptor {
440 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
443 impl rtio::RtioSocket for TcpAcceptor {
444 fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
445 let _m = self.fire_homing_missile();
446 socket_name(Tcp, self.listener.handle)
450 impl rtio::RtioTcpAcceptor for TcpAcceptor {
451 fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> {
452 self.listener.incoming.recv()
455 fn accept_simultaneously(&mut self) -> Result<(), IoError> {
456 let _m = self.fire_homing_missile();
457 status_to_io_result(unsafe {
458 uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
462 fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
463 let _m = self.fire_homing_missile();
464 status_to_io_result(unsafe {
465 uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
470 ////////////////////////////////////////////////////////////////////////////////
471 /// UDP implementation
472 ////////////////////////////////////////////////////////////////////////////////
474 pub struct UdpWatcher {
475 handle: *uvll::uv_udp_t,
478 // See above for what these fields are
481 write_access: Access,
485 pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
486 -> Result<UdpWatcher, UvError> {
487 let udp = UdpWatcher {
488 handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
489 home: io.make_handle(),
490 refcount: Refcount::new(),
491 read_access: Access::new(),
492 write_access: Access::new(),
495 uvll::uv_udp_init(io.uv_loop(), udp.handle)
497 let (addr, _len) = addr_to_sockaddr(address);
498 let result = unsafe {
499 let addr_p = &addr as *libc::sockaddr_storage;
500 uvll::uv_udp_bind(udp.handle, addr_p as *libc::sockaddr, 0u32)
502 return match result {
504 n => Err(UvError(n)),
509 impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
510 fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
513 impl HomingIO for UdpWatcher {
514 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
517 impl rtio::RtioSocket for UdpWatcher {
518 fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
519 let _m = self.fire_homing_missile();
520 socket_name(Udp, self.handle)
524 impl rtio::RtioUdpSocket for UdpWatcher {
525 fn recvfrom(&mut self, buf: &mut [u8])
526 -> Result<(uint, ip::SocketAddr), IoError>
529 task: Option<BlockedTask>,
531 result: Option<(ssize_t, Option<ip::SocketAddr>)>,
533 let loop_ = self.uv_loop();
534 let m = self.fire_homing_missile();
535 let _g = self.read_access.grant(m);
537 let a = match unsafe {
538 uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
543 buf: Some(slice_to_uv_buf(buf)),
546 let handle = self.handle;
547 wait_until_woken_after(&mut cx.task, &loop_, || {
548 unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
550 match cx.result.take_unwrap() {
552 Err(uv_error_to_io_error(UvError(n as c_int))),
553 (n, addr) => Ok((n as uint, addr.unwrap()))
556 n => Err(uv_error_to_io_error(UvError(n)))
560 extern fn alloc_cb(handle: *uvll::uv_udp_t,
561 _suggested_size: size_t,
565 cast::transmute(uvll::get_data_for_uv_handle(handle));
566 *buf = cx.buf.take().expect("recv alloc_cb called more than once")
570 extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
571 addr: *libc::sockaddr, _flags: c_uint) {
572 assert!(nread != uvll::ECANCELED as ssize_t);
573 let cx: &mut Ctx = unsafe {
574 cast::transmute(uvll::get_data_for_uv_handle(handle))
577 // When there's no data to read the recv callback can be a no-op.
578 // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
579 // this we just drop back to kqueue and wait for the next callback.
581 cx.buf = Some(unsafe { *buf });
586 assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
589 let cx: &mut Ctx = unsafe {
590 cast::transmute(uvll::get_data_for_uv_handle(handle))
592 let addr = if addr == ptr::null() {
595 let len = mem::size_of::<libc::sockaddr_storage>();
596 Some(sockaddr_to_addr(unsafe { cast::transmute(addr) }, len))
598 cx.result = Some((nread, addr));
599 wakeup(&mut cx.task);
603 fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> {
604 struct Ctx { task: Option<BlockedTask>, result: c_int }
606 let m = self.fire_homing_missile();
607 let loop_ = self.uv_loop();
608 let _g = self.write_access.grant(m);
610 let mut req = Request::new(uvll::UV_UDP_SEND);
611 let buf = slice_to_uv_buf(buf);
612 let (addr, _len) = addr_to_sockaddr(dst);
613 let result = unsafe {
614 let addr_p = &addr as *libc::sockaddr_storage;
615 uvll::uv_udp_send(req.handle, self.handle, [buf],
616 addr_p as *libc::sockaddr, send_cb)
619 return match result {
621 req.defuse(); // uv callback now owns this request
622 let mut cx = Ctx { task: None, result: 0 };
623 wait_until_woken_after(&mut cx.task, &loop_, || {
628 n => Err(uv_error_to_io_error(UvError(n)))
631 n => Err(uv_error_to_io_error(UvError(n)))
634 extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
635 let req = Request::wrap(req);
636 assert!(status != uvll::ECANCELED);
637 let cx: &mut Ctx = unsafe { req.get_data() };
639 wakeup(&mut cx.task);
643 fn join_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
644 let _m = self.fire_homing_missile();
645 status_to_io_result(unsafe {
646 multi.to_str().with_c_str(|m_addr| {
647 uvll::uv_udp_set_membership(self.handle,
654 fn leave_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
655 let _m = self.fire_homing_missile();
656 status_to_io_result(unsafe {
657 multi.to_str().with_c_str(|m_addr| {
658 uvll::uv_udp_set_membership(self.handle,
660 uvll::UV_LEAVE_GROUP)
665 fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
666 let _m = self.fire_homing_missile();
667 status_to_io_result(unsafe {
668 uvll::uv_udp_set_multicast_loop(self.handle,
673 fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
674 let _m = self.fire_homing_missile();
675 status_to_io_result(unsafe {
676 uvll::uv_udp_set_multicast_loop(self.handle,
681 fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
682 let _m = self.fire_homing_missile();
683 status_to_io_result(unsafe {
684 uvll::uv_udp_set_multicast_ttl(self.handle,
689 fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
690 let _m = self.fire_homing_missile();
691 status_to_io_result(unsafe {
692 uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
696 fn hear_broadcasts(&mut self) -> Result<(), IoError> {
697 let _m = self.fire_homing_missile();
698 status_to_io_result(unsafe {
699 uvll::uv_udp_set_broadcast(self.handle,
704 fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
705 let _m = self.fire_homing_missile();
706 status_to_io_result(unsafe {
707 uvll::uv_udp_set_broadcast(self.handle,
712 fn clone(&self) -> ~rtio::RtioUdpSocket:Send {
715 home: self.home.clone(),
716 refcount: self.refcount.clone(),
717 write_access: self.write_access.clone(),
718 read_access: self.read_access.clone(),
719 } as ~rtio::RtioUdpSocket:Send
723 impl Drop for UdpWatcher {
725 // Send ourselves home to close this handle (blocking while doing so).
726 let _m = self.fire_homing_missile();
727 if self.refcount.decrement() {
735 use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
737 use std::io::test::{next_test_ip4, next_test_ip6};
739 use super::{UdpWatcher, TcpWatcher, TcpListener};
740 use super::super::local_loop;
743 fn connect_close_ip4() {
744 match TcpWatcher::connect(local_loop(), next_test_ip4()) {
746 Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
751 fn connect_close_ip6() {
752 match TcpWatcher::connect(local_loop(), next_test_ip6()) {
754 Err(e) => assert_eq!(e.name(), ~"ECONNREFUSED"),
759 fn udp_bind_close_ip4() {
760 match UdpWatcher::bind(local_loop(), next_test_ip4()) {
767 fn udp_bind_close_ip6() {
768 match UdpWatcher::bind(local_loop(), next_test_ip6()) {
776 let (tx, rx) = channel();
777 let addr = next_test_ip4();
780 let w = match TcpListener::bind(local_loop(), addr) {
781 Ok(w) => w, Err(e) => fail!("{:?}", e)
783 let mut w = match w.listen() {
784 Ok(w) => w, Err(e) => fail!("{:?}", e),
789 let mut buf = [0u8, ..10];
790 match stream.read(buf) {
791 Ok(10) => {} e => fail!("{:?}", e),
793 for i in range(0, 10u8) {
794 assert_eq!(buf[i as uint], i + 1);
797 Err(e) => fail!("{:?}", e)
802 let mut w = match TcpWatcher::connect(local_loop(), addr) {
803 Ok(w) => w, Err(e) => fail!("{:?}", e)
805 match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
806 Ok(()) => {}, Err(e) => fail!("{:?}", e)
812 let (tx, rx) = channel();
813 let addr = next_test_ip6();
816 let w = match TcpListener::bind(local_loop(), addr) {
817 Ok(w) => w, Err(e) => fail!("{:?}", e)
819 let mut w = match w.listen() {
820 Ok(w) => w, Err(e) => fail!("{:?}", e),
825 let mut buf = [0u8, ..10];
826 match stream.read(buf) {
827 Ok(10) => {} e => fail!("{:?}", e),
829 for i in range(0, 10u8) {
830 assert_eq!(buf[i as uint], i + 1);
833 Err(e) => fail!("{:?}", e)
838 let mut w = match TcpWatcher::connect(local_loop(), addr) {
839 Ok(w) => w, Err(e) => fail!("{:?}", e)
841 match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
842 Ok(()) => {}, Err(e) => fail!("{:?}", e)
848 let (tx, rx) = channel();
849 let client = next_test_ip4();
850 let server = next_test_ip4();
853 match UdpWatcher::bind(local_loop(), server) {
856 let mut buf = [0u8, ..10];
857 match w.recvfrom(buf) {
858 Ok((10, addr)) => assert_eq!(addr, client),
859 e => fail!("{:?}", e),
861 for i in range(0, 10u8) {
862 assert_eq!(buf[i as uint], i + 1);
865 Err(e) => fail!("{:?}", e)
870 let mut w = match UdpWatcher::bind(local_loop(), client) {
871 Ok(w) => w, Err(e) => fail!("{:?}", e)
873 match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
874 Ok(()) => {}, Err(e) => fail!("{:?}", e)
880 let (tx, rx) = channel();
881 let client = next_test_ip6();
882 let server = next_test_ip6();
885 match UdpWatcher::bind(local_loop(), server) {
888 let mut buf = [0u8, ..10];
889 match w.recvfrom(buf) {
890 Ok((10, addr)) => assert_eq!(addr, client),
891 e => fail!("{:?}", e),
893 for i in range(0, 10u8) {
894 assert_eq!(buf[i as uint], i + 1);
897 Err(e) => fail!("{:?}", e)
902 let mut w = match UdpWatcher::bind(local_loop(), client) {
903 Ok(w) => w, Err(e) => fail!("{:?}", e)
905 match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
906 Ok(()) => {}, Err(e) => fail!("{:?}", e)
911 fn test_read_read_read() {
912 let addr = next_test_ip4();
913 static MAX: uint = 5000;
914 let (tx, rx) = channel();
917 let listener = TcpListener::bind(local_loop(), addr).unwrap();
918 let mut acceptor = listener.listen().unwrap();
920 let mut stream = acceptor.accept().unwrap();
921 let buf = [1, .. 2048];
922 let mut total_bytes_written = 0;
923 while total_bytes_written < MAX {
924 assert!(stream.write(buf).is_ok());
925 uvdebug!("wrote bytes");
926 total_bytes_written += buf.len();
931 let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
932 let mut buf = [0, .. 2048];
933 let mut total_bytes_read = 0;
934 while total_bytes_read < MAX {
935 let nread = stream.read(buf).unwrap();
936 total_bytes_read += nread;
937 for i in range(0u, nread) {
938 assert_eq!(buf[i], 1);
941 uvdebug!("read {} bytes total", total_bytes_read);
945 #[ignore(cfg(windows))] // FIXME(#10102) server never sees second packet
946 fn test_udp_twice() {
947 let server_addr = next_test_ip4();
948 let client_addr = next_test_ip4();
949 let (tx, rx) = channel();
952 let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
954 assert!(client.sendto([1], server_addr).is_ok());
955 assert!(client.sendto([2], server_addr).is_ok());
958 let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
962 let (nread1, src1) = server.recvfrom(buf1).unwrap();
963 let (nread2, src2) = server.recvfrom(buf2).unwrap();
964 assert_eq!(nread1, 1);
965 assert_eq!(nread2, 1);
966 assert_eq!(src1, client_addr);
967 assert_eq!(src2, client_addr);
968 assert_eq!(buf1[0], 1);
969 assert_eq!(buf2[0], 2);
973 fn test_udp_many_read() {
974 let server_out_addr = next_test_ip4();
975 let server_in_addr = next_test_ip4();
976 let client_out_addr = next_test_ip4();
977 let client_in_addr = next_test_ip4();
978 static MAX: uint = 500_000;
980 let (tx1, rx1) = channel::<()>();
981 let (tx2, rx2) = channel::<()>();
984 let l = local_loop();
985 let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
986 let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
987 let (tx, rx) = (tx2, rx1);
990 let msg = [1, .. 2048];
991 let mut total_bytes_sent = 0;
995 assert!(server_out.sendto(msg, client_in_addr).is_ok());
996 total_bytes_sent += msg.len();
997 // check if the client has received enough
998 let res = server_in.recvfrom(buf);
999 assert!(res.is_ok());
1000 let (nread, src) = res.unwrap();
1001 assert_eq!(nread, 1);
1002 assert_eq!(src, client_out_addr);
1004 assert!(total_bytes_sent >= MAX);
1007 let l = local_loop();
1008 let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
1009 let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
1010 let (tx, rx) = (tx1, rx2);
1013 let mut total_bytes_recv = 0;
1014 let mut buf = [0, .. 2048];
1015 while total_bytes_recv < MAX {
1017 assert!(client_out.sendto([1], server_in_addr).is_ok());
1019 let res = client_in.recvfrom(buf);
1020 assert!(res.is_ok());
1021 let (nread, src) = res.unwrap();
1022 assert_eq!(src, server_out_addr);
1023 total_bytes_recv += nread;
1024 for i in range(0u, nread) {
1025 assert_eq!(buf[i], 1);
1028 // tell the server we're done
1029 assert!(client_out.sendto([0], server_in_addr).is_ok());
1033 fn test_read_and_block() {
1034 let addr = next_test_ip4();
1035 let (tx, rx) = channel::<Receiver<()>>();
1039 let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
1040 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1041 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1043 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1044 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1048 let listener = TcpListener::bind(local_loop(), addr).unwrap();
1049 let mut acceptor = listener.listen().unwrap();
1050 let (tx2, rx2) = channel();
1052 let mut stream = acceptor.accept().unwrap();
1053 let mut buf = [0, .. 2048];
1056 let mut current = 0;
1059 while current < expected {
1060 let nread = stream.read(buf).unwrap();
1061 for i in range(0u, nread) {
1062 let val = buf[i] as uint;
1063 assert_eq!(val, current % 8);
1071 // Make sure we had multiple reads
1076 fn test_simple_tcp_server_and_client_on_diff_threads() {
1077 let addr = next_test_ip4();
1080 let listener = TcpListener::bind(local_loop(), addr).unwrap();
1081 let mut acceptor = listener.listen().unwrap();
1082 let mut stream = acceptor.accept().unwrap();
1083 let mut buf = [0, .. 2048];
1084 let nread = stream.read(buf).unwrap();
1085 assert_eq!(nread, 8);
1086 for i in range(0u, nread) {
1087 assert_eq!(buf[i], i as u8);
1091 let mut stream = TcpWatcher::connect(local_loop(), addr);
1092 while stream.is_err() {
1093 stream = TcpWatcher::connect(local_loop(), addr);
1095 stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1098 #[should_fail] #[test]
1099 fn tcp_listener_fail_cleanup() {
1100 let addr = next_test_ip4();
1101 let w = TcpListener::bind(local_loop(), addr).unwrap();
1102 let _w = w.listen().unwrap();
1106 #[should_fail] #[test]
1107 fn tcp_stream_fail_cleanup() {
1108 let (tx, rx) = channel();
1109 let addr = next_test_ip4();
1112 let w = TcpListener::bind(local_loop(), addr).unwrap();
1113 let mut w = w.listen().unwrap();
1115 drop(w.accept().unwrap());
1118 let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
1122 #[should_fail] #[test]
1123 fn udp_listener_fail_cleanup() {
1124 let addr = next_test_ip4();
1125 let _w = UdpWatcher::bind(local_loop(), addr).unwrap();
1129 #[should_fail] #[test]
1130 fn udp_fail_other_task() {
1131 let addr = next_test_ip4();
1132 let (tx, rx) = channel();
1134 // force the handle to be created on a different scheduler, failure in
1135 // the original task will force a homing operation back to this
1138 let w = UdpWatcher::bind(local_loop(), addr).unwrap();