]> git.lizzy.rs Git - rust.git/blob - src/libstd/net/tcp.rs
Auto merge of #31981 - achanda:unspecified-ip, r=alexcrichton
[rust.git] / src / libstd / net / tcp.rs
1 // Copyright 2015 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use prelude::v1::*;
12 use io::prelude::*;
13
14 use fmt;
15 use io;
16 use net::{ToSocketAddrs, SocketAddr, Shutdown};
17 use sys_common::io::read_to_end_uninitialized;
18 use sys_common::net as net_imp;
19 use sys_common::{AsInner, FromInner, IntoInner};
20 use time::Duration;
21
22 /// A structure which represents a TCP stream between a local socket and a
23 /// remote socket.
24 ///
25 /// The socket will be closed when the value is dropped.
26 ///
27 /// # Examples
28 ///
29 /// ```no_run
30 /// use std::io::prelude::*;
31 /// use std::net::TcpStream;
32 ///
33 /// {
34 ///     let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
35 ///
36 ///     // ignore the Result
37 ///     let _ = stream.write(&[1]);
38 ///     let _ = stream.read(&mut [0; 128]); // ignore here too
39 /// } // the stream is closed here
40 /// ```
41 #[stable(feature = "rust1", since = "1.0.0")]
42 pub struct TcpStream(net_imp::TcpStream);
43
44 /// A structure representing a socket server.
45 ///
46 /// # Examples
47 ///
48 /// ```no_run
49 /// use std::net::{TcpListener, TcpStream};
50 /// use std::thread;
51 ///
52 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
53 ///
54 /// fn handle_client(stream: TcpStream) {
55 ///     // ...
56 /// }
57 ///
58 /// // accept connections and process them, spawning a new thread for each one
59 /// for stream in listener.incoming() {
60 ///     match stream {
61 ///         Ok(stream) => {
62 ///             thread::spawn(move|| {
63 ///                 // connection succeeded
64 ///                 handle_client(stream)
65 ///             });
66 ///         }
67 ///         Err(e) => { /* connection failed */ }
68 ///     }
69 /// }
70 ///
71 /// // close the socket server
72 /// drop(listener);
73 /// ```
74 #[stable(feature = "rust1", since = "1.0.0")]
75 pub struct TcpListener(net_imp::TcpListener);
76
77 /// An infinite iterator over the connections from a `TcpListener`.
78 ///
79 /// This iterator will infinitely yield `Some` of the accepted connections. It
80 /// is equivalent to calling `accept` in a loop.
81 #[stable(feature = "rust1", since = "1.0.0")]
82 pub struct Incoming<'a> { listener: &'a TcpListener }
83
84 impl TcpStream {
85     /// Opens a TCP connection to a remote host.
86     ///
87     /// `addr` is an address of the remote host. Anything which implements
88     /// `ToSocketAddrs` trait can be supplied for the address; see this trait
89     /// documentation for concrete examples.
90     #[stable(feature = "rust1", since = "1.0.0")]
91     pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
92         super::each_addr(addr, net_imp::TcpStream::connect).map(TcpStream)
93     }
94
95     /// Returns the socket address of the remote peer of this TCP connection.
96     #[stable(feature = "rust1", since = "1.0.0")]
97     pub fn peer_addr(&self) -> io::Result<SocketAddr> {
98         self.0.peer_addr()
99     }
100
101     /// Returns the socket address of the local half of this TCP connection.
102     #[stable(feature = "rust1", since = "1.0.0")]
103     pub fn local_addr(&self) -> io::Result<SocketAddr> {
104         self.0.socket_addr()
105     }
106
107     /// Shuts down the read, write, or both halves of this connection.
108     ///
109     /// This function will cause all pending and future I/O on the specified
110     /// portions to return immediately with an appropriate value (see the
111     /// documentation of `Shutdown`).
112     #[stable(feature = "rust1", since = "1.0.0")]
113     pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
114         self.0.shutdown(how)
115     }
116
117     /// Creates a new independently owned handle to the underlying socket.
118     ///
119     /// The returned `TcpStream` is a reference to the same stream that this
120     /// object references. Both handles will read and write the same stream of
121     /// data, and options set on one stream will be propagated to the other
122     /// stream.
123     #[stable(feature = "rust1", since = "1.0.0")]
124     pub fn try_clone(&self) -> io::Result<TcpStream> {
125         self.0.duplicate().map(TcpStream)
126     }
127
128     /// Sets the read timeout to the timeout specified.
129     ///
130     /// If the value specified is `None`, then `read` calls will block
131     /// indefinitely. It is an error to pass the zero `Duration` to this
132     /// method.
133     ///
134     /// # Note
135     ///
136     /// Platforms may return a different error code whenever a read times out as
137     /// a result of setting this option. For example Unix typically returns an
138     /// error of the kind `WouldBlock`, but Windows may return `TimedOut`.
139     #[stable(feature = "socket_timeout", since = "1.4.0")]
140     pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
141         self.0.set_read_timeout(dur)
142     }
143
144     /// Sets the write timeout to the timeout specified.
145     ///
146     /// If the value specified is `None`, then `write` calls will block
147     /// indefinitely. It is an error to pass the zero `Duration` to this
148     /// method.
149     ///
150     /// # Note
151     ///
152     /// Platforms may return a different error code whenever a write times out
153     /// as a result of setting this option. For example Unix typically returns
154     /// an error of the kind `WouldBlock`, but Windows may return `TimedOut`.
155     #[stable(feature = "socket_timeout", since = "1.4.0")]
156     pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
157         self.0.set_write_timeout(dur)
158     }
159
160     /// Returns the read timeout of this socket.
161     ///
162     /// If the timeout is `None`, then `read` calls will block indefinitely.
163     ///
164     /// # Note
165     ///
166     /// Some platforms do not provide access to the current timeout.
167     #[stable(feature = "socket_timeout", since = "1.4.0")]
168     pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
169         self.0.read_timeout()
170     }
171
172     /// Returns the write timeout of this socket.
173     ///
174     /// If the timeout is `None`, then `write` calls will block indefinitely.
175     ///
176     /// # Note
177     ///
178     /// Some platforms do not provide access to the current timeout.
179     #[stable(feature = "socket_timeout", since = "1.4.0")]
180     pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
181         self.0.write_timeout()
182     }
183
184     /// Sets the value of the `TCP_NODELAY` option on this socket.
185     ///
186     /// If set, this option disables the Nagle algorithm. This means that
187     /// segments are always sent as soon as possible, even if there is only a
188     /// small amount of data. When not set, data is buffered until there is a
189     /// sufficient amount to send out, thereby avoiding the frequent sending of
190     /// small packets.
191     #[stable(feature = "net2_mutators", since = "1.9.0")]
192     pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
193         self.0.set_nodelay(nodelay)
194     }
195
196     /// Gets the value of the `TCP_NODELAY` option on this socket.
197     ///
198     /// For more information about this option, see [`set_nodelay`][link].
199     ///
200     /// [link]: #tymethod.set_nodelay
201     #[stable(feature = "net2_mutators", since = "1.9.0")]
202     pub fn nodelay(&self) -> io::Result<bool> {
203         self.0.nodelay()
204     }
205
206     /// Sets the value for the `IP_TTL` option on this socket.
207     ///
208     /// This value sets the time-to-live field that is used in every packet sent
209     /// from this socket.
210     #[stable(feature = "net2_mutators", since = "1.9.0")]
211     pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
212         self.0.set_ttl(ttl)
213     }
214
215     /// Gets the value of the `IP_TTL` option for this socket.
216     ///
217     /// For more information about this option, see [`set_ttl`][link].
218     ///
219     /// [link]: #tymethod.set_ttl
220     #[stable(feature = "net2_mutators", since = "1.9.0")]
221     pub fn ttl(&self) -> io::Result<u32> {
222         self.0.ttl()
223     }
224
225     /// Sets the value for the `IPV6_V6ONLY` option on this socket.
226     ///
227     /// If this is set to `true` then the socket is restricted to sending and
228     /// receiving IPv6 packets only. If this is the case, an IPv4 and an IPv6
229     /// application can each bind the same port at the same time.
230     ///
231     /// If this is set to `false` then the socket can be used to send and
232     /// receive packets from an IPv4-mapped IPv6 address.
233     #[stable(feature = "net2_mutators", since = "1.9.0")]
234     pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
235         self.0.set_only_v6(only_v6)
236     }
237
238     /// Gets the value of the `IPV6_V6ONLY` option for this socket.
239     ///
240     /// For more information about this option, see [`set_only_v6`][link].
241     ///
242     /// [link]: #tymethod.set_only_v6
243     #[stable(feature = "net2_mutators", since = "1.9.0")]
244     pub fn only_v6(&self) -> io::Result<bool> {
245         self.0.only_v6()
246     }
247
248     /// Get the value of the `SO_ERROR` option on this socket.
249     ///
250     /// This will retrieve the stored error in the underlying socket, clearing
251     /// the field in the process. This can be useful for checking errors between
252     /// calls.
253     #[stable(feature = "net2_mutators", since = "1.9.0")]
254     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
255         self.0.take_error()
256     }
257
258     /// Moves this TCP stream into or out of nonblocking mode.
259     ///
260     /// On Unix this corresponds to calling fcntl, and on Windows this
261     /// corresponds to calling ioctlsocket.
262     #[stable(feature = "net2_mutators", since = "1.9.0")]
263     pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
264         self.0.set_nonblocking(nonblocking)
265     }
266 }
267
268 #[stable(feature = "rust1", since = "1.0.0")]
269 impl Read for TcpStream {
270     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
271     fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
272         unsafe { read_to_end_uninitialized(self, buf) }
273     }
274 }
275 #[stable(feature = "rust1", since = "1.0.0")]
276 impl Write for TcpStream {
277     fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
278     fn flush(&mut self) -> io::Result<()> { Ok(()) }
279 }
280 #[stable(feature = "rust1", since = "1.0.0")]
281 impl<'a> Read for &'a TcpStream {
282     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
283     fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
284         unsafe { read_to_end_uninitialized(self, buf) }
285     }
286 }
287 #[stable(feature = "rust1", since = "1.0.0")]
288 impl<'a> Write for &'a TcpStream {
289     fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
290     fn flush(&mut self) -> io::Result<()> { Ok(()) }
291 }
292
293 impl AsInner<net_imp::TcpStream> for TcpStream {
294     fn as_inner(&self) -> &net_imp::TcpStream { &self.0 }
295 }
296
297 impl FromInner<net_imp::TcpStream> for TcpStream {
298     fn from_inner(inner: net_imp::TcpStream) -> TcpStream { TcpStream(inner) }
299 }
300
301 impl IntoInner<net_imp::TcpStream> for TcpStream {
302     fn into_inner(self) -> net_imp::TcpStream { self.0 }
303 }
304
305 #[stable(feature = "rust1", since = "1.0.0")]
306 impl fmt::Debug for TcpStream {
307     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
308         self.0.fmt(f)
309     }
310 }
311
312 impl TcpListener {
313     /// Creates a new `TcpListener` which will be bound to the specified
314     /// address.
315     ///
316     /// The returned listener is ready for accepting connections.
317     ///
318     /// Binding with a port number of 0 will request that the OS assigns a port
319     /// to this listener. The port allocated can be queried via the
320     /// `local_addr` method.
321     ///
322     /// The address type can be any implementor of `ToSocketAddrs` trait. See
323     /// its documentation for concrete examples.
324     #[stable(feature = "rust1", since = "1.0.0")]
325     pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
326         super::each_addr(addr, net_imp::TcpListener::bind).map(TcpListener)
327     }
328
329     /// Returns the local socket address of this listener.
330     #[stable(feature = "rust1", since = "1.0.0")]
331     pub fn local_addr(&self) -> io::Result<SocketAddr> {
332         self.0.socket_addr()
333     }
334
335     /// Creates a new independently owned handle to the underlying socket.
336     ///
337     /// The returned `TcpListener` is a reference to the same socket that this
338     /// object references. Both handles can be used to accept incoming
339     /// connections and options set on one listener will affect the other.
340     #[stable(feature = "rust1", since = "1.0.0")]
341     pub fn try_clone(&self) -> io::Result<TcpListener> {
342         self.0.duplicate().map(TcpListener)
343     }
344
345     /// Accept a new incoming connection from this listener.
346     ///
347     /// This function will block the calling thread until a new TCP connection
348     /// is established. When established, the corresponding `TcpStream` and the
349     /// remote peer's address will be returned.
350     #[stable(feature = "rust1", since = "1.0.0")]
351     pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
352         self.0.accept().map(|(a, b)| (TcpStream(a), b))
353     }
354
355     /// Returns an iterator over the connections being received on this
356     /// listener.
357     ///
358     /// The returned iterator will never return `None` and will also not yield
359     /// the peer's `SocketAddr` structure.
360     #[stable(feature = "rust1", since = "1.0.0")]
361     pub fn incoming(&self) -> Incoming {
362         Incoming { listener: self }
363     }
364
365     /// Sets the value for the `IP_TTL` option on this socket.
366     ///
367     /// This value sets the time-to-live field that is used in every packet sent
368     /// from this socket.
369     #[stable(feature = "net2_mutators", since = "1.9.0")]
370     pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
371         self.0.set_ttl(ttl)
372     }
373
374     /// Gets the value of the `IP_TTL` option for this socket.
375     ///
376     /// For more information about this option, see [`set_ttl`][link].
377     ///
378     /// [link]: #tymethod.set_ttl
379     #[stable(feature = "net2_mutators", since = "1.9.0")]
380     pub fn ttl(&self) -> io::Result<u32> {
381         self.0.ttl()
382     }
383
384     /// Sets the value for the `IPV6_V6ONLY` option on this socket.
385     ///
386     /// If this is set to `true` then the socket is restricted to sending and
387     /// receiving IPv6 packets only. In this case two IPv4 and IPv6 applications
388     /// can bind the same port at the same time.
389     ///
390     /// If this is set to `false` then the socket can be used to send and
391     /// receive packets from an IPv4-mapped IPv6 address.
392     #[stable(feature = "net2_mutators", since = "1.9.0")]
393     pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
394         self.0.set_only_v6(only_v6)
395     }
396
397     /// Gets the value of the `IPV6_V6ONLY` option for this socket.
398     ///
399     /// For more information about this option, see [`set_only_v6`][link].
400     ///
401     /// [link]: #tymethod.set_only_v6
402     #[stable(feature = "net2_mutators", since = "1.9.0")]
403     pub fn only_v6(&self) -> io::Result<bool> {
404         self.0.only_v6()
405     }
406
407     /// Get the value of the `SO_ERROR` option on this socket.
408     ///
409     /// This will retrieve the stored error in the underlying socket, clearing
410     /// the field in the process. This can be useful for checking errors between
411     /// calls.
412     #[stable(feature = "net2_mutators", since = "1.9.0")]
413     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
414         self.0.take_error()
415     }
416
417     /// Moves this TCP stream into or out of nonblocking mode.
418     ///
419     /// On Unix this corresponds to calling fcntl, and on Windows this
420     /// corresponds to calling ioctlsocket.
421     #[stable(feature = "net2_mutators", since = "1.9.0")]
422     pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
423         self.0.set_nonblocking(nonblocking)
424     }
425 }
426
427 #[stable(feature = "rust1", since = "1.0.0")]
428 impl<'a> Iterator for Incoming<'a> {
429     type Item = io::Result<TcpStream>;
430     fn next(&mut self) -> Option<io::Result<TcpStream>> {
431         Some(self.listener.accept().map(|p| p.0))
432     }
433 }
434
435 impl AsInner<net_imp::TcpListener> for TcpListener {
436     fn as_inner(&self) -> &net_imp::TcpListener { &self.0 }
437 }
438
439 impl FromInner<net_imp::TcpListener> for TcpListener {
440     fn from_inner(inner: net_imp::TcpListener) -> TcpListener {
441         TcpListener(inner)
442     }
443 }
444
445 impl IntoInner<net_imp::TcpListener> for TcpListener {
446     fn into_inner(self) -> net_imp::TcpListener { self.0 }
447 }
448
449 #[stable(feature = "rust1", since = "1.0.0")]
450 impl fmt::Debug for TcpListener {
451     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
452         self.0.fmt(f)
453     }
454 }
455
456 #[cfg(test)]
457 mod tests {
458     use prelude::v1::*;
459
460     use io::ErrorKind;
461     use io::prelude::*;
462     use net::*;
463     use net::test::{next_test_ip4, next_test_ip6};
464     use sync::mpsc::channel;
465     use sys_common::AsInner;
466     use time::{Instant, Duration};
467     use thread;
468
469     fn each_ip(f: &mut FnMut(SocketAddr)) {
470         f(next_test_ip4());
471         f(next_test_ip6());
472     }
473
474     macro_rules! t {
475         ($e:expr) => {
476             match $e {
477                 Ok(t) => t,
478                 Err(e) => panic!("received error for `{}`: {}", stringify!($e), e),
479             }
480         }
481     }
482
483     #[test]
484     fn bind_error() {
485         match TcpListener::bind("1.1.1.1:9999") {
486             Ok(..) => panic!(),
487             Err(e) =>
488                 assert_eq!(e.kind(), ErrorKind::AddrNotAvailable),
489         }
490     }
491
492     #[test]
493     fn connect_error() {
494         match TcpStream::connect("0.0.0.0:1") {
495             Ok(..) => panic!(),
496             Err(e) => assert!(e.kind() == ErrorKind::ConnectionRefused ||
497                               e.kind() == ErrorKind::InvalidInput ||
498                               e.kind() == ErrorKind::AddrInUse ||
499                               e.kind() == ErrorKind::AddrNotAvailable,
500                               "bad error: {} {:?}", e, e.kind()),
501         }
502     }
503
504     #[test]
505     fn listen_localhost() {
506         let socket_addr = next_test_ip4();
507         let listener = t!(TcpListener::bind(&socket_addr));
508
509         let _t = thread::spawn(move || {
510             let mut stream = t!(TcpStream::connect(&("localhost",
511                                                      socket_addr.port())));
512             t!(stream.write(&[144]));
513         });
514
515         let mut stream = t!(listener.accept()).0;
516         let mut buf = [0];
517         t!(stream.read(&mut buf));
518         assert!(buf[0] == 144);
519     }
520
521     #[test]
522     fn connect_loopback() {
523         each_ip(&mut |addr| {
524             let acceptor = t!(TcpListener::bind(&addr));
525
526             let _t = thread::spawn(move|| {
527                 let host = match addr {
528                     SocketAddr::V4(..) => "127.0.0.1",
529                     SocketAddr::V6(..) => "::1",
530                 };
531                 let mut stream = t!(TcpStream::connect(&(host, addr.port())));
532                 t!(stream.write(&[66]));
533             });
534
535             let mut stream = t!(acceptor.accept()).0;
536             let mut buf = [0];
537             t!(stream.read(&mut buf));
538             assert!(buf[0] == 66);
539         })
540     }
541
542     #[test]
543     fn smoke_test() {
544         each_ip(&mut |addr| {
545             let acceptor = t!(TcpListener::bind(&addr));
546
547             let (tx, rx) = channel();
548             let _t = thread::spawn(move|| {
549                 let mut stream = t!(TcpStream::connect(&addr));
550                 t!(stream.write(&[99]));
551                 tx.send(t!(stream.local_addr())).unwrap();
552             });
553
554             let (mut stream, addr) = t!(acceptor.accept());
555             let mut buf = [0];
556             t!(stream.read(&mut buf));
557             assert!(buf[0] == 99);
558             assert_eq!(addr, t!(rx.recv()));
559         })
560     }
561
562     #[test]
563     fn read_eof() {
564         each_ip(&mut |addr| {
565             let acceptor = t!(TcpListener::bind(&addr));
566
567             let _t = thread::spawn(move|| {
568                 let _stream = t!(TcpStream::connect(&addr));
569                 // Close
570             });
571
572             let mut stream = t!(acceptor.accept()).0;
573             let mut buf = [0];
574             let nread = t!(stream.read(&mut buf));
575             assert_eq!(nread, 0);
576             let nread = t!(stream.read(&mut buf));
577             assert_eq!(nread, 0);
578         })
579     }
580
581     #[test]
582     fn write_close() {
583         each_ip(&mut |addr| {
584             let acceptor = t!(TcpListener::bind(&addr));
585
586             let (tx, rx) = channel();
587             let _t = thread::spawn(move|| {
588                 drop(t!(TcpStream::connect(&addr)));
589                 tx.send(()).unwrap();
590             });
591
592             let mut stream = t!(acceptor.accept()).0;
593             rx.recv().unwrap();
594             let buf = [0];
595             match stream.write(&buf) {
596                 Ok(..) => {}
597                 Err(e) => {
598                     assert!(e.kind() == ErrorKind::ConnectionReset ||
599                             e.kind() == ErrorKind::BrokenPipe ||
600                             e.kind() == ErrorKind::ConnectionAborted,
601                             "unknown error: {}", e);
602                 }
603             }
604         })
605     }
606
607     #[test]
608     fn multiple_connect_serial() {
609         each_ip(&mut |addr| {
610             let max = 10;
611             let acceptor = t!(TcpListener::bind(&addr));
612
613             let _t = thread::spawn(move|| {
614                 for _ in 0..max {
615                     let mut stream = t!(TcpStream::connect(&addr));
616                     t!(stream.write(&[99]));
617                 }
618             });
619
620             for stream in acceptor.incoming().take(max) {
621                 let mut stream = t!(stream);
622                 let mut buf = [0];
623                 t!(stream.read(&mut buf));
624                 assert_eq!(buf[0], 99);
625             }
626         })
627     }
628
629     #[test]
630     fn multiple_connect_interleaved_greedy_schedule() {
631         const MAX: usize = 10;
632         each_ip(&mut |addr| {
633             let acceptor = t!(TcpListener::bind(&addr));
634
635             let _t = thread::spawn(move|| {
636                 let acceptor = acceptor;
637                 for (i, stream) in acceptor.incoming().enumerate().take(MAX) {
638                     // Start another thread to handle the connection
639                     let _t = thread::spawn(move|| {
640                         let mut stream = t!(stream);
641                         let mut buf = [0];
642                         t!(stream.read(&mut buf));
643                         assert!(buf[0] == i as u8);
644                     });
645                 }
646             });
647
648             connect(0, addr);
649         });
650
651         fn connect(i: usize, addr: SocketAddr) {
652             if i == MAX { return }
653
654             let t = thread::spawn(move|| {
655                 let mut stream = t!(TcpStream::connect(&addr));
656                 // Connect again before writing
657                 connect(i + 1, addr);
658                 t!(stream.write(&[i as u8]));
659             });
660             t.join().ok().unwrap();
661         }
662     }
663
664     #[test]
665     fn multiple_connect_interleaved_lazy_schedule() {
666         const MAX: usize = 10;
667         each_ip(&mut |addr| {
668             let acceptor = t!(TcpListener::bind(&addr));
669
670             let _t = thread::spawn(move|| {
671                 for stream in acceptor.incoming().take(MAX) {
672                     // Start another thread to handle the connection
673                     let _t = thread::spawn(move|| {
674                         let mut stream = t!(stream);
675                         let mut buf = [0];
676                         t!(stream.read(&mut buf));
677                         assert!(buf[0] == 99);
678                     });
679                 }
680             });
681
682             connect(0, addr);
683         });
684
685         fn connect(i: usize, addr: SocketAddr) {
686             if i == MAX { return }
687
688             let t = thread::spawn(move|| {
689                 let mut stream = t!(TcpStream::connect(&addr));
690                 connect(i + 1, addr);
691                 t!(stream.write(&[99]));
692             });
693             t.join().ok().unwrap();
694         }
695     }
696
697     #[test]
698     fn socket_and_peer_name() {
699         each_ip(&mut |addr| {
700             let listener = t!(TcpListener::bind(&addr));
701             let so_name = t!(listener.local_addr());
702             assert_eq!(addr, so_name);
703             let _t = thread::spawn(move|| {
704                 t!(listener.accept());
705             });
706
707             let stream = t!(TcpStream::connect(&addr));
708             assert_eq!(addr, t!(stream.peer_addr()));
709         })
710     }
711
712     #[test]
713     fn partial_read() {
714         each_ip(&mut |addr| {
715             let (tx, rx) = channel();
716             let srv = t!(TcpListener::bind(&addr));
717             let _t = thread::spawn(move|| {
718                 let mut cl = t!(srv.accept()).0;
719                 cl.write(&[10]).unwrap();
720                 let mut b = [0];
721                 t!(cl.read(&mut b));
722                 tx.send(()).unwrap();
723             });
724
725             let mut c = t!(TcpStream::connect(&addr));
726             let mut b = [0; 10];
727             assert_eq!(c.read(&mut b).unwrap(), 1);
728             t!(c.write(&[1]));
729             rx.recv().unwrap();
730         })
731     }
732
733     #[test]
734     fn double_bind() {
735         each_ip(&mut |addr| {
736             let _listener = t!(TcpListener::bind(&addr));
737             match TcpListener::bind(&addr) {
738                 Ok(..) => panic!(),
739                 Err(e) => {
740                     assert!(e.kind() == ErrorKind::ConnectionRefused ||
741                             e.kind() == ErrorKind::Other ||
742                             e.kind() == ErrorKind::AddrInUse,
743                             "unknown error: {} {:?}", e, e.kind());
744                 }
745             }
746         })
747     }
748
749     #[test]
750     fn fast_rebind() {
751         each_ip(&mut |addr| {
752             let acceptor = t!(TcpListener::bind(&addr));
753
754             let _t = thread::spawn(move|| {
755                 t!(TcpStream::connect(&addr));
756             });
757
758             t!(acceptor.accept());
759             drop(acceptor);
760             t!(TcpListener::bind(&addr));
761         });
762     }
763
764     #[test]
765     fn tcp_clone_smoke() {
766         each_ip(&mut |addr| {
767             let acceptor = t!(TcpListener::bind(&addr));
768
769             let _t = thread::spawn(move|| {
770                 let mut s = t!(TcpStream::connect(&addr));
771                 let mut buf = [0, 0];
772                 assert_eq!(s.read(&mut buf).unwrap(), 1);
773                 assert_eq!(buf[0], 1);
774                 t!(s.write(&[2]));
775             });
776
777             let mut s1 = t!(acceptor.accept()).0;
778             let s2 = t!(s1.try_clone());
779
780             let (tx1, rx1) = channel();
781             let (tx2, rx2) = channel();
782             let _t = thread::spawn(move|| {
783                 let mut s2 = s2;
784                 rx1.recv().unwrap();
785                 t!(s2.write(&[1]));
786                 tx2.send(()).unwrap();
787             });
788             tx1.send(()).unwrap();
789             let mut buf = [0, 0];
790             assert_eq!(s1.read(&mut buf).unwrap(), 1);
791             rx2.recv().unwrap();
792         })
793     }
794
795     #[test]
796     fn tcp_clone_two_read() {
797         each_ip(&mut |addr| {
798             let acceptor = t!(TcpListener::bind(&addr));
799             let (tx1, rx) = channel();
800             let tx2 = tx1.clone();
801
802             let _t = thread::spawn(move|| {
803                 let mut s = t!(TcpStream::connect(&addr));
804                 t!(s.write(&[1]));
805                 rx.recv().unwrap();
806                 t!(s.write(&[2]));
807                 rx.recv().unwrap();
808             });
809
810             let mut s1 = t!(acceptor.accept()).0;
811             let s2 = t!(s1.try_clone());
812
813             let (done, rx) = channel();
814             let _t = thread::spawn(move|| {
815                 let mut s2 = s2;
816                 let mut buf = [0, 0];
817                 t!(s2.read(&mut buf));
818                 tx2.send(()).unwrap();
819                 done.send(()).unwrap();
820             });
821             let mut buf = [0, 0];
822             t!(s1.read(&mut buf));
823             tx1.send(()).unwrap();
824
825             rx.recv().unwrap();
826         })
827     }
828
829     #[test]
830     fn tcp_clone_two_write() {
831         each_ip(&mut |addr| {
832             let acceptor = t!(TcpListener::bind(&addr));
833
834             let _t = thread::spawn(move|| {
835                 let mut s = t!(TcpStream::connect(&addr));
836                 let mut buf = [0, 1];
837                 t!(s.read(&mut buf));
838                 t!(s.read(&mut buf));
839             });
840
841             let mut s1 = t!(acceptor.accept()).0;
842             let s2 = t!(s1.try_clone());
843
844             let (done, rx) = channel();
845             let _t = thread::spawn(move|| {
846                 let mut s2 = s2;
847                 t!(s2.write(&[1]));
848                 done.send(()).unwrap();
849             });
850             t!(s1.write(&[2]));
851
852             rx.recv().unwrap();
853         })
854     }
855
856     #[test]
857     fn shutdown_smoke() {
858         each_ip(&mut |addr| {
859             let a = t!(TcpListener::bind(&addr));
860             let _t = thread::spawn(move|| {
861                 let mut c = t!(a.accept()).0;
862                 let mut b = [0];
863                 assert_eq!(c.read(&mut b).unwrap(), 0);
864                 t!(c.write(&[1]));
865             });
866
867             let mut s = t!(TcpStream::connect(&addr));
868             t!(s.shutdown(Shutdown::Write));
869             assert!(s.write(&[1]).is_err());
870             let mut b = [0, 0];
871             assert_eq!(t!(s.read(&mut b)), 1);
872             assert_eq!(b[0], 1);
873         })
874     }
875
876     #[test]
877     fn close_readwrite_smoke() {
878         each_ip(&mut |addr| {
879             let a = t!(TcpListener::bind(&addr));
880             let (tx, rx) = channel::<()>();
881             let _t = thread::spawn(move|| {
882                 let _s = t!(a.accept());
883                 let _ = rx.recv();
884             });
885
886             let mut b = [0];
887             let mut s = t!(TcpStream::connect(&addr));
888             let mut s2 = t!(s.try_clone());
889
890             // closing should prevent reads/writes
891             t!(s.shutdown(Shutdown::Write));
892             assert!(s.write(&[0]).is_err());
893             t!(s.shutdown(Shutdown::Read));
894             assert_eq!(s.read(&mut b).unwrap(), 0);
895
896             // closing should affect previous handles
897             assert!(s2.write(&[0]).is_err());
898             assert_eq!(s2.read(&mut b).unwrap(), 0);
899
900             // closing should affect new handles
901             let mut s3 = t!(s.try_clone());
902             assert!(s3.write(&[0]).is_err());
903             assert_eq!(s3.read(&mut b).unwrap(), 0);
904
905             // make sure these don't die
906             let _ = s2.shutdown(Shutdown::Read);
907             let _ = s2.shutdown(Shutdown::Write);
908             let _ = s3.shutdown(Shutdown::Read);
909             let _ = s3.shutdown(Shutdown::Write);
910             drop(tx);
911         })
912     }
913
914     #[test]
915     fn close_read_wakes_up() {
916         each_ip(&mut |addr| {
917             let a = t!(TcpListener::bind(&addr));
918             let (tx1, rx) = channel::<()>();
919             let _t = thread::spawn(move|| {
920                 let _s = t!(a.accept());
921                 let _ = rx.recv();
922             });
923
924             let s = t!(TcpStream::connect(&addr));
925             let s2 = t!(s.try_clone());
926             let (tx, rx) = channel();
927             let _t = thread::spawn(move|| {
928                 let mut s2 = s2;
929                 assert_eq!(t!(s2.read(&mut [0])), 0);
930                 tx.send(()).unwrap();
931             });
932             // this should wake up the child thread
933             t!(s.shutdown(Shutdown::Read));
934
935             // this test will never finish if the child doesn't wake up
936             rx.recv().unwrap();
937             drop(tx1);
938         })
939     }
940
941     #[test]
942     fn clone_while_reading() {
943         each_ip(&mut |addr| {
944             let accept = t!(TcpListener::bind(&addr));
945
946             // Enqueue a thread to write to a socket
947             let (tx, rx) = channel();
948             let (txdone, rxdone) = channel();
949             let txdone2 = txdone.clone();
950             let _t = thread::spawn(move|| {
951                 let mut tcp = t!(TcpStream::connect(&addr));
952                 rx.recv().unwrap();
953                 t!(tcp.write(&[0]));
954                 txdone2.send(()).unwrap();
955             });
956
957             // Spawn off a reading clone
958             let tcp = t!(accept.accept()).0;
959             let tcp2 = t!(tcp.try_clone());
960             let txdone3 = txdone.clone();
961             let _t = thread::spawn(move|| {
962                 let mut tcp2 = tcp2;
963                 t!(tcp2.read(&mut [0]));
964                 txdone3.send(()).unwrap();
965             });
966
967             // Try to ensure that the reading clone is indeed reading
968             for _ in 0..50 {
969                 thread::yield_now();
970             }
971
972             // clone the handle again while it's reading, then let it finish the
973             // read.
974             let _ = t!(tcp.try_clone());
975             tx.send(()).unwrap();
976             rxdone.recv().unwrap();
977             rxdone.recv().unwrap();
978         })
979     }
980
981     #[test]
982     fn clone_accept_smoke() {
983         each_ip(&mut |addr| {
984             let a = t!(TcpListener::bind(&addr));
985             let a2 = t!(a.try_clone());
986
987             let _t = thread::spawn(move|| {
988                 let _ = TcpStream::connect(&addr);
989             });
990             let _t = thread::spawn(move|| {
991                 let _ = TcpStream::connect(&addr);
992             });
993
994             t!(a.accept());
995             t!(a2.accept());
996         })
997     }
998
999     #[test]
1000     fn clone_accept_concurrent() {
1001         each_ip(&mut |addr| {
1002             let a = t!(TcpListener::bind(&addr));
1003             let a2 = t!(a.try_clone());
1004
1005             let (tx, rx) = channel();
1006             let tx2 = tx.clone();
1007
1008             let _t = thread::spawn(move|| {
1009                 tx.send(t!(a.accept())).unwrap();
1010             });
1011             let _t = thread::spawn(move|| {
1012                 tx2.send(t!(a2.accept())).unwrap();
1013             });
1014
1015             let _t = thread::spawn(move|| {
1016                 let _ = TcpStream::connect(&addr);
1017             });
1018             let _t = thread::spawn(move|| {
1019                 let _ = TcpStream::connect(&addr);
1020             });
1021
1022             rx.recv().unwrap();
1023             rx.recv().unwrap();
1024         })
1025     }
1026
1027     #[test]
1028     fn debug() {
1029         let name = if cfg!(windows) {"socket"} else {"fd"};
1030         let socket_addr = next_test_ip4();
1031
1032         let listener = t!(TcpListener::bind(&socket_addr));
1033         let listener_inner = listener.0.socket().as_inner();
1034         let compare = format!("TcpListener {{ addr: {:?}, {}: {:?} }}",
1035                               socket_addr, name, listener_inner);
1036         assert_eq!(format!("{:?}", listener), compare);
1037
1038         let stream = t!(TcpStream::connect(&("localhost",
1039                                                  socket_addr.port())));
1040         let stream_inner = stream.0.socket().as_inner();
1041         let compare = format!("TcpStream {{ addr: {:?}, \
1042                               peer: {:?}, {}: {:?} }}",
1043                               stream.local_addr().unwrap(),
1044                               stream.peer_addr().unwrap(),
1045                               name,
1046                               stream_inner);
1047         assert_eq!(format!("{:?}", stream), compare);
1048     }
1049
1050     // FIXME: re-enabled bitrig/openbsd tests once their socket timeout code
1051     //        no longer has rounding errors.
1052     #[cfg_attr(any(target_os = "bitrig", target_os = "netbsd", target_os = "openbsd"), ignore)]
1053     #[test]
1054     fn timeouts() {
1055         let addr = next_test_ip4();
1056         let listener = t!(TcpListener::bind(&addr));
1057
1058         let stream = t!(TcpStream::connect(&("localhost", addr.port())));
1059         let dur = Duration::new(15410, 0);
1060
1061         assert_eq!(None, t!(stream.read_timeout()));
1062
1063         t!(stream.set_read_timeout(Some(dur)));
1064         assert_eq!(Some(dur), t!(stream.read_timeout()));
1065
1066         assert_eq!(None, t!(stream.write_timeout()));
1067
1068         t!(stream.set_write_timeout(Some(dur)));
1069         assert_eq!(Some(dur), t!(stream.write_timeout()));
1070
1071         t!(stream.set_read_timeout(None));
1072         assert_eq!(None, t!(stream.read_timeout()));
1073
1074         t!(stream.set_write_timeout(None));
1075         assert_eq!(None, t!(stream.write_timeout()));
1076         drop(listener);
1077     }
1078
1079     #[test]
1080     fn test_read_timeout() {
1081         let addr = next_test_ip4();
1082         let listener = t!(TcpListener::bind(&addr));
1083
1084         let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
1085         t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
1086
1087         let mut buf = [0; 10];
1088         let start = Instant::now();
1089         let kind = stream.read(&mut buf).err().expect("expected error").kind();
1090         assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
1091         assert!(start.elapsed() > Duration::from_millis(400));
1092         drop(listener);
1093     }
1094
1095     #[test]
1096     fn test_read_with_timeout() {
1097         let addr = next_test_ip4();
1098         let listener = t!(TcpListener::bind(&addr));
1099
1100         let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
1101         t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
1102
1103         let mut other_end = t!(listener.accept()).0;
1104         t!(other_end.write_all(b"hello world"));
1105
1106         let mut buf = [0; 11];
1107         t!(stream.read(&mut buf));
1108         assert_eq!(b"hello world", &buf[..]);
1109
1110         let start = Instant::now();
1111         let kind = stream.read(&mut buf).err().expect("expected error").kind();
1112         assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
1113         assert!(start.elapsed() > Duration::from_millis(400));
1114         drop(listener);
1115     }
1116
1117     #[test]
1118     fn nodelay() {
1119         let addr = next_test_ip4();
1120         let _listener = t!(TcpListener::bind(&addr));
1121
1122         let stream = t!(TcpStream::connect(&("localhost", addr.port())));
1123
1124         assert_eq!(false, t!(stream.nodelay()));
1125         t!(stream.set_nodelay(true));
1126         assert_eq!(true, t!(stream.nodelay()));
1127         t!(stream.set_nodelay(false));
1128         assert_eq!(false, t!(stream.nodelay()));
1129     }
1130
1131     #[test]
1132     fn ttl() {
1133         let ttl = 100;
1134
1135         let addr = next_test_ip4();
1136         let listener = t!(TcpListener::bind(&addr));
1137
1138         t!(listener.set_ttl(ttl));
1139         assert_eq!(ttl, t!(listener.ttl()));
1140
1141         let stream = t!(TcpStream::connect(&("localhost", addr.port())));
1142
1143         t!(stream.set_ttl(ttl));
1144         assert_eq!(ttl, t!(stream.ttl()));
1145     }
1146
1147     #[test]
1148     fn set_nonblocking() {
1149         let addr = next_test_ip4();
1150         let listener = t!(TcpListener::bind(&addr));
1151
1152         t!(listener.set_nonblocking(true));
1153         t!(listener.set_nonblocking(false));
1154
1155         let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
1156
1157         t!(stream.set_nonblocking(false));
1158         t!(stream.set_nonblocking(true));
1159
1160         let mut buf = [0];
1161         match stream.read(&mut buf) {
1162             Ok(_) => panic!("expected error"),
1163             Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
1164             Err(e) => panic!("unexpected error {}", e),
1165         }
1166     }
1167 }