]> git.lizzy.rs Git - rust.git/blob - src/libstd/io/net/tcp.rs
Merge pull request #20674 from jbcrail/fix-misspelled-comments
[rust.git] / src / libstd / io / net / tcp.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! TCP network connections
12 //!
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
16 //!
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
19
20 use clone::Clone;
21 use io::IoResult;
22 use result::Result::Err;
23 use io::net::ip::{SocketAddr, ToSocketAddr};
24 use io::{Reader, Writer, Listener, Acceptor};
25 use io::{standard_error, TimedOut};
26 use option::Option;
27 use option::Option::{None, Some};
28 use time::Duration;
29
30 use sys::tcp::TcpStream as TcpStreamImp;
31 use sys::tcp::TcpListener as TcpListenerImp;
32 use sys::tcp::TcpAcceptor as TcpAcceptorImp;
33
34 use sys_common;
35
36 /// A structure which represents a TCP stream between a local socket and a
37 /// remote socket.
38 ///
39 /// The socket will be closed when the value is dropped.
40 ///
41 /// # Example
42 ///
43 /// ```no_run
44 /// use std::io::TcpStream;
45 ///
46 /// {
47 ///     let mut stream = TcpStream::connect("127.0.0.1:34254");
48 ///
49 ///     // ignore the Result
50 ///     let _ = stream.write(&[1]);
51 ///
52 ///     let mut buf = [0];
53 ///     let _ = stream.read(&mut buf); // ignore here too
54 /// } // the stream is closed here
55 /// ```
56 pub struct TcpStream {
57     inner: TcpStreamImp,
58 }
59
60 impl TcpStream {
61     fn new(s: TcpStreamImp) -> TcpStream {
62         TcpStream { inner: s }
63     }
64
65     /// Open a TCP connection to a remote host.
66     ///
67     /// `addr` is an address of the remote host. Anything which implements `ToSocketAddr`
68     /// trait can be supplied for the address; see this trait documentation for
69     /// concrete examples.
70     pub fn connect<A: ToSocketAddr>(addr: A) -> IoResult<TcpStream> {
71         super::with_addresses(addr, |addr| {
72             TcpStreamImp::connect(addr, None).map(TcpStream::new)
73         })
74     }
75
76     /// Creates a TCP connection to a remote socket address, timing out after
77     /// the specified duration.
78     ///
79     /// This is the same as the `connect` method, except that if the timeout
80     /// specified elapses before a connection is made an error will be
81     /// returned. The error's kind will be `TimedOut`.
82     ///
83     /// Same as the `connect` method, `addr` argument type can be anything which
84     /// implements `ToSocketAddr` trait.
85     ///
86     /// If a `timeout` with zero or negative duration is specified then
87     /// the function returns `Err`, with the error kind set to `TimedOut`.
88     #[experimental = "the timeout argument may eventually change types"]
89     pub fn connect_timeout<A: ToSocketAddr>(addr: A,
90                                             timeout: Duration) -> IoResult<TcpStream> {
91         if timeout <= Duration::milliseconds(0) {
92             return Err(standard_error(TimedOut));
93         }
94
95         super::with_addresses(addr, |addr| {
96             TcpStreamImp::connect(addr, Some(timeout.num_milliseconds() as u64))
97                 .map(TcpStream::new)
98         })
99     }
100
101     /// Returns the socket address of the remote peer of this TCP connection.
102     pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
103         self.inner.peer_name()
104     }
105
106     /// Returns the socket address of the local half of this TCP connection.
107     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
108         self.inner.socket_name()
109     }
110
111     /// Sets the nodelay flag on this connection to the boolean specified
112     #[experimental]
113     pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
114         self.inner.set_nodelay(nodelay)
115     }
116
117     /// Sets the keepalive timeout to the timeout specified.
118     ///
119     /// If the value specified is `None`, then the keepalive flag is cleared on
120     /// this connection. Otherwise, the keepalive timeout will be set to the
121     /// specified time, in seconds.
122     #[experimental]
123     pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
124         self.inner.set_keepalive(delay_in_seconds)
125     }
126
127     /// Closes the reading half of this connection.
128     ///
129     /// This method will close the reading portion of this connection, causing
130     /// all pending and future reads to immediately return with an error.
131     ///
132     /// # Example
133     ///
134     /// ```no_run
135     /// # #![allow(unused_must_use)]
136     /// use std::io::timer;
137     /// use std::io::TcpStream;
138     /// use std::time::Duration;
139     /// use std::thread::Thread;
140     ///
141     /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
142     /// let stream2 = stream.clone();
143     ///
144     /// let _t = Thread::spawn(move|| {
145     ///     // close this stream after one second
146     ///     timer::sleep(Duration::seconds(1));
147     ///     let mut stream = stream2;
148     ///     stream.close_read();
149     /// });
150     ///
151     /// // wait for some data, will get canceled after one second
152     /// let mut buf = [0];
153     /// stream.read(&mut buf);
154     /// ```
155     ///
156     /// Note that this method affects all cloned handles associated with this
157     /// stream, not just this one handle.
158     pub fn close_read(&mut self) -> IoResult<()> {
159         self.inner.close_read()
160     }
161
162     /// Closes the writing half of this connection.
163     ///
164     /// This method will close the writing portion of this connection, causing
165     /// all future writes to immediately return with an error.
166     ///
167     /// Note that this method affects all cloned handles associated with this
168     /// stream, not just this one handle.
169     pub fn close_write(&mut self) -> IoResult<()> {
170         self.inner.close_write()
171     }
172
173     /// Sets a timeout, in milliseconds, for blocking operations on this stream.
174     ///
175     /// This function will set a timeout for all blocking operations (including
176     /// reads and writes) on this stream. The timeout specified is a relative
177     /// time, in milliseconds, into the future after which point operations will
178     /// time out. This means that the timeout must be reset periodically to keep
179     /// it from expiring. Specifying a value of `None` will clear the timeout
180     /// for this stream.
181     ///
182     /// The timeout on this stream is local to this stream only. Setting a
183     /// timeout does not affect any other cloned instances of this stream, nor
184     /// does the timeout propagated to cloned handles of this stream. Setting
185     /// this timeout will override any specific read or write timeouts
186     /// previously set for this stream.
187     ///
188     /// For clarification on the semantics of interrupting a read and a write,
189     /// take a look at `set_read_timeout` and `set_write_timeout`.
190     #[experimental = "the timeout argument may change in type and value"]
191     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
192         self.inner.set_timeout(timeout_ms)
193     }
194
195     /// Sets the timeout for read operations on this stream.
196     ///
197     /// See documentation in `set_timeout` for the semantics of this read time.
198     /// This will overwrite any previous read timeout set through either this
199     /// function or `set_timeout`.
200     ///
201     /// # Errors
202     ///
203     /// When this timeout expires, if there is no pending read operation, no
204     /// action is taken. Otherwise, the read operation will be scheduled to
205     /// promptly return. If a timeout error is returned, then no data was read
206     /// during the timeout period.
207     #[experimental = "the timeout argument may change in type and value"]
208     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
209         self.inner.set_read_timeout(timeout_ms)
210     }
211
212     /// Sets the timeout for write operations on this stream.
213     ///
214     /// See documentation in `set_timeout` for the semantics of this write time.
215     /// This will overwrite any previous write timeout set through either this
216     /// function or `set_timeout`.
217     ///
218     /// # Errors
219     ///
220     /// When this timeout expires, if there is no pending write operation, no
221     /// action is taken. Otherwise, the pending write operation will be
222     /// scheduled to promptly return. The actual state of the underlying stream
223     /// is not specified.
224     ///
225     /// The write operation may return an error of type `ShortWrite` which
226     /// indicates that the object is known to have written an exact number of
227     /// bytes successfully during the timeout period, and the remaining bytes
228     /// were never written.
229     ///
230     /// If the write operation returns `TimedOut`, then it the timeout primitive
231     /// does not know how many bytes were written as part of the timeout
232     /// operation. It may be the case that bytes continue to be written in an
233     /// asynchronous fashion after the call to write returns.
234     #[experimental = "the timeout argument may change in type and value"]
235     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
236         self.inner.set_write_timeout(timeout_ms)
237     }
238 }
239
240 impl Clone for TcpStream {
241     /// Creates a new handle to this TCP stream, allowing for simultaneous reads
242     /// and writes of this connection.
243     ///
244     /// The underlying TCP stream will not be closed until all handles to the
245     /// stream have been deallocated. All handles will also follow the same
246     /// stream, but two concurrent reads will not receive the same data.
247     /// Instead, the first read will receive the first packet received, and the
248     /// second read will receive the second packet.
249     fn clone(&self) -> TcpStream {
250         TcpStream { inner: self.inner.clone() }
251     }
252 }
253
254 impl Reader for TcpStream {
255     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
256         self.inner.read(buf)
257     }
258 }
259
260 impl Writer for TcpStream {
261     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
262         self.inner.write(buf)
263     }
264 }
265
266 impl sys_common::AsInner<TcpStreamImp> for TcpStream {
267     fn as_inner(&self) -> &TcpStreamImp {
268         &self.inner
269     }
270 }
271
272 /// A structure representing a socket server. This listener is used to create a
273 /// `TcpAcceptor` which can be used to accept sockets on a local port.
274 ///
275 /// # Example
276 ///
277 /// ```rust
278 /// # fn main() { }
279 /// # fn foo() {
280 /// # #![allow(dead_code)]
281 /// use std::io::{TcpListener, TcpStream};
282 /// use std::io::{Acceptor, Listener};
283 /// use std::thread::Thread;
284 ///
285 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
286 ///
287 /// // bind the listener to the specified address
288 /// let mut acceptor = listener.listen().unwrap();
289 ///
290 /// fn handle_client(mut stream: TcpStream) {
291 ///     // ...
292 /// # &mut stream; // silence unused mutability/variable warning
293 /// }
294 /// // accept connections and process them, spawning a new tasks for each one
295 /// for stream in acceptor.incoming() {
296 ///     match stream {
297 ///         Err(e) => { /* connection failed */ }
298 ///         Ok(stream) => {
299 ///             Thread::spawn(move|| {
300 ///                 // connection succeeded
301 ///                 handle_client(stream)
302 ///             });
303 ///         }
304 ///     }
305 /// }
306 ///
307 /// // close the socket server
308 /// drop(acceptor);
309 /// # }
310 /// ```
311 pub struct TcpListener {
312     inner: TcpListenerImp,
313 }
314
315 impl TcpListener {
316     /// Creates a new `TcpListener` which will be bound to the specified address.
317     /// This listener is not ready for accepting connections, `listen` must be called
318     /// on it before that's possible.
319     ///
320     /// Binding with a port number of 0 will request that the OS assigns a port
321     /// to this listener. The port allocated can be queried via the
322     /// `socket_name` function.
323     ///
324     /// The address type can be any implementer of `ToSocketAddr` trait. See its
325     /// documentation for concrete examples.
326     pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<TcpListener> {
327         super::with_addresses(addr, |addr| {
328             TcpListenerImp::bind(addr).map(|inner| TcpListener { inner: inner })
329         })
330     }
331
332     /// Returns the local socket address of this listener.
333     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
334         self.inner.socket_name()
335     }
336 }
337
338 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
339     fn listen(self) -> IoResult<TcpAcceptor> {
340         self.inner.listen(128).map(|a| TcpAcceptor { inner: a })
341     }
342 }
343
344 impl sys_common::AsInner<TcpListenerImp> for TcpListener {
345     fn as_inner(&self) -> &TcpListenerImp {
346         &self.inner
347     }
348 }
349
350 /// The accepting half of a TCP socket server. This structure is created through
351 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
352 /// `TcpStream` instances.
353 pub struct TcpAcceptor {
354     inner: TcpAcceptorImp,
355 }
356
357 impl TcpAcceptor {
358     /// Prevents blocking on all future accepts after `ms` milliseconds have
359     /// elapsed.
360     ///
361     /// This function is used to set a deadline after which this acceptor will
362     /// time out accepting any connections. The argument is the relative
363     /// distance, in milliseconds, to a point in the future after which all
364     /// accepts will fail.
365     ///
366     /// If the argument specified is `None`, then any previously registered
367     /// timeout is cleared.
368     ///
369     /// A timeout of `0` can be used to "poll" this acceptor to see if it has
370     /// any pending connections. All pending connections will be accepted,
371     /// regardless of whether the timeout has expired or not (the accept will
372     /// not block in this case).
373     ///
374     /// # Example
375     ///
376     /// ```no_run
377     /// # #![allow(experimental)]
378     /// use std::io::TcpListener;
379     /// use std::io::{Listener, Acceptor, TimedOut};
380     ///
381     /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
382     ///
383     /// // After 100ms have passed, all accepts will fail
384     /// a.set_timeout(Some(100));
385     ///
386     /// match a.accept() {
387     ///     Ok(..) => println!("accepted a socket"),
388     ///     Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
389     ///     Err(e) => println!("err: {}", e),
390     /// }
391     ///
392     /// // Reset the timeout and try again
393     /// a.set_timeout(Some(100));
394     /// let socket = a.accept();
395     ///
396     /// // Clear the timeout and block indefinitely waiting for a connection
397     /// a.set_timeout(None);
398     /// let socket = a.accept();
399     /// ```
400     #[experimental = "the type of the argument and name of this function are \
401                       subject to change"]
402     pub fn set_timeout(&mut self, ms: Option<u64>) { self.inner.set_timeout(ms); }
403
404     /// Closes the accepting capabilities of this acceptor.
405     ///
406     /// This function is similar to `TcpStream`'s `close_{read,write}` methods
407     /// in that it will affect *all* cloned handles of this acceptor's original
408     /// handle.
409     ///
410     /// Once this function succeeds, all future calls to `accept` will return
411     /// immediately with an error, preventing all future calls to accept. The
412     /// underlying socket will not be relinquished back to the OS until all
413     /// acceptors have been deallocated.
414     ///
415     /// This is useful for waking up a thread in an accept loop to indicate that
416     /// it should exit.
417     ///
418     /// # Example
419     ///
420     /// ```
421     /// # #![allow(experimental)]
422     /// use std::io::{TcpListener, Listener, Acceptor, EndOfFile};
423     /// use std::thread::Thread;
424     ///
425     /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
426     /// let a2 = a.clone();
427     ///
428     /// let _t = Thread::spawn(move|| {
429     ///     let mut a2 = a2;
430     ///     for socket in a2.incoming() {
431     ///         match socket {
432     ///             Ok(s) => { /* handle s */ }
433     ///             Err(ref e) if e.kind == EndOfFile => break, // closed
434     ///             Err(e) => panic!("unexpected error: {}", e),
435     ///         }
436     ///     }
437     /// });
438     ///
439     /// # fn wait_for_sigint() {}
440     /// // Now that our accept loop is running, wait for the program to be
441     /// // requested to exit.
442     /// wait_for_sigint();
443     ///
444     /// // Signal our accept loop to exit
445     /// assert!(a.close_accept().is_ok());
446     /// ```
447     #[experimental]
448     pub fn close_accept(&mut self) -> IoResult<()> {
449         self.inner.close_accept()
450     }
451 }
452
453 impl Acceptor<TcpStream> for TcpAcceptor {
454     fn accept(&mut self) -> IoResult<TcpStream> {
455         self.inner.accept().map(TcpStream::new)
456     }
457 }
458
459 impl Clone for TcpAcceptor {
460     /// Creates a new handle to this TCP acceptor, allowing for simultaneous
461     /// accepts.
462     ///
463     /// The underlying TCP acceptor will not be closed until all handles to the
464     /// acceptor have been deallocated. Incoming connections will be received on
465     /// at most once acceptor, the same connection will not be accepted twice.
466     ///
467     /// The `close_accept` method will shut down *all* acceptors cloned from the
468     /// same original acceptor, whereas the `set_timeout` method only affects
469     /// the selector that it is called on.
470     ///
471     /// This function is useful for creating a handle to invoke `close_accept`
472     /// on to wake up any other task blocked in `accept`.
473     fn clone(&self) -> TcpAcceptor {
474         TcpAcceptor { inner: self.inner.clone() }
475     }
476 }
477
478 impl sys_common::AsInner<TcpAcceptorImp> for TcpAcceptor {
479     fn as_inner(&self) -> &TcpAcceptorImp {
480         &self.inner
481     }
482 }
483
484 #[cfg(test)]
485 #[allow(experimental)]
486 mod test {
487     use prelude::v1::*;
488
489     use sync::mpsc::channel;
490     use thread::Thread;
491     use io::net::tcp::*;
492     use io::net::ip::*;
493     use io::test::*;
494     use io::{EndOfFile, TimedOut, ShortWrite, IoError};
495     use io::{ConnectionRefused, BrokenPipe, ConnectionAborted};
496     use io::{ConnectionReset, NotConnected, PermissionDenied, OtherIoError};
497     use io::{Acceptor, Listener};
498
499     // FIXME #11530 this fails on android because tests are run as root
500     #[cfg_attr(any(windows, target_os = "android"), ignore)]
501     #[test]
502     fn bind_error() {
503         match TcpListener::bind("0.0.0.0:1") {
504             Ok(..) => panic!(),
505             Err(e) => assert_eq!(e.kind, PermissionDenied),
506         }
507     }
508
509     #[test]
510     fn connect_error() {
511         match TcpStream::connect("0.0.0.0:1") {
512             Ok(..) => panic!(),
513             Err(e) => assert_eq!(e.kind, ConnectionRefused),
514         }
515     }
516
517     #[test]
518     fn listen_ip4_localhost() {
519         let socket_addr = next_test_ip4();
520         let listener = TcpListener::bind(socket_addr);
521         let mut acceptor = listener.listen();
522
523         let _t = Thread::spawn(move|| {
524             let mut stream = TcpStream::connect(("localhost", socket_addr.port));
525             stream.write(&[144]).unwrap();
526         });
527
528         let mut stream = acceptor.accept();
529         let mut buf = [0];
530         stream.read(&mut buf).unwrap();
531         assert!(buf[0] == 144);
532     }
533
534     #[test]
535     fn connect_localhost() {
536         let addr = next_test_ip4();
537         let mut acceptor = TcpListener::bind(addr).listen();
538
539         let _t = Thread::spawn(move|| {
540             let mut stream = TcpStream::connect(("localhost", addr.port));
541             stream.write(&[64]).unwrap();
542         });
543
544         let mut stream = acceptor.accept();
545         let mut buf = [0];
546         stream.read(&mut buf).unwrap();
547         assert!(buf[0] == 64);
548     }
549
550     #[test]
551     fn connect_ip4_loopback() {
552         let addr = next_test_ip4();
553         let mut acceptor = TcpListener::bind(addr).listen();
554
555         let _t = Thread::spawn(move|| {
556             let mut stream = TcpStream::connect(("127.0.0.1", addr.port));
557             stream.write(&[44]).unwrap();
558         });
559
560         let mut stream = acceptor.accept();
561         let mut buf = [0];
562         stream.read(&mut buf).unwrap();
563         assert!(buf[0] == 44);
564     }
565
566     #[test]
567     fn connect_ip6_loopback() {
568         let addr = next_test_ip6();
569         let mut acceptor = TcpListener::bind(addr).listen();
570
571         let _t = Thread::spawn(move|| {
572             let mut stream = TcpStream::connect(("::1", addr.port));
573             stream.write(&[66]).unwrap();
574         });
575
576         let mut stream = acceptor.accept();
577         let mut buf = [0];
578         stream.read(&mut buf).unwrap();
579         assert!(buf[0] == 66);
580     }
581
582     #[test]
583     fn smoke_test_ip4() {
584         let addr = next_test_ip4();
585         let mut acceptor = TcpListener::bind(addr).listen();
586
587         let _t = Thread::spawn(move|| {
588             let mut stream = TcpStream::connect(addr);
589             stream.write(&[99]).unwrap();
590         });
591
592         let mut stream = acceptor.accept();
593         let mut buf = [0];
594         stream.read(&mut buf).unwrap();
595         assert!(buf[0] == 99);
596     }
597
598     #[test]
599     fn smoke_test_ip6() {
600         let addr = next_test_ip6();
601         let mut acceptor = TcpListener::bind(addr).listen();
602
603         let _t = Thread::spawn(move|| {
604             let mut stream = TcpStream::connect(addr);
605             stream.write(&[99]).unwrap();
606         });
607
608         let mut stream = acceptor.accept();
609         let mut buf = [0];
610         stream.read(&mut buf).unwrap();
611         assert!(buf[0] == 99);
612     }
613
614     #[test]
615     fn read_eof_ip4() {
616         let addr = next_test_ip4();
617         let mut acceptor = TcpListener::bind(addr).listen();
618
619         let _t = Thread::spawn(move|| {
620             let _stream = TcpStream::connect(addr);
621             // Close
622         });
623
624         let mut stream = acceptor.accept();
625         let mut buf = [0];
626         let nread = stream.read(&mut buf);
627         assert!(nread.is_err());
628     }
629
630     #[test]
631     fn read_eof_ip6() {
632         let addr = next_test_ip6();
633         let mut acceptor = TcpListener::bind(addr).listen();
634
635         let _t = Thread::spawn(move|| {
636             let _stream = TcpStream::connect(addr);
637             // Close
638         });
639
640         let mut stream = acceptor.accept();
641         let mut buf = [0];
642         let nread = stream.read(&mut buf);
643         assert!(nread.is_err());
644     }
645
646     #[test]
647     fn read_eof_twice_ip4() {
648         let addr = next_test_ip4();
649         let mut acceptor = TcpListener::bind(addr).listen();
650
651         let _t = Thread::spawn(move|| {
652             let _stream = TcpStream::connect(addr);
653             // Close
654         });
655
656         let mut stream = acceptor.accept();
657         let mut buf = [0];
658         let nread = stream.read(&mut buf);
659         assert!(nread.is_err());
660
661         match stream.read(&mut buf) {
662             Ok(..) => panic!(),
663             Err(ref e) => {
664                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
665                         "unknown kind: {:?}", e.kind);
666             }
667         }
668     }
669
670     #[test]
671     fn read_eof_twice_ip6() {
672         let addr = next_test_ip6();
673         let mut acceptor = TcpListener::bind(addr).listen();
674
675         let _t = Thread::spawn(move|| {
676             let _stream = TcpStream::connect(addr);
677             // Close
678         });
679
680         let mut stream = acceptor.accept();
681         let mut buf = [0];
682         let nread = stream.read(&mut buf);
683         assert!(nread.is_err());
684
685         match stream.read(&mut buf) {
686             Ok(..) => panic!(),
687             Err(ref e) => {
688                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
689                         "unknown kind: {:?}", e.kind);
690             }
691         }
692     }
693
694     #[test]
695     fn write_close_ip4() {
696         let addr = next_test_ip4();
697         let mut acceptor = TcpListener::bind(addr).listen();
698
699         let (tx, rx) = channel();
700         let _t = Thread::spawn(move|| {
701             drop(TcpStream::connect(addr));
702             tx.send(()).unwrap();
703         });
704
705         let mut stream = acceptor.accept();
706         rx.recv().unwrap();
707         let buf = [0];
708         match stream.write(&buf) {
709             Ok(..) => {}
710             Err(e) => {
711                 assert!(e.kind == ConnectionReset ||
712                         e.kind == BrokenPipe ||
713                         e.kind == ConnectionAborted,
714                         "unknown error: {}", e);
715             }
716         }
717     }
718
719     #[test]
720     fn write_close_ip6() {
721         let addr = next_test_ip6();
722         let mut acceptor = TcpListener::bind(addr).listen();
723
724         let (tx, rx) = channel();
725         let _t = Thread::spawn(move|| {
726             drop(TcpStream::connect(addr));
727             tx.send(()).unwrap();
728         });
729
730         let mut stream = acceptor.accept();
731         rx.recv().unwrap();
732         let buf = [0];
733         match stream.write(&buf) {
734             Ok(..) => {}
735             Err(e) => {
736                 assert!(e.kind == ConnectionReset ||
737                         e.kind == BrokenPipe ||
738                         e.kind == ConnectionAborted,
739                         "unknown error: {}", e);
740             }
741         }
742     }
743
744     #[test]
745     fn multiple_connect_serial_ip4() {
746         let addr = next_test_ip4();
747         let max = 10u;
748         let mut acceptor = TcpListener::bind(addr).listen();
749
750         let _t = Thread::spawn(move|| {
751             for _ in range(0, max) {
752                 let mut stream = TcpStream::connect(addr);
753                 stream.write(&[99]).unwrap();
754             }
755         });
756
757         for ref mut stream in acceptor.incoming().take(max) {
758             let mut buf = [0];
759             stream.read(&mut buf).unwrap();
760             assert_eq!(buf[0], 99);
761         }
762     }
763
764     #[test]
765     fn multiple_connect_serial_ip6() {
766         let addr = next_test_ip6();
767         let max = 10u;
768         let mut acceptor = TcpListener::bind(addr).listen();
769
770         let _t = Thread::spawn(move|| {
771             for _ in range(0, max) {
772                 let mut stream = TcpStream::connect(addr);
773                 stream.write(&[99]).unwrap();
774             }
775         });
776
777         for ref mut stream in acceptor.incoming().take(max) {
778             let mut buf = [0];
779             stream.read(&mut buf).unwrap();
780             assert_eq!(buf[0], 99);
781         }
782     }
783
784     #[test]
785     fn multiple_connect_interleaved_greedy_schedule_ip4() {
786         let addr = next_test_ip4();
787         static MAX: int = 10;
788         let acceptor = TcpListener::bind(addr).listen();
789
790         let _t = Thread::spawn(move|| {
791             let mut acceptor = acceptor;
792             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
793                 // Start another task to handle the connection
794                 let _t = Thread::spawn(move|| {
795                     let mut stream = stream;
796                     let mut buf = [0];
797                     stream.read(&mut buf).unwrap();
798                     assert!(buf[0] == i as u8);
799                     debug!("read");
800                 });
801             }
802         });
803
804         connect(0, addr);
805
806         fn connect(i: int, addr: SocketAddr) {
807             if i == MAX { return }
808
809             let _t = Thread::spawn(move|| {
810                 debug!("connecting");
811                 let mut stream = TcpStream::connect(addr);
812                 // Connect again before writing
813                 connect(i + 1, addr);
814                 debug!("writing");
815                 stream.write(&[i as u8]).unwrap();
816             });
817         }
818     }
819
820     #[test]
821     fn multiple_connect_interleaved_greedy_schedule_ip6() {
822         let addr = next_test_ip6();
823         static MAX: int = 10;
824         let acceptor = TcpListener::bind(addr).listen();
825
826         let _t = Thread::spawn(move|| {
827             let mut acceptor = acceptor;
828             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
829                 // Start another task to handle the connection
830                 let _t = Thread::spawn(move|| {
831                     let mut stream = stream;
832                     let mut buf = [0];
833                     stream.read(&mut buf).unwrap();
834                     assert!(buf[0] == i as u8);
835                     debug!("read");
836                 });
837             }
838         });
839
840         connect(0, addr);
841
842         fn connect(i: int, addr: SocketAddr) {
843             if i == MAX { return }
844
845             let _t = Thread::spawn(move|| {
846                 debug!("connecting");
847                 let mut stream = TcpStream::connect(addr);
848                 // Connect again before writing
849                 connect(i + 1, addr);
850                 debug!("writing");
851                 stream.write(&[i as u8]).unwrap();
852             });
853         }
854     }
855
856     #[test]
857     fn multiple_connect_interleaved_lazy_schedule_ip4() {
858         static MAX: int = 10;
859         let addr = next_test_ip4();
860         let acceptor = TcpListener::bind(addr).listen();
861
862         let _t = Thread::spawn(move|| {
863             let mut acceptor = acceptor;
864             for stream in acceptor.incoming().take(MAX as uint) {
865                 // Start another task to handle the connection
866                 let _t = Thread::spawn(move|| {
867                     let mut stream = stream;
868                     let mut buf = [0];
869                     stream.read(&mut buf).unwrap();
870                     assert!(buf[0] == 99);
871                     debug!("read");
872                 });
873             }
874         });
875
876         connect(0, addr);
877
878         fn connect(i: int, addr: SocketAddr) {
879             if i == MAX { return }
880
881             let _t = Thread::spawn(move|| {
882                 debug!("connecting");
883                 let mut stream = TcpStream::connect(addr);
884                 // Connect again before writing
885                 connect(i + 1, addr);
886                 debug!("writing");
887                 stream.write(&[99]).unwrap();
888             });
889         }
890     }
891
892     #[test]
893     fn multiple_connect_interleaved_lazy_schedule_ip6() {
894         static MAX: int = 10;
895         let addr = next_test_ip6();
896         let acceptor = TcpListener::bind(addr).listen();
897
898         let _t = Thread::spawn(move|| {
899             let mut acceptor = acceptor;
900             for stream in acceptor.incoming().take(MAX as uint) {
901                 // Start another task to handle the connection
902                 let _t = Thread::spawn(move|| {
903                     let mut stream = stream;
904                     let mut buf = [0];
905                     stream.read(&mut buf).unwrap();
906                     assert!(buf[0] == 99);
907                     debug!("read");
908                 });
909             }
910         });
911
912         connect(0, addr);
913
914         fn connect(i: int, addr: SocketAddr) {
915             if i == MAX { return }
916
917             let _t = Thread::spawn(move|| {
918                 debug!("connecting");
919                 let mut stream = TcpStream::connect(addr);
920                 // Connect again before writing
921                 connect(i + 1, addr);
922                 debug!("writing");
923                 stream.write(&[99]).unwrap();
924             });
925         }
926     }
927
928     pub fn socket_name(addr: SocketAddr) {
929         let mut listener = TcpListener::bind(addr).unwrap();
930
931         // Make sure socket_name gives
932         // us the socket we binded to.
933         let so_name = listener.socket_name();
934         assert!(so_name.is_ok());
935         assert_eq!(addr, so_name.unwrap());
936     }
937
938     pub fn peer_name(addr: SocketAddr) {
939         let acceptor = TcpListener::bind(addr).listen();
940         let _t = Thread::spawn(move|| {
941             let mut acceptor = acceptor;
942             acceptor.accept().unwrap();
943         });
944
945         let stream = TcpStream::connect(addr);
946
947         assert!(stream.is_ok());
948         let mut stream = stream.unwrap();
949
950         // Make sure peer_name gives us the
951         // address/port of the peer we've
952         // connected to.
953         let peer_name = stream.peer_name();
954         assert!(peer_name.is_ok());
955         assert_eq!(addr, peer_name.unwrap());
956     }
957
958     #[test]
959     fn socket_and_peer_name_ip4() {
960         peer_name(next_test_ip4());
961         socket_name(next_test_ip4());
962     }
963
964     #[test]
965     fn socket_and_peer_name_ip6() {
966         // FIXME: peer name is not consistent
967         //peer_name(next_test_ip6());
968         socket_name(next_test_ip6());
969     }
970
971     #[test]
972     fn partial_read() {
973         let addr = next_test_ip4();
974         let (tx, rx) = channel();
975         let _t = Thread::spawn(move|| {
976             let mut srv = TcpListener::bind(addr).listen().unwrap();
977             tx.send(()).unwrap();
978             let mut cl = srv.accept().unwrap();
979             cl.write(&[10]).unwrap();
980             let mut b = [0];
981             cl.read(&mut b).unwrap();
982             tx.send(()).unwrap();
983         });
984
985         rx.recv().unwrap();
986         let mut c = TcpStream::connect(addr).unwrap();
987         let mut b = [0; 10];
988         assert_eq!(c.read(&mut b), Ok(1));
989         c.write(&[1]).unwrap();
990         rx.recv().unwrap();
991     }
992
993     #[test]
994     fn double_bind() {
995         let addr = next_test_ip4();
996         let listener = TcpListener::bind(addr).unwrap().listen();
997         assert!(listener.is_ok());
998         match TcpListener::bind(addr).listen() {
999             Ok(..) => panic!(),
1000             Err(e) => {
1001                 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
1002                         "unknown error: {} {:?}", e, e.kind);
1003             }
1004         }
1005     }
1006
1007     #[test]
1008     fn fast_rebind() {
1009         let addr = next_test_ip4();
1010         let (tx, rx) = channel();
1011
1012         let _t = Thread::spawn(move|| {
1013             rx.recv().unwrap();
1014             let _stream = TcpStream::connect(addr).unwrap();
1015             // Close
1016             rx.recv().unwrap();
1017         });
1018
1019         {
1020             let mut acceptor = TcpListener::bind(addr).listen();
1021             tx.send(()).unwrap();
1022             {
1023                 let _stream = acceptor.accept().unwrap();
1024                 // Close client
1025                 tx.send(()).unwrap();
1026             }
1027             // Close listener
1028         }
1029         let _listener = TcpListener::bind(addr);
1030     }
1031
1032     #[test]
1033     fn tcp_clone_smoke() {
1034         let addr = next_test_ip4();
1035         let mut acceptor = TcpListener::bind(addr).listen();
1036
1037         let _t = Thread::spawn(move|| {
1038             let mut s = TcpStream::connect(addr);
1039             let mut buf = [0, 0];
1040             assert_eq!(s.read(&mut buf), Ok(1));
1041             assert_eq!(buf[0], 1);
1042             s.write(&[2]).unwrap();
1043         });
1044
1045         let mut s1 = acceptor.accept().unwrap();
1046         let s2 = s1.clone();
1047
1048         let (tx1, rx1) = channel();
1049         let (tx2, rx2) = channel();
1050         let _t = Thread::spawn(move|| {
1051             let mut s2 = s2;
1052             rx1.recv().unwrap();
1053             s2.write(&[1]).unwrap();
1054             tx2.send(()).unwrap();
1055         });
1056         tx1.send(()).unwrap();
1057         let mut buf = [0, 0];
1058         assert_eq!(s1.read(&mut buf), Ok(1));
1059         rx2.recv().unwrap();
1060     }
1061
1062     #[test]
1063     fn tcp_clone_two_read() {
1064         let addr = next_test_ip6();
1065         let mut acceptor = TcpListener::bind(addr).listen();
1066         let (tx1, rx) = channel();
1067         let tx2 = tx1.clone();
1068
1069         let _t = Thread::spawn(move|| {
1070             let mut s = TcpStream::connect(addr);
1071             s.write(&[1]).unwrap();
1072             rx.recv().unwrap();
1073             s.write(&[2]).unwrap();
1074             rx.recv().unwrap();
1075         });
1076
1077         let mut s1 = acceptor.accept().unwrap();
1078         let s2 = s1.clone();
1079
1080         let (done, rx) = channel();
1081         let _t = Thread::spawn(move|| {
1082             let mut s2 = s2;
1083             let mut buf = [0, 0];
1084             s2.read(&mut buf).unwrap();
1085             tx2.send(()).unwrap();
1086             done.send(()).unwrap();
1087         });
1088         let mut buf = [0, 0];
1089         s1.read(&mut buf).unwrap();
1090         tx1.send(()).unwrap();
1091
1092         rx.recv().unwrap();
1093     }
1094
1095     #[test]
1096     fn tcp_clone_two_write() {
1097         let addr = next_test_ip4();
1098         let mut acceptor = TcpListener::bind(addr).listen();
1099
1100         let _t = Thread::spawn(move|| {
1101             let mut s = TcpStream::connect(addr);
1102             let mut buf = [0, 1];
1103             s.read(&mut buf).unwrap();
1104             s.read(&mut buf).unwrap();
1105         });
1106
1107         let mut s1 = acceptor.accept().unwrap();
1108         let s2 = s1.clone();
1109
1110         let (done, rx) = channel();
1111         let _t = Thread::spawn(move|| {
1112             let mut s2 = s2;
1113             s2.write(&[1]).unwrap();
1114             done.send(()).unwrap();
1115         });
1116         s1.write(&[2]).unwrap();
1117
1118         rx.recv().unwrap();
1119     }
1120
1121     #[test]
1122     fn shutdown_smoke() {
1123         let addr = next_test_ip4();
1124         let a = TcpListener::bind(addr).unwrap().listen();
1125         let _t = Thread::spawn(move|| {
1126             let mut a = a;
1127             let mut c = a.accept().unwrap();
1128             assert_eq!(c.read_to_end(), Ok(vec!()));
1129             c.write(&[1]).unwrap();
1130         });
1131
1132         let mut s = TcpStream::connect(addr).unwrap();
1133         assert!(s.inner.close_write().is_ok());
1134         assert!(s.write(&[1]).is_err());
1135         assert_eq!(s.read_to_end(), Ok(vec!(1)));
1136     }
1137
1138     #[test]
1139     fn accept_timeout() {
1140         let addr = next_test_ip4();
1141         let mut a = TcpListener::bind(addr).unwrap().listen().unwrap();
1142
1143         a.set_timeout(Some(10));
1144
1145         // Make sure we time out once and future invocations also time out
1146         let err = a.accept().err().unwrap();
1147         assert_eq!(err.kind, TimedOut);
1148         let err = a.accept().err().unwrap();
1149         assert_eq!(err.kind, TimedOut);
1150
1151         // Also make sure that even though the timeout is expired that we will
1152         // continue to receive any pending connections.
1153         //
1154         // FIXME: freebsd apparently never sees the pending connection, but
1155         //        testing manually always works. Need to investigate this
1156         //        flakiness.
1157         if !cfg!(target_os = "freebsd") {
1158             let (tx, rx) = channel();
1159             let _t = Thread::spawn(move|| {
1160                 tx.send(TcpStream::connect(addr).unwrap()).unwrap();
1161             });
1162             let _l = rx.recv().unwrap();
1163             for i in range(0i, 1001) {
1164                 match a.accept() {
1165                     Ok(..) => break,
1166                     Err(ref e) if e.kind == TimedOut => {}
1167                     Err(e) => panic!("error: {}", e),
1168                 }
1169                 ::thread::Thread::yield_now();
1170                 if i == 1000 { panic!("should have a pending connection") }
1171             }
1172         }
1173
1174         // Unset the timeout and make sure that this always blocks.
1175         a.set_timeout(None);
1176         let _t = Thread::spawn(move|| {
1177             drop(TcpStream::connect(addr).unwrap());
1178         });
1179         a.accept().unwrap();
1180     }
1181
1182     #[test]
1183     fn close_readwrite_smoke() {
1184         let addr = next_test_ip4();
1185         let a = TcpListener::bind(addr).listen().unwrap();
1186         let (_tx, rx) = channel::<()>();
1187         Thread::spawn(move|| {
1188             let mut a = a;
1189             let _s = a.accept().unwrap();
1190             let _ = rx.recv().unwrap();
1191         });
1192
1193         let mut b = [0];
1194         let mut s = TcpStream::connect(addr).unwrap();
1195         let mut s2 = s.clone();
1196
1197         // closing should prevent reads/writes
1198         s.close_write().unwrap();
1199         assert!(s.write(&[0]).is_err());
1200         s.close_read().unwrap();
1201         assert!(s.read(&mut b).is_err());
1202
1203         // closing should affect previous handles
1204         assert!(s2.write(&[0]).is_err());
1205         assert!(s2.read(&mut b).is_err());
1206
1207         // closing should affect new handles
1208         let mut s3 = s.clone();
1209         assert!(s3.write(&[0]).is_err());
1210         assert!(s3.read(&mut b).is_err());
1211
1212         // make sure these don't die
1213         let _ = s2.close_read();
1214         let _ = s2.close_write();
1215         let _ = s3.close_read();
1216         let _ = s3.close_write();
1217     }
1218
1219     #[test]
1220     fn close_read_wakes_up() {
1221         let addr = next_test_ip4();
1222         let a = TcpListener::bind(addr).listen().unwrap();
1223         let (_tx, rx) = channel::<()>();
1224         Thread::spawn(move|| {
1225             let mut a = a;
1226             let _s = a.accept().unwrap();
1227             let _ = rx.recv().unwrap();
1228         });
1229
1230         let mut s = TcpStream::connect(addr).unwrap();
1231         let s2 = s.clone();
1232         let (tx, rx) = channel();
1233         let _t = Thread::spawn(move|| {
1234             let mut s2 = s2;
1235             assert!(s2.read(&mut [0]).is_err());
1236             tx.send(()).unwrap();
1237         });
1238         // this should wake up the child task
1239         s.close_read().unwrap();
1240
1241         // this test will never finish if the child doesn't wake up
1242         rx.recv().unwrap();
1243     }
1244
1245     #[test]
1246     fn readwrite_timeouts() {
1247         let addr = next_test_ip6();
1248         let mut a = TcpListener::bind(addr).listen().unwrap();
1249         let (tx, rx) = channel::<()>();
1250         Thread::spawn(move|| {
1251             let mut s = TcpStream::connect(addr).unwrap();
1252             rx.recv().unwrap();
1253             assert!(s.write(&[0]).is_ok());
1254             let _ = rx.recv();
1255         });
1256
1257         let mut s = a.accept().unwrap();
1258         s.set_timeout(Some(20));
1259         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1260         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1261
1262         s.set_timeout(Some(20));
1263         for i in range(0i, 1001) {
1264             match s.write(&[0; 128 * 1024]) {
1265                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1266                 Err(IoError { kind: TimedOut, .. }) => break,
1267                 Err(e) => panic!("{}", e),
1268            }
1269            if i == 1000 { panic!("should have filled up?!"); }
1270         }
1271         assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
1272
1273         tx.send(()).unwrap();
1274         s.set_timeout(None);
1275         assert_eq!(s.read(&mut [0, 0]), Ok(1));
1276     }
1277
1278     #[test]
1279     fn read_timeouts() {
1280         let addr = next_test_ip6();
1281         let mut a = TcpListener::bind(addr).listen().unwrap();
1282         let (tx, rx) = channel::<()>();
1283         Thread::spawn(move|| {
1284             let mut s = TcpStream::connect(addr).unwrap();
1285             rx.recv().unwrap();
1286             let mut amt = 0;
1287             while amt < 100 * 128 * 1024 {
1288                 match s.read(&mut [0;128 * 1024]) {
1289                     Ok(n) => { amt += n; }
1290                     Err(e) => panic!("{}", e),
1291                 }
1292             }
1293             let _ = rx.recv();
1294         });
1295
1296         let mut s = a.accept().unwrap();
1297         s.set_read_timeout(Some(20));
1298         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1299         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1300
1301         tx.send(()).unwrap();
1302         for _ in range(0i, 100) {
1303             assert!(s.write(&[0;128 * 1024]).is_ok());
1304         }
1305     }
1306
1307     #[test]
1308     fn write_timeouts() {
1309         let addr = next_test_ip6();
1310         let mut a = TcpListener::bind(addr).listen().unwrap();
1311         let (tx, rx) = channel::<()>();
1312         Thread::spawn(move|| {
1313             let mut s = TcpStream::connect(addr).unwrap();
1314             rx.recv().unwrap();
1315             assert!(s.write(&[0]).is_ok());
1316             let _ = rx.recv();
1317         });
1318
1319         let mut s = a.accept().unwrap();
1320         s.set_write_timeout(Some(20));
1321         for i in range(0i, 1001) {
1322             match s.write(&[0; 128 * 1024]) {
1323                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1324                 Err(IoError { kind: TimedOut, .. }) => break,
1325                 Err(e) => panic!("{}", e),
1326            }
1327            if i == 1000 { panic!("should have filled up?!"); }
1328         }
1329         assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
1330
1331         tx.send(()).unwrap();
1332         assert!(s.read(&mut [0]).is_ok());
1333     }
1334
1335     #[test]
1336     fn timeout_concurrent_read() {
1337         let addr = next_test_ip6();
1338         let mut a = TcpListener::bind(addr).listen().unwrap();
1339         let (tx, rx) = channel::<()>();
1340         Thread::spawn(move|| {
1341             let mut s = TcpStream::connect(addr).unwrap();
1342             rx.recv().unwrap();
1343             assert_eq!(s.write(&[0]), Ok(()));
1344             let _ = rx.recv();
1345         });
1346
1347         let mut s = a.accept().unwrap();
1348         let s2 = s.clone();
1349         let (tx2, rx2) = channel();
1350         let _t = Thread::spawn(move|| {
1351             let mut s2 = s2;
1352             assert_eq!(s2.read(&mut [0]), Ok(1));
1353             tx2.send(()).unwrap();
1354         });
1355
1356         s.set_read_timeout(Some(20));
1357         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1358         tx.send(()).unwrap();
1359
1360         rx2.recv().unwrap();
1361     }
1362
1363     #[test]
1364     fn clone_while_reading() {
1365         let addr = next_test_ip6();
1366         let listen = TcpListener::bind(addr);
1367         let mut accept = listen.listen().unwrap();
1368
1369         // Enqueue a task to write to a socket
1370         let (tx, rx) = channel();
1371         let (txdone, rxdone) = channel();
1372         let txdone2 = txdone.clone();
1373         let _t = Thread::spawn(move|| {
1374             let mut tcp = TcpStream::connect(addr).unwrap();
1375             rx.recv().unwrap();
1376             tcp.write_u8(0).unwrap();
1377             txdone2.send(()).unwrap();
1378         });
1379
1380         // Spawn off a reading clone
1381         let tcp = accept.accept().unwrap();
1382         let tcp2 = tcp.clone();
1383         let txdone3 = txdone.clone();
1384         let _t = Thread::spawn(move|| {
1385             let mut tcp2 = tcp2;
1386             tcp2.read_u8().unwrap();
1387             txdone3.send(()).unwrap();
1388         });
1389
1390         // Try to ensure that the reading clone is indeed reading
1391         for _ in range(0i, 50) {
1392             ::thread::Thread::yield_now();
1393         }
1394
1395         // clone the handle again while it's reading, then let it finish the
1396         // read.
1397         let _ = tcp.clone();
1398         tx.send(()).unwrap();
1399         rxdone.recv().unwrap();
1400         rxdone.recv().unwrap();
1401     }
1402
1403     #[test]
1404     fn clone_accept_smoke() {
1405         let addr = next_test_ip4();
1406         let l = TcpListener::bind(addr);
1407         let mut a = l.listen().unwrap();
1408         let mut a2 = a.clone();
1409
1410         let _t = Thread::spawn(move|| {
1411             let _ = TcpStream::connect(addr);
1412         });
1413         let _t = Thread::spawn(move|| {
1414             let _ = TcpStream::connect(addr);
1415         });
1416
1417         assert!(a.accept().is_ok());
1418         assert!(a2.accept().is_ok());
1419     }
1420
1421     #[test]
1422     fn clone_accept_concurrent() {
1423         let addr = next_test_ip4();
1424         let l = TcpListener::bind(addr);
1425         let a = l.listen().unwrap();
1426         let a2 = a.clone();
1427
1428         let (tx, rx) = channel();
1429         let tx2 = tx.clone();
1430
1431         let _t = Thread::spawn(move|| {
1432             let mut a = a;
1433             tx.send(a.accept()).unwrap();
1434         });
1435         let _t = Thread::spawn(move|| {
1436             let mut a = a2;
1437             tx2.send(a.accept()).unwrap();
1438         });
1439
1440         let _t = Thread::spawn(move|| {
1441             let _ = TcpStream::connect(addr);
1442         });
1443         let _t = Thread::spawn(move|| {
1444             let _ = TcpStream::connect(addr);
1445         });
1446
1447         assert!(rx.recv().unwrap().is_ok());
1448         assert!(rx.recv().unwrap().is_ok());
1449     }
1450
1451     #[test]
1452     fn close_accept_smoke() {
1453         let addr = next_test_ip4();
1454         let l = TcpListener::bind(addr);
1455         let mut a = l.listen().unwrap();
1456
1457         a.close_accept().unwrap();
1458         assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
1459     }
1460
1461     #[test]
1462     fn close_accept_concurrent() {
1463         let addr = next_test_ip4();
1464         let l = TcpListener::bind(addr);
1465         let a = l.listen().unwrap();
1466         let mut a2 = a.clone();
1467
1468         let (tx, rx) = channel();
1469         let _t = Thread::spawn(move|| {
1470             let mut a = a;
1471             tx.send(a.accept()).unwrap();
1472         });
1473         a2.close_accept().unwrap();
1474
1475         assert_eq!(rx.recv().unwrap().err().unwrap().kind, EndOfFile);
1476     }
1477 }