]> git.lizzy.rs Git - rust.git/blob - src/libstd/old_io/net/tcp.rs
ebf7f6cc0f2a9e6b4d256033ef1669e755827cfc
[rust.git] / src / libstd / old_io / net / tcp.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! TCP network connections
12 //!
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
16 //!
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
19
20 use clone::Clone;
21 use old_io::IoResult;
22 use result::Result::Err;
23 use old_io::net::ip::{SocketAddr, ToSocketAddr};
24 use old_io::{Reader, Writer, Listener, Acceptor};
25 use old_io::{standard_error, TimedOut};
26 use option::Option;
27 use option::Option::{None, Some};
28 use time::Duration;
29
30 use sys::tcp::TcpStream as TcpStreamImp;
31 use sys::tcp::TcpListener as TcpListenerImp;
32 use sys::tcp::TcpAcceptor as TcpAcceptorImp;
33
34 use sys_common;
35
36 /// A structure which represents a TCP stream between a local socket and a
37 /// remote socket.
38 ///
39 /// The socket will be closed when the value is dropped.
40 ///
41 /// # Example
42 ///
43 /// ```no_run
44 /// use std::old_io::TcpStream;
45 ///
46 /// {
47 ///     let mut stream = TcpStream::connect("127.0.0.1:34254");
48 ///
49 ///     // ignore the Result
50 ///     let _ = stream.write(&[1]);
51 ///
52 ///     let mut buf = [0];
53 ///     let _ = stream.read(&mut buf); // ignore here too
54 /// } // the stream is closed here
55 /// ```
56 pub struct TcpStream {
57     inner: TcpStreamImp,
58 }
59
60 impl TcpStream {
61     fn new(s: TcpStreamImp) -> TcpStream {
62         TcpStream { inner: s }
63     }
64
65     /// Open a TCP connection to a remote host.
66     ///
67     /// `addr` is an address of the remote host. Anything which implements `ToSocketAddr`
68     /// trait can be supplied for the address; see this trait documentation for
69     /// concrete examples.
70     pub fn connect<A: ToSocketAddr>(addr: A) -> IoResult<TcpStream> {
71         super::with_addresses(addr, |addr| {
72             TcpStreamImp::connect(addr, None).map(TcpStream::new)
73         })
74     }
75
76     /// Creates a TCP connection to a remote socket address, timing out after
77     /// the specified duration.
78     ///
79     /// This is the same as the `connect` method, except that if the timeout
80     /// specified elapses before a connection is made an error will be
81     /// returned. The error's kind will be `TimedOut`.
82     ///
83     /// Same as the `connect` method, `addr` argument type can be anything which
84     /// implements `ToSocketAddr` trait.
85     ///
86     /// If a `timeout` with zero or negative duration is specified then
87     /// the function returns `Err`, with the error kind set to `TimedOut`.
88     #[unstable(feature = "io",
89                reason = "the timeout argument may eventually change types")]
90     pub fn connect_timeout<A: ToSocketAddr>(addr: A,
91                                             timeout: Duration) -> IoResult<TcpStream> {
92         if timeout <= Duration::milliseconds(0) {
93             return Err(standard_error(TimedOut));
94         }
95
96         super::with_addresses(addr, |addr| {
97             TcpStreamImp::connect(addr, Some(timeout.num_milliseconds() as u64))
98                 .map(TcpStream::new)
99         })
100     }
101
102     /// Returns the socket address of the remote peer of this TCP connection.
103     pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
104         self.inner.peer_name()
105     }
106
107     /// Returns the socket address of the local half of this TCP connection.
108     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
109         self.inner.socket_name()
110     }
111
112     /// Sets the nodelay flag on this connection to the boolean specified
113     #[unstable(feature = "io")]
114     pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
115         self.inner.set_nodelay(nodelay)
116     }
117
118     /// Sets the keepalive timeout to the timeout specified.
119     ///
120     /// If the value specified is `None`, then the keepalive flag is cleared on
121     /// this connection. Otherwise, the keepalive timeout will be set to the
122     /// specified time, in seconds.
123     #[unstable(feature = "io")]
124     pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
125         self.inner.set_keepalive(delay_in_seconds)
126     }
127
128     /// Closes the reading half of this connection.
129     ///
130     /// This method will close the reading portion of this connection, causing
131     /// all pending and future reads to immediately return with an error.
132     ///
133     /// # Example
134     ///
135     /// ```no_run
136     /// # #![allow(unused_must_use)]
137     /// use std::old_io::timer;
138     /// use std::old_io::TcpStream;
139     /// use std::time::Duration;
140     /// use std::thread::Thread;
141     ///
142     /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
143     /// let stream2 = stream.clone();
144     ///
145     /// let _t = Thread::spawn(move|| {
146     ///     // close this stream after one second
147     ///     timer::sleep(Duration::seconds(1));
148     ///     let mut stream = stream2;
149     ///     stream.close_read();
150     /// });
151     ///
152     /// // wait for some data, will get canceled after one second
153     /// let mut buf = [0];
154     /// stream.read(&mut buf);
155     /// ```
156     ///
157     /// Note that this method affects all cloned handles associated with this
158     /// stream, not just this one handle.
159     pub fn close_read(&mut self) -> IoResult<()> {
160         self.inner.close_read()
161     }
162
163     /// Closes the writing half of this connection.
164     ///
165     /// This method will close the writing portion of this connection, causing
166     /// all future writes to immediately return with an error.
167     ///
168     /// Note that this method affects all cloned handles associated with this
169     /// stream, not just this one handle.
170     pub fn close_write(&mut self) -> IoResult<()> {
171         self.inner.close_write()
172     }
173
174     /// Sets a timeout, in milliseconds, for blocking operations on this stream.
175     ///
176     /// This function will set a timeout for all blocking operations (including
177     /// reads and writes) on this stream. The timeout specified is a relative
178     /// time, in milliseconds, into the future after which point operations will
179     /// time out. This means that the timeout must be reset periodically to keep
180     /// it from expiring. Specifying a value of `None` will clear the timeout
181     /// for this stream.
182     ///
183     /// The timeout on this stream is local to this stream only. Setting a
184     /// timeout does not affect any other cloned instances of this stream, nor
185     /// does the timeout propagated to cloned handles of this stream. Setting
186     /// this timeout will override any specific read or write timeouts
187     /// previously set for this stream.
188     ///
189     /// For clarification on the semantics of interrupting a read and a write,
190     /// take a look at `set_read_timeout` and `set_write_timeout`.
191     #[unstable(feature = "io",
192                reason = "the timeout argument may change in type and value")]
193     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
194         self.inner.set_timeout(timeout_ms)
195     }
196
197     /// Sets the timeout for read operations on this stream.
198     ///
199     /// See documentation in `set_timeout` for the semantics of this read time.
200     /// This will overwrite any previous read timeout set through either this
201     /// function or `set_timeout`.
202     ///
203     /// # Errors
204     ///
205     /// When this timeout expires, if there is no pending read operation, no
206     /// action is taken. Otherwise, the read operation will be scheduled to
207     /// promptly return. If a timeout error is returned, then no data was read
208     /// during the timeout period.
209     #[unstable(feature = "io",
210                reason = "the timeout argument may change in type and value")]
211     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
212         self.inner.set_read_timeout(timeout_ms)
213     }
214
215     /// Sets the timeout for write operations on this stream.
216     ///
217     /// See documentation in `set_timeout` for the semantics of this write time.
218     /// This will overwrite any previous write timeout set through either this
219     /// function or `set_timeout`.
220     ///
221     /// # Errors
222     ///
223     /// When this timeout expires, if there is no pending write operation, no
224     /// action is taken. Otherwise, the pending write operation will be
225     /// scheduled to promptly return. The actual state of the underlying stream
226     /// is not specified.
227     ///
228     /// The write operation may return an error of type `ShortWrite` which
229     /// indicates that the object is known to have written an exact number of
230     /// bytes successfully during the timeout period, and the remaining bytes
231     /// were never written.
232     ///
233     /// If the write operation returns `TimedOut`, then it the timeout primitive
234     /// does not know how many bytes were written as part of the timeout
235     /// operation. It may be the case that bytes continue to be written in an
236     /// asynchronous fashion after the call to write returns.
237     #[unstable(feature = "io",
238                reason = "the timeout argument may change in type and value")]
239     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
240         self.inner.set_write_timeout(timeout_ms)
241     }
242 }
243
244 impl Clone for TcpStream {
245     /// Creates a new handle to this TCP stream, allowing for simultaneous reads
246     /// and writes of this connection.
247     ///
248     /// The underlying TCP stream will not be closed until all handles to the
249     /// stream have been deallocated. All handles will also follow the same
250     /// stream, but two concurrent reads will not receive the same data.
251     /// Instead, the first read will receive the first packet received, and the
252     /// second read will receive the second packet.
253     fn clone(&self) -> TcpStream {
254         TcpStream { inner: self.inner.clone() }
255     }
256 }
257
258 impl Reader for TcpStream {
259     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
260         self.inner.read(buf)
261     }
262 }
263
264 impl Writer for TcpStream {
265     fn write_all(&mut self, buf: &[u8]) -> IoResult<()> {
266         self.inner.write(buf)
267     }
268 }
269
270 impl sys_common::AsInner<TcpStreamImp> for TcpStream {
271     fn as_inner(&self) -> &TcpStreamImp {
272         &self.inner
273     }
274 }
275
276 /// A structure representing a socket server. This listener is used to create a
277 /// `TcpAcceptor` which can be used to accept sockets on a local port.
278 ///
279 /// # Examples
280 ///
281 /// ```
282 /// # fn foo() {
283 /// use std::old_io::{TcpListener, TcpStream};
284 /// use std::old_io::{Acceptor, Listener};
285 /// use std::thread::Thread;
286 ///
287 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
288 ///
289 /// // bind the listener to the specified address
290 /// let mut acceptor = listener.listen().unwrap();
291 ///
292 /// fn handle_client(mut stream: TcpStream) {
293 ///     // ...
294 /// # &mut stream; // silence unused mutability/variable warning
295 /// }
296 /// // accept connections and process them, spawning a new tasks for each one
297 /// for stream in acceptor.incoming() {
298 ///     match stream {
299 ///         Err(e) => { /* connection failed */ }
300 ///         Ok(stream) => {
301 ///             Thread::spawn(move|| {
302 ///                 // connection succeeded
303 ///                 handle_client(stream)
304 ///             });
305 ///         }
306 ///     }
307 /// }
308 ///
309 /// // close the socket server
310 /// drop(acceptor);
311 /// # }
312 /// ```
313 pub struct TcpListener {
314     inner: TcpListenerImp,
315 }
316
317 impl TcpListener {
318     /// Creates a new `TcpListener` which will be bound to the specified address.
319     /// This listener is not ready for accepting connections, `listen` must be called
320     /// on it before that's possible.
321     ///
322     /// Binding with a port number of 0 will request that the OS assigns a port
323     /// to this listener. The port allocated can be queried via the
324     /// `socket_name` function.
325     ///
326     /// The address type can be any implementer of `ToSocketAddr` trait. See its
327     /// documentation for concrete examples.
328     pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<TcpListener> {
329         super::with_addresses(addr, |addr| {
330             TcpListenerImp::bind(addr).map(|inner| TcpListener { inner: inner })
331         })
332     }
333
334     /// Returns the local socket address of this listener.
335     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
336         self.inner.socket_name()
337     }
338 }
339
340 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
341     fn listen(self) -> IoResult<TcpAcceptor> {
342         self.inner.listen(128).map(|a| TcpAcceptor { inner: a })
343     }
344 }
345
346 impl sys_common::AsInner<TcpListenerImp> for TcpListener {
347     fn as_inner(&self) -> &TcpListenerImp {
348         &self.inner
349     }
350 }
351
352 /// The accepting half of a TCP socket server. This structure is created through
353 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
354 /// `TcpStream` instances.
355 pub struct TcpAcceptor {
356     inner: TcpAcceptorImp,
357 }
358
359 impl TcpAcceptor {
360     /// Prevents blocking on all future accepts after `ms` milliseconds have
361     /// elapsed.
362     ///
363     /// This function is used to set a deadline after which this acceptor will
364     /// time out accepting any connections. The argument is the relative
365     /// distance, in milliseconds, to a point in the future after which all
366     /// accepts will fail.
367     ///
368     /// If the argument specified is `None`, then any previously registered
369     /// timeout is cleared.
370     ///
371     /// A timeout of `0` can be used to "poll" this acceptor to see if it has
372     /// any pending connections. All pending connections will be accepted,
373     /// regardless of whether the timeout has expired or not (the accept will
374     /// not block in this case).
375     ///
376     /// # Example
377     ///
378     /// ```no_run
379     /// use std::old_io::TcpListener;
380     /// use std::old_io::{Listener, Acceptor, TimedOut};
381     ///
382     /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
383     ///
384     /// // After 100ms have passed, all accepts will fail
385     /// a.set_timeout(Some(100));
386     ///
387     /// match a.accept() {
388     ///     Ok(..) => println!("accepted a socket"),
389     ///     Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
390     ///     Err(e) => println!("err: {}", e),
391     /// }
392     ///
393     /// // Reset the timeout and try again
394     /// a.set_timeout(Some(100));
395     /// let socket = a.accept();
396     ///
397     /// // Clear the timeout and block indefinitely waiting for a connection
398     /// a.set_timeout(None);
399     /// let socket = a.accept();
400     /// ```
401     #[unstable(feature = "io",
402                reason = "the type of the argument and name of this function are \
403                          subject to change")]
404     pub fn set_timeout(&mut self, ms: Option<u64>) { self.inner.set_timeout(ms); }
405
406     /// Closes the accepting capabilities of this acceptor.
407     ///
408     /// This function is similar to `TcpStream`'s `close_{read,write}` methods
409     /// in that it will affect *all* cloned handles of this acceptor's original
410     /// handle.
411     ///
412     /// Once this function succeeds, all future calls to `accept` will return
413     /// immediately with an error, preventing all future calls to accept. The
414     /// underlying socket will not be relinquished back to the OS until all
415     /// acceptors have been deallocated.
416     ///
417     /// This is useful for waking up a thread in an accept loop to indicate that
418     /// it should exit.
419     ///
420     /// # Example
421     ///
422     /// ```
423     /// use std::old_io::{TcpListener, Listener, Acceptor, EndOfFile};
424     /// use std::thread::Thread;
425     ///
426     /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
427     /// let a2 = a.clone();
428     ///
429     /// let _t = Thread::spawn(move|| {
430     ///     let mut a2 = a2;
431     ///     for socket in a2.incoming() {
432     ///         match socket {
433     ///             Ok(s) => { /* handle s */ }
434     ///             Err(ref e) if e.kind == EndOfFile => break, // closed
435     ///             Err(e) => panic!("unexpected error: {}", e),
436     ///         }
437     ///     }
438     /// });
439     ///
440     /// # fn wait_for_sigint() {}
441     /// // Now that our accept loop is running, wait for the program to be
442     /// // requested to exit.
443     /// wait_for_sigint();
444     ///
445     /// // Signal our accept loop to exit
446     /// assert!(a.close_accept().is_ok());
447     /// ```
448     #[unstable(feature = "io")]
449     pub fn close_accept(&mut self) -> IoResult<()> {
450         self.inner.close_accept()
451     }
452 }
453
454 impl Acceptor<TcpStream> for TcpAcceptor {
455     fn accept(&mut self) -> IoResult<TcpStream> {
456         self.inner.accept().map(TcpStream::new)
457     }
458 }
459
460 impl Clone for TcpAcceptor {
461     /// Creates a new handle to this TCP acceptor, allowing for simultaneous
462     /// accepts.
463     ///
464     /// The underlying TCP acceptor will not be closed until all handles to the
465     /// acceptor have been deallocated. Incoming connections will be received on
466     /// at most once acceptor, the same connection will not be accepted twice.
467     ///
468     /// The `close_accept` method will shut down *all* acceptors cloned from the
469     /// same original acceptor, whereas the `set_timeout` method only affects
470     /// the selector that it is called on.
471     ///
472     /// This function is useful for creating a handle to invoke `close_accept`
473     /// on to wake up any other task blocked in `accept`.
474     fn clone(&self) -> TcpAcceptor {
475         TcpAcceptor { inner: self.inner.clone() }
476     }
477 }
478
479 impl sys_common::AsInner<TcpAcceptorImp> for TcpAcceptor {
480     fn as_inner(&self) -> &TcpAcceptorImp {
481         &self.inner
482     }
483 }
484
485 #[cfg(test)]
486 mod test {
487     use prelude::v1::*;
488
489     use sync::mpsc::channel;
490     use thread::Thread;
491     use old_io::net::tcp::*;
492     use old_io::net::ip::*;
493     use old_io::test::*;
494     use old_io::{EndOfFile, TimedOut, ShortWrite, IoError};
495     use old_io::{ConnectionRefused, BrokenPipe, ConnectionAborted};
496     use old_io::{ConnectionReset, NotConnected, PermissionDenied, OtherIoError};
497     use old_io::{Acceptor, Listener};
498
499     // FIXME #11530 this fails on android because tests are run as root
500     #[cfg_attr(any(windows, target_os = "android"), ignore)]
501     #[test]
502     fn bind_error() {
503         match TcpListener::bind("0.0.0.0:1") {
504             Ok(..) => panic!(),
505             Err(e) => assert_eq!(e.kind, PermissionDenied),
506         }
507     }
508
509     #[test]
510     fn connect_error() {
511         match TcpStream::connect("0.0.0.0:1") {
512             Ok(..) => panic!(),
513             Err(e) => assert_eq!(e.kind, ConnectionRefused),
514         }
515     }
516
517     #[test]
518     fn listen_ip4_localhost() {
519         let socket_addr = next_test_ip4();
520         let listener = TcpListener::bind(socket_addr);
521         let mut acceptor = listener.listen();
522
523         let _t = Thread::spawn(move|| {
524             let mut stream = TcpStream::connect(("localhost", socket_addr.port));
525             stream.write(&[144]).unwrap();
526         });
527
528         let mut stream = acceptor.accept();
529         let mut buf = [0];
530         stream.read(&mut buf).unwrap();
531         assert!(buf[0] == 144);
532     }
533
534     #[test]
535     fn connect_localhost() {
536         let addr = next_test_ip4();
537         let mut acceptor = TcpListener::bind(addr).listen();
538
539         let _t = Thread::spawn(move|| {
540             let mut stream = TcpStream::connect(("localhost", addr.port));
541             stream.write(&[64]).unwrap();
542         });
543
544         let mut stream = acceptor.accept();
545         let mut buf = [0];
546         stream.read(&mut buf).unwrap();
547         assert!(buf[0] == 64);
548     }
549
550     #[test]
551     fn connect_ip4_loopback() {
552         let addr = next_test_ip4();
553         let mut acceptor = TcpListener::bind(addr).listen();
554
555         let _t = Thread::spawn(move|| {
556             let mut stream = TcpStream::connect(("127.0.0.1", addr.port));
557             stream.write(&[44]).unwrap();
558         });
559
560         let mut stream = acceptor.accept();
561         let mut buf = [0];
562         stream.read(&mut buf).unwrap();
563         assert!(buf[0] == 44);
564     }
565
566     #[test]
567     fn connect_ip6_loopback() {
568         let addr = next_test_ip6();
569         let mut acceptor = TcpListener::bind(addr).listen();
570
571         let _t = Thread::spawn(move|| {
572             let mut stream = TcpStream::connect(("::1", addr.port));
573             stream.write(&[66]).unwrap();
574         });
575
576         let mut stream = acceptor.accept();
577         let mut buf = [0];
578         stream.read(&mut buf).unwrap();
579         assert!(buf[0] == 66);
580     }
581
582     #[test]
583     fn smoke_test_ip4() {
584         let addr = next_test_ip4();
585         let mut acceptor = TcpListener::bind(addr).listen();
586
587         let _t = Thread::spawn(move|| {
588             let mut stream = TcpStream::connect(addr);
589             stream.write(&[99]).unwrap();
590         });
591
592         let mut stream = acceptor.accept();
593         let mut buf = [0];
594         stream.read(&mut buf).unwrap();
595         assert!(buf[0] == 99);
596     }
597
598     #[test]
599     fn smoke_test_ip6() {
600         let addr = next_test_ip6();
601         let mut acceptor = TcpListener::bind(addr).listen();
602
603         let _t = Thread::spawn(move|| {
604             let mut stream = TcpStream::connect(addr);
605             stream.write(&[99]).unwrap();
606         });
607
608         let mut stream = acceptor.accept();
609         let mut buf = [0];
610         stream.read(&mut buf).unwrap();
611         assert!(buf[0] == 99);
612     }
613
614     #[test]
615     fn read_eof_ip4() {
616         let addr = next_test_ip4();
617         let mut acceptor = TcpListener::bind(addr).listen();
618
619         let _t = Thread::spawn(move|| {
620             let _stream = TcpStream::connect(addr);
621             // Close
622         });
623
624         let mut stream = acceptor.accept();
625         let mut buf = [0];
626         let nread = stream.read(&mut buf);
627         assert!(nread.is_err());
628     }
629
630     #[test]
631     fn read_eof_ip6() {
632         let addr = next_test_ip6();
633         let mut acceptor = TcpListener::bind(addr).listen();
634
635         let _t = Thread::spawn(move|| {
636             let _stream = TcpStream::connect(addr);
637             // Close
638         });
639
640         let mut stream = acceptor.accept();
641         let mut buf = [0];
642         let nread = stream.read(&mut buf);
643         assert!(nread.is_err());
644     }
645
646     #[test]
647     fn read_eof_twice_ip4() {
648         let addr = next_test_ip4();
649         let mut acceptor = TcpListener::bind(addr).listen();
650
651         let _t = Thread::spawn(move|| {
652             let _stream = TcpStream::connect(addr);
653             // Close
654         });
655
656         let mut stream = acceptor.accept();
657         let mut buf = [0];
658         let nread = stream.read(&mut buf);
659         assert!(nread.is_err());
660
661         match stream.read(&mut buf) {
662             Ok(..) => panic!(),
663             Err(ref e) => {
664                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
665                         "unknown kind: {:?}", e.kind);
666             }
667         }
668     }
669
670     #[test]
671     fn read_eof_twice_ip6() {
672         let addr = next_test_ip6();
673         let mut acceptor = TcpListener::bind(addr).listen();
674
675         let _t = Thread::spawn(move|| {
676             let _stream = TcpStream::connect(addr);
677             // Close
678         });
679
680         let mut stream = acceptor.accept();
681         let mut buf = [0];
682         let nread = stream.read(&mut buf);
683         assert!(nread.is_err());
684
685         match stream.read(&mut buf) {
686             Ok(..) => panic!(),
687             Err(ref e) => {
688                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
689                         "unknown kind: {:?}", e.kind);
690             }
691         }
692     }
693
694     #[test]
695     fn write_close_ip4() {
696         let addr = next_test_ip4();
697         let mut acceptor = TcpListener::bind(addr).listen();
698
699         let (tx, rx) = channel();
700         let _t = Thread::spawn(move|| {
701             drop(TcpStream::connect(addr));
702             tx.send(()).unwrap();
703         });
704
705         let mut stream = acceptor.accept();
706         rx.recv().unwrap();
707         let buf = [0];
708         match stream.write(&buf) {
709             Ok(..) => {}
710             Err(e) => {
711                 assert!(e.kind == ConnectionReset ||
712                         e.kind == BrokenPipe ||
713                         e.kind == ConnectionAborted,
714                         "unknown error: {}", e);
715             }
716         }
717     }
718
719     #[test]
720     fn write_close_ip6() {
721         let addr = next_test_ip6();
722         let mut acceptor = TcpListener::bind(addr).listen();
723
724         let (tx, rx) = channel();
725         let _t = Thread::spawn(move|| {
726             drop(TcpStream::connect(addr));
727             tx.send(()).unwrap();
728         });
729
730         let mut stream = acceptor.accept();
731         rx.recv().unwrap();
732         let buf = [0];
733         match stream.write(&buf) {
734             Ok(..) => {}
735             Err(e) => {
736                 assert!(e.kind == ConnectionReset ||
737                         e.kind == BrokenPipe ||
738                         e.kind == ConnectionAborted,
739                         "unknown error: {}", e);
740             }
741         }
742     }
743
744     #[test]
745     fn multiple_connect_serial_ip4() {
746         let addr = next_test_ip4();
747         let max = 10u;
748         let mut acceptor = TcpListener::bind(addr).listen();
749
750         let _t = Thread::spawn(move|| {
751             for _ in 0..max {
752                 let mut stream = TcpStream::connect(addr);
753                 stream.write(&[99]).unwrap();
754             }
755         });
756
757         for ref mut stream in acceptor.incoming().take(max) {
758             let mut buf = [0];
759             stream.read(&mut buf).unwrap();
760             assert_eq!(buf[0], 99);
761         }
762     }
763
764     #[test]
765     fn multiple_connect_serial_ip6() {
766         let addr = next_test_ip6();
767         let max = 10u;
768         let mut acceptor = TcpListener::bind(addr).listen();
769
770         let _t = Thread::spawn(move|| {
771             for _ in 0..max {
772                 let mut stream = TcpStream::connect(addr);
773                 stream.write(&[99]).unwrap();
774             }
775         });
776
777         for ref mut stream in acceptor.incoming().take(max) {
778             let mut buf = [0];
779             stream.read(&mut buf).unwrap();
780             assert_eq!(buf[0], 99);
781         }
782     }
783
784     #[test]
785     fn multiple_connect_interleaved_greedy_schedule_ip4() {
786         let addr = next_test_ip4();
787         static MAX: int = 10;
788         let acceptor = TcpListener::bind(addr).listen();
789
790         let _t = Thread::spawn(move|| {
791             let mut acceptor = acceptor;
792             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
793                 // Start another task to handle the connection
794                 let _t = Thread::spawn(move|| {
795                     let mut stream = stream;
796                     let mut buf = [0];
797                     stream.read(&mut buf).unwrap();
798                     assert!(buf[0] == i as u8);
799                     debug!("read");
800                 });
801             }
802         });
803
804         connect(0, addr);
805
806         fn connect(i: int, addr: SocketAddr) {
807             if i == MAX { return }
808
809             let _t = Thread::spawn(move|| {
810                 debug!("connecting");
811                 let mut stream = TcpStream::connect(addr);
812                 // Connect again before writing
813                 connect(i + 1, addr);
814                 debug!("writing");
815                 stream.write(&[i as u8]).unwrap();
816             });
817         }
818     }
819
820     #[test]
821     fn multiple_connect_interleaved_greedy_schedule_ip6() {
822         let addr = next_test_ip6();
823         static MAX: int = 10;
824         let acceptor = TcpListener::bind(addr).listen();
825
826         let _t = Thread::spawn(move|| {
827             let mut acceptor = acceptor;
828             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
829                 // Start another task to handle the connection
830                 let _t = Thread::spawn(move|| {
831                     let mut stream = stream;
832                     let mut buf = [0];
833                     stream.read(&mut buf).unwrap();
834                     assert!(buf[0] == i as u8);
835                     debug!("read");
836                 });
837             }
838         });
839
840         connect(0, addr);
841
842         fn connect(i: int, addr: SocketAddr) {
843             if i == MAX { return }
844
845             let _t = Thread::spawn(move|| {
846                 debug!("connecting");
847                 let mut stream = TcpStream::connect(addr);
848                 // Connect again before writing
849                 connect(i + 1, addr);
850                 debug!("writing");
851                 stream.write(&[i as u8]).unwrap();
852             });
853         }
854     }
855
856     #[test]
857     fn multiple_connect_interleaved_lazy_schedule_ip4() {
858         static MAX: int = 10;
859         let addr = next_test_ip4();
860         let acceptor = TcpListener::bind(addr).listen();
861
862         let _t = Thread::spawn(move|| {
863             let mut acceptor = acceptor;
864             for stream in acceptor.incoming().take(MAX as uint) {
865                 // Start another task to handle the connection
866                 let _t = Thread::spawn(move|| {
867                     let mut stream = stream;
868                     let mut buf = [0];
869                     stream.read(&mut buf).unwrap();
870                     assert!(buf[0] == 99);
871                     debug!("read");
872                 });
873             }
874         });
875
876         connect(0, addr);
877
878         fn connect(i: int, addr: SocketAddr) {
879             if i == MAX { return }
880
881             let _t = Thread::spawn(move|| {
882                 debug!("connecting");
883                 let mut stream = TcpStream::connect(addr);
884                 // Connect again before writing
885                 connect(i + 1, addr);
886                 debug!("writing");
887                 stream.write(&[99]).unwrap();
888             });
889         }
890     }
891
892     #[test]
893     fn multiple_connect_interleaved_lazy_schedule_ip6() {
894         static MAX: int = 10;
895         let addr = next_test_ip6();
896         let acceptor = TcpListener::bind(addr).listen();
897
898         let _t = Thread::spawn(move|| {
899             let mut acceptor = acceptor;
900             for stream in acceptor.incoming().take(MAX as uint) {
901                 // Start another task to handle the connection
902                 let _t = Thread::spawn(move|| {
903                     let mut stream = stream;
904                     let mut buf = [0];
905                     stream.read(&mut buf).unwrap();
906                     assert!(buf[0] == 99);
907                     debug!("read");
908                 });
909             }
910         });
911
912         connect(0, addr);
913
914         fn connect(i: int, addr: SocketAddr) {
915             if i == MAX { return }
916
917             let _t = Thread::spawn(move|| {
918                 debug!("connecting");
919                 let mut stream = TcpStream::connect(addr);
920                 // Connect again before writing
921                 connect(i + 1, addr);
922                 debug!("writing");
923                 stream.write(&[99]).unwrap();
924             });
925         }
926     }
927
928     pub fn socket_name(addr: SocketAddr) {
929         let mut listener = TcpListener::bind(addr).unwrap();
930
931         // Make sure socket_name gives
932         // us the socket we binded to.
933         let so_name = listener.socket_name();
934         assert!(so_name.is_ok());
935         assert_eq!(addr, so_name.unwrap());
936     }
937
938     pub fn peer_name(addr: SocketAddr) {
939         let acceptor = TcpListener::bind(addr).listen();
940         let _t = Thread::spawn(move|| {
941             let mut acceptor = acceptor;
942             acceptor.accept().unwrap();
943         });
944
945         let stream = TcpStream::connect(addr);
946
947         assert!(stream.is_ok());
948         let mut stream = stream.unwrap();
949
950         // Make sure peer_name gives us the
951         // address/port of the peer we've
952         // connected to.
953         let peer_name = stream.peer_name();
954         assert!(peer_name.is_ok());
955         assert_eq!(addr, peer_name.unwrap());
956     }
957
958     #[test]
959     fn socket_and_peer_name_ip4() {
960         peer_name(next_test_ip4());
961         socket_name(next_test_ip4());
962     }
963
964     #[test]
965     fn socket_and_peer_name_ip6() {
966         // FIXME: peer name is not consistent
967         //peer_name(next_test_ip6());
968         socket_name(next_test_ip6());
969     }
970
971     #[test]
972     fn partial_read() {
973         let addr = next_test_ip4();
974         let (tx, rx) = channel();
975         let _t = Thread::spawn(move|| {
976             let mut srv = TcpListener::bind(addr).listen().unwrap();
977             tx.send(()).unwrap();
978             let mut cl = srv.accept().unwrap();
979             cl.write(&[10]).unwrap();
980             let mut b = [0];
981             cl.read(&mut b).unwrap();
982             tx.send(()).unwrap();
983         });
984
985         rx.recv().unwrap();
986         let mut c = TcpStream::connect(addr).unwrap();
987         let mut b = [0; 10];
988         assert_eq!(c.read(&mut b), Ok(1));
989         c.write(&[1]).unwrap();
990         rx.recv().unwrap();
991     }
992
993     #[test]
994     fn double_bind() {
995         let addr = next_test_ip4();
996         let listener = TcpListener::bind(addr).unwrap().listen();
997         assert!(listener.is_ok());
998         match TcpListener::bind(addr).listen() {
999             Ok(..) => panic!(),
1000             Err(e) => {
1001                 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
1002                         "unknown error: {} {:?}", e, e.kind);
1003             }
1004         }
1005     }
1006
1007     #[test]
1008     fn fast_rebind() {
1009         let addr = next_test_ip4();
1010         let (tx, rx) = channel();
1011
1012         let _t = Thread::spawn(move|| {
1013             rx.recv().unwrap();
1014             let _stream = TcpStream::connect(addr).unwrap();
1015             // Close
1016             rx.recv().unwrap();
1017         });
1018
1019         {
1020             let mut acceptor = TcpListener::bind(addr).listen();
1021             tx.send(()).unwrap();
1022             {
1023                 let _stream = acceptor.accept().unwrap();
1024                 // Close client
1025                 tx.send(()).unwrap();
1026             }
1027             // Close listener
1028         }
1029         let _listener = TcpListener::bind(addr);
1030     }
1031
1032     #[test]
1033     fn tcp_clone_smoke() {
1034         let addr = next_test_ip4();
1035         let mut acceptor = TcpListener::bind(addr).listen();
1036
1037         let _t = Thread::spawn(move|| {
1038             let mut s = TcpStream::connect(addr);
1039             let mut buf = [0, 0];
1040             assert_eq!(s.read(&mut buf), Ok(1));
1041             assert_eq!(buf[0], 1);
1042             s.write(&[2]).unwrap();
1043         });
1044
1045         let mut s1 = acceptor.accept().unwrap();
1046         let s2 = s1.clone();
1047
1048         let (tx1, rx1) = channel();
1049         let (tx2, rx2) = channel();
1050         let _t = Thread::spawn(move|| {
1051             let mut s2 = s2;
1052             rx1.recv().unwrap();
1053             s2.write(&[1]).unwrap();
1054             tx2.send(()).unwrap();
1055         });
1056         tx1.send(()).unwrap();
1057         let mut buf = [0, 0];
1058         assert_eq!(s1.read(&mut buf), Ok(1));
1059         rx2.recv().unwrap();
1060     }
1061
1062     #[test]
1063     fn tcp_clone_two_read() {
1064         let addr = next_test_ip6();
1065         let mut acceptor = TcpListener::bind(addr).listen();
1066         let (tx1, rx) = channel();
1067         let tx2 = tx1.clone();
1068
1069         let _t = Thread::spawn(move|| {
1070             let mut s = TcpStream::connect(addr);
1071             s.write(&[1]).unwrap();
1072             rx.recv().unwrap();
1073             s.write(&[2]).unwrap();
1074             rx.recv().unwrap();
1075         });
1076
1077         let mut s1 = acceptor.accept().unwrap();
1078         let s2 = s1.clone();
1079
1080         let (done, rx) = channel();
1081         let _t = Thread::spawn(move|| {
1082             let mut s2 = s2;
1083             let mut buf = [0, 0];
1084             s2.read(&mut buf).unwrap();
1085             tx2.send(()).unwrap();
1086             done.send(()).unwrap();
1087         });
1088         let mut buf = [0, 0];
1089         s1.read(&mut buf).unwrap();
1090         tx1.send(()).unwrap();
1091
1092         rx.recv().unwrap();
1093     }
1094
1095     #[test]
1096     fn tcp_clone_two_write() {
1097         let addr = next_test_ip4();
1098         let mut acceptor = TcpListener::bind(addr).listen();
1099
1100         let _t = Thread::spawn(move|| {
1101             let mut s = TcpStream::connect(addr);
1102             let mut buf = [0, 1];
1103             s.read(&mut buf).unwrap();
1104             s.read(&mut buf).unwrap();
1105         });
1106
1107         let mut s1 = acceptor.accept().unwrap();
1108         let s2 = s1.clone();
1109
1110         let (done, rx) = channel();
1111         let _t = Thread::spawn(move|| {
1112             let mut s2 = s2;
1113             s2.write(&[1]).unwrap();
1114             done.send(()).unwrap();
1115         });
1116         s1.write(&[2]).unwrap();
1117
1118         rx.recv().unwrap();
1119     }
1120
1121     #[test]
1122     fn shutdown_smoke() {
1123         let addr = next_test_ip4();
1124         let a = TcpListener::bind(addr).unwrap().listen();
1125         let _t = Thread::spawn(move|| {
1126             let mut a = a;
1127             let mut c = a.accept().unwrap();
1128             assert_eq!(c.read_to_end(), Ok(vec!()));
1129             c.write(&[1]).unwrap();
1130         });
1131
1132         let mut s = TcpStream::connect(addr).unwrap();
1133         assert!(s.inner.close_write().is_ok());
1134         assert!(s.write(&[1]).is_err());
1135         assert_eq!(s.read_to_end(), Ok(vec!(1)));
1136     }
1137
1138     #[test]
1139     fn accept_timeout() {
1140         let addr = next_test_ip4();
1141         let mut a = TcpListener::bind(addr).unwrap().listen().unwrap();
1142
1143         a.set_timeout(Some(10));
1144
1145         // Make sure we time out once and future invocations also time out
1146         let err = a.accept().err().unwrap();
1147         assert_eq!(err.kind, TimedOut);
1148         let err = a.accept().err().unwrap();
1149         assert_eq!(err.kind, TimedOut);
1150
1151         // Also make sure that even though the timeout is expired that we will
1152         // continue to receive any pending connections.
1153         //
1154         // FIXME: freebsd apparently never sees the pending connection, but
1155         //        testing manually always works. Need to investigate this
1156         //        flakiness.
1157         if !cfg!(target_os = "freebsd") {
1158             let (tx, rx) = channel();
1159             let _t = Thread::spawn(move|| {
1160                 tx.send(TcpStream::connect(addr).unwrap()).unwrap();
1161             });
1162             let _l = rx.recv().unwrap();
1163             for i in 0i32..1001 {
1164                 match a.accept() {
1165                     Ok(..) => break,
1166                     Err(ref e) if e.kind == TimedOut => {}
1167                     Err(e) => panic!("error: {}", e),
1168                 }
1169                 ::thread::Thread::yield_now();
1170                 if i == 1000 { panic!("should have a pending connection") }
1171             }
1172         }
1173
1174         // Unset the timeout and make sure that this always blocks.
1175         a.set_timeout(None);
1176         let _t = Thread::spawn(move|| {
1177             drop(TcpStream::connect(addr).unwrap());
1178         });
1179         a.accept().unwrap();
1180     }
1181
1182     #[test]
1183     fn close_readwrite_smoke() {
1184         let addr = next_test_ip4();
1185         let a = TcpListener::bind(addr).listen().unwrap();
1186         let (_tx, rx) = channel::<()>();
1187         Thread::spawn(move|| {
1188             let mut a = a;
1189             let _s = a.accept().unwrap();
1190             let _ = rx.recv().unwrap();
1191         });
1192
1193         let mut b = [0];
1194         let mut s = TcpStream::connect(addr).unwrap();
1195         let mut s2 = s.clone();
1196
1197         // closing should prevent reads/writes
1198         s.close_write().unwrap();
1199         assert!(s.write(&[0]).is_err());
1200         s.close_read().unwrap();
1201         assert!(s.read(&mut b).is_err());
1202
1203         // closing should affect previous handles
1204         assert!(s2.write(&[0]).is_err());
1205         assert!(s2.read(&mut b).is_err());
1206
1207         // closing should affect new handles
1208         let mut s3 = s.clone();
1209         assert!(s3.write(&[0]).is_err());
1210         assert!(s3.read(&mut b).is_err());
1211
1212         // make sure these don't die
1213         let _ = s2.close_read();
1214         let _ = s2.close_write();
1215         let _ = s3.close_read();
1216         let _ = s3.close_write();
1217     }
1218
1219     #[test]
1220     fn close_read_wakes_up() {
1221         let addr = next_test_ip4();
1222         let a = TcpListener::bind(addr).listen().unwrap();
1223         let (_tx, rx) = channel::<()>();
1224         Thread::spawn(move|| {
1225             let mut a = a;
1226             let _s = a.accept().unwrap();
1227             let _ = rx.recv().unwrap();
1228         });
1229
1230         let mut s = TcpStream::connect(addr).unwrap();
1231         let s2 = s.clone();
1232         let (tx, rx) = channel();
1233         let _t = Thread::spawn(move|| {
1234             let mut s2 = s2;
1235             assert!(s2.read(&mut [0]).is_err());
1236             tx.send(()).unwrap();
1237         });
1238         // this should wake up the child task
1239         s.close_read().unwrap();
1240
1241         // this test will never finish if the child doesn't wake up
1242         rx.recv().unwrap();
1243     }
1244
1245     #[test]
1246     fn readwrite_timeouts() {
1247         let addr = next_test_ip6();
1248         let mut a = TcpListener::bind(addr).listen().unwrap();
1249         let (tx, rx) = channel::<()>();
1250         Thread::spawn(move|| {
1251             let mut s = TcpStream::connect(addr).unwrap();
1252             rx.recv().unwrap();
1253             assert!(s.write(&[0]).is_ok());
1254             let _ = rx.recv();
1255         });
1256
1257         let mut s = a.accept().unwrap();
1258         s.set_timeout(Some(20));
1259         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1260         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1261
1262         s.set_timeout(Some(20));
1263         for i in 0i32..1001 {
1264             match s.write(&[0; 128 * 1024]) {
1265                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1266                 Err(IoError { kind: TimedOut, .. }) => break,
1267                 Err(e) => panic!("{}", e),
1268            }
1269            if i == 1000 { panic!("should have filled up?!"); }
1270         }
1271         assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
1272
1273         tx.send(()).unwrap();
1274         s.set_timeout(None);
1275         assert_eq!(s.read(&mut [0, 0]), Ok(1));
1276     }
1277
1278     #[test]
1279     fn read_timeouts() {
1280         let addr = next_test_ip6();
1281         let mut a = TcpListener::bind(addr).listen().unwrap();
1282         let (tx, rx) = channel::<()>();
1283         Thread::spawn(move|| {
1284             let mut s = TcpStream::connect(addr).unwrap();
1285             rx.recv().unwrap();
1286             let mut amt = 0;
1287             while amt < 100 * 128 * 1024 {
1288                 match s.read(&mut [0;128 * 1024]) {
1289                     Ok(n) => { amt += n; }
1290                     Err(e) => panic!("{}", e),
1291                 }
1292             }
1293             let _ = rx.recv();
1294         });
1295
1296         let mut s = a.accept().unwrap();
1297         s.set_read_timeout(Some(20));
1298         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1299         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1300
1301         tx.send(()).unwrap();
1302         for _ in 0..100 {
1303             assert!(s.write(&[0;128 * 1024]).is_ok());
1304         }
1305     }
1306
1307     #[test]
1308     fn write_timeouts() {
1309         let addr = next_test_ip6();
1310         let mut a = TcpListener::bind(addr).listen().unwrap();
1311         let (tx, rx) = channel::<()>();
1312         Thread::spawn(move|| {
1313             let mut s = TcpStream::connect(addr).unwrap();
1314             rx.recv().unwrap();
1315             assert!(s.write(&[0]).is_ok());
1316             let _ = rx.recv();
1317         });
1318
1319         let mut s = a.accept().unwrap();
1320         s.set_write_timeout(Some(20));
1321         for i in 0i32..1001 {
1322             match s.write(&[0; 128 * 1024]) {
1323                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1324                 Err(IoError { kind: TimedOut, .. }) => break,
1325                 Err(e) => panic!("{}", e),
1326            }
1327            if i == 1000 { panic!("should have filled up?!"); }
1328         }
1329         assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
1330
1331         tx.send(()).unwrap();
1332         assert!(s.read(&mut [0]).is_ok());
1333     }
1334
1335     #[test]
1336     fn timeout_concurrent_read() {
1337         let addr = next_test_ip6();
1338         let mut a = TcpListener::bind(addr).listen().unwrap();
1339         let (tx, rx) = channel::<()>();
1340         Thread::spawn(move|| {
1341             let mut s = TcpStream::connect(addr).unwrap();
1342             rx.recv().unwrap();
1343             assert_eq!(s.write(&[0]), Ok(()));
1344             let _ = rx.recv();
1345         });
1346
1347         let mut s = a.accept().unwrap();
1348         let s2 = s.clone();
1349         let (tx2, rx2) = channel();
1350         let _t = Thread::spawn(move|| {
1351             let mut s2 = s2;
1352             assert_eq!(s2.read(&mut [0]), Ok(1));
1353             tx2.send(()).unwrap();
1354         });
1355
1356         s.set_read_timeout(Some(20));
1357         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1358         tx.send(()).unwrap();
1359
1360         rx2.recv().unwrap();
1361     }
1362
1363     #[test]
1364     fn clone_while_reading() {
1365         let addr = next_test_ip6();
1366         let listen = TcpListener::bind(addr);
1367         let mut accept = listen.listen().unwrap();
1368
1369         // Enqueue a task to write to a socket
1370         let (tx, rx) = channel();
1371         let (txdone, rxdone) = channel();
1372         let txdone2 = txdone.clone();
1373         let _t = Thread::spawn(move|| {
1374             let mut tcp = TcpStream::connect(addr).unwrap();
1375             rx.recv().unwrap();
1376             tcp.write_u8(0).unwrap();
1377             txdone2.send(()).unwrap();
1378         });
1379
1380         // Spawn off a reading clone
1381         let tcp = accept.accept().unwrap();
1382         let tcp2 = tcp.clone();
1383         let txdone3 = txdone.clone();
1384         let _t = Thread::spawn(move|| {
1385             let mut tcp2 = tcp2;
1386             tcp2.read_u8().unwrap();
1387             txdone3.send(()).unwrap();
1388         });
1389
1390         // Try to ensure that the reading clone is indeed reading
1391         for _ in 0..50 {
1392             ::thread::Thread::yield_now();
1393         }
1394
1395         // clone the handle again while it's reading, then let it finish the
1396         // read.
1397         let _ = tcp.clone();
1398         tx.send(()).unwrap();
1399         rxdone.recv().unwrap();
1400         rxdone.recv().unwrap();
1401     }
1402
1403     #[test]
1404     fn clone_accept_smoke() {
1405         let addr = next_test_ip4();
1406         let l = TcpListener::bind(addr);
1407         let mut a = l.listen().unwrap();
1408         let mut a2 = a.clone();
1409
1410         let _t = Thread::spawn(move|| {
1411             let _ = TcpStream::connect(addr);
1412         });
1413         let _t = Thread::spawn(move|| {
1414             let _ = TcpStream::connect(addr);
1415         });
1416
1417         assert!(a.accept().is_ok());
1418         assert!(a2.accept().is_ok());
1419     }
1420
1421     #[test]
1422     fn clone_accept_concurrent() {
1423         let addr = next_test_ip4();
1424         let l = TcpListener::bind(addr);
1425         let a = l.listen().unwrap();
1426         let a2 = a.clone();
1427
1428         let (tx, rx) = channel();
1429         let tx2 = tx.clone();
1430
1431         let _t = Thread::spawn(move|| {
1432             let mut a = a;
1433             tx.send(a.accept()).unwrap();
1434         });
1435         let _t = Thread::spawn(move|| {
1436             let mut a = a2;
1437             tx2.send(a.accept()).unwrap();
1438         });
1439
1440         let _t = Thread::spawn(move|| {
1441             let _ = TcpStream::connect(addr);
1442         });
1443         let _t = Thread::spawn(move|| {
1444             let _ = TcpStream::connect(addr);
1445         });
1446
1447         assert!(rx.recv().unwrap().is_ok());
1448         assert!(rx.recv().unwrap().is_ok());
1449     }
1450
1451     #[test]
1452     fn close_accept_smoke() {
1453         let addr = next_test_ip4();
1454         let l = TcpListener::bind(addr);
1455         let mut a = l.listen().unwrap();
1456
1457         a.close_accept().unwrap();
1458         assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
1459     }
1460
1461     #[test]
1462     fn close_accept_concurrent() {
1463         let addr = next_test_ip4();
1464         let l = TcpListener::bind(addr);
1465         let a = l.listen().unwrap();
1466         let mut a2 = a.clone();
1467
1468         let (tx, rx) = channel();
1469         let _t = Thread::spawn(move|| {
1470             let mut a = a;
1471             tx.send(a.accept()).unwrap();
1472         });
1473         a2.close_accept().unwrap();
1474
1475         assert_eq!(rx.recv().unwrap().err().unwrap().kind, EndOfFile);
1476     }
1477 }