2 use crate::io::prelude::*;
3 use crate::io::{ErrorKind, IoSlice, IoSliceMut};
4 use crate::net::test::{next_test_ip4, next_test_ip6};
6 use crate::sync::mpsc::channel;
8 use crate::time::{Duration, Instant};
10 fn each_ip(f: &mut dyn FnMut(SocketAddr)) {
19 Err(e) => panic!("received error for `{}`: {}", stringify!($e), e),
26 match TcpListener::bind("1.1.1.1:9999") {
28 Err(e) => assert_eq!(e.kind(), ErrorKind::AddrNotAvailable),
34 match TcpStream::connect("0.0.0.0:1") {
37 e.kind() == ErrorKind::ConnectionRefused
38 || e.kind() == ErrorKind::InvalidInput
39 || e.kind() == ErrorKind::AddrInUse
40 || e.kind() == ErrorKind::AddrNotAvailable,
49 fn listen_localhost() {
50 let socket_addr = next_test_ip4();
51 let listener = t!(TcpListener::bind(&socket_addr));
53 let _t = thread::spawn(move || {
54 let mut stream = t!(TcpStream::connect(&("localhost", socket_addr.port())));
55 t!(stream.write(&[144]));
58 let mut stream = t!(listener.accept()).0;
60 t!(stream.read(&mut buf));
61 assert!(buf[0] == 144);
65 fn connect_loopback() {
67 let acceptor = t!(TcpListener::bind(&addr));
69 let _t = thread::spawn(move || {
70 let host = match addr {
71 SocketAddr::V4(..) => "127.0.0.1",
72 SocketAddr::V6(..) => "::1",
74 let mut stream = t!(TcpStream::connect(&(host, addr.port())));
75 t!(stream.write(&[66]));
78 let mut stream = t!(acceptor.accept()).0;
80 t!(stream.read(&mut buf));
81 assert!(buf[0] == 66);
88 let acceptor = t!(TcpListener::bind(&addr));
90 let (tx, rx) = channel();
91 let _t = thread::spawn(move || {
92 let mut stream = t!(TcpStream::connect(&addr));
93 t!(stream.write(&[99]));
94 tx.send(t!(stream.local_addr())).unwrap();
97 let (mut stream, addr) = t!(acceptor.accept());
99 t!(stream.read(&mut buf));
100 assert!(buf[0] == 99);
101 assert_eq!(addr, t!(rx.recv()));
107 each_ip(&mut |addr| {
108 let acceptor = t!(TcpListener::bind(&addr));
110 let _t = thread::spawn(move || {
111 let _stream = t!(TcpStream::connect(&addr));
115 let mut stream = t!(acceptor.accept()).0;
117 let nread = t!(stream.read(&mut buf));
118 assert_eq!(nread, 0);
119 let nread = t!(stream.read(&mut buf));
120 assert_eq!(nread, 0);
126 each_ip(&mut |addr| {
127 let acceptor = t!(TcpListener::bind(&addr));
129 let (tx, rx) = channel();
130 let _t = thread::spawn(move || {
131 drop(t!(TcpStream::connect(&addr)));
132 tx.send(()).unwrap();
135 let mut stream = t!(acceptor.accept()).0;
138 match stream.write(&buf) {
142 e.kind() == ErrorKind::ConnectionReset
143 || e.kind() == ErrorKind::BrokenPipe
144 || e.kind() == ErrorKind::ConnectionAborted,
153 fn multiple_connect_serial() {
154 each_ip(&mut |addr| {
156 let acceptor = t!(TcpListener::bind(&addr));
158 let _t = thread::spawn(move || {
160 let mut stream = t!(TcpStream::connect(&addr));
161 t!(stream.write(&[99]));
165 for stream in acceptor.incoming().take(max) {
166 let mut stream = t!(stream);
168 t!(stream.read(&mut buf));
169 assert_eq!(buf[0], 99);
175 fn multiple_connect_interleaved_greedy_schedule() {
176 const MAX: usize = 10;
177 each_ip(&mut |addr| {
178 let acceptor = t!(TcpListener::bind(&addr));
180 let _t = thread::spawn(move || {
181 let acceptor = acceptor;
182 for (i, stream) in acceptor.incoming().enumerate().take(MAX) {
183 // Start another thread to handle the connection
184 let _t = thread::spawn(move || {
185 let mut stream = t!(stream);
187 t!(stream.read(&mut buf));
188 assert!(buf[0] == i as u8);
196 fn connect(i: usize, addr: SocketAddr) {
201 let t = thread::spawn(move || {
202 let mut stream = t!(TcpStream::connect(&addr));
203 // Connect again before writing
204 connect(i + 1, addr);
205 t!(stream.write(&[i as u8]));
207 t.join().ok().expect("thread panicked");
212 fn multiple_connect_interleaved_lazy_schedule() {
213 const MAX: usize = 10;
214 each_ip(&mut |addr| {
215 let acceptor = t!(TcpListener::bind(&addr));
217 let _t = thread::spawn(move || {
218 for stream in acceptor.incoming().take(MAX) {
219 // Start another thread to handle the connection
220 let _t = thread::spawn(move || {
221 let mut stream = t!(stream);
223 t!(stream.read(&mut buf));
224 assert!(buf[0] == 99);
232 fn connect(i: usize, addr: SocketAddr) {
237 let t = thread::spawn(move || {
238 let mut stream = t!(TcpStream::connect(&addr));
239 connect(i + 1, addr);
240 t!(stream.write(&[99]));
242 t.join().ok().expect("thread panicked");
247 fn socket_and_peer_name() {
248 each_ip(&mut |addr| {
249 let listener = t!(TcpListener::bind(&addr));
250 let so_name = t!(listener.local_addr());
251 assert_eq!(addr, so_name);
252 let _t = thread::spawn(move || {
253 t!(listener.accept());
256 let stream = t!(TcpStream::connect(&addr));
257 assert_eq!(addr, t!(stream.peer_addr()));
263 each_ip(&mut |addr| {
264 let (tx, rx) = channel();
265 let srv = t!(TcpListener::bind(&addr));
266 let _t = thread::spawn(move || {
267 let mut cl = t!(srv.accept()).0;
268 cl.write(&[10]).unwrap();
271 tx.send(()).unwrap();
274 let mut c = t!(TcpStream::connect(&addr));
276 assert_eq!(c.read(&mut b).unwrap(), 1);
284 each_ip(&mut |addr| {
285 let srv = t!(TcpListener::bind(&addr));
286 let mut s1 = t!(TcpStream::connect(&addr));
287 let mut s2 = t!(srv.accept()).0;
289 let len = s1.write(&[10, 11, 12]).unwrap();
295 let len = t!(s2.read_vectored(&mut [
296 IoSliceMut::new(&mut a),
297 IoSliceMut::new(&mut b),
298 IoSliceMut::new(&mut c)
302 // some implementations don't support readv, so we may only fill the first buffer
303 assert!(len == 1 || c == [11, 12, 0]);
308 fn write_vectored() {
309 each_ip(&mut |addr| {
310 let srv = t!(TcpListener::bind(&addr));
311 let mut s1 = t!(TcpStream::connect(&addr));
312 let mut s2 = t!(srv.accept()).0;
317 t!(s1.write_vectored(&[IoSlice::new(&a), IoSlice::new(&b), IoSlice::new(&c)]));
319 let mut buf = [0; 4];
320 let len = t!(s2.read(&mut buf));
321 // some implementations don't support writev, so we may only write the first buffer
323 assert_eq!(buf, [10, 0, 0, 0]);
326 assert_eq!(buf, [10, 11, 12, 0]);
333 each_ip(&mut |addr| {
334 let listener1 = t!(TcpListener::bind(&addr));
335 match TcpListener::bind(&addr) {
336 Ok(listener2) => panic!(
337 "This system (perhaps due to options set by TcpListener::bind) \
338 permits double binding: {:?} and {:?}",
343 e.kind() == ErrorKind::ConnectionRefused
344 || e.kind() == ErrorKind::Uncategorized
345 || e.kind() == ErrorKind::AddrInUse,
346 "unknown error: {} {:?}",
356 fn tcp_clone_smoke() {
357 each_ip(&mut |addr| {
358 let acceptor = t!(TcpListener::bind(&addr));
360 let _t = thread::spawn(move || {
361 let mut s = t!(TcpStream::connect(&addr));
362 let mut buf = [0, 0];
363 assert_eq!(s.read(&mut buf).unwrap(), 1);
364 assert_eq!(buf[0], 1);
368 let mut s1 = t!(acceptor.accept()).0;
369 let s2 = t!(s1.try_clone());
371 let (tx1, rx1) = channel();
372 let (tx2, rx2) = channel();
373 let _t = thread::spawn(move || {
377 tx2.send(()).unwrap();
379 tx1.send(()).unwrap();
380 let mut buf = [0, 0];
381 assert_eq!(s1.read(&mut buf).unwrap(), 1);
387 fn tcp_clone_two_read() {
388 each_ip(&mut |addr| {
389 let acceptor = t!(TcpListener::bind(&addr));
390 let (tx1, rx) = channel();
391 let tx2 = tx1.clone();
393 let _t = thread::spawn(move || {
394 let mut s = t!(TcpStream::connect(&addr));
401 let mut s1 = t!(acceptor.accept()).0;
402 let s2 = t!(s1.try_clone());
404 let (done, rx) = channel();
405 let _t = thread::spawn(move || {
407 let mut buf = [0, 0];
408 t!(s2.read(&mut buf));
409 tx2.send(()).unwrap();
410 done.send(()).unwrap();
412 let mut buf = [0, 0];
413 t!(s1.read(&mut buf));
414 tx1.send(()).unwrap();
421 fn tcp_clone_two_write() {
422 each_ip(&mut |addr| {
423 let acceptor = t!(TcpListener::bind(&addr));
425 let _t = thread::spawn(move || {
426 let mut s = t!(TcpStream::connect(&addr));
427 let mut buf = [0, 1];
428 t!(s.read(&mut buf));
429 t!(s.read(&mut buf));
432 let mut s1 = t!(acceptor.accept()).0;
433 let s2 = t!(s1.try_clone());
435 let (done, rx) = channel();
436 let _t = thread::spawn(move || {
439 done.send(()).unwrap();
448 // FIXME: https://github.com/fortanix/rust-sgx/issues/110
449 #[cfg_attr(target_env = "sgx", ignore)]
450 fn shutdown_smoke() {
451 each_ip(&mut |addr| {
452 let a = t!(TcpListener::bind(&addr));
453 let _t = thread::spawn(move || {
454 let mut c = t!(a.accept()).0;
456 assert_eq!(c.read(&mut b).unwrap(), 0);
460 let mut s = t!(TcpStream::connect(&addr));
461 t!(s.shutdown(Shutdown::Write));
462 assert!(s.write(&[1]).is_err());
464 assert_eq!(t!(s.read(&mut b)), 1);
470 // FIXME: https://github.com/fortanix/rust-sgx/issues/110
471 #[cfg_attr(target_env = "sgx", ignore)]
472 fn close_readwrite_smoke() {
473 each_ip(&mut |addr| {
474 let a = t!(TcpListener::bind(&addr));
475 let (tx, rx) = channel::<()>();
476 let _t = thread::spawn(move || {
477 let _s = t!(a.accept());
482 let mut s = t!(TcpStream::connect(&addr));
483 let mut s2 = t!(s.try_clone());
485 // closing should prevent reads/writes
486 t!(s.shutdown(Shutdown::Write));
487 assert!(s.write(&[0]).is_err());
488 t!(s.shutdown(Shutdown::Read));
489 assert_eq!(s.read(&mut b).unwrap(), 0);
491 // closing should affect previous handles
492 assert!(s2.write(&[0]).is_err());
493 assert_eq!(s2.read(&mut b).unwrap(), 0);
495 // closing should affect new handles
496 let mut s3 = t!(s.try_clone());
497 assert!(s3.write(&[0]).is_err());
498 assert_eq!(s3.read(&mut b).unwrap(), 0);
500 // make sure these don't die
501 let _ = s2.shutdown(Shutdown::Read);
502 let _ = s2.shutdown(Shutdown::Write);
503 let _ = s3.shutdown(Shutdown::Read);
504 let _ = s3.shutdown(Shutdown::Write);
510 #[cfg_attr(target_env = "sgx", ignore)]
511 fn close_read_wakes_up() {
512 each_ip(&mut |addr| {
513 let a = t!(TcpListener::bind(&addr));
514 let (tx1, rx) = channel::<()>();
515 let _t = thread::spawn(move || {
516 let _s = t!(a.accept());
520 let s = t!(TcpStream::connect(&addr));
521 let s2 = t!(s.try_clone());
522 let (tx, rx) = channel();
523 let _t = thread::spawn(move || {
525 assert_eq!(t!(s2.read(&mut [0])), 0);
526 tx.send(()).unwrap();
528 // this should wake up the child thread
529 t!(s.shutdown(Shutdown::Read));
531 // this test will never finish if the child doesn't wake up
538 fn clone_while_reading() {
539 each_ip(&mut |addr| {
540 let accept = t!(TcpListener::bind(&addr));
542 // Enqueue a thread to write to a socket
543 let (tx, rx) = channel();
544 let (txdone, rxdone) = channel();
545 let txdone2 = txdone.clone();
546 let _t = thread::spawn(move || {
547 let mut tcp = t!(TcpStream::connect(&addr));
550 txdone2.send(()).unwrap();
553 // Spawn off a reading clone
554 let tcp = t!(accept.accept()).0;
555 let tcp2 = t!(tcp.try_clone());
556 let txdone3 = txdone.clone();
557 let _t = thread::spawn(move || {
559 t!(tcp2.read(&mut [0]));
560 txdone3.send(()).unwrap();
563 // Try to ensure that the reading clone is indeed reading
568 // clone the handle again while it's reading, then let it finish the
570 let _ = t!(tcp.try_clone());
571 tx.send(()).unwrap();
572 rxdone.recv().unwrap();
573 rxdone.recv().unwrap();
578 fn clone_accept_smoke() {
579 each_ip(&mut |addr| {
580 let a = t!(TcpListener::bind(&addr));
581 let a2 = t!(a.try_clone());
583 let _t = thread::spawn(move || {
584 let _ = TcpStream::connect(&addr);
586 let _t = thread::spawn(move || {
587 let _ = TcpStream::connect(&addr);
596 fn clone_accept_concurrent() {
597 each_ip(&mut |addr| {
598 let a = t!(TcpListener::bind(&addr));
599 let a2 = t!(a.try_clone());
601 let (tx, rx) = channel();
602 let tx2 = tx.clone();
604 let _t = thread::spawn(move || {
605 tx.send(t!(a.accept())).unwrap();
607 let _t = thread::spawn(move || {
608 tx2.send(t!(a2.accept())).unwrap();
611 let _t = thread::spawn(move || {
612 let _ = TcpStream::connect(&addr);
614 let _t = thread::spawn(move || {
615 let _ = TcpStream::connect(&addr);
625 #[cfg(not(target_env = "sgx"))]
626 fn render_socket_addr<'a>(addr: &'a SocketAddr) -> impl fmt::Debug + 'a {
629 #[cfg(target_env = "sgx")]
630 fn render_socket_addr<'a>(addr: &'a SocketAddr) -> impl fmt::Debug + 'a {
634 #[cfg(target_env = "sgx")]
635 use crate::os::fortanix_sgx::io::AsRawFd;
637 use crate::os::unix::io::AsRawFd;
639 fn render_inner(addr: &dyn AsRawFd) -> impl fmt::Debug {
643 fn render_inner(addr: &dyn crate::os::windows::io::AsRawSocket) -> impl fmt::Debug {
647 let inner_name = if cfg!(windows) { "socket" } else { "fd" };
648 let socket_addr = next_test_ip4();
650 let listener = t!(TcpListener::bind(&socket_addr));
651 let compare = format!(
652 "TcpListener {{ addr: {:?}, {}: {:?} }}",
653 render_socket_addr(&socket_addr),
655 render_inner(&listener)
657 assert_eq!(format!("{listener:?}"), compare);
659 let stream = t!(TcpStream::connect(&("localhost", socket_addr.port())));
660 let compare = format!(
661 "TcpStream {{ addr: {:?}, peer: {:?}, {}: {:?} }}",
662 render_socket_addr(&stream.local_addr().unwrap()),
663 render_socket_addr(&stream.peer_addr().unwrap()),
665 render_inner(&stream)
667 assert_eq!(format!("{stream:?}"), compare);
670 // FIXME: re-enabled openbsd tests once their socket timeout code
671 // no longer has rounding errors.
672 // VxWorks ignores SO_SNDTIMEO.
673 #[cfg_attr(any(target_os = "netbsd", target_os = "openbsd", target_os = "vxworks"), ignore)]
674 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
677 let addr = next_test_ip4();
678 let listener = t!(TcpListener::bind(&addr));
680 let stream = t!(TcpStream::connect(&("localhost", addr.port())));
681 let dur = Duration::new(15410, 0);
683 assert_eq!(None, t!(stream.read_timeout()));
685 t!(stream.set_read_timeout(Some(dur)));
686 assert_eq!(Some(dur), t!(stream.read_timeout()));
688 assert_eq!(None, t!(stream.write_timeout()));
690 t!(stream.set_write_timeout(Some(dur)));
691 assert_eq!(Some(dur), t!(stream.write_timeout()));
693 t!(stream.set_read_timeout(None));
694 assert_eq!(None, t!(stream.read_timeout()));
696 t!(stream.set_write_timeout(None));
697 assert_eq!(None, t!(stream.write_timeout()));
702 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
703 fn test_read_timeout() {
704 let addr = next_test_ip4();
705 let listener = t!(TcpListener::bind(&addr));
707 let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
708 t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
710 let mut buf = [0; 10];
711 let start = Instant::now();
712 let kind = stream.read_exact(&mut buf).err().expect("expected error").kind();
714 kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut,
715 "unexpected_error: {:?}",
718 assert!(start.elapsed() > Duration::from_millis(400));
723 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
724 fn test_read_with_timeout() {
725 let addr = next_test_ip4();
726 let listener = t!(TcpListener::bind(&addr));
728 let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
729 t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
731 let mut other_end = t!(listener.accept()).0;
732 t!(other_end.write_all(b"hello world"));
734 let mut buf = [0; 11];
735 t!(stream.read(&mut buf));
736 assert_eq!(b"hello world", &buf[..]);
738 let start = Instant::now();
739 let kind = stream.read_exact(&mut buf).err().expect("expected error").kind();
741 kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut,
742 "unexpected_error: {:?}",
745 assert!(start.elapsed() > Duration::from_millis(400));
749 // Ensure the `set_read_timeout` and `set_write_timeout` calls return errors
750 // when passed zero Durations
752 fn test_timeout_zero_duration() {
753 let addr = next_test_ip4();
755 let listener = t!(TcpListener::bind(&addr));
756 let stream = t!(TcpStream::connect(&addr));
758 let result = stream.set_write_timeout(Some(Duration::new(0, 0)));
759 let err = result.unwrap_err();
760 assert_eq!(err.kind(), ErrorKind::InvalidInput);
762 let result = stream.set_read_timeout(Some(Duration::new(0, 0)));
763 let err = result.unwrap_err();
764 assert_eq!(err.kind(), ErrorKind::InvalidInput);
770 #[cfg_attr(target_env = "sgx", ignore)]
772 let addr = next_test_ip4();
773 let _listener = t!(TcpListener::bind(&addr));
775 let stream = t!(TcpStream::connect(&("localhost", addr.port())));
777 assert_eq!(None, t!(stream.linger()));
778 t!(stream.set_linger(Some(Duration::from_secs(1))));
779 assert_eq!(Some(Duration::from_secs(1)), t!(stream.linger()));
780 t!(stream.set_linger(None));
781 assert_eq!(None, t!(stream.linger()));
785 #[cfg_attr(target_env = "sgx", ignore)]
787 let addr = next_test_ip4();
788 let _listener = t!(TcpListener::bind(&addr));
790 let stream = t!(TcpStream::connect(&("localhost", addr.port())));
792 assert_eq!(false, t!(stream.nodelay()));
793 t!(stream.set_nodelay(true));
794 assert_eq!(true, t!(stream.nodelay()));
795 t!(stream.set_nodelay(false));
796 assert_eq!(false, t!(stream.nodelay()));
800 #[cfg_attr(target_env = "sgx", ignore)]
804 let addr = next_test_ip4();
805 let listener = t!(TcpListener::bind(&addr));
807 t!(listener.set_ttl(ttl));
808 assert_eq!(ttl, t!(listener.ttl()));
810 let stream = t!(TcpStream::connect(&("localhost", addr.port())));
812 t!(stream.set_ttl(ttl));
813 assert_eq!(ttl, t!(stream.ttl()));
817 #[cfg_attr(target_env = "sgx", ignore)]
818 fn set_nonblocking() {
819 let addr = next_test_ip4();
820 let listener = t!(TcpListener::bind(&addr));
822 t!(listener.set_nonblocking(true));
823 t!(listener.set_nonblocking(false));
825 let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
827 t!(stream.set_nonblocking(false));
828 t!(stream.set_nonblocking(true));
831 match stream.read(&mut buf) {
832 Ok(_) => panic!("expected error"),
833 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
834 Err(e) => panic!("unexpected error {e}"),
839 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
841 each_ip(&mut |addr| {
842 let (txdone, rxdone) = channel();
844 let srv = t!(TcpListener::bind(&addr));
845 let _t = thread::spawn(move || {
846 let mut cl = t!(srv.accept()).0;
847 cl.write(&[1, 3, 3, 7]).unwrap();
851 let mut c = t!(TcpStream::connect(&addr));
854 let len = c.peek(&mut b).unwrap();
857 let len = c.read(&mut b).unwrap();
860 t!(c.set_nonblocking(true));
861 match c.peek(&mut b) {
862 Ok(_) => panic!("expected error"),
863 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
864 Err(e) => panic!("unexpected error {e}"),
871 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
872 fn connect_timeout_valid() {
873 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
874 let addr = listener.local_addr().unwrap();
875 TcpStream::connect_timeout(&addr, Duration::from_secs(2)).unwrap();