2 use crate::io::prelude::*;
3 use crate::io::{ErrorKind, IoSlice, IoSliceMut};
4 use crate::net::test::{next_test_ip4, next_test_ip6};
6 use crate::sync::mpsc::channel;
8 use crate::time::{Duration, Instant};
10 fn each_ip(f: &mut dyn FnMut(SocketAddr)) {
19 Err(e) => panic!("received error for `{}`: {}", stringify!($e), e),
26 match TcpListener::bind("1.1.1.1:9999") {
28 Err(e) => assert_eq!(e.kind(), ErrorKind::AddrNotAvailable),
34 match TcpStream::connect("0.0.0.0:1") {
37 e.kind() == ErrorKind::ConnectionRefused
38 || e.kind() == ErrorKind::InvalidInput
39 || e.kind() == ErrorKind::AddrInUse
40 || e.kind() == ErrorKind::AddrNotAvailable,
49 fn listen_localhost() {
50 let socket_addr = next_test_ip4();
51 let listener = t!(TcpListener::bind(&socket_addr));
53 let _t = thread::spawn(move || {
54 let mut stream = t!(TcpStream::connect(&("localhost", socket_addr.port())));
55 t!(stream.write(&[144]));
58 let mut stream = t!(listener.accept()).0;
60 t!(stream.read(&mut buf));
61 assert!(buf[0] == 144);
65 fn connect_loopback() {
67 let acceptor = t!(TcpListener::bind(&addr));
69 let _t = thread::spawn(move || {
70 let host = match addr {
71 SocketAddr::V4(..) => "127.0.0.1",
72 SocketAddr::V6(..) => "::1",
74 let mut stream = t!(TcpStream::connect(&(host, addr.port())));
75 t!(stream.write(&[66]));
78 let mut stream = t!(acceptor.accept()).0;
80 t!(stream.read(&mut buf));
81 assert!(buf[0] == 66);
88 let acceptor = t!(TcpListener::bind(&addr));
90 let (tx, rx) = channel();
91 let _t = thread::spawn(move || {
92 let mut stream = t!(TcpStream::connect(&addr));
93 t!(stream.write(&[99]));
94 tx.send(t!(stream.local_addr())).unwrap();
97 let (mut stream, addr) = t!(acceptor.accept());
99 t!(stream.read(&mut buf));
100 assert!(buf[0] == 99);
101 assert_eq!(addr, t!(rx.recv()));
107 each_ip(&mut |addr| {
108 let acceptor = t!(TcpListener::bind(&addr));
110 let _t = thread::spawn(move || {
111 let _stream = t!(TcpStream::connect(&addr));
115 let mut stream = t!(acceptor.accept()).0;
117 let nread = t!(stream.read(&mut buf));
118 assert_eq!(nread, 0);
119 let nread = t!(stream.read(&mut buf));
120 assert_eq!(nread, 0);
126 each_ip(&mut |addr| {
127 let acceptor = t!(TcpListener::bind(&addr));
129 let (tx, rx) = channel();
130 let _t = thread::spawn(move || {
131 drop(t!(TcpStream::connect(&addr)));
132 tx.send(()).unwrap();
135 let mut stream = t!(acceptor.accept()).0;
138 match stream.write(&buf) {
142 e.kind() == ErrorKind::ConnectionReset
143 || e.kind() == ErrorKind::BrokenPipe
144 || e.kind() == ErrorKind::ConnectionAborted,
154 fn multiple_connect_serial() {
155 each_ip(&mut |addr| {
157 let acceptor = t!(TcpListener::bind(&addr));
159 let _t = thread::spawn(move || {
161 let mut stream = t!(TcpStream::connect(&addr));
162 t!(stream.write(&[99]));
166 for stream in acceptor.incoming().take(max) {
167 let mut stream = t!(stream);
169 t!(stream.read(&mut buf));
170 assert_eq!(buf[0], 99);
176 fn multiple_connect_interleaved_greedy_schedule() {
177 const MAX: usize = 10;
178 each_ip(&mut |addr| {
179 let acceptor = t!(TcpListener::bind(&addr));
181 let _t = thread::spawn(move || {
182 let acceptor = acceptor;
183 for (i, stream) in acceptor.incoming().enumerate().take(MAX) {
184 // Start another thread to handle the connection
185 let _t = thread::spawn(move || {
186 let mut stream = t!(stream);
188 t!(stream.read(&mut buf));
189 assert!(buf[0] == i as u8);
197 fn connect(i: usize, addr: SocketAddr) {
202 let t = thread::spawn(move || {
203 let mut stream = t!(TcpStream::connect(&addr));
204 // Connect again before writing
205 connect(i + 1, addr);
206 t!(stream.write(&[i as u8]));
208 t.join().ok().expect("thread panicked");
213 fn multiple_connect_interleaved_lazy_schedule() {
214 const MAX: usize = 10;
215 each_ip(&mut |addr| {
216 let acceptor = t!(TcpListener::bind(&addr));
218 let _t = thread::spawn(move || {
219 for stream in acceptor.incoming().take(MAX) {
220 // Start another thread to handle the connection
221 let _t = thread::spawn(move || {
222 let mut stream = t!(stream);
224 t!(stream.read(&mut buf));
225 assert!(buf[0] == 99);
233 fn connect(i: usize, addr: SocketAddr) {
238 let t = thread::spawn(move || {
239 let mut stream = t!(TcpStream::connect(&addr));
240 connect(i + 1, addr);
241 t!(stream.write(&[99]));
243 t.join().ok().expect("thread panicked");
248 fn socket_and_peer_name() {
249 each_ip(&mut |addr| {
250 let listener = t!(TcpListener::bind(&addr));
251 let so_name = t!(listener.local_addr());
252 assert_eq!(addr, so_name);
253 let _t = thread::spawn(move || {
254 t!(listener.accept());
257 let stream = t!(TcpStream::connect(&addr));
258 assert_eq!(addr, t!(stream.peer_addr()));
264 each_ip(&mut |addr| {
265 let (tx, rx) = channel();
266 let srv = t!(TcpListener::bind(&addr));
267 let _t = thread::spawn(move || {
268 let mut cl = t!(srv.accept()).0;
269 cl.write(&[10]).unwrap();
272 tx.send(()).unwrap();
275 let mut c = t!(TcpStream::connect(&addr));
277 assert_eq!(c.read(&mut b).unwrap(), 1);
285 each_ip(&mut |addr| {
286 let srv = t!(TcpListener::bind(&addr));
287 let mut s1 = t!(TcpStream::connect(&addr));
288 let mut s2 = t!(srv.accept()).0;
290 let len = s1.write(&[10, 11, 12]).unwrap();
296 let len = t!(s2.read_vectored(&mut [
297 IoSliceMut::new(&mut a),
298 IoSliceMut::new(&mut b),
299 IoSliceMut::new(&mut c)
303 // some implementations don't support readv, so we may only fill the first buffer
304 assert!(len == 1 || c == [11, 12, 0]);
309 fn write_vectored() {
310 each_ip(&mut |addr| {
311 let srv = t!(TcpListener::bind(&addr));
312 let mut s1 = t!(TcpStream::connect(&addr));
313 let mut s2 = t!(srv.accept()).0;
318 t!(s1.write_vectored(&[IoSlice::new(&a), IoSlice::new(&b), IoSlice::new(&c)]));
320 let mut buf = [0; 4];
321 let len = t!(s2.read(&mut buf));
322 // some implementations don't support writev, so we may only write the first buffer
324 assert_eq!(buf, [10, 0, 0, 0]);
327 assert_eq!(buf, [10, 11, 12, 0]);
334 each_ip(&mut |addr| {
335 let listener1 = t!(TcpListener::bind(&addr));
336 match TcpListener::bind(&addr) {
337 Ok(listener2) => panic!(
338 "This system (perhaps due to options set by TcpListener::bind) \
339 permits double binding: {:?} and {:?}",
344 e.kind() == ErrorKind::ConnectionRefused
345 || e.kind() == ErrorKind::Uncategorized
346 || e.kind() == ErrorKind::AddrInUse,
347 "unknown error: {} {:?}",
357 fn tcp_clone_smoke() {
358 each_ip(&mut |addr| {
359 let acceptor = t!(TcpListener::bind(&addr));
361 let _t = thread::spawn(move || {
362 let mut s = t!(TcpStream::connect(&addr));
363 let mut buf = [0, 0];
364 assert_eq!(s.read(&mut buf).unwrap(), 1);
365 assert_eq!(buf[0], 1);
369 let mut s1 = t!(acceptor.accept()).0;
370 let s2 = t!(s1.try_clone());
372 let (tx1, rx1) = channel();
373 let (tx2, rx2) = channel();
374 let _t = thread::spawn(move || {
378 tx2.send(()).unwrap();
380 tx1.send(()).unwrap();
381 let mut buf = [0, 0];
382 assert_eq!(s1.read(&mut buf).unwrap(), 1);
388 fn tcp_clone_two_read() {
389 each_ip(&mut |addr| {
390 let acceptor = t!(TcpListener::bind(&addr));
391 let (tx1, rx) = channel();
392 let tx2 = tx1.clone();
394 let _t = thread::spawn(move || {
395 let mut s = t!(TcpStream::connect(&addr));
402 let mut s1 = t!(acceptor.accept()).0;
403 let s2 = t!(s1.try_clone());
405 let (done, rx) = channel();
406 let _t = thread::spawn(move || {
408 let mut buf = [0, 0];
409 t!(s2.read(&mut buf));
410 tx2.send(()).unwrap();
411 done.send(()).unwrap();
413 let mut buf = [0, 0];
414 t!(s1.read(&mut buf));
415 tx1.send(()).unwrap();
422 fn tcp_clone_two_write() {
423 each_ip(&mut |addr| {
424 let acceptor = t!(TcpListener::bind(&addr));
426 let _t = thread::spawn(move || {
427 let mut s = t!(TcpStream::connect(&addr));
428 let mut buf = [0, 1];
429 t!(s.read(&mut buf));
430 t!(s.read(&mut buf));
433 let mut s1 = t!(acceptor.accept()).0;
434 let s2 = t!(s1.try_clone());
436 let (done, rx) = channel();
437 let _t = thread::spawn(move || {
440 done.send(()).unwrap();
449 // FIXME: https://github.com/fortanix/rust-sgx/issues/110
450 #[cfg_attr(target_env = "sgx", ignore)]
451 fn shutdown_smoke() {
452 each_ip(&mut |addr| {
453 let a = t!(TcpListener::bind(&addr));
454 let _t = thread::spawn(move || {
455 let mut c = t!(a.accept()).0;
457 assert_eq!(c.read(&mut b).unwrap(), 0);
461 let mut s = t!(TcpStream::connect(&addr));
462 t!(s.shutdown(Shutdown::Write));
463 assert!(s.write(&[1]).is_err());
465 assert_eq!(t!(s.read(&mut b)), 1);
471 // FIXME: https://github.com/fortanix/rust-sgx/issues/110
472 #[cfg_attr(target_env = "sgx", ignore)]
473 fn close_readwrite_smoke() {
474 each_ip(&mut |addr| {
475 let a = t!(TcpListener::bind(&addr));
476 let (tx, rx) = channel::<()>();
477 let _t = thread::spawn(move || {
478 let _s = t!(a.accept());
483 let mut s = t!(TcpStream::connect(&addr));
484 let mut s2 = t!(s.try_clone());
486 // closing should prevent reads/writes
487 t!(s.shutdown(Shutdown::Write));
488 assert!(s.write(&[0]).is_err());
489 t!(s.shutdown(Shutdown::Read));
490 assert_eq!(s.read(&mut b).unwrap(), 0);
492 // closing should affect previous handles
493 assert!(s2.write(&[0]).is_err());
494 assert_eq!(s2.read(&mut b).unwrap(), 0);
496 // closing should affect new handles
497 let mut s3 = t!(s.try_clone());
498 assert!(s3.write(&[0]).is_err());
499 assert_eq!(s3.read(&mut b).unwrap(), 0);
501 // make sure these don't die
502 let _ = s2.shutdown(Shutdown::Read);
503 let _ = s2.shutdown(Shutdown::Write);
504 let _ = s3.shutdown(Shutdown::Read);
505 let _ = s3.shutdown(Shutdown::Write);
511 #[cfg(unix)] // test doesn't work on Windows, see #31657
512 fn close_read_wakes_up() {
513 each_ip(&mut |addr| {
514 let a = t!(TcpListener::bind(&addr));
515 let (tx1, rx) = channel::<()>();
516 let _t = thread::spawn(move || {
517 let _s = t!(a.accept());
521 let s = t!(TcpStream::connect(&addr));
522 let s2 = t!(s.try_clone());
523 let (tx, rx) = channel();
524 let _t = thread::spawn(move || {
526 assert_eq!(t!(s2.read(&mut [0])), 0);
527 tx.send(()).unwrap();
529 // this should wake up the child thread
530 t!(s.shutdown(Shutdown::Read));
532 // this test will never finish if the child doesn't wake up
539 fn clone_while_reading() {
540 each_ip(&mut |addr| {
541 let accept = t!(TcpListener::bind(&addr));
543 // Enqueue a thread to write to a socket
544 let (tx, rx) = channel();
545 let (txdone, rxdone) = channel();
546 let txdone2 = txdone.clone();
547 let _t = thread::spawn(move || {
548 let mut tcp = t!(TcpStream::connect(&addr));
551 txdone2.send(()).unwrap();
554 // Spawn off a reading clone
555 let tcp = t!(accept.accept()).0;
556 let tcp2 = t!(tcp.try_clone());
557 let txdone3 = txdone.clone();
558 let _t = thread::spawn(move || {
560 t!(tcp2.read(&mut [0]));
561 txdone3.send(()).unwrap();
564 // Try to ensure that the reading clone is indeed reading
569 // clone the handle again while it's reading, then let it finish the
571 let _ = t!(tcp.try_clone());
572 tx.send(()).unwrap();
573 rxdone.recv().unwrap();
574 rxdone.recv().unwrap();
579 fn clone_accept_smoke() {
580 each_ip(&mut |addr| {
581 let a = t!(TcpListener::bind(&addr));
582 let a2 = t!(a.try_clone());
584 let _t = thread::spawn(move || {
585 let _ = TcpStream::connect(&addr);
587 let _t = thread::spawn(move || {
588 let _ = TcpStream::connect(&addr);
597 fn clone_accept_concurrent() {
598 each_ip(&mut |addr| {
599 let a = t!(TcpListener::bind(&addr));
600 let a2 = t!(a.try_clone());
602 let (tx, rx) = channel();
603 let tx2 = tx.clone();
605 let _t = thread::spawn(move || {
606 tx.send(t!(a.accept())).unwrap();
608 let _t = thread::spawn(move || {
609 tx2.send(t!(a2.accept())).unwrap();
612 let _t = thread::spawn(move || {
613 let _ = TcpStream::connect(&addr);
615 let _t = thread::spawn(move || {
616 let _ = TcpStream::connect(&addr);
626 #[cfg(not(target_env = "sgx"))]
627 fn render_socket_addr<'a>(addr: &'a SocketAddr) -> impl fmt::Debug + 'a {
630 #[cfg(target_env = "sgx")]
631 fn render_socket_addr<'a>(addr: &'a SocketAddr) -> impl fmt::Debug + 'a {
635 #[cfg(target_env = "sgx")]
636 use crate::os::fortanix_sgx::io::AsRawFd;
638 use crate::os::unix::io::AsRawFd;
640 fn render_inner(addr: &dyn AsRawFd) -> impl fmt::Debug {
644 fn render_inner(addr: &dyn crate::os::windows::io::AsRawSocket) -> impl fmt::Debug {
648 let inner_name = if cfg!(windows) { "socket" } else { "fd" };
649 let socket_addr = next_test_ip4();
651 let listener = t!(TcpListener::bind(&socket_addr));
652 let compare = format!(
653 "TcpListener {{ addr: {:?}, {}: {:?} }}",
654 render_socket_addr(&socket_addr),
656 render_inner(&listener)
658 assert_eq!(format!("{:?}", listener), compare);
660 let stream = t!(TcpStream::connect(&("localhost", socket_addr.port())));
661 let compare = format!(
662 "TcpStream {{ addr: {:?}, peer: {:?}, {}: {:?} }}",
663 render_socket_addr(&stream.local_addr().unwrap()),
664 render_socket_addr(&stream.peer_addr().unwrap()),
666 render_inner(&stream)
668 assert_eq!(format!("{:?}", stream), compare);
671 // FIXME: re-enabled openbsd tests once their socket timeout code
672 // no longer has rounding errors.
673 // VxWorks ignores SO_SNDTIMEO.
674 #[cfg_attr(any(target_os = "netbsd", target_os = "openbsd", target_os = "vxworks"), ignore)]
675 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
678 let addr = next_test_ip4();
679 let listener = t!(TcpListener::bind(&addr));
681 let stream = t!(TcpStream::connect(&("localhost", addr.port())));
682 let dur = Duration::new(15410, 0);
684 assert_eq!(None, t!(stream.read_timeout()));
686 t!(stream.set_read_timeout(Some(dur)));
687 assert_eq!(Some(dur), t!(stream.read_timeout()));
689 assert_eq!(None, t!(stream.write_timeout()));
691 t!(stream.set_write_timeout(Some(dur)));
692 assert_eq!(Some(dur), t!(stream.write_timeout()));
694 t!(stream.set_read_timeout(None));
695 assert_eq!(None, t!(stream.read_timeout()));
697 t!(stream.set_write_timeout(None));
698 assert_eq!(None, t!(stream.write_timeout()));
703 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
704 fn test_read_timeout() {
705 let addr = next_test_ip4();
706 let listener = t!(TcpListener::bind(&addr));
708 let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
709 t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
711 let mut buf = [0; 10];
712 let start = Instant::now();
713 let kind = stream.read_exact(&mut buf).err().expect("expected error").kind();
715 kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut,
716 "unexpected_error: {:?}",
719 assert!(start.elapsed() > Duration::from_millis(400));
724 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
725 fn test_read_with_timeout() {
726 let addr = next_test_ip4();
727 let listener = t!(TcpListener::bind(&addr));
729 let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
730 t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
732 let mut other_end = t!(listener.accept()).0;
733 t!(other_end.write_all(b"hello world"));
735 let mut buf = [0; 11];
736 t!(stream.read(&mut buf));
737 assert_eq!(b"hello world", &buf[..]);
739 let start = Instant::now();
740 let kind = stream.read_exact(&mut buf).err().expect("expected error").kind();
742 kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut,
743 "unexpected_error: {:?}",
746 assert!(start.elapsed() > Duration::from_millis(400));
750 // Ensure the `set_read_timeout` and `set_write_timeout` calls return errors
751 // when passed zero Durations
753 fn test_timeout_zero_duration() {
754 let addr = next_test_ip4();
756 let listener = t!(TcpListener::bind(&addr));
757 let stream = t!(TcpStream::connect(&addr));
759 let result = stream.set_write_timeout(Some(Duration::new(0, 0)));
760 let err = result.unwrap_err();
761 assert_eq!(err.kind(), ErrorKind::InvalidInput);
763 let result = stream.set_read_timeout(Some(Duration::new(0, 0)));
764 let err = result.unwrap_err();
765 assert_eq!(err.kind(), ErrorKind::InvalidInput);
771 #[cfg_attr(target_env = "sgx", ignore)]
773 let addr = next_test_ip4();
774 let _listener = t!(TcpListener::bind(&addr));
776 let stream = t!(TcpStream::connect(&("localhost", addr.port())));
778 assert_eq!(false, t!(stream.nodelay()));
779 t!(stream.set_nodelay(true));
780 assert_eq!(true, t!(stream.nodelay()));
781 t!(stream.set_nodelay(false));
782 assert_eq!(false, t!(stream.nodelay()));
786 #[cfg_attr(target_env = "sgx", ignore)]
790 let addr = next_test_ip4();
791 let listener = t!(TcpListener::bind(&addr));
793 t!(listener.set_ttl(ttl));
794 assert_eq!(ttl, t!(listener.ttl()));
796 let stream = t!(TcpStream::connect(&("localhost", addr.port())));
798 t!(stream.set_ttl(ttl));
799 assert_eq!(ttl, t!(stream.ttl()));
803 #[cfg_attr(target_env = "sgx", ignore)]
804 fn set_nonblocking() {
805 let addr = next_test_ip4();
806 let listener = t!(TcpListener::bind(&addr));
808 t!(listener.set_nonblocking(true));
809 t!(listener.set_nonblocking(false));
811 let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
813 t!(stream.set_nonblocking(false));
814 t!(stream.set_nonblocking(true));
817 match stream.read(&mut buf) {
818 Ok(_) => panic!("expected error"),
819 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
820 Err(e) => panic!("unexpected error {}", e),
825 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
827 each_ip(&mut |addr| {
828 let (txdone, rxdone) = channel();
830 let srv = t!(TcpListener::bind(&addr));
831 let _t = thread::spawn(move || {
832 let mut cl = t!(srv.accept()).0;
833 cl.write(&[1, 3, 3, 7]).unwrap();
837 let mut c = t!(TcpStream::connect(&addr));
840 let len = c.peek(&mut b).unwrap();
843 let len = c.read(&mut b).unwrap();
846 t!(c.set_nonblocking(true));
847 match c.peek(&mut b) {
848 Ok(_) => panic!("expected error"),
849 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
850 Err(e) => panic!("unexpected error {}", e),
857 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
858 fn connect_timeout_valid() {
859 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
860 let addr = listener.local_addr().unwrap();
861 TcpStream::connect_timeout(&addr, Duration::from_secs(2)).unwrap();