]> git.lizzy.rs Git - rust.git/blob - src/libstd/old_io/net/tcp.rs
std: Rename Writer::write to Writer::write_all
[rust.git] / src / libstd / old_io / net / tcp.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! TCP network connections
12 //!
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
16 //!
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
19
20 use clone::Clone;
21 use old_io::IoResult;
22 use result::Result::Err;
23 use old_io::net::ip::{SocketAddr, ToSocketAddr};
24 use old_io::{Reader, Writer, Listener, Acceptor};
25 use old_io::{standard_error, TimedOut};
26 use option::Option;
27 use option::Option::{None, Some};
28 use time::Duration;
29
30 use sys::tcp::TcpStream as TcpStreamImp;
31 use sys::tcp::TcpListener as TcpListenerImp;
32 use sys::tcp::TcpAcceptor as TcpAcceptorImp;
33
34 use sys_common;
35
36 /// A structure which represents a TCP stream between a local socket and a
37 /// remote socket.
38 ///
39 /// The socket will be closed when the value is dropped.
40 ///
41 /// # Example
42 ///
43 /// ```no_run
44 /// use std::old_io::TcpStream;
45 ///
46 /// {
47 ///     let mut stream = TcpStream::connect("127.0.0.1:34254");
48 ///
49 ///     // ignore the Result
50 ///     let _ = stream.write(&[1]);
51 ///
52 ///     let mut buf = [0];
53 ///     let _ = stream.read(&mut buf); // ignore here too
54 /// } // the stream is closed here
55 /// ```
56 pub struct TcpStream {
57     inner: TcpStreamImp,
58 }
59
60 impl TcpStream {
61     fn new(s: TcpStreamImp) -> TcpStream {
62         TcpStream { inner: s }
63     }
64
65     /// Open a TCP connection to a remote host.
66     ///
67     /// `addr` is an address of the remote host. Anything which implements `ToSocketAddr`
68     /// trait can be supplied for the address; see this trait documentation for
69     /// concrete examples.
70     pub fn connect<A: ToSocketAddr>(addr: A) -> IoResult<TcpStream> {
71         super::with_addresses(addr, |addr| {
72             TcpStreamImp::connect(addr, None).map(TcpStream::new)
73         })
74     }
75
76     /// Creates a TCP connection to a remote socket address, timing out after
77     /// the specified duration.
78     ///
79     /// This is the same as the `connect` method, except that if the timeout
80     /// specified elapses before a connection is made an error will be
81     /// returned. The error's kind will be `TimedOut`.
82     ///
83     /// Same as the `connect` method, `addr` argument type can be anything which
84     /// implements `ToSocketAddr` trait.
85     ///
86     /// If a `timeout` with zero or negative duration is specified then
87     /// the function returns `Err`, with the error kind set to `TimedOut`.
88     #[unstable = "the timeout argument may eventually change types"]
89     pub fn connect_timeout<A: ToSocketAddr>(addr: A,
90                                             timeout: Duration) -> IoResult<TcpStream> {
91         if timeout <= Duration::milliseconds(0) {
92             return Err(standard_error(TimedOut));
93         }
94
95         super::with_addresses(addr, |addr| {
96             TcpStreamImp::connect(addr, Some(timeout.num_milliseconds() as u64))
97                 .map(TcpStream::new)
98         })
99     }
100
101     /// Returns the socket address of the remote peer of this TCP connection.
102     pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
103         self.inner.peer_name()
104     }
105
106     /// Returns the socket address of the local half of this TCP connection.
107     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
108         self.inner.socket_name()
109     }
110
111     /// Sets the nodelay flag on this connection to the boolean specified
112     #[unstable]
113     pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
114         self.inner.set_nodelay(nodelay)
115     }
116
117     /// Sets the keepalive timeout to the timeout specified.
118     ///
119     /// If the value specified is `None`, then the keepalive flag is cleared on
120     /// this connection. Otherwise, the keepalive timeout will be set to the
121     /// specified time, in seconds.
122     #[unstable]
123     pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
124         self.inner.set_keepalive(delay_in_seconds)
125     }
126
127     /// Closes the reading half of this connection.
128     ///
129     /// This method will close the reading portion of this connection, causing
130     /// all pending and future reads to immediately return with an error.
131     ///
132     /// # Example
133     ///
134     /// ```no_run
135     /// # #![allow(unused_must_use)]
136     /// use std::old_io::timer;
137     /// use std::old_io::TcpStream;
138     /// use std::time::Duration;
139     /// use std::thread::Thread;
140     ///
141     /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
142     /// let stream2 = stream.clone();
143     ///
144     /// let _t = Thread::spawn(move|| {
145     ///     // close this stream after one second
146     ///     timer::sleep(Duration::seconds(1));
147     ///     let mut stream = stream2;
148     ///     stream.close_read();
149     /// });
150     ///
151     /// // wait for some data, will get canceled after one second
152     /// let mut buf = [0];
153     /// stream.read(&mut buf);
154     /// ```
155     ///
156     /// Note that this method affects all cloned handles associated with this
157     /// stream, not just this one handle.
158     pub fn close_read(&mut self) -> IoResult<()> {
159         self.inner.close_read()
160     }
161
162     /// Closes the writing half of this connection.
163     ///
164     /// This method will close the writing portion of this connection, causing
165     /// all future writes to immediately return with an error.
166     ///
167     /// Note that this method affects all cloned handles associated with this
168     /// stream, not just this one handle.
169     pub fn close_write(&mut self) -> IoResult<()> {
170         self.inner.close_write()
171     }
172
173     /// Sets a timeout, in milliseconds, for blocking operations on this stream.
174     ///
175     /// This function will set a timeout for all blocking operations (including
176     /// reads and writes) on this stream. The timeout specified is a relative
177     /// time, in milliseconds, into the future after which point operations will
178     /// time out. This means that the timeout must be reset periodically to keep
179     /// it from expiring. Specifying a value of `None` will clear the timeout
180     /// for this stream.
181     ///
182     /// The timeout on this stream is local to this stream only. Setting a
183     /// timeout does not affect any other cloned instances of this stream, nor
184     /// does the timeout propagated to cloned handles of this stream. Setting
185     /// this timeout will override any specific read or write timeouts
186     /// previously set for this stream.
187     ///
188     /// For clarification on the semantics of interrupting a read and a write,
189     /// take a look at `set_read_timeout` and `set_write_timeout`.
190     #[unstable = "the timeout argument may change in type and value"]
191     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
192         self.inner.set_timeout(timeout_ms)
193     }
194
195     /// Sets the timeout for read operations on this stream.
196     ///
197     /// See documentation in `set_timeout` for the semantics of this read time.
198     /// This will overwrite any previous read timeout set through either this
199     /// function or `set_timeout`.
200     ///
201     /// # Errors
202     ///
203     /// When this timeout expires, if there is no pending read operation, no
204     /// action is taken. Otherwise, the read operation will be scheduled to
205     /// promptly return. If a timeout error is returned, then no data was read
206     /// during the timeout period.
207     #[unstable = "the timeout argument may change in type and value"]
208     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
209         self.inner.set_read_timeout(timeout_ms)
210     }
211
212     /// Sets the timeout for write operations on this stream.
213     ///
214     /// See documentation in `set_timeout` for the semantics of this write time.
215     /// This will overwrite any previous write timeout set through either this
216     /// function or `set_timeout`.
217     ///
218     /// # Errors
219     ///
220     /// When this timeout expires, if there is no pending write operation, no
221     /// action is taken. Otherwise, the pending write operation will be
222     /// scheduled to promptly return. The actual state of the underlying stream
223     /// is not specified.
224     ///
225     /// The write operation may return an error of type `ShortWrite` which
226     /// indicates that the object is known to have written an exact number of
227     /// bytes successfully during the timeout period, and the remaining bytes
228     /// were never written.
229     ///
230     /// If the write operation returns `TimedOut`, then it the timeout primitive
231     /// does not know how many bytes were written as part of the timeout
232     /// operation. It may be the case that bytes continue to be written in an
233     /// asynchronous fashion after the call to write returns.
234     #[unstable = "the timeout argument may change in type and value"]
235     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
236         self.inner.set_write_timeout(timeout_ms)
237     }
238 }
239
240 impl Clone for TcpStream {
241     /// Creates a new handle to this TCP stream, allowing for simultaneous reads
242     /// and writes of this connection.
243     ///
244     /// The underlying TCP stream will not be closed until all handles to the
245     /// stream have been deallocated. All handles will also follow the same
246     /// stream, but two concurrent reads will not receive the same data.
247     /// Instead, the first read will receive the first packet received, and the
248     /// second read will receive the second packet.
249     fn clone(&self) -> TcpStream {
250         TcpStream { inner: self.inner.clone() }
251     }
252 }
253
254 impl Reader for TcpStream {
255     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
256         self.inner.read(buf)
257     }
258 }
259
260 impl Writer for TcpStream {
261     fn write_all(&mut self, buf: &[u8]) -> IoResult<()> {
262         self.inner.write(buf)
263     }
264 }
265
266 impl sys_common::AsInner<TcpStreamImp> for TcpStream {
267     fn as_inner(&self) -> &TcpStreamImp {
268         &self.inner
269     }
270 }
271
272 /// A structure representing a socket server. This listener is used to create a
273 /// `TcpAcceptor` which can be used to accept sockets on a local port.
274 ///
275 /// # Examples
276 ///
277 /// ```
278 /// # fn foo() {
279 /// use std::old_io::{TcpListener, TcpStream};
280 /// use std::old_io::{Acceptor, Listener};
281 /// use std::thread::Thread;
282 ///
283 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
284 ///
285 /// // bind the listener to the specified address
286 /// let mut acceptor = listener.listen().unwrap();
287 ///
288 /// fn handle_client(mut stream: TcpStream) {
289 ///     // ...
290 /// # &mut stream; // silence unused mutability/variable warning
291 /// }
292 /// // accept connections and process them, spawning a new tasks for each one
293 /// for stream in acceptor.incoming() {
294 ///     match stream {
295 ///         Err(e) => { /* connection failed */ }
296 ///         Ok(stream) => {
297 ///             Thread::spawn(move|| {
298 ///                 // connection succeeded
299 ///                 handle_client(stream)
300 ///             });
301 ///         }
302 ///     }
303 /// }
304 ///
305 /// // close the socket server
306 /// drop(acceptor);
307 /// # }
308 /// ```
309 pub struct TcpListener {
310     inner: TcpListenerImp,
311 }
312
313 impl TcpListener {
314     /// Creates a new `TcpListener` which will be bound to the specified address.
315     /// This listener is not ready for accepting connections, `listen` must be called
316     /// on it before that's possible.
317     ///
318     /// Binding with a port number of 0 will request that the OS assigns a port
319     /// to this listener. The port allocated can be queried via the
320     /// `socket_name` function.
321     ///
322     /// The address type can be any implementer of `ToSocketAddr` trait. See its
323     /// documentation for concrete examples.
324     pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<TcpListener> {
325         super::with_addresses(addr, |addr| {
326             TcpListenerImp::bind(addr).map(|inner| TcpListener { inner: inner })
327         })
328     }
329
330     /// Returns the local socket address of this listener.
331     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
332         self.inner.socket_name()
333     }
334 }
335
336 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
337     fn listen(self) -> IoResult<TcpAcceptor> {
338         self.inner.listen(128).map(|a| TcpAcceptor { inner: a })
339     }
340 }
341
342 impl sys_common::AsInner<TcpListenerImp> for TcpListener {
343     fn as_inner(&self) -> &TcpListenerImp {
344         &self.inner
345     }
346 }
347
348 /// The accepting half of a TCP socket server. This structure is created through
349 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
350 /// `TcpStream` instances.
351 pub struct TcpAcceptor {
352     inner: TcpAcceptorImp,
353 }
354
355 impl TcpAcceptor {
356     /// Prevents blocking on all future accepts after `ms` milliseconds have
357     /// elapsed.
358     ///
359     /// This function is used to set a deadline after which this acceptor will
360     /// time out accepting any connections. The argument is the relative
361     /// distance, in milliseconds, to a point in the future after which all
362     /// accepts will fail.
363     ///
364     /// If the argument specified is `None`, then any previously registered
365     /// timeout is cleared.
366     ///
367     /// A timeout of `0` can be used to "poll" this acceptor to see if it has
368     /// any pending connections. All pending connections will be accepted,
369     /// regardless of whether the timeout has expired or not (the accept will
370     /// not block in this case).
371     ///
372     /// # Example
373     ///
374     /// ```no_run
375     /// # #![allow(unstable)]
376     /// use std::old_io::TcpListener;
377     /// use std::old_io::{Listener, Acceptor, TimedOut};
378     ///
379     /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
380     ///
381     /// // After 100ms have passed, all accepts will fail
382     /// a.set_timeout(Some(100));
383     ///
384     /// match a.accept() {
385     ///     Ok(..) => println!("accepted a socket"),
386     ///     Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
387     ///     Err(e) => println!("err: {}", e),
388     /// }
389     ///
390     /// // Reset the timeout and try again
391     /// a.set_timeout(Some(100));
392     /// let socket = a.accept();
393     ///
394     /// // Clear the timeout and block indefinitely waiting for a connection
395     /// a.set_timeout(None);
396     /// let socket = a.accept();
397     /// ```
398     #[unstable = "the type of the argument and name of this function are \
399                       subject to change"]
400     pub fn set_timeout(&mut self, ms: Option<u64>) { self.inner.set_timeout(ms); }
401
402     /// Closes the accepting capabilities of this acceptor.
403     ///
404     /// This function is similar to `TcpStream`'s `close_{read,write}` methods
405     /// in that it will affect *all* cloned handles of this acceptor's original
406     /// handle.
407     ///
408     /// Once this function succeeds, all future calls to `accept` will return
409     /// immediately with an error, preventing all future calls to accept. The
410     /// underlying socket will not be relinquished back to the OS until all
411     /// acceptors have been deallocated.
412     ///
413     /// This is useful for waking up a thread in an accept loop to indicate that
414     /// it should exit.
415     ///
416     /// # Example
417     ///
418     /// ```
419     /// # #![allow(unstable)]
420     /// use std::old_io::{TcpListener, Listener, Acceptor, EndOfFile};
421     /// use std::thread::Thread;
422     ///
423     /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
424     /// let a2 = a.clone();
425     ///
426     /// let _t = Thread::spawn(move|| {
427     ///     let mut a2 = a2;
428     ///     for socket in a2.incoming() {
429     ///         match socket {
430     ///             Ok(s) => { /* handle s */ }
431     ///             Err(ref e) if e.kind == EndOfFile => break, // closed
432     ///             Err(e) => panic!("unexpected error: {}", e),
433     ///         }
434     ///     }
435     /// });
436     ///
437     /// # fn wait_for_sigint() {}
438     /// // Now that our accept loop is running, wait for the program to be
439     /// // requested to exit.
440     /// wait_for_sigint();
441     ///
442     /// // Signal our accept loop to exit
443     /// assert!(a.close_accept().is_ok());
444     /// ```
445     #[unstable]
446     pub fn close_accept(&mut self) -> IoResult<()> {
447         self.inner.close_accept()
448     }
449 }
450
451 impl Acceptor<TcpStream> for TcpAcceptor {
452     fn accept(&mut self) -> IoResult<TcpStream> {
453         self.inner.accept().map(TcpStream::new)
454     }
455 }
456
457 impl Clone for TcpAcceptor {
458     /// Creates a new handle to this TCP acceptor, allowing for simultaneous
459     /// accepts.
460     ///
461     /// The underlying TCP acceptor will not be closed until all handles to the
462     /// acceptor have been deallocated. Incoming connections will be received on
463     /// at most once acceptor, the same connection will not be accepted twice.
464     ///
465     /// The `close_accept` method will shut down *all* acceptors cloned from the
466     /// same original acceptor, whereas the `set_timeout` method only affects
467     /// the selector that it is called on.
468     ///
469     /// This function is useful for creating a handle to invoke `close_accept`
470     /// on to wake up any other task blocked in `accept`.
471     fn clone(&self) -> TcpAcceptor {
472         TcpAcceptor { inner: self.inner.clone() }
473     }
474 }
475
476 impl sys_common::AsInner<TcpAcceptorImp> for TcpAcceptor {
477     fn as_inner(&self) -> &TcpAcceptorImp {
478         &self.inner
479     }
480 }
481
482 #[cfg(test)]
483 #[allow(unstable)]
484 mod test {
485     use prelude::v1::*;
486
487     use sync::mpsc::channel;
488     use thread::Thread;
489     use old_io::net::tcp::*;
490     use old_io::net::ip::*;
491     use old_io::test::*;
492     use old_io::{EndOfFile, TimedOut, ShortWrite, IoError};
493     use old_io::{ConnectionRefused, BrokenPipe, ConnectionAborted};
494     use old_io::{ConnectionReset, NotConnected, PermissionDenied, OtherIoError};
495     use old_io::{Acceptor, Listener};
496
497     // FIXME #11530 this fails on android because tests are run as root
498     #[cfg_attr(any(windows, target_os = "android"), ignore)]
499     #[test]
500     fn bind_error() {
501         match TcpListener::bind("0.0.0.0:1") {
502             Ok(..) => panic!(),
503             Err(e) => assert_eq!(e.kind, PermissionDenied),
504         }
505     }
506
507     #[test]
508     fn connect_error() {
509         match TcpStream::connect("0.0.0.0:1") {
510             Ok(..) => panic!(),
511             Err(e) => assert_eq!(e.kind, ConnectionRefused),
512         }
513     }
514
515     #[test]
516     fn listen_ip4_localhost() {
517         let socket_addr = next_test_ip4();
518         let listener = TcpListener::bind(socket_addr);
519         let mut acceptor = listener.listen();
520
521         let _t = Thread::spawn(move|| {
522             let mut stream = TcpStream::connect(("localhost", socket_addr.port));
523             stream.write(&[144]).unwrap();
524         });
525
526         let mut stream = acceptor.accept();
527         let mut buf = [0];
528         stream.read(&mut buf).unwrap();
529         assert!(buf[0] == 144);
530     }
531
532     #[test]
533     fn connect_localhost() {
534         let addr = next_test_ip4();
535         let mut acceptor = TcpListener::bind(addr).listen();
536
537         let _t = Thread::spawn(move|| {
538             let mut stream = TcpStream::connect(("localhost", addr.port));
539             stream.write(&[64]).unwrap();
540         });
541
542         let mut stream = acceptor.accept();
543         let mut buf = [0];
544         stream.read(&mut buf).unwrap();
545         assert!(buf[0] == 64);
546     }
547
548     #[test]
549     fn connect_ip4_loopback() {
550         let addr = next_test_ip4();
551         let mut acceptor = TcpListener::bind(addr).listen();
552
553         let _t = Thread::spawn(move|| {
554             let mut stream = TcpStream::connect(("127.0.0.1", addr.port));
555             stream.write(&[44]).unwrap();
556         });
557
558         let mut stream = acceptor.accept();
559         let mut buf = [0];
560         stream.read(&mut buf).unwrap();
561         assert!(buf[0] == 44);
562     }
563
564     #[test]
565     fn connect_ip6_loopback() {
566         let addr = next_test_ip6();
567         let mut acceptor = TcpListener::bind(addr).listen();
568
569         let _t = Thread::spawn(move|| {
570             let mut stream = TcpStream::connect(("::1", addr.port));
571             stream.write(&[66]).unwrap();
572         });
573
574         let mut stream = acceptor.accept();
575         let mut buf = [0];
576         stream.read(&mut buf).unwrap();
577         assert!(buf[0] == 66);
578     }
579
580     #[test]
581     fn smoke_test_ip4() {
582         let addr = next_test_ip4();
583         let mut acceptor = TcpListener::bind(addr).listen();
584
585         let _t = Thread::spawn(move|| {
586             let mut stream = TcpStream::connect(addr);
587             stream.write(&[99]).unwrap();
588         });
589
590         let mut stream = acceptor.accept();
591         let mut buf = [0];
592         stream.read(&mut buf).unwrap();
593         assert!(buf[0] == 99);
594     }
595
596     #[test]
597     fn smoke_test_ip6() {
598         let addr = next_test_ip6();
599         let mut acceptor = TcpListener::bind(addr).listen();
600
601         let _t = Thread::spawn(move|| {
602             let mut stream = TcpStream::connect(addr);
603             stream.write(&[99]).unwrap();
604         });
605
606         let mut stream = acceptor.accept();
607         let mut buf = [0];
608         stream.read(&mut buf).unwrap();
609         assert!(buf[0] == 99);
610     }
611
612     #[test]
613     fn read_eof_ip4() {
614         let addr = next_test_ip4();
615         let mut acceptor = TcpListener::bind(addr).listen();
616
617         let _t = Thread::spawn(move|| {
618             let _stream = TcpStream::connect(addr);
619             // Close
620         });
621
622         let mut stream = acceptor.accept();
623         let mut buf = [0];
624         let nread = stream.read(&mut buf);
625         assert!(nread.is_err());
626     }
627
628     #[test]
629     fn read_eof_ip6() {
630         let addr = next_test_ip6();
631         let mut acceptor = TcpListener::bind(addr).listen();
632
633         let _t = Thread::spawn(move|| {
634             let _stream = TcpStream::connect(addr);
635             // Close
636         });
637
638         let mut stream = acceptor.accept();
639         let mut buf = [0];
640         let nread = stream.read(&mut buf);
641         assert!(nread.is_err());
642     }
643
644     #[test]
645     fn read_eof_twice_ip4() {
646         let addr = next_test_ip4();
647         let mut acceptor = TcpListener::bind(addr).listen();
648
649         let _t = Thread::spawn(move|| {
650             let _stream = TcpStream::connect(addr);
651             // Close
652         });
653
654         let mut stream = acceptor.accept();
655         let mut buf = [0];
656         let nread = stream.read(&mut buf);
657         assert!(nread.is_err());
658
659         match stream.read(&mut buf) {
660             Ok(..) => panic!(),
661             Err(ref e) => {
662                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
663                         "unknown kind: {:?}", e.kind);
664             }
665         }
666     }
667
668     #[test]
669     fn read_eof_twice_ip6() {
670         let addr = next_test_ip6();
671         let mut acceptor = TcpListener::bind(addr).listen();
672
673         let _t = Thread::spawn(move|| {
674             let _stream = TcpStream::connect(addr);
675             // Close
676         });
677
678         let mut stream = acceptor.accept();
679         let mut buf = [0];
680         let nread = stream.read(&mut buf);
681         assert!(nread.is_err());
682
683         match stream.read(&mut buf) {
684             Ok(..) => panic!(),
685             Err(ref e) => {
686                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
687                         "unknown kind: {:?}", e.kind);
688             }
689         }
690     }
691
692     #[test]
693     fn write_close_ip4() {
694         let addr = next_test_ip4();
695         let mut acceptor = TcpListener::bind(addr).listen();
696
697         let (tx, rx) = channel();
698         let _t = Thread::spawn(move|| {
699             drop(TcpStream::connect(addr));
700             tx.send(()).unwrap();
701         });
702
703         let mut stream = acceptor.accept();
704         rx.recv().unwrap();
705         let buf = [0];
706         match stream.write(&buf) {
707             Ok(..) => {}
708             Err(e) => {
709                 assert!(e.kind == ConnectionReset ||
710                         e.kind == BrokenPipe ||
711                         e.kind == ConnectionAborted,
712                         "unknown error: {}", e);
713             }
714         }
715     }
716
717     #[test]
718     fn write_close_ip6() {
719         let addr = next_test_ip6();
720         let mut acceptor = TcpListener::bind(addr).listen();
721
722         let (tx, rx) = channel();
723         let _t = Thread::spawn(move|| {
724             drop(TcpStream::connect(addr));
725             tx.send(()).unwrap();
726         });
727
728         let mut stream = acceptor.accept();
729         rx.recv().unwrap();
730         let buf = [0];
731         match stream.write(&buf) {
732             Ok(..) => {}
733             Err(e) => {
734                 assert!(e.kind == ConnectionReset ||
735                         e.kind == BrokenPipe ||
736                         e.kind == ConnectionAborted,
737                         "unknown error: {}", e);
738             }
739         }
740     }
741
742     #[test]
743     fn multiple_connect_serial_ip4() {
744         let addr = next_test_ip4();
745         let max = 10u;
746         let mut acceptor = TcpListener::bind(addr).listen();
747
748         let _t = Thread::spawn(move|| {
749             for _ in range(0, max) {
750                 let mut stream = TcpStream::connect(addr);
751                 stream.write(&[99]).unwrap();
752             }
753         });
754
755         for ref mut stream in acceptor.incoming().take(max) {
756             let mut buf = [0];
757             stream.read(&mut buf).unwrap();
758             assert_eq!(buf[0], 99);
759         }
760     }
761
762     #[test]
763     fn multiple_connect_serial_ip6() {
764         let addr = next_test_ip6();
765         let max = 10u;
766         let mut acceptor = TcpListener::bind(addr).listen();
767
768         let _t = Thread::spawn(move|| {
769             for _ in range(0, max) {
770                 let mut stream = TcpStream::connect(addr);
771                 stream.write(&[99]).unwrap();
772             }
773         });
774
775         for ref mut stream in acceptor.incoming().take(max) {
776             let mut buf = [0];
777             stream.read(&mut buf).unwrap();
778             assert_eq!(buf[0], 99);
779         }
780     }
781
782     #[test]
783     fn multiple_connect_interleaved_greedy_schedule_ip4() {
784         let addr = next_test_ip4();
785         static MAX: int = 10;
786         let acceptor = TcpListener::bind(addr).listen();
787
788         let _t = Thread::spawn(move|| {
789             let mut acceptor = acceptor;
790             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
791                 // Start another task to handle the connection
792                 let _t = Thread::spawn(move|| {
793                     let mut stream = stream;
794                     let mut buf = [0];
795                     stream.read(&mut buf).unwrap();
796                     assert!(buf[0] == i as u8);
797                     debug!("read");
798                 });
799             }
800         });
801
802         connect(0, addr);
803
804         fn connect(i: int, addr: SocketAddr) {
805             if i == MAX { return }
806
807             let _t = Thread::spawn(move|| {
808                 debug!("connecting");
809                 let mut stream = TcpStream::connect(addr);
810                 // Connect again before writing
811                 connect(i + 1, addr);
812                 debug!("writing");
813                 stream.write(&[i as u8]).unwrap();
814             });
815         }
816     }
817
818     #[test]
819     fn multiple_connect_interleaved_greedy_schedule_ip6() {
820         let addr = next_test_ip6();
821         static MAX: int = 10;
822         let acceptor = TcpListener::bind(addr).listen();
823
824         let _t = Thread::spawn(move|| {
825             let mut acceptor = acceptor;
826             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
827                 // Start another task to handle the connection
828                 let _t = Thread::spawn(move|| {
829                     let mut stream = stream;
830                     let mut buf = [0];
831                     stream.read(&mut buf).unwrap();
832                     assert!(buf[0] == i as u8);
833                     debug!("read");
834                 });
835             }
836         });
837
838         connect(0, addr);
839
840         fn connect(i: int, addr: SocketAddr) {
841             if i == MAX { return }
842
843             let _t = Thread::spawn(move|| {
844                 debug!("connecting");
845                 let mut stream = TcpStream::connect(addr);
846                 // Connect again before writing
847                 connect(i + 1, addr);
848                 debug!("writing");
849                 stream.write(&[i as u8]).unwrap();
850             });
851         }
852     }
853
854     #[test]
855     fn multiple_connect_interleaved_lazy_schedule_ip4() {
856         static MAX: int = 10;
857         let addr = next_test_ip4();
858         let acceptor = TcpListener::bind(addr).listen();
859
860         let _t = Thread::spawn(move|| {
861             let mut acceptor = acceptor;
862             for stream in acceptor.incoming().take(MAX as uint) {
863                 // Start another task to handle the connection
864                 let _t = Thread::spawn(move|| {
865                     let mut stream = stream;
866                     let mut buf = [0];
867                     stream.read(&mut buf).unwrap();
868                     assert!(buf[0] == 99);
869                     debug!("read");
870                 });
871             }
872         });
873
874         connect(0, addr);
875
876         fn connect(i: int, addr: SocketAddr) {
877             if i == MAX { return }
878
879             let _t = Thread::spawn(move|| {
880                 debug!("connecting");
881                 let mut stream = TcpStream::connect(addr);
882                 // Connect again before writing
883                 connect(i + 1, addr);
884                 debug!("writing");
885                 stream.write(&[99]).unwrap();
886             });
887         }
888     }
889
890     #[test]
891     fn multiple_connect_interleaved_lazy_schedule_ip6() {
892         static MAX: int = 10;
893         let addr = next_test_ip6();
894         let acceptor = TcpListener::bind(addr).listen();
895
896         let _t = Thread::spawn(move|| {
897             let mut acceptor = acceptor;
898             for stream in acceptor.incoming().take(MAX as uint) {
899                 // Start another task to handle the connection
900                 let _t = Thread::spawn(move|| {
901                     let mut stream = stream;
902                     let mut buf = [0];
903                     stream.read(&mut buf).unwrap();
904                     assert!(buf[0] == 99);
905                     debug!("read");
906                 });
907             }
908         });
909
910         connect(0, addr);
911
912         fn connect(i: int, addr: SocketAddr) {
913             if i == MAX { return }
914
915             let _t = Thread::spawn(move|| {
916                 debug!("connecting");
917                 let mut stream = TcpStream::connect(addr);
918                 // Connect again before writing
919                 connect(i + 1, addr);
920                 debug!("writing");
921                 stream.write(&[99]).unwrap();
922             });
923         }
924     }
925
926     pub fn socket_name(addr: SocketAddr) {
927         let mut listener = TcpListener::bind(addr).unwrap();
928
929         // Make sure socket_name gives
930         // us the socket we binded to.
931         let so_name = listener.socket_name();
932         assert!(so_name.is_ok());
933         assert_eq!(addr, so_name.unwrap());
934     }
935
936     pub fn peer_name(addr: SocketAddr) {
937         let acceptor = TcpListener::bind(addr).listen();
938         let _t = Thread::spawn(move|| {
939             let mut acceptor = acceptor;
940             acceptor.accept().unwrap();
941         });
942
943         let stream = TcpStream::connect(addr);
944
945         assert!(stream.is_ok());
946         let mut stream = stream.unwrap();
947
948         // Make sure peer_name gives us the
949         // address/port of the peer we've
950         // connected to.
951         let peer_name = stream.peer_name();
952         assert!(peer_name.is_ok());
953         assert_eq!(addr, peer_name.unwrap());
954     }
955
956     #[test]
957     fn socket_and_peer_name_ip4() {
958         peer_name(next_test_ip4());
959         socket_name(next_test_ip4());
960     }
961
962     #[test]
963     fn socket_and_peer_name_ip6() {
964         // FIXME: peer name is not consistent
965         //peer_name(next_test_ip6());
966         socket_name(next_test_ip6());
967     }
968
969     #[test]
970     fn partial_read() {
971         let addr = next_test_ip4();
972         let (tx, rx) = channel();
973         let _t = Thread::spawn(move|| {
974             let mut srv = TcpListener::bind(addr).listen().unwrap();
975             tx.send(()).unwrap();
976             let mut cl = srv.accept().unwrap();
977             cl.write(&[10]).unwrap();
978             let mut b = [0];
979             cl.read(&mut b).unwrap();
980             tx.send(()).unwrap();
981         });
982
983         rx.recv().unwrap();
984         let mut c = TcpStream::connect(addr).unwrap();
985         let mut b = [0; 10];
986         assert_eq!(c.read(&mut b), Ok(1));
987         c.write(&[1]).unwrap();
988         rx.recv().unwrap();
989     }
990
991     #[test]
992     fn double_bind() {
993         let addr = next_test_ip4();
994         let listener = TcpListener::bind(addr).unwrap().listen();
995         assert!(listener.is_ok());
996         match TcpListener::bind(addr).listen() {
997             Ok(..) => panic!(),
998             Err(e) => {
999                 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
1000                         "unknown error: {} {:?}", e, e.kind);
1001             }
1002         }
1003     }
1004
1005     #[test]
1006     fn fast_rebind() {
1007         let addr = next_test_ip4();
1008         let (tx, rx) = channel();
1009
1010         let _t = Thread::spawn(move|| {
1011             rx.recv().unwrap();
1012             let _stream = TcpStream::connect(addr).unwrap();
1013             // Close
1014             rx.recv().unwrap();
1015         });
1016
1017         {
1018             let mut acceptor = TcpListener::bind(addr).listen();
1019             tx.send(()).unwrap();
1020             {
1021                 let _stream = acceptor.accept().unwrap();
1022                 // Close client
1023                 tx.send(()).unwrap();
1024             }
1025             // Close listener
1026         }
1027         let _listener = TcpListener::bind(addr);
1028     }
1029
1030     #[test]
1031     fn tcp_clone_smoke() {
1032         let addr = next_test_ip4();
1033         let mut acceptor = TcpListener::bind(addr).listen();
1034
1035         let _t = Thread::spawn(move|| {
1036             let mut s = TcpStream::connect(addr);
1037             let mut buf = [0, 0];
1038             assert_eq!(s.read(&mut buf), Ok(1));
1039             assert_eq!(buf[0], 1);
1040             s.write(&[2]).unwrap();
1041         });
1042
1043         let mut s1 = acceptor.accept().unwrap();
1044         let s2 = s1.clone();
1045
1046         let (tx1, rx1) = channel();
1047         let (tx2, rx2) = channel();
1048         let _t = Thread::spawn(move|| {
1049             let mut s2 = s2;
1050             rx1.recv().unwrap();
1051             s2.write(&[1]).unwrap();
1052             tx2.send(()).unwrap();
1053         });
1054         tx1.send(()).unwrap();
1055         let mut buf = [0, 0];
1056         assert_eq!(s1.read(&mut buf), Ok(1));
1057         rx2.recv().unwrap();
1058     }
1059
1060     #[test]
1061     fn tcp_clone_two_read() {
1062         let addr = next_test_ip6();
1063         let mut acceptor = TcpListener::bind(addr).listen();
1064         let (tx1, rx) = channel();
1065         let tx2 = tx1.clone();
1066
1067         let _t = Thread::spawn(move|| {
1068             let mut s = TcpStream::connect(addr);
1069             s.write(&[1]).unwrap();
1070             rx.recv().unwrap();
1071             s.write(&[2]).unwrap();
1072             rx.recv().unwrap();
1073         });
1074
1075         let mut s1 = acceptor.accept().unwrap();
1076         let s2 = s1.clone();
1077
1078         let (done, rx) = channel();
1079         let _t = Thread::spawn(move|| {
1080             let mut s2 = s2;
1081             let mut buf = [0, 0];
1082             s2.read(&mut buf).unwrap();
1083             tx2.send(()).unwrap();
1084             done.send(()).unwrap();
1085         });
1086         let mut buf = [0, 0];
1087         s1.read(&mut buf).unwrap();
1088         tx1.send(()).unwrap();
1089
1090         rx.recv().unwrap();
1091     }
1092
1093     #[test]
1094     fn tcp_clone_two_write() {
1095         let addr = next_test_ip4();
1096         let mut acceptor = TcpListener::bind(addr).listen();
1097
1098         let _t = Thread::spawn(move|| {
1099             let mut s = TcpStream::connect(addr);
1100             let mut buf = [0, 1];
1101             s.read(&mut buf).unwrap();
1102             s.read(&mut buf).unwrap();
1103         });
1104
1105         let mut s1 = acceptor.accept().unwrap();
1106         let s2 = s1.clone();
1107
1108         let (done, rx) = channel();
1109         let _t = Thread::spawn(move|| {
1110             let mut s2 = s2;
1111             s2.write(&[1]).unwrap();
1112             done.send(()).unwrap();
1113         });
1114         s1.write(&[2]).unwrap();
1115
1116         rx.recv().unwrap();
1117     }
1118
1119     #[test]
1120     fn shutdown_smoke() {
1121         let addr = next_test_ip4();
1122         let a = TcpListener::bind(addr).unwrap().listen();
1123         let _t = Thread::spawn(move|| {
1124             let mut a = a;
1125             let mut c = a.accept().unwrap();
1126             assert_eq!(c.read_to_end(), Ok(vec!()));
1127             c.write(&[1]).unwrap();
1128         });
1129
1130         let mut s = TcpStream::connect(addr).unwrap();
1131         assert!(s.inner.close_write().is_ok());
1132         assert!(s.write(&[1]).is_err());
1133         assert_eq!(s.read_to_end(), Ok(vec!(1)));
1134     }
1135
1136     #[test]
1137     fn accept_timeout() {
1138         let addr = next_test_ip4();
1139         let mut a = TcpListener::bind(addr).unwrap().listen().unwrap();
1140
1141         a.set_timeout(Some(10));
1142
1143         // Make sure we time out once and future invocations also time out
1144         let err = a.accept().err().unwrap();
1145         assert_eq!(err.kind, TimedOut);
1146         let err = a.accept().err().unwrap();
1147         assert_eq!(err.kind, TimedOut);
1148
1149         // Also make sure that even though the timeout is expired that we will
1150         // continue to receive any pending connections.
1151         //
1152         // FIXME: freebsd apparently never sees the pending connection, but
1153         //        testing manually always works. Need to investigate this
1154         //        flakiness.
1155         if !cfg!(target_os = "freebsd") {
1156             let (tx, rx) = channel();
1157             let _t = Thread::spawn(move|| {
1158                 tx.send(TcpStream::connect(addr).unwrap()).unwrap();
1159             });
1160             let _l = rx.recv().unwrap();
1161             for i in range(0i, 1001) {
1162                 match a.accept() {
1163                     Ok(..) => break,
1164                     Err(ref e) if e.kind == TimedOut => {}
1165                     Err(e) => panic!("error: {}", e),
1166                 }
1167                 ::thread::Thread::yield_now();
1168                 if i == 1000 { panic!("should have a pending connection") }
1169             }
1170         }
1171
1172         // Unset the timeout and make sure that this always blocks.
1173         a.set_timeout(None);
1174         let _t = Thread::spawn(move|| {
1175             drop(TcpStream::connect(addr).unwrap());
1176         });
1177         a.accept().unwrap();
1178     }
1179
1180     #[test]
1181     fn close_readwrite_smoke() {
1182         let addr = next_test_ip4();
1183         let a = TcpListener::bind(addr).listen().unwrap();
1184         let (_tx, rx) = channel::<()>();
1185         Thread::spawn(move|| {
1186             let mut a = a;
1187             let _s = a.accept().unwrap();
1188             let _ = rx.recv().unwrap();
1189         });
1190
1191         let mut b = [0];
1192         let mut s = TcpStream::connect(addr).unwrap();
1193         let mut s2 = s.clone();
1194
1195         // closing should prevent reads/writes
1196         s.close_write().unwrap();
1197         assert!(s.write(&[0]).is_err());
1198         s.close_read().unwrap();
1199         assert!(s.read(&mut b).is_err());
1200
1201         // closing should affect previous handles
1202         assert!(s2.write(&[0]).is_err());
1203         assert!(s2.read(&mut b).is_err());
1204
1205         // closing should affect new handles
1206         let mut s3 = s.clone();
1207         assert!(s3.write(&[0]).is_err());
1208         assert!(s3.read(&mut b).is_err());
1209
1210         // make sure these don't die
1211         let _ = s2.close_read();
1212         let _ = s2.close_write();
1213         let _ = s3.close_read();
1214         let _ = s3.close_write();
1215     }
1216
1217     #[test]
1218     fn close_read_wakes_up() {
1219         let addr = next_test_ip4();
1220         let a = TcpListener::bind(addr).listen().unwrap();
1221         let (_tx, rx) = channel::<()>();
1222         Thread::spawn(move|| {
1223             let mut a = a;
1224             let _s = a.accept().unwrap();
1225             let _ = rx.recv().unwrap();
1226         });
1227
1228         let mut s = TcpStream::connect(addr).unwrap();
1229         let s2 = s.clone();
1230         let (tx, rx) = channel();
1231         let _t = Thread::spawn(move|| {
1232             let mut s2 = s2;
1233             assert!(s2.read(&mut [0]).is_err());
1234             tx.send(()).unwrap();
1235         });
1236         // this should wake up the child task
1237         s.close_read().unwrap();
1238
1239         // this test will never finish if the child doesn't wake up
1240         rx.recv().unwrap();
1241     }
1242
1243     #[test]
1244     fn readwrite_timeouts() {
1245         let addr = next_test_ip6();
1246         let mut a = TcpListener::bind(addr).listen().unwrap();
1247         let (tx, rx) = channel::<()>();
1248         Thread::spawn(move|| {
1249             let mut s = TcpStream::connect(addr).unwrap();
1250             rx.recv().unwrap();
1251             assert!(s.write(&[0]).is_ok());
1252             let _ = rx.recv();
1253         });
1254
1255         let mut s = a.accept().unwrap();
1256         s.set_timeout(Some(20));
1257         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1258         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1259
1260         s.set_timeout(Some(20));
1261         for i in range(0i, 1001) {
1262             match s.write(&[0; 128 * 1024]) {
1263                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1264                 Err(IoError { kind: TimedOut, .. }) => break,
1265                 Err(e) => panic!("{}", e),
1266            }
1267            if i == 1000 { panic!("should have filled up?!"); }
1268         }
1269         assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
1270
1271         tx.send(()).unwrap();
1272         s.set_timeout(None);
1273         assert_eq!(s.read(&mut [0, 0]), Ok(1));
1274     }
1275
1276     #[test]
1277     fn read_timeouts() {
1278         let addr = next_test_ip6();
1279         let mut a = TcpListener::bind(addr).listen().unwrap();
1280         let (tx, rx) = channel::<()>();
1281         Thread::spawn(move|| {
1282             let mut s = TcpStream::connect(addr).unwrap();
1283             rx.recv().unwrap();
1284             let mut amt = 0;
1285             while amt < 100 * 128 * 1024 {
1286                 match s.read(&mut [0;128 * 1024]) {
1287                     Ok(n) => { amt += n; }
1288                     Err(e) => panic!("{}", e),
1289                 }
1290             }
1291             let _ = rx.recv();
1292         });
1293
1294         let mut s = a.accept().unwrap();
1295         s.set_read_timeout(Some(20));
1296         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1297         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1298
1299         tx.send(()).unwrap();
1300         for _ in range(0i, 100) {
1301             assert!(s.write(&[0;128 * 1024]).is_ok());
1302         }
1303     }
1304
1305     #[test]
1306     fn write_timeouts() {
1307         let addr = next_test_ip6();
1308         let mut a = TcpListener::bind(addr).listen().unwrap();
1309         let (tx, rx) = channel::<()>();
1310         Thread::spawn(move|| {
1311             let mut s = TcpStream::connect(addr).unwrap();
1312             rx.recv().unwrap();
1313             assert!(s.write(&[0]).is_ok());
1314             let _ = rx.recv();
1315         });
1316
1317         let mut s = a.accept().unwrap();
1318         s.set_write_timeout(Some(20));
1319         for i in range(0i, 1001) {
1320             match s.write(&[0; 128 * 1024]) {
1321                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1322                 Err(IoError { kind: TimedOut, .. }) => break,
1323                 Err(e) => panic!("{}", e),
1324            }
1325            if i == 1000 { panic!("should have filled up?!"); }
1326         }
1327         assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
1328
1329         tx.send(()).unwrap();
1330         assert!(s.read(&mut [0]).is_ok());
1331     }
1332
1333     #[test]
1334     fn timeout_concurrent_read() {
1335         let addr = next_test_ip6();
1336         let mut a = TcpListener::bind(addr).listen().unwrap();
1337         let (tx, rx) = channel::<()>();
1338         Thread::spawn(move|| {
1339             let mut s = TcpStream::connect(addr).unwrap();
1340             rx.recv().unwrap();
1341             assert_eq!(s.write(&[0]), Ok(()));
1342             let _ = rx.recv();
1343         });
1344
1345         let mut s = a.accept().unwrap();
1346         let s2 = s.clone();
1347         let (tx2, rx2) = channel();
1348         let _t = Thread::spawn(move|| {
1349             let mut s2 = s2;
1350             assert_eq!(s2.read(&mut [0]), Ok(1));
1351             tx2.send(()).unwrap();
1352         });
1353
1354         s.set_read_timeout(Some(20));
1355         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1356         tx.send(()).unwrap();
1357
1358         rx2.recv().unwrap();
1359     }
1360
1361     #[test]
1362     fn clone_while_reading() {
1363         let addr = next_test_ip6();
1364         let listen = TcpListener::bind(addr);
1365         let mut accept = listen.listen().unwrap();
1366
1367         // Enqueue a task to write to a socket
1368         let (tx, rx) = channel();
1369         let (txdone, rxdone) = channel();
1370         let txdone2 = txdone.clone();
1371         let _t = Thread::spawn(move|| {
1372             let mut tcp = TcpStream::connect(addr).unwrap();
1373             rx.recv().unwrap();
1374             tcp.write_u8(0).unwrap();
1375             txdone2.send(()).unwrap();
1376         });
1377
1378         // Spawn off a reading clone
1379         let tcp = accept.accept().unwrap();
1380         let tcp2 = tcp.clone();
1381         let txdone3 = txdone.clone();
1382         let _t = Thread::spawn(move|| {
1383             let mut tcp2 = tcp2;
1384             tcp2.read_u8().unwrap();
1385             txdone3.send(()).unwrap();
1386         });
1387
1388         // Try to ensure that the reading clone is indeed reading
1389         for _ in range(0i, 50) {
1390             ::thread::Thread::yield_now();
1391         }
1392
1393         // clone the handle again while it's reading, then let it finish the
1394         // read.
1395         let _ = tcp.clone();
1396         tx.send(()).unwrap();
1397         rxdone.recv().unwrap();
1398         rxdone.recv().unwrap();
1399     }
1400
1401     #[test]
1402     fn clone_accept_smoke() {
1403         let addr = next_test_ip4();
1404         let l = TcpListener::bind(addr);
1405         let mut a = l.listen().unwrap();
1406         let mut a2 = a.clone();
1407
1408         let _t = Thread::spawn(move|| {
1409             let _ = TcpStream::connect(addr);
1410         });
1411         let _t = Thread::spawn(move|| {
1412             let _ = TcpStream::connect(addr);
1413         });
1414
1415         assert!(a.accept().is_ok());
1416         assert!(a2.accept().is_ok());
1417     }
1418
1419     #[test]
1420     fn clone_accept_concurrent() {
1421         let addr = next_test_ip4();
1422         let l = TcpListener::bind(addr);
1423         let a = l.listen().unwrap();
1424         let a2 = a.clone();
1425
1426         let (tx, rx) = channel();
1427         let tx2 = tx.clone();
1428
1429         let _t = Thread::spawn(move|| {
1430             let mut a = a;
1431             tx.send(a.accept()).unwrap();
1432         });
1433         let _t = Thread::spawn(move|| {
1434             let mut a = a2;
1435             tx2.send(a.accept()).unwrap();
1436         });
1437
1438         let _t = Thread::spawn(move|| {
1439             let _ = TcpStream::connect(addr);
1440         });
1441         let _t = Thread::spawn(move|| {
1442             let _ = TcpStream::connect(addr);
1443         });
1444
1445         assert!(rx.recv().unwrap().is_ok());
1446         assert!(rx.recv().unwrap().is_ok());
1447     }
1448
1449     #[test]
1450     fn close_accept_smoke() {
1451         let addr = next_test_ip4();
1452         let l = TcpListener::bind(addr);
1453         let mut a = l.listen().unwrap();
1454
1455         a.close_accept().unwrap();
1456         assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
1457     }
1458
1459     #[test]
1460     fn close_accept_concurrent() {
1461         let addr = next_test_ip4();
1462         let l = TcpListener::bind(addr);
1463         let a = l.listen().unwrap();
1464         let mut a2 = a.clone();
1465
1466         let (tx, rx) = channel();
1467         let _t = Thread::spawn(move|| {
1468             let mut a = a;
1469             tx.send(a.accept()).unwrap();
1470         });
1471         a2.close_accept().unwrap();
1472
1473         assert_eq!(rx.recv().unwrap().err().unwrap().kind, EndOfFile);
1474     }
1475 }