1 // Copyright 2015 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
16 use net::{ToSocketAddrs, SocketAddr, Shutdown};
17 use sys_common::net as net_imp;
18 use sys_common::{AsInner, FromInner, IntoInner};
21 /// A structure which represents a TCP stream between a local socket and a
24 /// The socket will be closed when the value is dropped.
29 /// use std::io::prelude::*;
30 /// use std::net::TcpStream;
33 /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
35 /// // ignore the Result
36 /// let _ = stream.write(&[1]);
37 /// let _ = stream.read(&mut [0; 128]); // ignore here too
38 /// } // the stream is closed here
40 #[stable(feature = "rust1", since = "1.0.0")]
41 pub struct TcpStream(net_imp::TcpStream);
43 /// A structure representing a socket server.
48 /// use std::net::{TcpListener, TcpStream};
51 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
53 /// fn handle_client(stream: TcpStream) {
57 /// // accept connections and process them, spawning a new thread for each one
58 /// for stream in listener.incoming() {
61 /// thread::spawn(move|| {
62 /// // connection succeeded
63 /// handle_client(stream)
66 /// Err(e) => { /* connection failed */ }
70 /// // close the socket server
73 #[stable(feature = "rust1", since = "1.0.0")]
74 pub struct TcpListener(net_imp::TcpListener);
76 /// An infinite iterator over the connections from a `TcpListener`.
78 /// This iterator will infinitely yield `Some` of the accepted connections. It
79 /// is equivalent to calling `accept` in a loop.
80 #[stable(feature = "rust1", since = "1.0.0")]
81 pub struct Incoming<'a> { listener: &'a TcpListener }
84 /// Opens a TCP connection to a remote host.
86 /// `addr` is an address of the remote host. Anything which implements
87 /// `ToSocketAddrs` trait can be supplied for the address; see this trait
88 /// documentation for concrete examples.
89 #[stable(feature = "rust1", since = "1.0.0")]
90 pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
91 super::each_addr(addr, net_imp::TcpStream::connect).map(TcpStream)
94 /// Returns the socket address of the remote peer of this TCP connection.
95 #[stable(feature = "rust1", since = "1.0.0")]
96 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
100 /// Returns the socket address of the local half of this TCP connection.
101 #[stable(feature = "rust1", since = "1.0.0")]
102 pub fn local_addr(&self) -> io::Result<SocketAddr> {
106 /// Shuts down the read, write, or both halves of this connection.
108 /// This function will cause all pending and future I/O on the specified
109 /// portions to return immediately with an appropriate value (see the
110 /// documentation of `Shutdown`).
111 #[stable(feature = "rust1", since = "1.0.0")]
112 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
116 /// Creates a new independently owned handle to the underlying socket.
118 /// The returned `TcpStream` is a reference to the same stream that this
119 /// object references. Both handles will read and write the same stream of
120 /// data, and options set on one stream will be propagated to the other
122 #[stable(feature = "rust1", since = "1.0.0")]
123 pub fn try_clone(&self) -> io::Result<TcpStream> {
124 self.0.duplicate().map(TcpStream)
127 /// Sets the read timeout to the timeout specified.
129 /// If the value specified is `None`, then `read` calls will block
130 /// indefinitely. It is an error to pass the zero `Duration` to this
135 /// Platforms may return a different error code whenever a read times out as
136 /// a result of setting this option. For example Unix typically returns an
137 /// error of the kind `WouldBlock`, but Windows may return `TimedOut`.
138 #[stable(feature = "socket_timeout", since = "1.4.0")]
139 pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
140 self.0.set_read_timeout(dur)
143 /// Sets the write timeout to the timeout specified.
145 /// If the value specified is `None`, then `write` calls will block
146 /// indefinitely. It is an error to pass the zero `Duration` to this
151 /// Platforms may return a different error code whenever a write times out
152 /// as a result of setting this option. For example Unix typically returns
153 /// an error of the kind `WouldBlock`, but Windows may return `TimedOut`.
154 #[stable(feature = "socket_timeout", since = "1.4.0")]
155 pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
156 self.0.set_write_timeout(dur)
159 /// Returns the read timeout of this socket.
161 /// If the timeout is `None`, then `read` calls will block indefinitely.
165 /// Some platforms do not provide access to the current timeout.
166 #[stable(feature = "socket_timeout", since = "1.4.0")]
167 pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
168 self.0.read_timeout()
171 /// Returns the write timeout of this socket.
173 /// If the timeout is `None`, then `write` calls will block indefinitely.
177 /// Some platforms do not provide access to the current timeout.
178 #[stable(feature = "socket_timeout", since = "1.4.0")]
179 pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
180 self.0.write_timeout()
183 /// Sets the value of the `TCP_NODELAY` option on this socket.
185 /// If set, this option disables the Nagle algorithm. This means that
186 /// segments are always sent as soon as possible, even if there is only a
187 /// small amount of data. When not set, data is buffered until there is a
188 /// sufficient amount to send out, thereby avoiding the frequent sending of
190 #[stable(feature = "net2_mutators", since = "1.9.0")]
191 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
192 self.0.set_nodelay(nodelay)
195 /// Gets the value of the `TCP_NODELAY` option on this socket.
197 /// For more information about this option, see [`set_nodelay`][link].
199 /// [link]: #tymethod.set_nodelay
200 #[stable(feature = "net2_mutators", since = "1.9.0")]
201 pub fn nodelay(&self) -> io::Result<bool> {
205 /// Sets the value for the `IP_TTL` option on this socket.
207 /// This value sets the time-to-live field that is used in every packet sent
208 /// from this socket.
209 #[stable(feature = "net2_mutators", since = "1.9.0")]
210 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
214 /// Gets the value of the `IP_TTL` option for this socket.
216 /// For more information about this option, see [`set_ttl`][link].
218 /// [link]: #tymethod.set_ttl
219 #[stable(feature = "net2_mutators", since = "1.9.0")]
220 pub fn ttl(&self) -> io::Result<u32> {
224 /// Sets the value for the `IPV6_V6ONLY` option on this socket.
226 /// If this is set to `true` then the socket is restricted to sending and
227 /// receiving IPv6 packets only. If this is the case, an IPv4 and an IPv6
228 /// application can each bind the same port at the same time.
230 /// If this is set to `false` then the socket can be used to send and
231 /// receive packets from an IPv4-mapped IPv6 address.
232 #[stable(feature = "net2_mutators", since = "1.9.0")]
233 pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
234 self.0.set_only_v6(only_v6)
237 /// Gets the value of the `IPV6_V6ONLY` option for this socket.
239 /// For more information about this option, see [`set_only_v6`][link].
241 /// [link]: #tymethod.set_only_v6
242 #[stable(feature = "net2_mutators", since = "1.9.0")]
243 pub fn only_v6(&self) -> io::Result<bool> {
247 /// Get the value of the `SO_ERROR` option on this socket.
249 /// This will retrieve the stored error in the underlying socket, clearing
250 /// the field in the process. This can be useful for checking errors between
252 #[stable(feature = "net2_mutators", since = "1.9.0")]
253 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
257 /// Moves this TCP stream into or out of nonblocking mode.
259 /// On Unix this corresponds to calling fcntl, and on Windows this
260 /// corresponds to calling ioctlsocket.
261 #[stable(feature = "net2_mutators", since = "1.9.0")]
262 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
263 self.0.set_nonblocking(nonblocking)
267 #[stable(feature = "rust1", since = "1.0.0")]
268 impl Read for TcpStream {
269 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
270 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
271 self.0.read_to_end(buf)
274 #[stable(feature = "rust1", since = "1.0.0")]
275 impl Write for TcpStream {
276 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
277 fn flush(&mut self) -> io::Result<()> { Ok(()) }
279 #[stable(feature = "rust1", since = "1.0.0")]
280 impl<'a> Read for &'a TcpStream {
281 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
282 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
283 self.0.read_to_end(buf)
286 #[stable(feature = "rust1", since = "1.0.0")]
287 impl<'a> Write for &'a TcpStream {
288 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
289 fn flush(&mut self) -> io::Result<()> { Ok(()) }
292 impl AsInner<net_imp::TcpStream> for TcpStream {
293 fn as_inner(&self) -> &net_imp::TcpStream { &self.0 }
296 impl FromInner<net_imp::TcpStream> for TcpStream {
297 fn from_inner(inner: net_imp::TcpStream) -> TcpStream { TcpStream(inner) }
300 impl IntoInner<net_imp::TcpStream> for TcpStream {
301 fn into_inner(self) -> net_imp::TcpStream { self.0 }
304 #[stable(feature = "rust1", since = "1.0.0")]
305 impl fmt::Debug for TcpStream {
306 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
312 /// Creates a new `TcpListener` which will be bound to the specified
315 /// The returned listener is ready for accepting connections.
317 /// Binding with a port number of 0 will request that the OS assigns a port
318 /// to this listener. The port allocated can be queried via the
319 /// `local_addr` method.
321 /// The address type can be any implementor of `ToSocketAddrs` trait. See
322 /// its documentation for concrete examples.
323 #[stable(feature = "rust1", since = "1.0.0")]
324 pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
325 super::each_addr(addr, net_imp::TcpListener::bind).map(TcpListener)
328 /// Returns the local socket address of this listener.
329 #[stable(feature = "rust1", since = "1.0.0")]
330 pub fn local_addr(&self) -> io::Result<SocketAddr> {
334 /// Creates a new independently owned handle to the underlying socket.
336 /// The returned `TcpListener` is a reference to the same socket that this
337 /// object references. Both handles can be used to accept incoming
338 /// connections and options set on one listener will affect the other.
339 #[stable(feature = "rust1", since = "1.0.0")]
340 pub fn try_clone(&self) -> io::Result<TcpListener> {
341 self.0.duplicate().map(TcpListener)
344 /// Accept a new incoming connection from this listener.
346 /// This function will block the calling thread until a new TCP connection
347 /// is established. When established, the corresponding `TcpStream` and the
348 /// remote peer's address will be returned.
349 #[stable(feature = "rust1", since = "1.0.0")]
350 pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
351 self.0.accept().map(|(a, b)| (TcpStream(a), b))
354 /// Returns an iterator over the connections being received on this
357 /// The returned iterator will never return `None` and will also not yield
358 /// the peer's `SocketAddr` structure.
359 #[stable(feature = "rust1", since = "1.0.0")]
360 pub fn incoming(&self) -> Incoming {
361 Incoming { listener: self }
364 /// Sets the value for the `IP_TTL` option on this socket.
366 /// This value sets the time-to-live field that is used in every packet sent
367 /// from this socket.
368 #[stable(feature = "net2_mutators", since = "1.9.0")]
369 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
373 /// Gets the value of the `IP_TTL` option for this socket.
375 /// For more information about this option, see [`set_ttl`][link].
377 /// [link]: #tymethod.set_ttl
378 #[stable(feature = "net2_mutators", since = "1.9.0")]
379 pub fn ttl(&self) -> io::Result<u32> {
383 /// Sets the value for the `IPV6_V6ONLY` option on this socket.
385 /// If this is set to `true` then the socket is restricted to sending and
386 /// receiving IPv6 packets only. In this case two IPv4 and IPv6 applications
387 /// can bind the same port at the same time.
389 /// If this is set to `false` then the socket can be used to send and
390 /// receive packets from an IPv4-mapped IPv6 address.
391 #[stable(feature = "net2_mutators", since = "1.9.0")]
392 pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
393 self.0.set_only_v6(only_v6)
396 /// Gets the value of the `IPV6_V6ONLY` option for this socket.
398 /// For more information about this option, see [`set_only_v6`][link].
400 /// [link]: #tymethod.set_only_v6
401 #[stable(feature = "net2_mutators", since = "1.9.0")]
402 pub fn only_v6(&self) -> io::Result<bool> {
406 /// Get the value of the `SO_ERROR` option on this socket.
408 /// This will retrieve the stored error in the underlying socket, clearing
409 /// the field in the process. This can be useful for checking errors between
411 #[stable(feature = "net2_mutators", since = "1.9.0")]
412 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
416 /// Moves this TCP stream into or out of nonblocking mode.
418 /// On Unix this corresponds to calling fcntl, and on Windows this
419 /// corresponds to calling ioctlsocket.
420 #[stable(feature = "net2_mutators", since = "1.9.0")]
421 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
422 self.0.set_nonblocking(nonblocking)
426 #[stable(feature = "rust1", since = "1.0.0")]
427 impl<'a> Iterator for Incoming<'a> {
428 type Item = io::Result<TcpStream>;
429 fn next(&mut self) -> Option<io::Result<TcpStream>> {
430 Some(self.listener.accept().map(|p| p.0))
434 impl AsInner<net_imp::TcpListener> for TcpListener {
435 fn as_inner(&self) -> &net_imp::TcpListener { &self.0 }
438 impl FromInner<net_imp::TcpListener> for TcpListener {
439 fn from_inner(inner: net_imp::TcpListener) -> TcpListener {
444 impl IntoInner<net_imp::TcpListener> for TcpListener {
445 fn into_inner(self) -> net_imp::TcpListener { self.0 }
448 #[stable(feature = "rust1", since = "1.0.0")]
449 impl fmt::Debug for TcpListener {
450 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
462 use net::test::{next_test_ip4, next_test_ip6};
463 use sync::mpsc::channel;
464 use sys_common::AsInner;
465 use time::{Instant, Duration};
468 fn each_ip(f: &mut FnMut(SocketAddr)) {
477 Err(e) => panic!("received error for `{}`: {}", stringify!($e), e),
484 match TcpListener::bind("1.1.1.1:9999") {
487 assert_eq!(e.kind(), ErrorKind::AddrNotAvailable),
493 match TcpStream::connect("0.0.0.0:1") {
495 Err(e) => assert!(e.kind() == ErrorKind::ConnectionRefused ||
496 e.kind() == ErrorKind::InvalidInput ||
497 e.kind() == ErrorKind::AddrInUse ||
498 e.kind() == ErrorKind::AddrNotAvailable,
499 "bad error: {} {:?}", e, e.kind()),
504 fn listen_localhost() {
505 let socket_addr = next_test_ip4();
506 let listener = t!(TcpListener::bind(&socket_addr));
508 let _t = thread::spawn(move || {
509 let mut stream = t!(TcpStream::connect(&("localhost",
510 socket_addr.port())));
511 t!(stream.write(&[144]));
514 let mut stream = t!(listener.accept()).0;
516 t!(stream.read(&mut buf));
517 assert!(buf[0] == 144);
521 fn connect_loopback() {
522 each_ip(&mut |addr| {
523 let acceptor = t!(TcpListener::bind(&addr));
525 let _t = thread::spawn(move|| {
526 let host = match addr {
527 SocketAddr::V4(..) => "127.0.0.1",
528 SocketAddr::V6(..) => "::1",
530 let mut stream = t!(TcpStream::connect(&(host, addr.port())));
531 t!(stream.write(&[66]));
534 let mut stream = t!(acceptor.accept()).0;
536 t!(stream.read(&mut buf));
537 assert!(buf[0] == 66);
543 each_ip(&mut |addr| {
544 let acceptor = t!(TcpListener::bind(&addr));
546 let (tx, rx) = channel();
547 let _t = thread::spawn(move|| {
548 let mut stream = t!(TcpStream::connect(&addr));
549 t!(stream.write(&[99]));
550 tx.send(t!(stream.local_addr())).unwrap();
553 let (mut stream, addr) = t!(acceptor.accept());
555 t!(stream.read(&mut buf));
556 assert!(buf[0] == 99);
557 assert_eq!(addr, t!(rx.recv()));
563 each_ip(&mut |addr| {
564 let acceptor = t!(TcpListener::bind(&addr));
566 let _t = thread::spawn(move|| {
567 let _stream = t!(TcpStream::connect(&addr));
571 let mut stream = t!(acceptor.accept()).0;
573 let nread = t!(stream.read(&mut buf));
574 assert_eq!(nread, 0);
575 let nread = t!(stream.read(&mut buf));
576 assert_eq!(nread, 0);
582 each_ip(&mut |addr| {
583 let acceptor = t!(TcpListener::bind(&addr));
585 let (tx, rx) = channel();
586 let _t = thread::spawn(move|| {
587 drop(t!(TcpStream::connect(&addr)));
588 tx.send(()).unwrap();
591 let mut stream = t!(acceptor.accept()).0;
594 match stream.write(&buf) {
597 assert!(e.kind() == ErrorKind::ConnectionReset ||
598 e.kind() == ErrorKind::BrokenPipe ||
599 e.kind() == ErrorKind::ConnectionAborted,
600 "unknown error: {}", e);
607 fn multiple_connect_serial() {
608 each_ip(&mut |addr| {
610 let acceptor = t!(TcpListener::bind(&addr));
612 let _t = thread::spawn(move|| {
614 let mut stream = t!(TcpStream::connect(&addr));
615 t!(stream.write(&[99]));
619 for stream in acceptor.incoming().take(max) {
620 let mut stream = t!(stream);
622 t!(stream.read(&mut buf));
623 assert_eq!(buf[0], 99);
629 fn multiple_connect_interleaved_greedy_schedule() {
630 const MAX: usize = 10;
631 each_ip(&mut |addr| {
632 let acceptor = t!(TcpListener::bind(&addr));
634 let _t = thread::spawn(move|| {
635 let acceptor = acceptor;
636 for (i, stream) in acceptor.incoming().enumerate().take(MAX) {
637 // Start another thread to handle the connection
638 let _t = thread::spawn(move|| {
639 let mut stream = t!(stream);
641 t!(stream.read(&mut buf));
642 assert!(buf[0] == i as u8);
650 fn connect(i: usize, addr: SocketAddr) {
651 if i == MAX { return }
653 let t = thread::spawn(move|| {
654 let mut stream = t!(TcpStream::connect(&addr));
655 // Connect again before writing
656 connect(i + 1, addr);
657 t!(stream.write(&[i as u8]));
659 t.join().ok().unwrap();
664 fn multiple_connect_interleaved_lazy_schedule() {
665 const MAX: usize = 10;
666 each_ip(&mut |addr| {
667 let acceptor = t!(TcpListener::bind(&addr));
669 let _t = thread::spawn(move|| {
670 for stream in acceptor.incoming().take(MAX) {
671 // Start another thread to handle the connection
672 let _t = thread::spawn(move|| {
673 let mut stream = t!(stream);
675 t!(stream.read(&mut buf));
676 assert!(buf[0] == 99);
684 fn connect(i: usize, addr: SocketAddr) {
685 if i == MAX { return }
687 let t = thread::spawn(move|| {
688 let mut stream = t!(TcpStream::connect(&addr));
689 connect(i + 1, addr);
690 t!(stream.write(&[99]));
692 t.join().ok().unwrap();
697 fn socket_and_peer_name() {
698 each_ip(&mut |addr| {
699 let listener = t!(TcpListener::bind(&addr));
700 let so_name = t!(listener.local_addr());
701 assert_eq!(addr, so_name);
702 let _t = thread::spawn(move|| {
703 t!(listener.accept());
706 let stream = t!(TcpStream::connect(&addr));
707 assert_eq!(addr, t!(stream.peer_addr()));
713 each_ip(&mut |addr| {
714 let (tx, rx) = channel();
715 let srv = t!(TcpListener::bind(&addr));
716 let _t = thread::spawn(move|| {
717 let mut cl = t!(srv.accept()).0;
718 cl.write(&[10]).unwrap();
721 tx.send(()).unwrap();
724 let mut c = t!(TcpStream::connect(&addr));
726 assert_eq!(c.read(&mut b).unwrap(), 1);
734 each_ip(&mut |addr| {
735 let _listener = t!(TcpListener::bind(&addr));
736 match TcpListener::bind(&addr) {
739 assert!(e.kind() == ErrorKind::ConnectionRefused ||
740 e.kind() == ErrorKind::Other ||
741 e.kind() == ErrorKind::AddrInUse,
742 "unknown error: {} {:?}", e, e.kind());
750 each_ip(&mut |addr| {
751 let acceptor = t!(TcpListener::bind(&addr));
753 let _t = thread::spawn(move|| {
754 t!(TcpStream::connect(&addr));
757 t!(acceptor.accept());
759 t!(TcpListener::bind(&addr));
764 fn tcp_clone_smoke() {
765 each_ip(&mut |addr| {
766 let acceptor = t!(TcpListener::bind(&addr));
768 let _t = thread::spawn(move|| {
769 let mut s = t!(TcpStream::connect(&addr));
770 let mut buf = [0, 0];
771 assert_eq!(s.read(&mut buf).unwrap(), 1);
772 assert_eq!(buf[0], 1);
776 let mut s1 = t!(acceptor.accept()).0;
777 let s2 = t!(s1.try_clone());
779 let (tx1, rx1) = channel();
780 let (tx2, rx2) = channel();
781 let _t = thread::spawn(move|| {
785 tx2.send(()).unwrap();
787 tx1.send(()).unwrap();
788 let mut buf = [0, 0];
789 assert_eq!(s1.read(&mut buf).unwrap(), 1);
795 fn tcp_clone_two_read() {
796 each_ip(&mut |addr| {
797 let acceptor = t!(TcpListener::bind(&addr));
798 let (tx1, rx) = channel();
799 let tx2 = tx1.clone();
801 let _t = thread::spawn(move|| {
802 let mut s = t!(TcpStream::connect(&addr));
809 let mut s1 = t!(acceptor.accept()).0;
810 let s2 = t!(s1.try_clone());
812 let (done, rx) = channel();
813 let _t = thread::spawn(move|| {
815 let mut buf = [0, 0];
816 t!(s2.read(&mut buf));
817 tx2.send(()).unwrap();
818 done.send(()).unwrap();
820 let mut buf = [0, 0];
821 t!(s1.read(&mut buf));
822 tx1.send(()).unwrap();
829 fn tcp_clone_two_write() {
830 each_ip(&mut |addr| {
831 let acceptor = t!(TcpListener::bind(&addr));
833 let _t = thread::spawn(move|| {
834 let mut s = t!(TcpStream::connect(&addr));
835 let mut buf = [0, 1];
836 t!(s.read(&mut buf));
837 t!(s.read(&mut buf));
840 let mut s1 = t!(acceptor.accept()).0;
841 let s2 = t!(s1.try_clone());
843 let (done, rx) = channel();
844 let _t = thread::spawn(move|| {
847 done.send(()).unwrap();
856 fn shutdown_smoke() {
857 each_ip(&mut |addr| {
858 let a = t!(TcpListener::bind(&addr));
859 let _t = thread::spawn(move|| {
860 let mut c = t!(a.accept()).0;
862 assert_eq!(c.read(&mut b).unwrap(), 0);
866 let mut s = t!(TcpStream::connect(&addr));
867 t!(s.shutdown(Shutdown::Write));
868 assert!(s.write(&[1]).is_err());
870 assert_eq!(t!(s.read(&mut b)), 1);
876 fn close_readwrite_smoke() {
877 each_ip(&mut |addr| {
878 let a = t!(TcpListener::bind(&addr));
879 let (tx, rx) = channel::<()>();
880 let _t = thread::spawn(move|| {
881 let _s = t!(a.accept());
886 let mut s = t!(TcpStream::connect(&addr));
887 let mut s2 = t!(s.try_clone());
889 // closing should prevent reads/writes
890 t!(s.shutdown(Shutdown::Write));
891 assert!(s.write(&[0]).is_err());
892 t!(s.shutdown(Shutdown::Read));
893 assert_eq!(s.read(&mut b).unwrap(), 0);
895 // closing should affect previous handles
896 assert!(s2.write(&[0]).is_err());
897 assert_eq!(s2.read(&mut b).unwrap(), 0);
899 // closing should affect new handles
900 let mut s3 = t!(s.try_clone());
901 assert!(s3.write(&[0]).is_err());
902 assert_eq!(s3.read(&mut b).unwrap(), 0);
904 // make sure these don't die
905 let _ = s2.shutdown(Shutdown::Read);
906 let _ = s2.shutdown(Shutdown::Write);
907 let _ = s3.shutdown(Shutdown::Read);
908 let _ = s3.shutdown(Shutdown::Write);
914 fn close_read_wakes_up() {
915 each_ip(&mut |addr| {
916 let a = t!(TcpListener::bind(&addr));
917 let (tx1, rx) = channel::<()>();
918 let _t = thread::spawn(move|| {
919 let _s = t!(a.accept());
923 let s = t!(TcpStream::connect(&addr));
924 let s2 = t!(s.try_clone());
925 let (tx, rx) = channel();
926 let _t = thread::spawn(move|| {
928 assert_eq!(t!(s2.read(&mut [0])), 0);
929 tx.send(()).unwrap();
931 // this should wake up the child thread
932 t!(s.shutdown(Shutdown::Read));
934 // this test will never finish if the child doesn't wake up
941 fn clone_while_reading() {
942 each_ip(&mut |addr| {
943 let accept = t!(TcpListener::bind(&addr));
945 // Enqueue a thread to write to a socket
946 let (tx, rx) = channel();
947 let (txdone, rxdone) = channel();
948 let txdone2 = txdone.clone();
949 let _t = thread::spawn(move|| {
950 let mut tcp = t!(TcpStream::connect(&addr));
953 txdone2.send(()).unwrap();
956 // Spawn off a reading clone
957 let tcp = t!(accept.accept()).0;
958 let tcp2 = t!(tcp.try_clone());
959 let txdone3 = txdone.clone();
960 let _t = thread::spawn(move|| {
962 t!(tcp2.read(&mut [0]));
963 txdone3.send(()).unwrap();
966 // Try to ensure that the reading clone is indeed reading
971 // clone the handle again while it's reading, then let it finish the
973 let _ = t!(tcp.try_clone());
974 tx.send(()).unwrap();
975 rxdone.recv().unwrap();
976 rxdone.recv().unwrap();
981 fn clone_accept_smoke() {
982 each_ip(&mut |addr| {
983 let a = t!(TcpListener::bind(&addr));
984 let a2 = t!(a.try_clone());
986 let _t = thread::spawn(move|| {
987 let _ = TcpStream::connect(&addr);
989 let _t = thread::spawn(move|| {
990 let _ = TcpStream::connect(&addr);
999 fn clone_accept_concurrent() {
1000 each_ip(&mut |addr| {
1001 let a = t!(TcpListener::bind(&addr));
1002 let a2 = t!(a.try_clone());
1004 let (tx, rx) = channel();
1005 let tx2 = tx.clone();
1007 let _t = thread::spawn(move|| {
1008 tx.send(t!(a.accept())).unwrap();
1010 let _t = thread::spawn(move|| {
1011 tx2.send(t!(a2.accept())).unwrap();
1014 let _t = thread::spawn(move|| {
1015 let _ = TcpStream::connect(&addr);
1017 let _t = thread::spawn(move|| {
1018 let _ = TcpStream::connect(&addr);
1028 let name = if cfg!(windows) {"socket"} else {"fd"};
1029 let socket_addr = next_test_ip4();
1031 let listener = t!(TcpListener::bind(&socket_addr));
1032 let listener_inner = listener.0.socket().as_inner();
1033 let compare = format!("TcpListener {{ addr: {:?}, {}: {:?} }}",
1034 socket_addr, name, listener_inner);
1035 assert_eq!(format!("{:?}", listener), compare);
1037 let stream = t!(TcpStream::connect(&("localhost",
1038 socket_addr.port())));
1039 let stream_inner = stream.0.socket().as_inner();
1040 let compare = format!("TcpStream {{ addr: {:?}, \
1041 peer: {:?}, {}: {:?} }}",
1042 stream.local_addr().unwrap(),
1043 stream.peer_addr().unwrap(),
1046 assert_eq!(format!("{:?}", stream), compare);
1049 // FIXME: re-enabled bitrig/openbsd tests once their socket timeout code
1050 // no longer has rounding errors.
1051 #[cfg_attr(any(target_os = "bitrig", target_os = "netbsd", target_os = "openbsd"), ignore)]
1054 let addr = next_test_ip4();
1055 let listener = t!(TcpListener::bind(&addr));
1057 let stream = t!(TcpStream::connect(&("localhost", addr.port())));
1058 let dur = Duration::new(15410, 0);
1060 assert_eq!(None, t!(stream.read_timeout()));
1062 t!(stream.set_read_timeout(Some(dur)));
1063 assert_eq!(Some(dur), t!(stream.read_timeout()));
1065 assert_eq!(None, t!(stream.write_timeout()));
1067 t!(stream.set_write_timeout(Some(dur)));
1068 assert_eq!(Some(dur), t!(stream.write_timeout()));
1070 t!(stream.set_read_timeout(None));
1071 assert_eq!(None, t!(stream.read_timeout()));
1073 t!(stream.set_write_timeout(None));
1074 assert_eq!(None, t!(stream.write_timeout()));
1079 fn test_read_timeout() {
1080 let addr = next_test_ip4();
1081 let listener = t!(TcpListener::bind(&addr));
1083 let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
1084 t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
1086 let mut buf = [0; 10];
1087 let start = Instant::now();
1088 let kind = stream.read(&mut buf).err().expect("expected error").kind();
1089 assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
1090 assert!(start.elapsed() > Duration::from_millis(400));
1095 fn test_read_with_timeout() {
1096 let addr = next_test_ip4();
1097 let listener = t!(TcpListener::bind(&addr));
1099 let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
1100 t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
1102 let mut other_end = t!(listener.accept()).0;
1103 t!(other_end.write_all(b"hello world"));
1105 let mut buf = [0; 11];
1106 t!(stream.read(&mut buf));
1107 assert_eq!(b"hello world", &buf[..]);
1109 let start = Instant::now();
1110 let kind = stream.read(&mut buf).err().expect("expected error").kind();
1111 assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
1112 assert!(start.elapsed() > Duration::from_millis(400));
1118 let addr = next_test_ip4();
1119 let _listener = t!(TcpListener::bind(&addr));
1121 let stream = t!(TcpStream::connect(&("localhost", addr.port())));
1123 assert_eq!(false, t!(stream.nodelay()));
1124 t!(stream.set_nodelay(true));
1125 assert_eq!(true, t!(stream.nodelay()));
1126 t!(stream.set_nodelay(false));
1127 assert_eq!(false, t!(stream.nodelay()));
1134 let addr = next_test_ip4();
1135 let listener = t!(TcpListener::bind(&addr));
1137 t!(listener.set_ttl(ttl));
1138 assert_eq!(ttl, t!(listener.ttl()));
1140 let stream = t!(TcpStream::connect(&("localhost", addr.port())));
1142 t!(stream.set_ttl(ttl));
1143 assert_eq!(ttl, t!(stream.ttl()));
1147 fn set_nonblocking() {
1148 let addr = next_test_ip4();
1149 let listener = t!(TcpListener::bind(&addr));
1151 t!(listener.set_nonblocking(true));
1152 t!(listener.set_nonblocking(false));
1154 let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
1156 t!(stream.set_nonblocking(false));
1157 t!(stream.set_nonblocking(true));
1160 match stream.read(&mut buf) {
1161 Ok(_) => panic!("expected error"),
1162 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
1163 Err(e) => panic!("unexpected error {}", e),