]> git.lizzy.rs Git - rust.git/blob - src/libstd/net/tcp.rs
Auto merge of #32239 - alexcrichton:fix-cross-to-freebsd, r=brson
[rust.git] / src / libstd / net / tcp.rs
1 // Copyright 2015 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use prelude::v1::*;
12 use io::prelude::*;
13
14 use fmt;
15 use io;
16 use net::{ToSocketAddrs, SocketAddr, Shutdown};
17 use sys_common::net as net_imp;
18 use sys_common::{AsInner, FromInner, IntoInner};
19 use time::Duration;
20
21 /// A structure which represents a TCP stream between a local socket and a
22 /// remote socket.
23 ///
24 /// The socket will be closed when the value is dropped.
25 ///
26 /// # Examples
27 ///
28 /// ```no_run
29 /// use std::io::prelude::*;
30 /// use std::net::TcpStream;
31 ///
32 /// {
33 ///     let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
34 ///
35 ///     // ignore the Result
36 ///     let _ = stream.write(&[1]);
37 ///     let _ = stream.read(&mut [0; 128]); // ignore here too
38 /// } // the stream is closed here
39 /// ```
40 #[stable(feature = "rust1", since = "1.0.0")]
41 pub struct TcpStream(net_imp::TcpStream);
42
43 /// A structure representing a socket server.
44 ///
45 /// # Examples
46 ///
47 /// ```no_run
48 /// use std::net::{TcpListener, TcpStream};
49 /// use std::thread;
50 ///
51 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
52 ///
53 /// fn handle_client(stream: TcpStream) {
54 ///     // ...
55 /// }
56 ///
57 /// // accept connections and process them, spawning a new thread for each one
58 /// for stream in listener.incoming() {
59 ///     match stream {
60 ///         Ok(stream) => {
61 ///             thread::spawn(move|| {
62 ///                 // connection succeeded
63 ///                 handle_client(stream)
64 ///             });
65 ///         }
66 ///         Err(e) => { /* connection failed */ }
67 ///     }
68 /// }
69 ///
70 /// // close the socket server
71 /// drop(listener);
72 /// ```
73 #[stable(feature = "rust1", since = "1.0.0")]
74 pub struct TcpListener(net_imp::TcpListener);
75
76 /// An infinite iterator over the connections from a `TcpListener`.
77 ///
78 /// This iterator will infinitely yield `Some` of the accepted connections. It
79 /// is equivalent to calling `accept` in a loop.
80 #[stable(feature = "rust1", since = "1.0.0")]
81 pub struct Incoming<'a> { listener: &'a TcpListener }
82
83 impl TcpStream {
84     /// Opens a TCP connection to a remote host.
85     ///
86     /// `addr` is an address of the remote host. Anything which implements
87     /// `ToSocketAddrs` trait can be supplied for the address; see this trait
88     /// documentation for concrete examples.
89     #[stable(feature = "rust1", since = "1.0.0")]
90     pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
91         super::each_addr(addr, net_imp::TcpStream::connect).map(TcpStream)
92     }
93
94     /// Returns the socket address of the remote peer of this TCP connection.
95     #[stable(feature = "rust1", since = "1.0.0")]
96     pub fn peer_addr(&self) -> io::Result<SocketAddr> {
97         self.0.peer_addr()
98     }
99
100     /// Returns the socket address of the local half of this TCP connection.
101     #[stable(feature = "rust1", since = "1.0.0")]
102     pub fn local_addr(&self) -> io::Result<SocketAddr> {
103         self.0.socket_addr()
104     }
105
106     /// Shuts down the read, write, or both halves of this connection.
107     ///
108     /// This function will cause all pending and future I/O on the specified
109     /// portions to return immediately with an appropriate value (see the
110     /// documentation of `Shutdown`).
111     #[stable(feature = "rust1", since = "1.0.0")]
112     pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
113         self.0.shutdown(how)
114     }
115
116     /// Creates a new independently owned handle to the underlying socket.
117     ///
118     /// The returned `TcpStream` is a reference to the same stream that this
119     /// object references. Both handles will read and write the same stream of
120     /// data, and options set on one stream will be propagated to the other
121     /// stream.
122     #[stable(feature = "rust1", since = "1.0.0")]
123     pub fn try_clone(&self) -> io::Result<TcpStream> {
124         self.0.duplicate().map(TcpStream)
125     }
126
127     /// Sets the read timeout to the timeout specified.
128     ///
129     /// If the value specified is `None`, then `read` calls will block
130     /// indefinitely. It is an error to pass the zero `Duration` to this
131     /// method.
132     ///
133     /// # Note
134     ///
135     /// Platforms may return a different error code whenever a read times out as
136     /// a result of setting this option. For example Unix typically returns an
137     /// error of the kind `WouldBlock`, but Windows may return `TimedOut`.
138     #[stable(feature = "socket_timeout", since = "1.4.0")]
139     pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
140         self.0.set_read_timeout(dur)
141     }
142
143     /// Sets the write timeout to the timeout specified.
144     ///
145     /// If the value specified is `None`, then `write` calls will block
146     /// indefinitely. It is an error to pass the zero `Duration` to this
147     /// method.
148     ///
149     /// # Note
150     ///
151     /// Platforms may return a different error code whenever a write times out
152     /// as a result of setting this option. For example Unix typically returns
153     /// an error of the kind `WouldBlock`, but Windows may return `TimedOut`.
154     #[stable(feature = "socket_timeout", since = "1.4.0")]
155     pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
156         self.0.set_write_timeout(dur)
157     }
158
159     /// Returns the read timeout of this socket.
160     ///
161     /// If the timeout is `None`, then `read` calls will block indefinitely.
162     ///
163     /// # Note
164     ///
165     /// Some platforms do not provide access to the current timeout.
166     #[stable(feature = "socket_timeout", since = "1.4.0")]
167     pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
168         self.0.read_timeout()
169     }
170
171     /// Returns the write timeout of this socket.
172     ///
173     /// If the timeout is `None`, then `write` calls will block indefinitely.
174     ///
175     /// # Note
176     ///
177     /// Some platforms do not provide access to the current timeout.
178     #[stable(feature = "socket_timeout", since = "1.4.0")]
179     pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
180         self.0.write_timeout()
181     }
182
183     /// Sets the value of the `TCP_NODELAY` option on this socket.
184     ///
185     /// If set, this option disables the Nagle algorithm. This means that
186     /// segments are always sent as soon as possible, even if there is only a
187     /// small amount of data. When not set, data is buffered until there is a
188     /// sufficient amount to send out, thereby avoiding the frequent sending of
189     /// small packets.
190     #[stable(feature = "net2_mutators", since = "1.9.0")]
191     pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
192         self.0.set_nodelay(nodelay)
193     }
194
195     /// Gets the value of the `TCP_NODELAY` option on this socket.
196     ///
197     /// For more information about this option, see [`set_nodelay`][link].
198     ///
199     /// [link]: #tymethod.set_nodelay
200     #[stable(feature = "net2_mutators", since = "1.9.0")]
201     pub fn nodelay(&self) -> io::Result<bool> {
202         self.0.nodelay()
203     }
204
205     /// Sets the value for the `IP_TTL` option on this socket.
206     ///
207     /// This value sets the time-to-live field that is used in every packet sent
208     /// from this socket.
209     #[stable(feature = "net2_mutators", since = "1.9.0")]
210     pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
211         self.0.set_ttl(ttl)
212     }
213
214     /// Gets the value of the `IP_TTL` option for this socket.
215     ///
216     /// For more information about this option, see [`set_ttl`][link].
217     ///
218     /// [link]: #tymethod.set_ttl
219     #[stable(feature = "net2_mutators", since = "1.9.0")]
220     pub fn ttl(&self) -> io::Result<u32> {
221         self.0.ttl()
222     }
223
224     /// Sets the value for the `IPV6_V6ONLY` option on this socket.
225     ///
226     /// If this is set to `true` then the socket is restricted to sending and
227     /// receiving IPv6 packets only. If this is the case, an IPv4 and an IPv6
228     /// application can each bind the same port at the same time.
229     ///
230     /// If this is set to `false` then the socket can be used to send and
231     /// receive packets from an IPv4-mapped IPv6 address.
232     #[stable(feature = "net2_mutators", since = "1.9.0")]
233     pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
234         self.0.set_only_v6(only_v6)
235     }
236
237     /// Gets the value of the `IPV6_V6ONLY` option for this socket.
238     ///
239     /// For more information about this option, see [`set_only_v6`][link].
240     ///
241     /// [link]: #tymethod.set_only_v6
242     #[stable(feature = "net2_mutators", since = "1.9.0")]
243     pub fn only_v6(&self) -> io::Result<bool> {
244         self.0.only_v6()
245     }
246
247     /// Get the value of the `SO_ERROR` option on this socket.
248     ///
249     /// This will retrieve the stored error in the underlying socket, clearing
250     /// the field in the process. This can be useful for checking errors between
251     /// calls.
252     #[stable(feature = "net2_mutators", since = "1.9.0")]
253     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
254         self.0.take_error()
255     }
256
257     /// Moves this TCP stream into or out of nonblocking mode.
258     ///
259     /// On Unix this corresponds to calling fcntl, and on Windows this
260     /// corresponds to calling ioctlsocket.
261     #[stable(feature = "net2_mutators", since = "1.9.0")]
262     pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
263         self.0.set_nonblocking(nonblocking)
264     }
265 }
266
267 #[stable(feature = "rust1", since = "1.0.0")]
268 impl Read for TcpStream {
269     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
270     fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
271         self.0.read_to_end(buf)
272     }
273 }
274 #[stable(feature = "rust1", since = "1.0.0")]
275 impl Write for TcpStream {
276     fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
277     fn flush(&mut self) -> io::Result<()> { Ok(()) }
278 }
279 #[stable(feature = "rust1", since = "1.0.0")]
280 impl<'a> Read for &'a TcpStream {
281     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
282     fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
283         self.0.read_to_end(buf)
284     }
285 }
286 #[stable(feature = "rust1", since = "1.0.0")]
287 impl<'a> Write for &'a TcpStream {
288     fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
289     fn flush(&mut self) -> io::Result<()> { Ok(()) }
290 }
291
292 impl AsInner<net_imp::TcpStream> for TcpStream {
293     fn as_inner(&self) -> &net_imp::TcpStream { &self.0 }
294 }
295
296 impl FromInner<net_imp::TcpStream> for TcpStream {
297     fn from_inner(inner: net_imp::TcpStream) -> TcpStream { TcpStream(inner) }
298 }
299
300 impl IntoInner<net_imp::TcpStream> for TcpStream {
301     fn into_inner(self) -> net_imp::TcpStream { self.0 }
302 }
303
304 #[stable(feature = "rust1", since = "1.0.0")]
305 impl fmt::Debug for TcpStream {
306     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
307         self.0.fmt(f)
308     }
309 }
310
311 impl TcpListener {
312     /// Creates a new `TcpListener` which will be bound to the specified
313     /// address.
314     ///
315     /// The returned listener is ready for accepting connections.
316     ///
317     /// Binding with a port number of 0 will request that the OS assigns a port
318     /// to this listener. The port allocated can be queried via the
319     /// `local_addr` method.
320     ///
321     /// The address type can be any implementor of `ToSocketAddrs` trait. See
322     /// its documentation for concrete examples.
323     #[stable(feature = "rust1", since = "1.0.0")]
324     pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
325         super::each_addr(addr, net_imp::TcpListener::bind).map(TcpListener)
326     }
327
328     /// Returns the local socket address of this listener.
329     #[stable(feature = "rust1", since = "1.0.0")]
330     pub fn local_addr(&self) -> io::Result<SocketAddr> {
331         self.0.socket_addr()
332     }
333
334     /// Creates a new independently owned handle to the underlying socket.
335     ///
336     /// The returned `TcpListener` is a reference to the same socket that this
337     /// object references. Both handles can be used to accept incoming
338     /// connections and options set on one listener will affect the other.
339     #[stable(feature = "rust1", since = "1.0.0")]
340     pub fn try_clone(&self) -> io::Result<TcpListener> {
341         self.0.duplicate().map(TcpListener)
342     }
343
344     /// Accept a new incoming connection from this listener.
345     ///
346     /// This function will block the calling thread until a new TCP connection
347     /// is established. When established, the corresponding `TcpStream` and the
348     /// remote peer's address will be returned.
349     #[stable(feature = "rust1", since = "1.0.0")]
350     pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
351         self.0.accept().map(|(a, b)| (TcpStream(a), b))
352     }
353
354     /// Returns an iterator over the connections being received on this
355     /// listener.
356     ///
357     /// The returned iterator will never return `None` and will also not yield
358     /// the peer's `SocketAddr` structure.
359     #[stable(feature = "rust1", since = "1.0.0")]
360     pub fn incoming(&self) -> Incoming {
361         Incoming { listener: self }
362     }
363
364     /// Sets the value for the `IP_TTL` option on this socket.
365     ///
366     /// This value sets the time-to-live field that is used in every packet sent
367     /// from this socket.
368     #[stable(feature = "net2_mutators", since = "1.9.0")]
369     pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
370         self.0.set_ttl(ttl)
371     }
372
373     /// Gets the value of the `IP_TTL` option for this socket.
374     ///
375     /// For more information about this option, see [`set_ttl`][link].
376     ///
377     /// [link]: #tymethod.set_ttl
378     #[stable(feature = "net2_mutators", since = "1.9.0")]
379     pub fn ttl(&self) -> io::Result<u32> {
380         self.0.ttl()
381     }
382
383     /// Sets the value for the `IPV6_V6ONLY` option on this socket.
384     ///
385     /// If this is set to `true` then the socket is restricted to sending and
386     /// receiving IPv6 packets only. In this case two IPv4 and IPv6 applications
387     /// can bind the same port at the same time.
388     ///
389     /// If this is set to `false` then the socket can be used to send and
390     /// receive packets from an IPv4-mapped IPv6 address.
391     #[stable(feature = "net2_mutators", since = "1.9.0")]
392     pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
393         self.0.set_only_v6(only_v6)
394     }
395
396     /// Gets the value of the `IPV6_V6ONLY` option for this socket.
397     ///
398     /// For more information about this option, see [`set_only_v6`][link].
399     ///
400     /// [link]: #tymethod.set_only_v6
401     #[stable(feature = "net2_mutators", since = "1.9.0")]
402     pub fn only_v6(&self) -> io::Result<bool> {
403         self.0.only_v6()
404     }
405
406     /// Get the value of the `SO_ERROR` option on this socket.
407     ///
408     /// This will retrieve the stored error in the underlying socket, clearing
409     /// the field in the process. This can be useful for checking errors between
410     /// calls.
411     #[stable(feature = "net2_mutators", since = "1.9.0")]
412     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
413         self.0.take_error()
414     }
415
416     /// Moves this TCP stream into or out of nonblocking mode.
417     ///
418     /// On Unix this corresponds to calling fcntl, and on Windows this
419     /// corresponds to calling ioctlsocket.
420     #[stable(feature = "net2_mutators", since = "1.9.0")]
421     pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
422         self.0.set_nonblocking(nonblocking)
423     }
424 }
425
426 #[stable(feature = "rust1", since = "1.0.0")]
427 impl<'a> Iterator for Incoming<'a> {
428     type Item = io::Result<TcpStream>;
429     fn next(&mut self) -> Option<io::Result<TcpStream>> {
430         Some(self.listener.accept().map(|p| p.0))
431     }
432 }
433
434 impl AsInner<net_imp::TcpListener> for TcpListener {
435     fn as_inner(&self) -> &net_imp::TcpListener { &self.0 }
436 }
437
438 impl FromInner<net_imp::TcpListener> for TcpListener {
439     fn from_inner(inner: net_imp::TcpListener) -> TcpListener {
440         TcpListener(inner)
441     }
442 }
443
444 impl IntoInner<net_imp::TcpListener> for TcpListener {
445     fn into_inner(self) -> net_imp::TcpListener { self.0 }
446 }
447
448 #[stable(feature = "rust1", since = "1.0.0")]
449 impl fmt::Debug for TcpListener {
450     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
451         self.0.fmt(f)
452     }
453 }
454
455 #[cfg(test)]
456 mod tests {
457     use prelude::v1::*;
458
459     use io::ErrorKind;
460     use io::prelude::*;
461     use net::*;
462     use net::test::{next_test_ip4, next_test_ip6};
463     use sync::mpsc::channel;
464     use sys_common::AsInner;
465     use time::{Instant, Duration};
466     use thread;
467
468     fn each_ip(f: &mut FnMut(SocketAddr)) {
469         f(next_test_ip4());
470         f(next_test_ip6());
471     }
472
473     macro_rules! t {
474         ($e:expr) => {
475             match $e {
476                 Ok(t) => t,
477                 Err(e) => panic!("received error for `{}`: {}", stringify!($e), e),
478             }
479         }
480     }
481
482     #[test]
483     fn bind_error() {
484         match TcpListener::bind("1.1.1.1:9999") {
485             Ok(..) => panic!(),
486             Err(e) =>
487                 assert_eq!(e.kind(), ErrorKind::AddrNotAvailable),
488         }
489     }
490
491     #[test]
492     fn connect_error() {
493         match TcpStream::connect("0.0.0.0:1") {
494             Ok(..) => panic!(),
495             Err(e) => assert!(e.kind() == ErrorKind::ConnectionRefused ||
496                               e.kind() == ErrorKind::InvalidInput ||
497                               e.kind() == ErrorKind::AddrInUse ||
498                               e.kind() == ErrorKind::AddrNotAvailable,
499                               "bad error: {} {:?}", e, e.kind()),
500         }
501     }
502
503     #[test]
504     fn listen_localhost() {
505         let socket_addr = next_test_ip4();
506         let listener = t!(TcpListener::bind(&socket_addr));
507
508         let _t = thread::spawn(move || {
509             let mut stream = t!(TcpStream::connect(&("localhost",
510                                                      socket_addr.port())));
511             t!(stream.write(&[144]));
512         });
513
514         let mut stream = t!(listener.accept()).0;
515         let mut buf = [0];
516         t!(stream.read(&mut buf));
517         assert!(buf[0] == 144);
518     }
519
520     #[test]
521     fn connect_loopback() {
522         each_ip(&mut |addr| {
523             let acceptor = t!(TcpListener::bind(&addr));
524
525             let _t = thread::spawn(move|| {
526                 let host = match addr {
527                     SocketAddr::V4(..) => "127.0.0.1",
528                     SocketAddr::V6(..) => "::1",
529                 };
530                 let mut stream = t!(TcpStream::connect(&(host, addr.port())));
531                 t!(stream.write(&[66]));
532             });
533
534             let mut stream = t!(acceptor.accept()).0;
535             let mut buf = [0];
536             t!(stream.read(&mut buf));
537             assert!(buf[0] == 66);
538         })
539     }
540
541     #[test]
542     fn smoke_test() {
543         each_ip(&mut |addr| {
544             let acceptor = t!(TcpListener::bind(&addr));
545
546             let (tx, rx) = channel();
547             let _t = thread::spawn(move|| {
548                 let mut stream = t!(TcpStream::connect(&addr));
549                 t!(stream.write(&[99]));
550                 tx.send(t!(stream.local_addr())).unwrap();
551             });
552
553             let (mut stream, addr) = t!(acceptor.accept());
554             let mut buf = [0];
555             t!(stream.read(&mut buf));
556             assert!(buf[0] == 99);
557             assert_eq!(addr, t!(rx.recv()));
558         })
559     }
560
561     #[test]
562     fn read_eof() {
563         each_ip(&mut |addr| {
564             let acceptor = t!(TcpListener::bind(&addr));
565
566             let _t = thread::spawn(move|| {
567                 let _stream = t!(TcpStream::connect(&addr));
568                 // Close
569             });
570
571             let mut stream = t!(acceptor.accept()).0;
572             let mut buf = [0];
573             let nread = t!(stream.read(&mut buf));
574             assert_eq!(nread, 0);
575             let nread = t!(stream.read(&mut buf));
576             assert_eq!(nread, 0);
577         })
578     }
579
580     #[test]
581     fn write_close() {
582         each_ip(&mut |addr| {
583             let acceptor = t!(TcpListener::bind(&addr));
584
585             let (tx, rx) = channel();
586             let _t = thread::spawn(move|| {
587                 drop(t!(TcpStream::connect(&addr)));
588                 tx.send(()).unwrap();
589             });
590
591             let mut stream = t!(acceptor.accept()).0;
592             rx.recv().unwrap();
593             let buf = [0];
594             match stream.write(&buf) {
595                 Ok(..) => {}
596                 Err(e) => {
597                     assert!(e.kind() == ErrorKind::ConnectionReset ||
598                             e.kind() == ErrorKind::BrokenPipe ||
599                             e.kind() == ErrorKind::ConnectionAborted,
600                             "unknown error: {}", e);
601                 }
602             }
603         })
604     }
605
606     #[test]
607     fn multiple_connect_serial() {
608         each_ip(&mut |addr| {
609             let max = 10;
610             let acceptor = t!(TcpListener::bind(&addr));
611
612             let _t = thread::spawn(move|| {
613                 for _ in 0..max {
614                     let mut stream = t!(TcpStream::connect(&addr));
615                     t!(stream.write(&[99]));
616                 }
617             });
618
619             for stream in acceptor.incoming().take(max) {
620                 let mut stream = t!(stream);
621                 let mut buf = [0];
622                 t!(stream.read(&mut buf));
623                 assert_eq!(buf[0], 99);
624             }
625         })
626     }
627
628     #[test]
629     fn multiple_connect_interleaved_greedy_schedule() {
630         const MAX: usize = 10;
631         each_ip(&mut |addr| {
632             let acceptor = t!(TcpListener::bind(&addr));
633
634             let _t = thread::spawn(move|| {
635                 let acceptor = acceptor;
636                 for (i, stream) in acceptor.incoming().enumerate().take(MAX) {
637                     // Start another thread to handle the connection
638                     let _t = thread::spawn(move|| {
639                         let mut stream = t!(stream);
640                         let mut buf = [0];
641                         t!(stream.read(&mut buf));
642                         assert!(buf[0] == i as u8);
643                     });
644                 }
645             });
646
647             connect(0, addr);
648         });
649
650         fn connect(i: usize, addr: SocketAddr) {
651             if i == MAX { return }
652
653             let t = thread::spawn(move|| {
654                 let mut stream = t!(TcpStream::connect(&addr));
655                 // Connect again before writing
656                 connect(i + 1, addr);
657                 t!(stream.write(&[i as u8]));
658             });
659             t.join().ok().unwrap();
660         }
661     }
662
663     #[test]
664     fn multiple_connect_interleaved_lazy_schedule() {
665         const MAX: usize = 10;
666         each_ip(&mut |addr| {
667             let acceptor = t!(TcpListener::bind(&addr));
668
669             let _t = thread::spawn(move|| {
670                 for stream in acceptor.incoming().take(MAX) {
671                     // Start another thread to handle the connection
672                     let _t = thread::spawn(move|| {
673                         let mut stream = t!(stream);
674                         let mut buf = [0];
675                         t!(stream.read(&mut buf));
676                         assert!(buf[0] == 99);
677                     });
678                 }
679             });
680
681             connect(0, addr);
682         });
683
684         fn connect(i: usize, addr: SocketAddr) {
685             if i == MAX { return }
686
687             let t = thread::spawn(move|| {
688                 let mut stream = t!(TcpStream::connect(&addr));
689                 connect(i + 1, addr);
690                 t!(stream.write(&[99]));
691             });
692             t.join().ok().unwrap();
693         }
694     }
695
696     #[test]
697     fn socket_and_peer_name() {
698         each_ip(&mut |addr| {
699             let listener = t!(TcpListener::bind(&addr));
700             let so_name = t!(listener.local_addr());
701             assert_eq!(addr, so_name);
702             let _t = thread::spawn(move|| {
703                 t!(listener.accept());
704             });
705
706             let stream = t!(TcpStream::connect(&addr));
707             assert_eq!(addr, t!(stream.peer_addr()));
708         })
709     }
710
711     #[test]
712     fn partial_read() {
713         each_ip(&mut |addr| {
714             let (tx, rx) = channel();
715             let srv = t!(TcpListener::bind(&addr));
716             let _t = thread::spawn(move|| {
717                 let mut cl = t!(srv.accept()).0;
718                 cl.write(&[10]).unwrap();
719                 let mut b = [0];
720                 t!(cl.read(&mut b));
721                 tx.send(()).unwrap();
722             });
723
724             let mut c = t!(TcpStream::connect(&addr));
725             let mut b = [0; 10];
726             assert_eq!(c.read(&mut b).unwrap(), 1);
727             t!(c.write(&[1]));
728             rx.recv().unwrap();
729         })
730     }
731
732     #[test]
733     fn double_bind() {
734         each_ip(&mut |addr| {
735             let _listener = t!(TcpListener::bind(&addr));
736             match TcpListener::bind(&addr) {
737                 Ok(..) => panic!(),
738                 Err(e) => {
739                     assert!(e.kind() == ErrorKind::ConnectionRefused ||
740                             e.kind() == ErrorKind::Other ||
741                             e.kind() == ErrorKind::AddrInUse,
742                             "unknown error: {} {:?}", e, e.kind());
743                 }
744             }
745         })
746     }
747
748     #[test]
749     fn fast_rebind() {
750         each_ip(&mut |addr| {
751             let acceptor = t!(TcpListener::bind(&addr));
752
753             let _t = thread::spawn(move|| {
754                 t!(TcpStream::connect(&addr));
755             });
756
757             t!(acceptor.accept());
758             drop(acceptor);
759             t!(TcpListener::bind(&addr));
760         });
761     }
762
763     #[test]
764     fn tcp_clone_smoke() {
765         each_ip(&mut |addr| {
766             let acceptor = t!(TcpListener::bind(&addr));
767
768             let _t = thread::spawn(move|| {
769                 let mut s = t!(TcpStream::connect(&addr));
770                 let mut buf = [0, 0];
771                 assert_eq!(s.read(&mut buf).unwrap(), 1);
772                 assert_eq!(buf[0], 1);
773                 t!(s.write(&[2]));
774             });
775
776             let mut s1 = t!(acceptor.accept()).0;
777             let s2 = t!(s1.try_clone());
778
779             let (tx1, rx1) = channel();
780             let (tx2, rx2) = channel();
781             let _t = thread::spawn(move|| {
782                 let mut s2 = s2;
783                 rx1.recv().unwrap();
784                 t!(s2.write(&[1]));
785                 tx2.send(()).unwrap();
786             });
787             tx1.send(()).unwrap();
788             let mut buf = [0, 0];
789             assert_eq!(s1.read(&mut buf).unwrap(), 1);
790             rx2.recv().unwrap();
791         })
792     }
793
794     #[test]
795     fn tcp_clone_two_read() {
796         each_ip(&mut |addr| {
797             let acceptor = t!(TcpListener::bind(&addr));
798             let (tx1, rx) = channel();
799             let tx2 = tx1.clone();
800
801             let _t = thread::spawn(move|| {
802                 let mut s = t!(TcpStream::connect(&addr));
803                 t!(s.write(&[1]));
804                 rx.recv().unwrap();
805                 t!(s.write(&[2]));
806                 rx.recv().unwrap();
807             });
808
809             let mut s1 = t!(acceptor.accept()).0;
810             let s2 = t!(s1.try_clone());
811
812             let (done, rx) = channel();
813             let _t = thread::spawn(move|| {
814                 let mut s2 = s2;
815                 let mut buf = [0, 0];
816                 t!(s2.read(&mut buf));
817                 tx2.send(()).unwrap();
818                 done.send(()).unwrap();
819             });
820             let mut buf = [0, 0];
821             t!(s1.read(&mut buf));
822             tx1.send(()).unwrap();
823
824             rx.recv().unwrap();
825         })
826     }
827
828     #[test]
829     fn tcp_clone_two_write() {
830         each_ip(&mut |addr| {
831             let acceptor = t!(TcpListener::bind(&addr));
832
833             let _t = thread::spawn(move|| {
834                 let mut s = t!(TcpStream::connect(&addr));
835                 let mut buf = [0, 1];
836                 t!(s.read(&mut buf));
837                 t!(s.read(&mut buf));
838             });
839
840             let mut s1 = t!(acceptor.accept()).0;
841             let s2 = t!(s1.try_clone());
842
843             let (done, rx) = channel();
844             let _t = thread::spawn(move|| {
845                 let mut s2 = s2;
846                 t!(s2.write(&[1]));
847                 done.send(()).unwrap();
848             });
849             t!(s1.write(&[2]));
850
851             rx.recv().unwrap();
852         })
853     }
854
855     #[test]
856     fn shutdown_smoke() {
857         each_ip(&mut |addr| {
858             let a = t!(TcpListener::bind(&addr));
859             let _t = thread::spawn(move|| {
860                 let mut c = t!(a.accept()).0;
861                 let mut b = [0];
862                 assert_eq!(c.read(&mut b).unwrap(), 0);
863                 t!(c.write(&[1]));
864             });
865
866             let mut s = t!(TcpStream::connect(&addr));
867             t!(s.shutdown(Shutdown::Write));
868             assert!(s.write(&[1]).is_err());
869             let mut b = [0, 0];
870             assert_eq!(t!(s.read(&mut b)), 1);
871             assert_eq!(b[0], 1);
872         })
873     }
874
875     #[test]
876     fn close_readwrite_smoke() {
877         each_ip(&mut |addr| {
878             let a = t!(TcpListener::bind(&addr));
879             let (tx, rx) = channel::<()>();
880             let _t = thread::spawn(move|| {
881                 let _s = t!(a.accept());
882                 let _ = rx.recv();
883             });
884
885             let mut b = [0];
886             let mut s = t!(TcpStream::connect(&addr));
887             let mut s2 = t!(s.try_clone());
888
889             // closing should prevent reads/writes
890             t!(s.shutdown(Shutdown::Write));
891             assert!(s.write(&[0]).is_err());
892             t!(s.shutdown(Shutdown::Read));
893             assert_eq!(s.read(&mut b).unwrap(), 0);
894
895             // closing should affect previous handles
896             assert!(s2.write(&[0]).is_err());
897             assert_eq!(s2.read(&mut b).unwrap(), 0);
898
899             // closing should affect new handles
900             let mut s3 = t!(s.try_clone());
901             assert!(s3.write(&[0]).is_err());
902             assert_eq!(s3.read(&mut b).unwrap(), 0);
903
904             // make sure these don't die
905             let _ = s2.shutdown(Shutdown::Read);
906             let _ = s2.shutdown(Shutdown::Write);
907             let _ = s3.shutdown(Shutdown::Read);
908             let _ = s3.shutdown(Shutdown::Write);
909             drop(tx);
910         })
911     }
912
913     #[test]
914     fn close_read_wakes_up() {
915         each_ip(&mut |addr| {
916             let a = t!(TcpListener::bind(&addr));
917             let (tx1, rx) = channel::<()>();
918             let _t = thread::spawn(move|| {
919                 let _s = t!(a.accept());
920                 let _ = rx.recv();
921             });
922
923             let s = t!(TcpStream::connect(&addr));
924             let s2 = t!(s.try_clone());
925             let (tx, rx) = channel();
926             let _t = thread::spawn(move|| {
927                 let mut s2 = s2;
928                 assert_eq!(t!(s2.read(&mut [0])), 0);
929                 tx.send(()).unwrap();
930             });
931             // this should wake up the child thread
932             t!(s.shutdown(Shutdown::Read));
933
934             // this test will never finish if the child doesn't wake up
935             rx.recv().unwrap();
936             drop(tx1);
937         })
938     }
939
940     #[test]
941     fn clone_while_reading() {
942         each_ip(&mut |addr| {
943             let accept = t!(TcpListener::bind(&addr));
944
945             // Enqueue a thread to write to a socket
946             let (tx, rx) = channel();
947             let (txdone, rxdone) = channel();
948             let txdone2 = txdone.clone();
949             let _t = thread::spawn(move|| {
950                 let mut tcp = t!(TcpStream::connect(&addr));
951                 rx.recv().unwrap();
952                 t!(tcp.write(&[0]));
953                 txdone2.send(()).unwrap();
954             });
955
956             // Spawn off a reading clone
957             let tcp = t!(accept.accept()).0;
958             let tcp2 = t!(tcp.try_clone());
959             let txdone3 = txdone.clone();
960             let _t = thread::spawn(move|| {
961                 let mut tcp2 = tcp2;
962                 t!(tcp2.read(&mut [0]));
963                 txdone3.send(()).unwrap();
964             });
965
966             // Try to ensure that the reading clone is indeed reading
967             for _ in 0..50 {
968                 thread::yield_now();
969             }
970
971             // clone the handle again while it's reading, then let it finish the
972             // read.
973             let _ = t!(tcp.try_clone());
974             tx.send(()).unwrap();
975             rxdone.recv().unwrap();
976             rxdone.recv().unwrap();
977         })
978     }
979
980     #[test]
981     fn clone_accept_smoke() {
982         each_ip(&mut |addr| {
983             let a = t!(TcpListener::bind(&addr));
984             let a2 = t!(a.try_clone());
985
986             let _t = thread::spawn(move|| {
987                 let _ = TcpStream::connect(&addr);
988             });
989             let _t = thread::spawn(move|| {
990                 let _ = TcpStream::connect(&addr);
991             });
992
993             t!(a.accept());
994             t!(a2.accept());
995         })
996     }
997
998     #[test]
999     fn clone_accept_concurrent() {
1000         each_ip(&mut |addr| {
1001             let a = t!(TcpListener::bind(&addr));
1002             let a2 = t!(a.try_clone());
1003
1004             let (tx, rx) = channel();
1005             let tx2 = tx.clone();
1006
1007             let _t = thread::spawn(move|| {
1008                 tx.send(t!(a.accept())).unwrap();
1009             });
1010             let _t = thread::spawn(move|| {
1011                 tx2.send(t!(a2.accept())).unwrap();
1012             });
1013
1014             let _t = thread::spawn(move|| {
1015                 let _ = TcpStream::connect(&addr);
1016             });
1017             let _t = thread::spawn(move|| {
1018                 let _ = TcpStream::connect(&addr);
1019             });
1020
1021             rx.recv().unwrap();
1022             rx.recv().unwrap();
1023         })
1024     }
1025
1026     #[test]
1027     fn debug() {
1028         let name = if cfg!(windows) {"socket"} else {"fd"};
1029         let socket_addr = next_test_ip4();
1030
1031         let listener = t!(TcpListener::bind(&socket_addr));
1032         let listener_inner = listener.0.socket().as_inner();
1033         let compare = format!("TcpListener {{ addr: {:?}, {}: {:?} }}",
1034                               socket_addr, name, listener_inner);
1035         assert_eq!(format!("{:?}", listener), compare);
1036
1037         let stream = t!(TcpStream::connect(&("localhost",
1038                                                  socket_addr.port())));
1039         let stream_inner = stream.0.socket().as_inner();
1040         let compare = format!("TcpStream {{ addr: {:?}, \
1041                               peer: {:?}, {}: {:?} }}",
1042                               stream.local_addr().unwrap(),
1043                               stream.peer_addr().unwrap(),
1044                               name,
1045                               stream_inner);
1046         assert_eq!(format!("{:?}", stream), compare);
1047     }
1048
1049     // FIXME: re-enabled bitrig/openbsd tests once their socket timeout code
1050     //        no longer has rounding errors.
1051     #[cfg_attr(any(target_os = "bitrig", target_os = "netbsd", target_os = "openbsd"), ignore)]
1052     #[test]
1053     fn timeouts() {
1054         let addr = next_test_ip4();
1055         let listener = t!(TcpListener::bind(&addr));
1056
1057         let stream = t!(TcpStream::connect(&("localhost", addr.port())));
1058         let dur = Duration::new(15410, 0);
1059
1060         assert_eq!(None, t!(stream.read_timeout()));
1061
1062         t!(stream.set_read_timeout(Some(dur)));
1063         assert_eq!(Some(dur), t!(stream.read_timeout()));
1064
1065         assert_eq!(None, t!(stream.write_timeout()));
1066
1067         t!(stream.set_write_timeout(Some(dur)));
1068         assert_eq!(Some(dur), t!(stream.write_timeout()));
1069
1070         t!(stream.set_read_timeout(None));
1071         assert_eq!(None, t!(stream.read_timeout()));
1072
1073         t!(stream.set_write_timeout(None));
1074         assert_eq!(None, t!(stream.write_timeout()));
1075         drop(listener);
1076     }
1077
1078     #[test]
1079     fn test_read_timeout() {
1080         let addr = next_test_ip4();
1081         let listener = t!(TcpListener::bind(&addr));
1082
1083         let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
1084         t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
1085
1086         let mut buf = [0; 10];
1087         let start = Instant::now();
1088         let kind = stream.read(&mut buf).err().expect("expected error").kind();
1089         assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
1090         assert!(start.elapsed() > Duration::from_millis(400));
1091         drop(listener);
1092     }
1093
1094     #[test]
1095     fn test_read_with_timeout() {
1096         let addr = next_test_ip4();
1097         let listener = t!(TcpListener::bind(&addr));
1098
1099         let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
1100         t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
1101
1102         let mut other_end = t!(listener.accept()).0;
1103         t!(other_end.write_all(b"hello world"));
1104
1105         let mut buf = [0; 11];
1106         t!(stream.read(&mut buf));
1107         assert_eq!(b"hello world", &buf[..]);
1108
1109         let start = Instant::now();
1110         let kind = stream.read(&mut buf).err().expect("expected error").kind();
1111         assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
1112         assert!(start.elapsed() > Duration::from_millis(400));
1113         drop(listener);
1114     }
1115
1116     #[test]
1117     fn nodelay() {
1118         let addr = next_test_ip4();
1119         let _listener = t!(TcpListener::bind(&addr));
1120
1121         let stream = t!(TcpStream::connect(&("localhost", addr.port())));
1122
1123         assert_eq!(false, t!(stream.nodelay()));
1124         t!(stream.set_nodelay(true));
1125         assert_eq!(true, t!(stream.nodelay()));
1126         t!(stream.set_nodelay(false));
1127         assert_eq!(false, t!(stream.nodelay()));
1128     }
1129
1130     #[test]
1131     fn ttl() {
1132         let ttl = 100;
1133
1134         let addr = next_test_ip4();
1135         let listener = t!(TcpListener::bind(&addr));
1136
1137         t!(listener.set_ttl(ttl));
1138         assert_eq!(ttl, t!(listener.ttl()));
1139
1140         let stream = t!(TcpStream::connect(&("localhost", addr.port())));
1141
1142         t!(stream.set_ttl(ttl));
1143         assert_eq!(ttl, t!(stream.ttl()));
1144     }
1145
1146     #[test]
1147     fn set_nonblocking() {
1148         let addr = next_test_ip4();
1149         let listener = t!(TcpListener::bind(&addr));
1150
1151         t!(listener.set_nonblocking(true));
1152         t!(listener.set_nonblocking(false));
1153
1154         let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
1155
1156         t!(stream.set_nonblocking(false));
1157         t!(stream.set_nonblocking(true));
1158
1159         let mut buf = [0];
1160         match stream.read(&mut buf) {
1161             Ok(_) => panic!("expected error"),
1162             Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
1163             Err(e) => panic!("unexpected error {}", e),
1164         }
1165     }
1166 }