]> git.lizzy.rs Git - rust.git/blob - src/libstd/net/tcp.rs
Rollup merge of #31969 - brson:relnotes, r=alexcrichton
[rust.git] / src / libstd / net / tcp.rs
1 // Copyright 2015 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use prelude::v1::*;
12 use io::prelude::*;
13
14 use fmt;
15 use io;
16 use net::{ToSocketAddrs, SocketAddr, Shutdown};
17 use sys_common::io::read_to_end_uninitialized;
18 use sys_common::net as net_imp;
19 use sys_common::{AsInner, FromInner, IntoInner};
20 use time::Duration;
21
22 /// A structure which represents a TCP stream between a local socket and a
23 /// remote socket.
24 ///
25 /// The socket will be closed when the value is dropped.
26 ///
27 /// # Examples
28 ///
29 /// ```no_run
30 /// use std::io::prelude::*;
31 /// use std::net::TcpStream;
32 ///
33 /// {
34 ///     let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
35 ///
36 ///     // ignore the Result
37 ///     let _ = stream.write(&[1]);
38 ///     let _ = stream.read(&mut [0; 128]); // ignore here too
39 /// } // the stream is closed here
40 /// ```
41 #[stable(feature = "rust1", since = "1.0.0")]
42 pub struct TcpStream(net_imp::TcpStream);
43
44 /// A structure representing a socket server.
45 ///
46 /// # Examples
47 ///
48 /// ```no_run
49 /// use std::net::{TcpListener, TcpStream};
50 /// use std::thread;
51 ///
52 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
53 ///
54 /// fn handle_client(stream: TcpStream) {
55 ///     // ...
56 /// }
57 ///
58 /// // accept connections and process them, spawning a new thread for each one
59 /// for stream in listener.incoming() {
60 ///     match stream {
61 ///         Ok(stream) => {
62 ///             thread::spawn(move|| {
63 ///                 // connection succeeded
64 ///                 handle_client(stream)
65 ///             });
66 ///         }
67 ///         Err(e) => { /* connection failed */ }
68 ///     }
69 /// }
70 ///
71 /// // close the socket server
72 /// drop(listener);
73 /// ```
74 #[stable(feature = "rust1", since = "1.0.0")]
75 pub struct TcpListener(net_imp::TcpListener);
76
77 /// An infinite iterator over the connections from a `TcpListener`.
78 ///
79 /// This iterator will infinitely yield `Some` of the accepted connections. It
80 /// is equivalent to calling `accept` in a loop.
81 #[stable(feature = "rust1", since = "1.0.0")]
82 pub struct Incoming<'a> { listener: &'a TcpListener }
83
84 impl TcpStream {
85     /// Opens a TCP connection to a remote host.
86     ///
87     /// `addr` is an address of the remote host. Anything which implements
88     /// `ToSocketAddrs` trait can be supplied for the address; see this trait
89     /// documentation for concrete examples.
90     #[stable(feature = "rust1", since = "1.0.0")]
91     pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
92         super::each_addr(addr, net_imp::TcpStream::connect).map(TcpStream)
93     }
94
95     /// Returns the socket address of the remote peer of this TCP connection.
96     #[stable(feature = "rust1", since = "1.0.0")]
97     pub fn peer_addr(&self) -> io::Result<SocketAddr> {
98         self.0.peer_addr()
99     }
100
101     /// Returns the socket address of the local half of this TCP connection.
102     #[stable(feature = "rust1", since = "1.0.0")]
103     pub fn local_addr(&self) -> io::Result<SocketAddr> {
104         self.0.socket_addr()
105     }
106
107     /// Shuts down the read, write, or both halves of this connection.
108     ///
109     /// This function will cause all pending and future I/O on the specified
110     /// portions to return immediately with an appropriate value (see the
111     /// documentation of `Shutdown`).
112     #[stable(feature = "rust1", since = "1.0.0")]
113     pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
114         self.0.shutdown(how)
115     }
116
117     /// Creates a new independently owned handle to the underlying socket.
118     ///
119     /// The returned `TcpStream` is a reference to the same stream that this
120     /// object references. Both handles will read and write the same stream of
121     /// data, and options set on one stream will be propagated to the other
122     /// stream.
123     #[stable(feature = "rust1", since = "1.0.0")]
124     pub fn try_clone(&self) -> io::Result<TcpStream> {
125         self.0.duplicate().map(TcpStream)
126     }
127
128     /// Sets the read timeout to the timeout specified.
129     ///
130     /// If the value specified is `None`, then `read` calls will block
131     /// indefinitely. It is an error to pass the zero `Duration` to this
132     /// method.
133     ///
134     /// # Note
135     ///
136     /// Platforms may return a different error code whenever a read times out as
137     /// a result of setting this option. For example Unix typically returns an
138     /// error of the kind `WouldBlock`, but Windows may return `TimedOut`.
139     #[stable(feature = "socket_timeout", since = "1.4.0")]
140     pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
141         self.0.set_read_timeout(dur)
142     }
143
144     /// Sets the write timeout to the timeout specified.
145     ///
146     /// If the value specified is `None`, then `write` calls will block
147     /// indefinitely. It is an error to pass the zero `Duration` to this
148     /// method.
149     ///
150     /// # Note
151     ///
152     /// Platforms may return a different error code whenever a write times out
153     /// as a result of setting this option. For example Unix typically returns
154     /// an error of the kind `WouldBlock`, but Windows may return `TimedOut`.
155     #[stable(feature = "socket_timeout", since = "1.4.0")]
156     pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
157         self.0.set_write_timeout(dur)
158     }
159
160     /// Returns the read timeout of this socket.
161     ///
162     /// If the timeout is `None`, then `read` calls will block indefinitely.
163     ///
164     /// # Note
165     ///
166     /// Some platforms do not provide access to the current timeout.
167     #[stable(feature = "socket_timeout", since = "1.4.0")]
168     pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
169         self.0.read_timeout()
170     }
171
172     /// Returns the write timeout of this socket.
173     ///
174     /// If the timeout is `None`, then `write` calls will block indefinitely.
175     ///
176     /// # Note
177     ///
178     /// Some platforms do not provide access to the current timeout.
179     #[stable(feature = "socket_timeout", since = "1.4.0")]
180     pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
181         self.0.write_timeout()
182     }
183 }
184
185 #[stable(feature = "rust1", since = "1.0.0")]
186 impl Read for TcpStream {
187     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
188     fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
189         unsafe { read_to_end_uninitialized(self, buf) }
190     }
191 }
192 #[stable(feature = "rust1", since = "1.0.0")]
193 impl Write for TcpStream {
194     fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
195     fn flush(&mut self) -> io::Result<()> { Ok(()) }
196 }
197 #[stable(feature = "rust1", since = "1.0.0")]
198 impl<'a> Read for &'a TcpStream {
199     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
200     fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
201         unsafe { read_to_end_uninitialized(self, buf) }
202     }
203 }
204 #[stable(feature = "rust1", since = "1.0.0")]
205 impl<'a> Write for &'a TcpStream {
206     fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
207     fn flush(&mut self) -> io::Result<()> { Ok(()) }
208 }
209
210 impl AsInner<net_imp::TcpStream> for TcpStream {
211     fn as_inner(&self) -> &net_imp::TcpStream { &self.0 }
212 }
213
214 impl FromInner<net_imp::TcpStream> for TcpStream {
215     fn from_inner(inner: net_imp::TcpStream) -> TcpStream { TcpStream(inner) }
216 }
217
218 impl IntoInner<net_imp::TcpStream> for TcpStream {
219     fn into_inner(self) -> net_imp::TcpStream { self.0 }
220 }
221
222 #[stable(feature = "rust1", since = "1.0.0")]
223 impl fmt::Debug for TcpStream {
224     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
225         self.0.fmt(f)
226     }
227 }
228
229 impl TcpListener {
230     /// Creates a new `TcpListener` which will be bound to the specified
231     /// address.
232     ///
233     /// The returned listener is ready for accepting connections.
234     ///
235     /// Binding with a port number of 0 will request that the OS assigns a port
236     /// to this listener. The port allocated can be queried via the
237     /// `local_addr` method.
238     ///
239     /// The address type can be any implementor of `ToSocketAddrs` trait. See
240     /// its documentation for concrete examples.
241     #[stable(feature = "rust1", since = "1.0.0")]
242     pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
243         super::each_addr(addr, net_imp::TcpListener::bind).map(TcpListener)
244     }
245
246     /// Returns the local socket address of this listener.
247     #[stable(feature = "rust1", since = "1.0.0")]
248     pub fn local_addr(&self) -> io::Result<SocketAddr> {
249         self.0.socket_addr()
250     }
251
252     /// Creates a new independently owned handle to the underlying socket.
253     ///
254     /// The returned `TcpListener` is a reference to the same socket that this
255     /// object references. Both handles can be used to accept incoming
256     /// connections and options set on one listener will affect the other.
257     #[stable(feature = "rust1", since = "1.0.0")]
258     pub fn try_clone(&self) -> io::Result<TcpListener> {
259         self.0.duplicate().map(TcpListener)
260     }
261
262     /// Accept a new incoming connection from this listener.
263     ///
264     /// This function will block the calling thread until a new TCP connection
265     /// is established. When established, the corresponding `TcpStream` and the
266     /// remote peer's address will be returned.
267     #[stable(feature = "rust1", since = "1.0.0")]
268     pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
269         self.0.accept().map(|(a, b)| (TcpStream(a), b))
270     }
271
272     /// Returns an iterator over the connections being received on this
273     /// listener.
274     ///
275     /// The returned iterator will never return `None` and will also not yield
276     /// the peer's `SocketAddr` structure.
277     #[stable(feature = "rust1", since = "1.0.0")]
278     pub fn incoming(&self) -> Incoming {
279         Incoming { listener: self }
280     }
281 }
282
283 #[stable(feature = "rust1", since = "1.0.0")]
284 impl<'a> Iterator for Incoming<'a> {
285     type Item = io::Result<TcpStream>;
286     fn next(&mut self) -> Option<io::Result<TcpStream>> {
287         Some(self.listener.accept().map(|p| p.0))
288     }
289 }
290
291 impl AsInner<net_imp::TcpListener> for TcpListener {
292     fn as_inner(&self) -> &net_imp::TcpListener { &self.0 }
293 }
294
295 impl FromInner<net_imp::TcpListener> for TcpListener {
296     fn from_inner(inner: net_imp::TcpListener) -> TcpListener {
297         TcpListener(inner)
298     }
299 }
300
301 impl IntoInner<net_imp::TcpListener> for TcpListener {
302     fn into_inner(self) -> net_imp::TcpListener { self.0 }
303 }
304
305 #[stable(feature = "rust1", since = "1.0.0")]
306 impl fmt::Debug for TcpListener {
307     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
308         self.0.fmt(f)
309     }
310 }
311
312 #[cfg(test)]
313 mod tests {
314     use prelude::v1::*;
315
316     use io::ErrorKind;
317     use io::prelude::*;
318     use net::*;
319     use net::test::{next_test_ip4, next_test_ip6};
320     use sync::mpsc::channel;
321     use sys_common::AsInner;
322     use time::{Instant, Duration};
323     use thread;
324
325     fn each_ip(f: &mut FnMut(SocketAddr)) {
326         f(next_test_ip4());
327         f(next_test_ip6());
328     }
329
330     macro_rules! t {
331         ($e:expr) => {
332             match $e {
333                 Ok(t) => t,
334                 Err(e) => panic!("received error for `{}`: {}", stringify!($e), e),
335             }
336         }
337     }
338
339     #[test]
340     fn bind_error() {
341         match TcpListener::bind("1.1.1.1:9999") {
342             Ok(..) => panic!(),
343             Err(e) =>
344                 assert_eq!(e.kind(), ErrorKind::AddrNotAvailable),
345         }
346     }
347
348     #[test]
349     fn connect_error() {
350         match TcpStream::connect("0.0.0.0:1") {
351             Ok(..) => panic!(),
352             Err(e) => assert!(e.kind() == ErrorKind::ConnectionRefused ||
353                               e.kind() == ErrorKind::InvalidInput ||
354                               e.kind() == ErrorKind::AddrInUse ||
355                               e.kind() == ErrorKind::AddrNotAvailable,
356                               "bad error: {} {:?}", e, e.kind()),
357         }
358     }
359
360     #[test]
361     fn listen_localhost() {
362         let socket_addr = next_test_ip4();
363         let listener = t!(TcpListener::bind(&socket_addr));
364
365         let _t = thread::spawn(move || {
366             let mut stream = t!(TcpStream::connect(&("localhost",
367                                                      socket_addr.port())));
368             t!(stream.write(&[144]));
369         });
370
371         let mut stream = t!(listener.accept()).0;
372         let mut buf = [0];
373         t!(stream.read(&mut buf));
374         assert!(buf[0] == 144);
375     }
376
377     #[test]
378     fn connect_loopback() {
379         each_ip(&mut |addr| {
380             let acceptor = t!(TcpListener::bind(&addr));
381
382             let _t = thread::spawn(move|| {
383                 let host = match addr {
384                     SocketAddr::V4(..) => "127.0.0.1",
385                     SocketAddr::V6(..) => "::1",
386                 };
387                 let mut stream = t!(TcpStream::connect(&(host, addr.port())));
388                 t!(stream.write(&[66]));
389             });
390
391             let mut stream = t!(acceptor.accept()).0;
392             let mut buf = [0];
393             t!(stream.read(&mut buf));
394             assert!(buf[0] == 66);
395         })
396     }
397
398     #[test]
399     fn smoke_test() {
400         each_ip(&mut |addr| {
401             let acceptor = t!(TcpListener::bind(&addr));
402
403             let (tx, rx) = channel();
404             let _t = thread::spawn(move|| {
405                 let mut stream = t!(TcpStream::connect(&addr));
406                 t!(stream.write(&[99]));
407                 tx.send(t!(stream.local_addr())).unwrap();
408             });
409
410             let (mut stream, addr) = t!(acceptor.accept());
411             let mut buf = [0];
412             t!(stream.read(&mut buf));
413             assert!(buf[0] == 99);
414             assert_eq!(addr, t!(rx.recv()));
415         })
416     }
417
418     #[test]
419     fn read_eof() {
420         each_ip(&mut |addr| {
421             let acceptor = t!(TcpListener::bind(&addr));
422
423             let _t = thread::spawn(move|| {
424                 let _stream = t!(TcpStream::connect(&addr));
425                 // Close
426             });
427
428             let mut stream = t!(acceptor.accept()).0;
429             let mut buf = [0];
430             let nread = t!(stream.read(&mut buf));
431             assert_eq!(nread, 0);
432             let nread = t!(stream.read(&mut buf));
433             assert_eq!(nread, 0);
434         })
435     }
436
437     #[test]
438     fn write_close() {
439         each_ip(&mut |addr| {
440             let acceptor = t!(TcpListener::bind(&addr));
441
442             let (tx, rx) = channel();
443             let _t = thread::spawn(move|| {
444                 drop(t!(TcpStream::connect(&addr)));
445                 tx.send(()).unwrap();
446             });
447
448             let mut stream = t!(acceptor.accept()).0;
449             rx.recv().unwrap();
450             let buf = [0];
451             match stream.write(&buf) {
452                 Ok(..) => {}
453                 Err(e) => {
454                     assert!(e.kind() == ErrorKind::ConnectionReset ||
455                             e.kind() == ErrorKind::BrokenPipe ||
456                             e.kind() == ErrorKind::ConnectionAborted,
457                             "unknown error: {}", e);
458                 }
459             }
460         })
461     }
462
463     #[test]
464     fn multiple_connect_serial() {
465         each_ip(&mut |addr| {
466             let max = 10;
467             let acceptor = t!(TcpListener::bind(&addr));
468
469             let _t = thread::spawn(move|| {
470                 for _ in 0..max {
471                     let mut stream = t!(TcpStream::connect(&addr));
472                     t!(stream.write(&[99]));
473                 }
474             });
475
476             for stream in acceptor.incoming().take(max) {
477                 let mut stream = t!(stream);
478                 let mut buf = [0];
479                 t!(stream.read(&mut buf));
480                 assert_eq!(buf[0], 99);
481             }
482         })
483     }
484
485     #[test]
486     fn multiple_connect_interleaved_greedy_schedule() {
487         const MAX: usize = 10;
488         each_ip(&mut |addr| {
489             let acceptor = t!(TcpListener::bind(&addr));
490
491             let _t = thread::spawn(move|| {
492                 let acceptor = acceptor;
493                 for (i, stream) in acceptor.incoming().enumerate().take(MAX) {
494                     // Start another thread to handle the connection
495                     let _t = thread::spawn(move|| {
496                         let mut stream = t!(stream);
497                         let mut buf = [0];
498                         t!(stream.read(&mut buf));
499                         assert!(buf[0] == i as u8);
500                     });
501                 }
502             });
503
504             connect(0, addr);
505         });
506
507         fn connect(i: usize, addr: SocketAddr) {
508             if i == MAX { return }
509
510             let t = thread::spawn(move|| {
511                 let mut stream = t!(TcpStream::connect(&addr));
512                 // Connect again before writing
513                 connect(i + 1, addr);
514                 t!(stream.write(&[i as u8]));
515             });
516             t.join().ok().unwrap();
517         }
518     }
519
520     #[test]
521     fn multiple_connect_interleaved_lazy_schedule() {
522         const MAX: usize = 10;
523         each_ip(&mut |addr| {
524             let acceptor = t!(TcpListener::bind(&addr));
525
526             let _t = thread::spawn(move|| {
527                 for stream in acceptor.incoming().take(MAX) {
528                     // Start another thread to handle the connection
529                     let _t = thread::spawn(move|| {
530                         let mut stream = t!(stream);
531                         let mut buf = [0];
532                         t!(stream.read(&mut buf));
533                         assert!(buf[0] == 99);
534                     });
535                 }
536             });
537
538             connect(0, addr);
539         });
540
541         fn connect(i: usize, addr: SocketAddr) {
542             if i == MAX { return }
543
544             let t = thread::spawn(move|| {
545                 let mut stream = t!(TcpStream::connect(&addr));
546                 connect(i + 1, addr);
547                 t!(stream.write(&[99]));
548             });
549             t.join().ok().unwrap();
550         }
551     }
552
553     #[test]
554     fn socket_and_peer_name() {
555         each_ip(&mut |addr| {
556             let listener = t!(TcpListener::bind(&addr));
557             let so_name = t!(listener.local_addr());
558             assert_eq!(addr, so_name);
559             let _t = thread::spawn(move|| {
560                 t!(listener.accept());
561             });
562
563             let stream = t!(TcpStream::connect(&addr));
564             assert_eq!(addr, t!(stream.peer_addr()));
565         })
566     }
567
568     #[test]
569     fn partial_read() {
570         each_ip(&mut |addr| {
571             let (tx, rx) = channel();
572             let srv = t!(TcpListener::bind(&addr));
573             let _t = thread::spawn(move|| {
574                 let mut cl = t!(srv.accept()).0;
575                 cl.write(&[10]).unwrap();
576                 let mut b = [0];
577                 t!(cl.read(&mut b));
578                 tx.send(()).unwrap();
579             });
580
581             let mut c = t!(TcpStream::connect(&addr));
582             let mut b = [0; 10];
583             assert_eq!(c.read(&mut b).unwrap(), 1);
584             t!(c.write(&[1]));
585             rx.recv().unwrap();
586         })
587     }
588
589     #[test]
590     fn double_bind() {
591         each_ip(&mut |addr| {
592             let _listener = t!(TcpListener::bind(&addr));
593             match TcpListener::bind(&addr) {
594                 Ok(..) => panic!(),
595                 Err(e) => {
596                     assert!(e.kind() == ErrorKind::ConnectionRefused ||
597                             e.kind() == ErrorKind::Other ||
598                             e.kind() == ErrorKind::AddrInUse,
599                             "unknown error: {} {:?}", e, e.kind());
600                 }
601             }
602         })
603     }
604
605     #[test]
606     fn fast_rebind() {
607         each_ip(&mut |addr| {
608             let acceptor = t!(TcpListener::bind(&addr));
609
610             let _t = thread::spawn(move|| {
611                 t!(TcpStream::connect(&addr));
612             });
613
614             t!(acceptor.accept());
615             drop(acceptor);
616             t!(TcpListener::bind(&addr));
617         });
618     }
619
620     #[test]
621     fn tcp_clone_smoke() {
622         each_ip(&mut |addr| {
623             let acceptor = t!(TcpListener::bind(&addr));
624
625             let _t = thread::spawn(move|| {
626                 let mut s = t!(TcpStream::connect(&addr));
627                 let mut buf = [0, 0];
628                 assert_eq!(s.read(&mut buf).unwrap(), 1);
629                 assert_eq!(buf[0], 1);
630                 t!(s.write(&[2]));
631             });
632
633             let mut s1 = t!(acceptor.accept()).0;
634             let s2 = t!(s1.try_clone());
635
636             let (tx1, rx1) = channel();
637             let (tx2, rx2) = channel();
638             let _t = thread::spawn(move|| {
639                 let mut s2 = s2;
640                 rx1.recv().unwrap();
641                 t!(s2.write(&[1]));
642                 tx2.send(()).unwrap();
643             });
644             tx1.send(()).unwrap();
645             let mut buf = [0, 0];
646             assert_eq!(s1.read(&mut buf).unwrap(), 1);
647             rx2.recv().unwrap();
648         })
649     }
650
651     #[test]
652     fn tcp_clone_two_read() {
653         each_ip(&mut |addr| {
654             let acceptor = t!(TcpListener::bind(&addr));
655             let (tx1, rx) = channel();
656             let tx2 = tx1.clone();
657
658             let _t = thread::spawn(move|| {
659                 let mut s = t!(TcpStream::connect(&addr));
660                 t!(s.write(&[1]));
661                 rx.recv().unwrap();
662                 t!(s.write(&[2]));
663                 rx.recv().unwrap();
664             });
665
666             let mut s1 = t!(acceptor.accept()).0;
667             let s2 = t!(s1.try_clone());
668
669             let (done, rx) = channel();
670             let _t = thread::spawn(move|| {
671                 let mut s2 = s2;
672                 let mut buf = [0, 0];
673                 t!(s2.read(&mut buf));
674                 tx2.send(()).unwrap();
675                 done.send(()).unwrap();
676             });
677             let mut buf = [0, 0];
678             t!(s1.read(&mut buf));
679             tx1.send(()).unwrap();
680
681             rx.recv().unwrap();
682         })
683     }
684
685     #[test]
686     fn tcp_clone_two_write() {
687         each_ip(&mut |addr| {
688             let acceptor = t!(TcpListener::bind(&addr));
689
690             let _t = thread::spawn(move|| {
691                 let mut s = t!(TcpStream::connect(&addr));
692                 let mut buf = [0, 1];
693                 t!(s.read(&mut buf));
694                 t!(s.read(&mut buf));
695             });
696
697             let mut s1 = t!(acceptor.accept()).0;
698             let s2 = t!(s1.try_clone());
699
700             let (done, rx) = channel();
701             let _t = thread::spawn(move|| {
702                 let mut s2 = s2;
703                 t!(s2.write(&[1]));
704                 done.send(()).unwrap();
705             });
706             t!(s1.write(&[2]));
707
708             rx.recv().unwrap();
709         })
710     }
711
712     #[test]
713     fn shutdown_smoke() {
714         each_ip(&mut |addr| {
715             let a = t!(TcpListener::bind(&addr));
716             let _t = thread::spawn(move|| {
717                 let mut c = t!(a.accept()).0;
718                 let mut b = [0];
719                 assert_eq!(c.read(&mut b).unwrap(), 0);
720                 t!(c.write(&[1]));
721             });
722
723             let mut s = t!(TcpStream::connect(&addr));
724             t!(s.shutdown(Shutdown::Write));
725             assert!(s.write(&[1]).is_err());
726             let mut b = [0, 0];
727             assert_eq!(t!(s.read(&mut b)), 1);
728             assert_eq!(b[0], 1);
729         })
730     }
731
732     #[test]
733     fn close_readwrite_smoke() {
734         each_ip(&mut |addr| {
735             let a = t!(TcpListener::bind(&addr));
736             let (tx, rx) = channel::<()>();
737             let _t = thread::spawn(move|| {
738                 let _s = t!(a.accept());
739                 let _ = rx.recv();
740             });
741
742             let mut b = [0];
743             let mut s = t!(TcpStream::connect(&addr));
744             let mut s2 = t!(s.try_clone());
745
746             // closing should prevent reads/writes
747             t!(s.shutdown(Shutdown::Write));
748             assert!(s.write(&[0]).is_err());
749             t!(s.shutdown(Shutdown::Read));
750             assert_eq!(s.read(&mut b).unwrap(), 0);
751
752             // closing should affect previous handles
753             assert!(s2.write(&[0]).is_err());
754             assert_eq!(s2.read(&mut b).unwrap(), 0);
755
756             // closing should affect new handles
757             let mut s3 = t!(s.try_clone());
758             assert!(s3.write(&[0]).is_err());
759             assert_eq!(s3.read(&mut b).unwrap(), 0);
760
761             // make sure these don't die
762             let _ = s2.shutdown(Shutdown::Read);
763             let _ = s2.shutdown(Shutdown::Write);
764             let _ = s3.shutdown(Shutdown::Read);
765             let _ = s3.shutdown(Shutdown::Write);
766             drop(tx);
767         })
768     }
769
770     #[test]
771     fn close_read_wakes_up() {
772         each_ip(&mut |addr| {
773             let a = t!(TcpListener::bind(&addr));
774             let (tx1, rx) = channel::<()>();
775             let _t = thread::spawn(move|| {
776                 let _s = t!(a.accept());
777                 let _ = rx.recv();
778             });
779
780             let s = t!(TcpStream::connect(&addr));
781             let s2 = t!(s.try_clone());
782             let (tx, rx) = channel();
783             let _t = thread::spawn(move|| {
784                 let mut s2 = s2;
785                 assert_eq!(t!(s2.read(&mut [0])), 0);
786                 tx.send(()).unwrap();
787             });
788             // this should wake up the child thread
789             t!(s.shutdown(Shutdown::Read));
790
791             // this test will never finish if the child doesn't wake up
792             rx.recv().unwrap();
793             drop(tx1);
794         })
795     }
796
797     #[test]
798     fn clone_while_reading() {
799         each_ip(&mut |addr| {
800             let accept = t!(TcpListener::bind(&addr));
801
802             // Enqueue a thread to write to a socket
803             let (tx, rx) = channel();
804             let (txdone, rxdone) = channel();
805             let txdone2 = txdone.clone();
806             let _t = thread::spawn(move|| {
807                 let mut tcp = t!(TcpStream::connect(&addr));
808                 rx.recv().unwrap();
809                 t!(tcp.write(&[0]));
810                 txdone2.send(()).unwrap();
811             });
812
813             // Spawn off a reading clone
814             let tcp = t!(accept.accept()).0;
815             let tcp2 = t!(tcp.try_clone());
816             let txdone3 = txdone.clone();
817             let _t = thread::spawn(move|| {
818                 let mut tcp2 = tcp2;
819                 t!(tcp2.read(&mut [0]));
820                 txdone3.send(()).unwrap();
821             });
822
823             // Try to ensure that the reading clone is indeed reading
824             for _ in 0..50 {
825                 thread::yield_now();
826             }
827
828             // clone the handle again while it's reading, then let it finish the
829             // read.
830             let _ = t!(tcp.try_clone());
831             tx.send(()).unwrap();
832             rxdone.recv().unwrap();
833             rxdone.recv().unwrap();
834         })
835     }
836
837     #[test]
838     fn clone_accept_smoke() {
839         each_ip(&mut |addr| {
840             let a = t!(TcpListener::bind(&addr));
841             let a2 = t!(a.try_clone());
842
843             let _t = thread::spawn(move|| {
844                 let _ = TcpStream::connect(&addr);
845             });
846             let _t = thread::spawn(move|| {
847                 let _ = TcpStream::connect(&addr);
848             });
849
850             t!(a.accept());
851             t!(a2.accept());
852         })
853     }
854
855     #[test]
856     fn clone_accept_concurrent() {
857         each_ip(&mut |addr| {
858             let a = t!(TcpListener::bind(&addr));
859             let a2 = t!(a.try_clone());
860
861             let (tx, rx) = channel();
862             let tx2 = tx.clone();
863
864             let _t = thread::spawn(move|| {
865                 tx.send(t!(a.accept())).unwrap();
866             });
867             let _t = thread::spawn(move|| {
868                 tx2.send(t!(a2.accept())).unwrap();
869             });
870
871             let _t = thread::spawn(move|| {
872                 let _ = TcpStream::connect(&addr);
873             });
874             let _t = thread::spawn(move|| {
875                 let _ = TcpStream::connect(&addr);
876             });
877
878             rx.recv().unwrap();
879             rx.recv().unwrap();
880         })
881     }
882
883     #[test]
884     fn debug() {
885         let name = if cfg!(windows) {"socket"} else {"fd"};
886         let socket_addr = next_test_ip4();
887
888         let listener = t!(TcpListener::bind(&socket_addr));
889         let listener_inner = listener.0.socket().as_inner();
890         let compare = format!("TcpListener {{ addr: {:?}, {}: {:?} }}",
891                               socket_addr, name, listener_inner);
892         assert_eq!(format!("{:?}", listener), compare);
893
894         let stream = t!(TcpStream::connect(&("localhost",
895                                                  socket_addr.port())));
896         let stream_inner = stream.0.socket().as_inner();
897         let compare = format!("TcpStream {{ addr: {:?}, \
898                               peer: {:?}, {}: {:?} }}",
899                               stream.local_addr().unwrap(),
900                               stream.peer_addr().unwrap(),
901                               name,
902                               stream_inner);
903         assert_eq!(format!("{:?}", stream), compare);
904     }
905
906     // FIXME: re-enabled bitrig/openbsd tests once their socket timeout code
907     //        no longer has rounding errors.
908     #[cfg_attr(any(target_os = "bitrig", target_os = "netbsd", target_os = "openbsd"), ignore)]
909     #[test]
910     fn timeouts() {
911         let addr = next_test_ip4();
912         let listener = t!(TcpListener::bind(&addr));
913
914         let stream = t!(TcpStream::connect(&("localhost", addr.port())));
915         let dur = Duration::new(15410, 0);
916
917         assert_eq!(None, t!(stream.read_timeout()));
918
919         t!(stream.set_read_timeout(Some(dur)));
920         assert_eq!(Some(dur), t!(stream.read_timeout()));
921
922         assert_eq!(None, t!(stream.write_timeout()));
923
924         t!(stream.set_write_timeout(Some(dur)));
925         assert_eq!(Some(dur), t!(stream.write_timeout()));
926
927         t!(stream.set_read_timeout(None));
928         assert_eq!(None, t!(stream.read_timeout()));
929
930         t!(stream.set_write_timeout(None));
931         assert_eq!(None, t!(stream.write_timeout()));
932         drop(listener);
933     }
934
935     #[test]
936     fn test_read_timeout() {
937         let addr = next_test_ip4();
938         let listener = t!(TcpListener::bind(&addr));
939
940         let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
941         t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
942
943         let mut buf = [0; 10];
944         let start = Instant::now();
945         let kind = stream.read(&mut buf).err().expect("expected error").kind();
946         assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
947         assert!(start.elapsed() > Duration::from_millis(400));
948         drop(listener);
949     }
950
951     #[test]
952     fn test_read_with_timeout() {
953         let addr = next_test_ip4();
954         let listener = t!(TcpListener::bind(&addr));
955
956         let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
957         t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
958
959         let mut other_end = t!(listener.accept()).0;
960         t!(other_end.write_all(b"hello world"));
961
962         let mut buf = [0; 11];
963         t!(stream.read(&mut buf));
964         assert_eq!(b"hello world", &buf[..]);
965
966         let start = Instant::now();
967         let kind = stream.read(&mut buf).err().expect("expected error").kind();
968         assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
969         assert!(start.elapsed() > Duration::from_millis(400));
970         drop(listener);
971     }
972 }