]> git.lizzy.rs Git - rust.git/blob - src/libstd/old_io/net/tcp.rs
openbsd: adapt connect_error test
[rust.git] / src / libstd / old_io / net / tcp.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! TCP network connections
12 //!
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
16 //!
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
19
20 use clone::Clone;
21 use old_io::IoResult;
22 use result::Result::Err;
23 use old_io::net::ip::{SocketAddr, ToSocketAddr};
24 use old_io::{Reader, Writer, Listener, Acceptor};
25 use old_io::{standard_error, TimedOut};
26 use option::Option;
27 use option::Option::{None, Some};
28 use time::Duration;
29
30 use sys::tcp::TcpStream as TcpStreamImp;
31 use sys::tcp::TcpListener as TcpListenerImp;
32 use sys::tcp::TcpAcceptor as TcpAcceptorImp;
33
34 use sys_common;
35
36 /// A structure which represents a TCP stream between a local socket and a
37 /// remote socket.
38 ///
39 /// The socket will be closed when the value is dropped.
40 ///
41 /// # Example
42 ///
43 /// ```no_run
44 /// use std::old_io::TcpStream;
45 ///
46 /// {
47 ///     let mut stream = TcpStream::connect("127.0.0.1:34254");
48 ///
49 ///     // ignore the Result
50 ///     let _ = stream.write(&[1]);
51 ///
52 ///     let mut buf = [0];
53 ///     let _ = stream.read(&mut buf); // ignore here too
54 /// } // the stream is closed here
55 /// ```
56 pub struct TcpStream {
57     inner: TcpStreamImp,
58 }
59
60 impl TcpStream {
61     fn new(s: TcpStreamImp) -> TcpStream {
62         TcpStream { inner: s }
63     }
64
65     /// Open a TCP connection to a remote host.
66     ///
67     /// `addr` is an address of the remote host. Anything which implements `ToSocketAddr`
68     /// trait can be supplied for the address; see this trait documentation for
69     /// concrete examples.
70     pub fn connect<A: ToSocketAddr>(addr: A) -> IoResult<TcpStream> {
71         super::with_addresses(addr, |addr| {
72             TcpStreamImp::connect(addr, None).map(TcpStream::new)
73         })
74     }
75
76     /// Creates a TCP connection to a remote socket address, timing out after
77     /// the specified duration.
78     ///
79     /// This is the same as the `connect` method, except that if the timeout
80     /// specified elapses before a connection is made an error will be
81     /// returned. The error's kind will be `TimedOut`.
82     ///
83     /// Same as the `connect` method, `addr` argument type can be anything which
84     /// implements `ToSocketAddr` trait.
85     ///
86     /// If a `timeout` with zero or negative duration is specified then
87     /// the function returns `Err`, with the error kind set to `TimedOut`.
88     #[unstable(feature = "io",
89                reason = "the timeout argument may eventually change types")]
90     pub fn connect_timeout<A: ToSocketAddr>(addr: A,
91                                             timeout: Duration) -> IoResult<TcpStream> {
92         if timeout <= Duration::milliseconds(0) {
93             return Err(standard_error(TimedOut));
94         }
95
96         super::with_addresses(addr, |addr| {
97             TcpStreamImp::connect(addr, Some(timeout.num_milliseconds() as u64))
98                 .map(TcpStream::new)
99         })
100     }
101
102     /// Returns the socket address of the remote peer of this TCP connection.
103     pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
104         self.inner.peer_name()
105     }
106
107     /// Returns the socket address of the local half of this TCP connection.
108     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
109         self.inner.socket_name()
110     }
111
112     /// Sets the nodelay flag on this connection to the boolean specified
113     #[unstable(feature = "io")]
114     pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
115         self.inner.set_nodelay(nodelay)
116     }
117
118     /// Sets the keepalive timeout to the timeout specified.
119     ///
120     /// If the value specified is `None`, then the keepalive flag is cleared on
121     /// this connection. Otherwise, the keepalive timeout will be set to the
122     /// specified time, in seconds.
123     #[unstable(feature = "io")]
124     pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
125         self.inner.set_keepalive(delay_in_seconds)
126     }
127
128     /// Closes the reading half of this connection.
129     ///
130     /// This method will close the reading portion of this connection, causing
131     /// all pending and future reads to immediately return with an error.
132     ///
133     /// # Example
134     ///
135     /// ```no_run
136     /// # #![allow(unused_must_use)]
137     /// use std::old_io::timer;
138     /// use std::old_io::TcpStream;
139     /// use std::time::Duration;
140     /// use std::thread::Thread;
141     ///
142     /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
143     /// let stream2 = stream.clone();
144     ///
145     /// let _t = Thread::spawn(move|| {
146     ///     // close this stream after one second
147     ///     timer::sleep(Duration::seconds(1));
148     ///     let mut stream = stream2;
149     ///     stream.close_read();
150     /// });
151     ///
152     /// // wait for some data, will get canceled after one second
153     /// let mut buf = [0];
154     /// stream.read(&mut buf);
155     /// ```
156     ///
157     /// Note that this method affects all cloned handles associated with this
158     /// stream, not just this one handle.
159     pub fn close_read(&mut self) -> IoResult<()> {
160         self.inner.close_read()
161     }
162
163     /// Closes the writing half of this connection.
164     ///
165     /// This method will close the writing portion of this connection, causing
166     /// all future writes to immediately return with an error.
167     ///
168     /// Note that this method affects all cloned handles associated with this
169     /// stream, not just this one handle.
170     pub fn close_write(&mut self) -> IoResult<()> {
171         self.inner.close_write()
172     }
173
174     /// Sets a timeout, in milliseconds, for blocking operations on this stream.
175     ///
176     /// This function will set a timeout for all blocking operations (including
177     /// reads and writes) on this stream. The timeout specified is a relative
178     /// time, in milliseconds, into the future after which point operations will
179     /// time out. This means that the timeout must be reset periodically to keep
180     /// it from expiring. Specifying a value of `None` will clear the timeout
181     /// for this stream.
182     ///
183     /// The timeout on this stream is local to this stream only. Setting a
184     /// timeout does not affect any other cloned instances of this stream, nor
185     /// does the timeout propagated to cloned handles of this stream. Setting
186     /// this timeout will override any specific read or write timeouts
187     /// previously set for this stream.
188     ///
189     /// For clarification on the semantics of interrupting a read and a write,
190     /// take a look at `set_read_timeout` and `set_write_timeout`.
191     #[unstable(feature = "io",
192                reason = "the timeout argument may change in type and value")]
193     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
194         self.inner.set_timeout(timeout_ms)
195     }
196
197     /// Sets the timeout for read operations on this stream.
198     ///
199     /// See documentation in `set_timeout` for the semantics of this read time.
200     /// This will overwrite any previous read timeout set through either this
201     /// function or `set_timeout`.
202     ///
203     /// # Errors
204     ///
205     /// When this timeout expires, if there is no pending read operation, no
206     /// action is taken. Otherwise, the read operation will be scheduled to
207     /// promptly return. If a timeout error is returned, then no data was read
208     /// during the timeout period.
209     #[unstable(feature = "io",
210                reason = "the timeout argument may change in type and value")]
211     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
212         self.inner.set_read_timeout(timeout_ms)
213     }
214
215     /// Sets the timeout for write operations on this stream.
216     ///
217     /// See documentation in `set_timeout` for the semantics of this write time.
218     /// This will overwrite any previous write timeout set through either this
219     /// function or `set_timeout`.
220     ///
221     /// # Errors
222     ///
223     /// When this timeout expires, if there is no pending write operation, no
224     /// action is taken. Otherwise, the pending write operation will be
225     /// scheduled to promptly return. The actual state of the underlying stream
226     /// is not specified.
227     ///
228     /// The write operation may return an error of type `ShortWrite` which
229     /// indicates that the object is known to have written an exact number of
230     /// bytes successfully during the timeout period, and the remaining bytes
231     /// were never written.
232     ///
233     /// If the write operation returns `TimedOut`, then it the timeout primitive
234     /// does not know how many bytes were written as part of the timeout
235     /// operation. It may be the case that bytes continue to be written in an
236     /// asynchronous fashion after the call to write returns.
237     #[unstable(feature = "io",
238                reason = "the timeout argument may change in type and value")]
239     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
240         self.inner.set_write_timeout(timeout_ms)
241     }
242 }
243
244 impl Clone for TcpStream {
245     /// Creates a new handle to this TCP stream, allowing for simultaneous reads
246     /// and writes of this connection.
247     ///
248     /// The underlying TCP stream will not be closed until all handles to the
249     /// stream have been deallocated. All handles will also follow the same
250     /// stream, but two concurrent reads will not receive the same data.
251     /// Instead, the first read will receive the first packet received, and the
252     /// second read will receive the second packet.
253     fn clone(&self) -> TcpStream {
254         TcpStream { inner: self.inner.clone() }
255     }
256 }
257
258 impl Reader for TcpStream {
259     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
260         self.inner.read(buf)
261     }
262 }
263
264 impl Writer for TcpStream {
265     fn write_all(&mut self, buf: &[u8]) -> IoResult<()> {
266         self.inner.write(buf)
267     }
268 }
269
270 impl sys_common::AsInner<TcpStreamImp> for TcpStream {
271     fn as_inner(&self) -> &TcpStreamImp {
272         &self.inner
273     }
274 }
275
276 /// A structure representing a socket server. This listener is used to create a
277 /// `TcpAcceptor` which can be used to accept sockets on a local port.
278 ///
279 /// # Examples
280 ///
281 /// ```
282 /// # fn foo() {
283 /// use std::old_io::{TcpListener, TcpStream};
284 /// use std::old_io::{Acceptor, Listener};
285 /// use std::thread::Thread;
286 ///
287 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
288 ///
289 /// // bind the listener to the specified address
290 /// let mut acceptor = listener.listen().unwrap();
291 ///
292 /// fn handle_client(mut stream: TcpStream) {
293 ///     // ...
294 /// # &mut stream; // silence unused mutability/variable warning
295 /// }
296 /// // accept connections and process them, spawning a new tasks for each one
297 /// for stream in acceptor.incoming() {
298 ///     match stream {
299 ///         Err(e) => { /* connection failed */ }
300 ///         Ok(stream) => {
301 ///             Thread::spawn(move|| {
302 ///                 // connection succeeded
303 ///                 handle_client(stream)
304 ///             });
305 ///         }
306 ///     }
307 /// }
308 ///
309 /// // close the socket server
310 /// drop(acceptor);
311 /// # }
312 /// ```
313 pub struct TcpListener {
314     inner: TcpListenerImp,
315 }
316
317 impl TcpListener {
318     /// Creates a new `TcpListener` which will be bound to the specified address.
319     /// This listener is not ready for accepting connections, `listen` must be called
320     /// on it before that's possible.
321     ///
322     /// Binding with a port number of 0 will request that the OS assigns a port
323     /// to this listener. The port allocated can be queried via the
324     /// `socket_name` function.
325     ///
326     /// The address type can be any implementer of `ToSocketAddr` trait. See its
327     /// documentation for concrete examples.
328     pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<TcpListener> {
329         super::with_addresses(addr, |addr| {
330             TcpListenerImp::bind(addr).map(|inner| TcpListener { inner: inner })
331         })
332     }
333
334     /// Returns the local socket address of this listener.
335     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
336         self.inner.socket_name()
337     }
338 }
339
340 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
341     fn listen(self) -> IoResult<TcpAcceptor> {
342         self.inner.listen(128).map(|a| TcpAcceptor { inner: a })
343     }
344 }
345
346 impl sys_common::AsInner<TcpListenerImp> for TcpListener {
347     fn as_inner(&self) -> &TcpListenerImp {
348         &self.inner
349     }
350 }
351
352 /// The accepting half of a TCP socket server. This structure is created through
353 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
354 /// `TcpStream` instances.
355 pub struct TcpAcceptor {
356     inner: TcpAcceptorImp,
357 }
358
359 impl TcpAcceptor {
360     /// Prevents blocking on all future accepts after `ms` milliseconds have
361     /// elapsed.
362     ///
363     /// This function is used to set a deadline after which this acceptor will
364     /// time out accepting any connections. The argument is the relative
365     /// distance, in milliseconds, to a point in the future after which all
366     /// accepts will fail.
367     ///
368     /// If the argument specified is `None`, then any previously registered
369     /// timeout is cleared.
370     ///
371     /// A timeout of `0` can be used to "poll" this acceptor to see if it has
372     /// any pending connections. All pending connections will be accepted,
373     /// regardless of whether the timeout has expired or not (the accept will
374     /// not block in this case).
375     ///
376     /// # Example
377     ///
378     /// ```no_run
379     /// use std::old_io::TcpListener;
380     /// use std::old_io::{Listener, Acceptor, TimedOut};
381     ///
382     /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
383     ///
384     /// // After 100ms have passed, all accepts will fail
385     /// a.set_timeout(Some(100));
386     ///
387     /// match a.accept() {
388     ///     Ok(..) => println!("accepted a socket"),
389     ///     Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
390     ///     Err(e) => println!("err: {}", e),
391     /// }
392     ///
393     /// // Reset the timeout and try again
394     /// a.set_timeout(Some(100));
395     /// let socket = a.accept();
396     ///
397     /// // Clear the timeout and block indefinitely waiting for a connection
398     /// a.set_timeout(None);
399     /// let socket = a.accept();
400     /// ```
401     #[unstable(feature = "io",
402                reason = "the type of the argument and name of this function are \
403                          subject to change")]
404     pub fn set_timeout(&mut self, ms: Option<u64>) { self.inner.set_timeout(ms); }
405
406     /// Closes the accepting capabilities of this acceptor.
407     ///
408     /// This function is similar to `TcpStream`'s `close_{read,write}` methods
409     /// in that it will affect *all* cloned handles of this acceptor's original
410     /// handle.
411     ///
412     /// Once this function succeeds, all future calls to `accept` will return
413     /// immediately with an error, preventing all future calls to accept. The
414     /// underlying socket will not be relinquished back to the OS until all
415     /// acceptors have been deallocated.
416     ///
417     /// This is useful for waking up a thread in an accept loop to indicate that
418     /// it should exit.
419     ///
420     /// # Example
421     ///
422     /// ```
423     /// use std::old_io::{TcpListener, Listener, Acceptor, EndOfFile};
424     /// use std::thread::Thread;
425     ///
426     /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
427     /// let a2 = a.clone();
428     ///
429     /// let _t = Thread::spawn(move|| {
430     ///     let mut a2 = a2;
431     ///     for socket in a2.incoming() {
432     ///         match socket {
433     ///             Ok(s) => { /* handle s */ }
434     ///             Err(ref e) if e.kind == EndOfFile => break, // closed
435     ///             Err(e) => panic!("unexpected error: {}", e),
436     ///         }
437     ///     }
438     /// });
439     ///
440     /// # fn wait_for_sigint() {}
441     /// // Now that our accept loop is running, wait for the program to be
442     /// // requested to exit.
443     /// wait_for_sigint();
444     ///
445     /// // Signal our accept loop to exit
446     /// assert!(a.close_accept().is_ok());
447     /// ```
448     #[unstable(feature = "io")]
449     pub fn close_accept(&mut self) -> IoResult<()> {
450         self.inner.close_accept()
451     }
452 }
453
454 impl Acceptor<TcpStream> for TcpAcceptor {
455     fn accept(&mut self) -> IoResult<TcpStream> {
456         self.inner.accept().map(TcpStream::new)
457     }
458 }
459
460 impl Clone for TcpAcceptor {
461     /// Creates a new handle to this TCP acceptor, allowing for simultaneous
462     /// accepts.
463     ///
464     /// The underlying TCP acceptor will not be closed until all handles to the
465     /// acceptor have been deallocated. Incoming connections will be received on
466     /// at most once acceptor, the same connection will not be accepted twice.
467     ///
468     /// The `close_accept` method will shut down *all* acceptors cloned from the
469     /// same original acceptor, whereas the `set_timeout` method only affects
470     /// the selector that it is called on.
471     ///
472     /// This function is useful for creating a handle to invoke `close_accept`
473     /// on to wake up any other task blocked in `accept`.
474     fn clone(&self) -> TcpAcceptor {
475         TcpAcceptor { inner: self.inner.clone() }
476     }
477 }
478
479 impl sys_common::AsInner<TcpAcceptorImp> for TcpAcceptor {
480     fn as_inner(&self) -> &TcpAcceptorImp {
481         &self.inner
482     }
483 }
484
485 #[cfg(test)]
486 mod test {
487     use prelude::v1::*;
488
489     use sync::mpsc::channel;
490     use thread::Thread;
491     use old_io::net::tcp::*;
492     use old_io::net::ip::*;
493     use old_io::test::*;
494     use old_io::{EndOfFile, TimedOut, ShortWrite, IoError};
495     use old_io::{ConnectionRefused, BrokenPipe, ConnectionAborted};
496     use old_io::{ConnectionReset, NotConnected, PermissionDenied, OtherIoError};
497     use old_io::{InvalidInput};
498     use old_io::{Acceptor, Listener};
499
500     // FIXME #11530 this fails on android because tests are run as root
501     #[cfg_attr(any(windows, target_os = "android"), ignore)]
502     #[test]
503     fn bind_error() {
504         match TcpListener::bind("0.0.0.0:1") {
505             Ok(..) => panic!(),
506             Err(e) => assert_eq!(e.kind, PermissionDenied),
507         }
508     }
509
510     #[test]
511     fn connect_error() {
512         match TcpStream::connect("0.0.0.0:1") {
513             Ok(..) => panic!(),
514             Err(e) => assert!((e.kind == ConnectionRefused)
515                               || (e.kind == InvalidInput)),
516         }
517     }
518
519     #[test]
520     fn listen_ip4_localhost() {
521         let socket_addr = next_test_ip4();
522         let listener = TcpListener::bind(socket_addr);
523         let mut acceptor = listener.listen();
524
525         let _t = Thread::spawn(move|| {
526             let mut stream = TcpStream::connect(("localhost", socket_addr.port));
527             stream.write(&[144]).unwrap();
528         });
529
530         let mut stream = acceptor.accept();
531         let mut buf = [0];
532         stream.read(&mut buf).unwrap();
533         assert!(buf[0] == 144);
534     }
535
536     #[test]
537     fn connect_localhost() {
538         let addr = next_test_ip4();
539         let mut acceptor = TcpListener::bind(addr).listen();
540
541         let _t = Thread::spawn(move|| {
542             let mut stream = TcpStream::connect(("localhost", addr.port));
543             stream.write(&[64]).unwrap();
544         });
545
546         let mut stream = acceptor.accept();
547         let mut buf = [0];
548         stream.read(&mut buf).unwrap();
549         assert!(buf[0] == 64);
550     }
551
552     #[test]
553     fn connect_ip4_loopback() {
554         let addr = next_test_ip4();
555         let mut acceptor = TcpListener::bind(addr).listen();
556
557         let _t = Thread::spawn(move|| {
558             let mut stream = TcpStream::connect(("127.0.0.1", addr.port));
559             stream.write(&[44]).unwrap();
560         });
561
562         let mut stream = acceptor.accept();
563         let mut buf = [0];
564         stream.read(&mut buf).unwrap();
565         assert!(buf[0] == 44);
566     }
567
568     #[test]
569     fn connect_ip6_loopback() {
570         let addr = next_test_ip6();
571         let mut acceptor = TcpListener::bind(addr).listen();
572
573         let _t = Thread::spawn(move|| {
574             let mut stream = TcpStream::connect(("::1", addr.port));
575             stream.write(&[66]).unwrap();
576         });
577
578         let mut stream = acceptor.accept();
579         let mut buf = [0];
580         stream.read(&mut buf).unwrap();
581         assert!(buf[0] == 66);
582     }
583
584     #[test]
585     fn smoke_test_ip4() {
586         let addr = next_test_ip4();
587         let mut acceptor = TcpListener::bind(addr).listen();
588
589         let _t = Thread::spawn(move|| {
590             let mut stream = TcpStream::connect(addr);
591             stream.write(&[99]).unwrap();
592         });
593
594         let mut stream = acceptor.accept();
595         let mut buf = [0];
596         stream.read(&mut buf).unwrap();
597         assert!(buf[0] == 99);
598     }
599
600     #[test]
601     fn smoke_test_ip6() {
602         let addr = next_test_ip6();
603         let mut acceptor = TcpListener::bind(addr).listen();
604
605         let _t = Thread::spawn(move|| {
606             let mut stream = TcpStream::connect(addr);
607             stream.write(&[99]).unwrap();
608         });
609
610         let mut stream = acceptor.accept();
611         let mut buf = [0];
612         stream.read(&mut buf).unwrap();
613         assert!(buf[0] == 99);
614     }
615
616     #[test]
617     fn read_eof_ip4() {
618         let addr = next_test_ip4();
619         let mut acceptor = TcpListener::bind(addr).listen();
620
621         let _t = Thread::spawn(move|| {
622             let _stream = TcpStream::connect(addr);
623             // Close
624         });
625
626         let mut stream = acceptor.accept();
627         let mut buf = [0];
628         let nread = stream.read(&mut buf);
629         assert!(nread.is_err());
630     }
631
632     #[test]
633     fn read_eof_ip6() {
634         let addr = next_test_ip6();
635         let mut acceptor = TcpListener::bind(addr).listen();
636
637         let _t = Thread::spawn(move|| {
638             let _stream = TcpStream::connect(addr);
639             // Close
640         });
641
642         let mut stream = acceptor.accept();
643         let mut buf = [0];
644         let nread = stream.read(&mut buf);
645         assert!(nread.is_err());
646     }
647
648     #[test]
649     fn read_eof_twice_ip4() {
650         let addr = next_test_ip4();
651         let mut acceptor = TcpListener::bind(addr).listen();
652
653         let _t = Thread::spawn(move|| {
654             let _stream = TcpStream::connect(addr);
655             // Close
656         });
657
658         let mut stream = acceptor.accept();
659         let mut buf = [0];
660         let nread = stream.read(&mut buf);
661         assert!(nread.is_err());
662
663         match stream.read(&mut buf) {
664             Ok(..) => panic!(),
665             Err(ref e) => {
666                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
667                         "unknown kind: {:?}", e.kind);
668             }
669         }
670     }
671
672     #[test]
673     fn read_eof_twice_ip6() {
674         let addr = next_test_ip6();
675         let mut acceptor = TcpListener::bind(addr).listen();
676
677         let _t = Thread::spawn(move|| {
678             let _stream = TcpStream::connect(addr);
679             // Close
680         });
681
682         let mut stream = acceptor.accept();
683         let mut buf = [0];
684         let nread = stream.read(&mut buf);
685         assert!(nread.is_err());
686
687         match stream.read(&mut buf) {
688             Ok(..) => panic!(),
689             Err(ref e) => {
690                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
691                         "unknown kind: {:?}", e.kind);
692             }
693         }
694     }
695
696     #[test]
697     fn write_close_ip4() {
698         let addr = next_test_ip4();
699         let mut acceptor = TcpListener::bind(addr).listen();
700
701         let (tx, rx) = channel();
702         let _t = Thread::spawn(move|| {
703             drop(TcpStream::connect(addr));
704             tx.send(()).unwrap();
705         });
706
707         let mut stream = acceptor.accept();
708         rx.recv().unwrap();
709         let buf = [0];
710         match stream.write(&buf) {
711             Ok(..) => {}
712             Err(e) => {
713                 assert!(e.kind == ConnectionReset ||
714                         e.kind == BrokenPipe ||
715                         e.kind == ConnectionAborted,
716                         "unknown error: {}", e);
717             }
718         }
719     }
720
721     #[test]
722     fn write_close_ip6() {
723         let addr = next_test_ip6();
724         let mut acceptor = TcpListener::bind(addr).listen();
725
726         let (tx, rx) = channel();
727         let _t = Thread::spawn(move|| {
728             drop(TcpStream::connect(addr));
729             tx.send(()).unwrap();
730         });
731
732         let mut stream = acceptor.accept();
733         rx.recv().unwrap();
734         let buf = [0];
735         match stream.write(&buf) {
736             Ok(..) => {}
737             Err(e) => {
738                 assert!(e.kind == ConnectionReset ||
739                         e.kind == BrokenPipe ||
740                         e.kind == ConnectionAborted,
741                         "unknown error: {}", e);
742             }
743         }
744     }
745
746     #[test]
747     fn multiple_connect_serial_ip4() {
748         let addr = next_test_ip4();
749         let max = 10u;
750         let mut acceptor = TcpListener::bind(addr).listen();
751
752         let _t = Thread::spawn(move|| {
753             for _ in 0..max {
754                 let mut stream = TcpStream::connect(addr);
755                 stream.write(&[99]).unwrap();
756             }
757         });
758
759         for ref mut stream in acceptor.incoming().take(max) {
760             let mut buf = [0];
761             stream.read(&mut buf).unwrap();
762             assert_eq!(buf[0], 99);
763         }
764     }
765
766     #[test]
767     fn multiple_connect_serial_ip6() {
768         let addr = next_test_ip6();
769         let max = 10u;
770         let mut acceptor = TcpListener::bind(addr).listen();
771
772         let _t = Thread::spawn(move|| {
773             for _ in 0..max {
774                 let mut stream = TcpStream::connect(addr);
775                 stream.write(&[99]).unwrap();
776             }
777         });
778
779         for ref mut stream in acceptor.incoming().take(max) {
780             let mut buf = [0];
781             stream.read(&mut buf).unwrap();
782             assert_eq!(buf[0], 99);
783         }
784     }
785
786     #[test]
787     fn multiple_connect_interleaved_greedy_schedule_ip4() {
788         let addr = next_test_ip4();
789         static MAX: int = 10;
790         let acceptor = TcpListener::bind(addr).listen();
791
792         let _t = Thread::spawn(move|| {
793             let mut acceptor = acceptor;
794             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
795                 // Start another task to handle the connection
796                 let _t = Thread::spawn(move|| {
797                     let mut stream = stream;
798                     let mut buf = [0];
799                     stream.read(&mut buf).unwrap();
800                     assert!(buf[0] == i as u8);
801                     debug!("read");
802                 });
803             }
804         });
805
806         connect(0, addr);
807
808         fn connect(i: int, addr: SocketAddr) {
809             if i == MAX { return }
810
811             let _t = Thread::spawn(move|| {
812                 debug!("connecting");
813                 let mut stream = TcpStream::connect(addr);
814                 // Connect again before writing
815                 connect(i + 1, addr);
816                 debug!("writing");
817                 stream.write(&[i as u8]).unwrap();
818             });
819         }
820     }
821
822     #[test]
823     fn multiple_connect_interleaved_greedy_schedule_ip6() {
824         let addr = next_test_ip6();
825         static MAX: int = 10;
826         let acceptor = TcpListener::bind(addr).listen();
827
828         let _t = Thread::spawn(move|| {
829             let mut acceptor = acceptor;
830             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
831                 // Start another task to handle the connection
832                 let _t = Thread::spawn(move|| {
833                     let mut stream = stream;
834                     let mut buf = [0];
835                     stream.read(&mut buf).unwrap();
836                     assert!(buf[0] == i as u8);
837                     debug!("read");
838                 });
839             }
840         });
841
842         connect(0, addr);
843
844         fn connect(i: int, addr: SocketAddr) {
845             if i == MAX { return }
846
847             let _t = Thread::spawn(move|| {
848                 debug!("connecting");
849                 let mut stream = TcpStream::connect(addr);
850                 // Connect again before writing
851                 connect(i + 1, addr);
852                 debug!("writing");
853                 stream.write(&[i as u8]).unwrap();
854             });
855         }
856     }
857
858     #[test]
859     fn multiple_connect_interleaved_lazy_schedule_ip4() {
860         static MAX: int = 10;
861         let addr = next_test_ip4();
862         let acceptor = TcpListener::bind(addr).listen();
863
864         let _t = Thread::spawn(move|| {
865             let mut acceptor = acceptor;
866             for stream in acceptor.incoming().take(MAX as uint) {
867                 // Start another task to handle the connection
868                 let _t = Thread::spawn(move|| {
869                     let mut stream = stream;
870                     let mut buf = [0];
871                     stream.read(&mut buf).unwrap();
872                     assert!(buf[0] == 99);
873                     debug!("read");
874                 });
875             }
876         });
877
878         connect(0, addr);
879
880         fn connect(i: int, addr: SocketAddr) {
881             if i == MAX { return }
882
883             let _t = Thread::spawn(move|| {
884                 debug!("connecting");
885                 let mut stream = TcpStream::connect(addr);
886                 // Connect again before writing
887                 connect(i + 1, addr);
888                 debug!("writing");
889                 stream.write(&[99]).unwrap();
890             });
891         }
892     }
893
894     #[test]
895     fn multiple_connect_interleaved_lazy_schedule_ip6() {
896         static MAX: int = 10;
897         let addr = next_test_ip6();
898         let acceptor = TcpListener::bind(addr).listen();
899
900         let _t = Thread::spawn(move|| {
901             let mut acceptor = acceptor;
902             for stream in acceptor.incoming().take(MAX as uint) {
903                 // Start another task to handle the connection
904                 let _t = Thread::spawn(move|| {
905                     let mut stream = stream;
906                     let mut buf = [0];
907                     stream.read(&mut buf).unwrap();
908                     assert!(buf[0] == 99);
909                     debug!("read");
910                 });
911             }
912         });
913
914         connect(0, addr);
915
916         fn connect(i: int, addr: SocketAddr) {
917             if i == MAX { return }
918
919             let _t = Thread::spawn(move|| {
920                 debug!("connecting");
921                 let mut stream = TcpStream::connect(addr);
922                 // Connect again before writing
923                 connect(i + 1, addr);
924                 debug!("writing");
925                 stream.write(&[99]).unwrap();
926             });
927         }
928     }
929
930     pub fn socket_name(addr: SocketAddr) {
931         let mut listener = TcpListener::bind(addr).unwrap();
932
933         // Make sure socket_name gives
934         // us the socket we binded to.
935         let so_name = listener.socket_name();
936         assert!(so_name.is_ok());
937         assert_eq!(addr, so_name.unwrap());
938     }
939
940     pub fn peer_name(addr: SocketAddr) {
941         let acceptor = TcpListener::bind(addr).listen();
942         let _t = Thread::spawn(move|| {
943             let mut acceptor = acceptor;
944             acceptor.accept().unwrap();
945         });
946
947         let stream = TcpStream::connect(addr);
948
949         assert!(stream.is_ok());
950         let mut stream = stream.unwrap();
951
952         // Make sure peer_name gives us the
953         // address/port of the peer we've
954         // connected to.
955         let peer_name = stream.peer_name();
956         assert!(peer_name.is_ok());
957         assert_eq!(addr, peer_name.unwrap());
958     }
959
960     #[test]
961     fn socket_and_peer_name_ip4() {
962         peer_name(next_test_ip4());
963         socket_name(next_test_ip4());
964     }
965
966     #[test]
967     fn socket_and_peer_name_ip6() {
968         // FIXME: peer name is not consistent
969         //peer_name(next_test_ip6());
970         socket_name(next_test_ip6());
971     }
972
973     #[test]
974     fn partial_read() {
975         let addr = next_test_ip4();
976         let (tx, rx) = channel();
977         let _t = Thread::spawn(move|| {
978             let mut srv = TcpListener::bind(addr).listen().unwrap();
979             tx.send(()).unwrap();
980             let mut cl = srv.accept().unwrap();
981             cl.write(&[10]).unwrap();
982             let mut b = [0];
983             cl.read(&mut b).unwrap();
984             tx.send(()).unwrap();
985         });
986
987         rx.recv().unwrap();
988         let mut c = TcpStream::connect(addr).unwrap();
989         let mut b = [0; 10];
990         assert_eq!(c.read(&mut b), Ok(1));
991         c.write(&[1]).unwrap();
992         rx.recv().unwrap();
993     }
994
995     #[test]
996     fn double_bind() {
997         let addr = next_test_ip4();
998         let listener = TcpListener::bind(addr).unwrap().listen();
999         assert!(listener.is_ok());
1000         match TcpListener::bind(addr).listen() {
1001             Ok(..) => panic!(),
1002             Err(e) => {
1003                 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
1004                         "unknown error: {} {:?}", e, e.kind);
1005             }
1006         }
1007     }
1008
1009     #[test]
1010     fn fast_rebind() {
1011         let addr = next_test_ip4();
1012         let (tx, rx) = channel();
1013
1014         let _t = Thread::spawn(move|| {
1015             rx.recv().unwrap();
1016             let _stream = TcpStream::connect(addr).unwrap();
1017             // Close
1018             rx.recv().unwrap();
1019         });
1020
1021         {
1022             let mut acceptor = TcpListener::bind(addr).listen();
1023             tx.send(()).unwrap();
1024             {
1025                 let _stream = acceptor.accept().unwrap();
1026                 // Close client
1027                 tx.send(()).unwrap();
1028             }
1029             // Close listener
1030         }
1031         let _listener = TcpListener::bind(addr);
1032     }
1033
1034     #[test]
1035     fn tcp_clone_smoke() {
1036         let addr = next_test_ip4();
1037         let mut acceptor = TcpListener::bind(addr).listen();
1038
1039         let _t = Thread::spawn(move|| {
1040             let mut s = TcpStream::connect(addr);
1041             let mut buf = [0, 0];
1042             assert_eq!(s.read(&mut buf), Ok(1));
1043             assert_eq!(buf[0], 1);
1044             s.write(&[2]).unwrap();
1045         });
1046
1047         let mut s1 = acceptor.accept().unwrap();
1048         let s2 = s1.clone();
1049
1050         let (tx1, rx1) = channel();
1051         let (tx2, rx2) = channel();
1052         let _t = Thread::spawn(move|| {
1053             let mut s2 = s2;
1054             rx1.recv().unwrap();
1055             s2.write(&[1]).unwrap();
1056             tx2.send(()).unwrap();
1057         });
1058         tx1.send(()).unwrap();
1059         let mut buf = [0, 0];
1060         assert_eq!(s1.read(&mut buf), Ok(1));
1061         rx2.recv().unwrap();
1062     }
1063
1064     #[test]
1065     fn tcp_clone_two_read() {
1066         let addr = next_test_ip6();
1067         let mut acceptor = TcpListener::bind(addr).listen();
1068         let (tx1, rx) = channel();
1069         let tx2 = tx1.clone();
1070
1071         let _t = Thread::spawn(move|| {
1072             let mut s = TcpStream::connect(addr);
1073             s.write(&[1]).unwrap();
1074             rx.recv().unwrap();
1075             s.write(&[2]).unwrap();
1076             rx.recv().unwrap();
1077         });
1078
1079         let mut s1 = acceptor.accept().unwrap();
1080         let s2 = s1.clone();
1081
1082         let (done, rx) = channel();
1083         let _t = Thread::spawn(move|| {
1084             let mut s2 = s2;
1085             let mut buf = [0, 0];
1086             s2.read(&mut buf).unwrap();
1087             tx2.send(()).unwrap();
1088             done.send(()).unwrap();
1089         });
1090         let mut buf = [0, 0];
1091         s1.read(&mut buf).unwrap();
1092         tx1.send(()).unwrap();
1093
1094         rx.recv().unwrap();
1095     }
1096
1097     #[test]
1098     fn tcp_clone_two_write() {
1099         let addr = next_test_ip4();
1100         let mut acceptor = TcpListener::bind(addr).listen();
1101
1102         let _t = Thread::spawn(move|| {
1103             let mut s = TcpStream::connect(addr);
1104             let mut buf = [0, 1];
1105             s.read(&mut buf).unwrap();
1106             s.read(&mut buf).unwrap();
1107         });
1108
1109         let mut s1 = acceptor.accept().unwrap();
1110         let s2 = s1.clone();
1111
1112         let (done, rx) = channel();
1113         let _t = Thread::spawn(move|| {
1114             let mut s2 = s2;
1115             s2.write(&[1]).unwrap();
1116             done.send(()).unwrap();
1117         });
1118         s1.write(&[2]).unwrap();
1119
1120         rx.recv().unwrap();
1121     }
1122
1123     #[test]
1124     fn shutdown_smoke() {
1125         let addr = next_test_ip4();
1126         let a = TcpListener::bind(addr).unwrap().listen();
1127         let _t = Thread::spawn(move|| {
1128             let mut a = a;
1129             let mut c = a.accept().unwrap();
1130             assert_eq!(c.read_to_end(), Ok(vec!()));
1131             c.write(&[1]).unwrap();
1132         });
1133
1134         let mut s = TcpStream::connect(addr).unwrap();
1135         assert!(s.inner.close_write().is_ok());
1136         assert!(s.write(&[1]).is_err());
1137         assert_eq!(s.read_to_end(), Ok(vec!(1)));
1138     }
1139
1140     #[test]
1141     fn accept_timeout() {
1142         let addr = next_test_ip4();
1143         let mut a = TcpListener::bind(addr).unwrap().listen().unwrap();
1144
1145         a.set_timeout(Some(10));
1146
1147         // Make sure we time out once and future invocations also time out
1148         let err = a.accept().err().unwrap();
1149         assert_eq!(err.kind, TimedOut);
1150         let err = a.accept().err().unwrap();
1151         assert_eq!(err.kind, TimedOut);
1152
1153         // Also make sure that even though the timeout is expired that we will
1154         // continue to receive any pending connections.
1155         //
1156         // FIXME: freebsd apparently never sees the pending connection, but
1157         //        testing manually always works. Need to investigate this
1158         //        flakiness.
1159         if !cfg!(target_os = "freebsd") {
1160             let (tx, rx) = channel();
1161             let _t = Thread::spawn(move|| {
1162                 tx.send(TcpStream::connect(addr).unwrap()).unwrap();
1163             });
1164             let _l = rx.recv().unwrap();
1165             for i in 0i32..1001 {
1166                 match a.accept() {
1167                     Ok(..) => break,
1168                     Err(ref e) if e.kind == TimedOut => {}
1169                     Err(e) => panic!("error: {}", e),
1170                 }
1171                 ::thread::Thread::yield_now();
1172                 if i == 1000 { panic!("should have a pending connection") }
1173             }
1174         }
1175
1176         // Unset the timeout and make sure that this always blocks.
1177         a.set_timeout(None);
1178         let _t = Thread::spawn(move|| {
1179             drop(TcpStream::connect(addr).unwrap());
1180         });
1181         a.accept().unwrap();
1182     }
1183
1184     #[test]
1185     fn close_readwrite_smoke() {
1186         let addr = next_test_ip4();
1187         let a = TcpListener::bind(addr).listen().unwrap();
1188         let (_tx, rx) = channel::<()>();
1189         Thread::spawn(move|| {
1190             let mut a = a;
1191             let _s = a.accept().unwrap();
1192             let _ = rx.recv().unwrap();
1193         });
1194
1195         let mut b = [0];
1196         let mut s = TcpStream::connect(addr).unwrap();
1197         let mut s2 = s.clone();
1198
1199         // closing should prevent reads/writes
1200         s.close_write().unwrap();
1201         assert!(s.write(&[0]).is_err());
1202         s.close_read().unwrap();
1203         assert!(s.read(&mut b).is_err());
1204
1205         // closing should affect previous handles
1206         assert!(s2.write(&[0]).is_err());
1207         assert!(s2.read(&mut b).is_err());
1208
1209         // closing should affect new handles
1210         let mut s3 = s.clone();
1211         assert!(s3.write(&[0]).is_err());
1212         assert!(s3.read(&mut b).is_err());
1213
1214         // make sure these don't die
1215         let _ = s2.close_read();
1216         let _ = s2.close_write();
1217         let _ = s3.close_read();
1218         let _ = s3.close_write();
1219     }
1220
1221     #[test]
1222     fn close_read_wakes_up() {
1223         let addr = next_test_ip4();
1224         let a = TcpListener::bind(addr).listen().unwrap();
1225         let (_tx, rx) = channel::<()>();
1226         Thread::spawn(move|| {
1227             let mut a = a;
1228             let _s = a.accept().unwrap();
1229             let _ = rx.recv().unwrap();
1230         });
1231
1232         let mut s = TcpStream::connect(addr).unwrap();
1233         let s2 = s.clone();
1234         let (tx, rx) = channel();
1235         let _t = Thread::spawn(move|| {
1236             let mut s2 = s2;
1237             assert!(s2.read(&mut [0]).is_err());
1238             tx.send(()).unwrap();
1239         });
1240         // this should wake up the child task
1241         s.close_read().unwrap();
1242
1243         // this test will never finish if the child doesn't wake up
1244         rx.recv().unwrap();
1245     }
1246
1247     #[test]
1248     fn readwrite_timeouts() {
1249         let addr = next_test_ip6();
1250         let mut a = TcpListener::bind(addr).listen().unwrap();
1251         let (tx, rx) = channel::<()>();
1252         Thread::spawn(move|| {
1253             let mut s = TcpStream::connect(addr).unwrap();
1254             rx.recv().unwrap();
1255             assert!(s.write(&[0]).is_ok());
1256             let _ = rx.recv();
1257         });
1258
1259         let mut s = a.accept().unwrap();
1260         s.set_timeout(Some(20));
1261         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1262         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1263
1264         s.set_timeout(Some(20));
1265         for i in 0i32..1001 {
1266             match s.write(&[0; 128 * 1024]) {
1267                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1268                 Err(IoError { kind: TimedOut, .. }) => break,
1269                 Err(e) => panic!("{}", e),
1270            }
1271            if i == 1000 { panic!("should have filled up?!"); }
1272         }
1273         assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
1274
1275         tx.send(()).unwrap();
1276         s.set_timeout(None);
1277         assert_eq!(s.read(&mut [0, 0]), Ok(1));
1278     }
1279
1280     #[test]
1281     fn read_timeouts() {
1282         let addr = next_test_ip6();
1283         let mut a = TcpListener::bind(addr).listen().unwrap();
1284         let (tx, rx) = channel::<()>();
1285         Thread::spawn(move|| {
1286             let mut s = TcpStream::connect(addr).unwrap();
1287             rx.recv().unwrap();
1288             let mut amt = 0;
1289             while amt < 100 * 128 * 1024 {
1290                 match s.read(&mut [0;128 * 1024]) {
1291                     Ok(n) => { amt += n; }
1292                     Err(e) => panic!("{}", e),
1293                 }
1294             }
1295             let _ = rx.recv();
1296         });
1297
1298         let mut s = a.accept().unwrap();
1299         s.set_read_timeout(Some(20));
1300         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1301         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1302
1303         tx.send(()).unwrap();
1304         for _ in 0..100 {
1305             assert!(s.write(&[0;128 * 1024]).is_ok());
1306         }
1307     }
1308
1309     #[test]
1310     fn write_timeouts() {
1311         let addr = next_test_ip6();
1312         let mut a = TcpListener::bind(addr).listen().unwrap();
1313         let (tx, rx) = channel::<()>();
1314         Thread::spawn(move|| {
1315             let mut s = TcpStream::connect(addr).unwrap();
1316             rx.recv().unwrap();
1317             assert!(s.write(&[0]).is_ok());
1318             let _ = rx.recv();
1319         });
1320
1321         let mut s = a.accept().unwrap();
1322         s.set_write_timeout(Some(20));
1323         for i in 0i32..1001 {
1324             match s.write(&[0; 128 * 1024]) {
1325                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1326                 Err(IoError { kind: TimedOut, .. }) => break,
1327                 Err(e) => panic!("{}", e),
1328            }
1329            if i == 1000 { panic!("should have filled up?!"); }
1330         }
1331         assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
1332
1333         tx.send(()).unwrap();
1334         assert!(s.read(&mut [0]).is_ok());
1335     }
1336
1337     #[test]
1338     fn timeout_concurrent_read() {
1339         let addr = next_test_ip6();
1340         let mut a = TcpListener::bind(addr).listen().unwrap();
1341         let (tx, rx) = channel::<()>();
1342         Thread::spawn(move|| {
1343             let mut s = TcpStream::connect(addr).unwrap();
1344             rx.recv().unwrap();
1345             assert_eq!(s.write(&[0]), Ok(()));
1346             let _ = rx.recv();
1347         });
1348
1349         let mut s = a.accept().unwrap();
1350         let s2 = s.clone();
1351         let (tx2, rx2) = channel();
1352         let _t = Thread::spawn(move|| {
1353             let mut s2 = s2;
1354             assert_eq!(s2.read(&mut [0]), Ok(1));
1355             tx2.send(()).unwrap();
1356         });
1357
1358         s.set_read_timeout(Some(20));
1359         assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1360         tx.send(()).unwrap();
1361
1362         rx2.recv().unwrap();
1363     }
1364
1365     #[test]
1366     fn clone_while_reading() {
1367         let addr = next_test_ip6();
1368         let listen = TcpListener::bind(addr);
1369         let mut accept = listen.listen().unwrap();
1370
1371         // Enqueue a task to write to a socket
1372         let (tx, rx) = channel();
1373         let (txdone, rxdone) = channel();
1374         let txdone2 = txdone.clone();
1375         let _t = Thread::spawn(move|| {
1376             let mut tcp = TcpStream::connect(addr).unwrap();
1377             rx.recv().unwrap();
1378             tcp.write_u8(0).unwrap();
1379             txdone2.send(()).unwrap();
1380         });
1381
1382         // Spawn off a reading clone
1383         let tcp = accept.accept().unwrap();
1384         let tcp2 = tcp.clone();
1385         let txdone3 = txdone.clone();
1386         let _t = Thread::spawn(move|| {
1387             let mut tcp2 = tcp2;
1388             tcp2.read_u8().unwrap();
1389             txdone3.send(()).unwrap();
1390         });
1391
1392         // Try to ensure that the reading clone is indeed reading
1393         for _ in 0..50 {
1394             ::thread::Thread::yield_now();
1395         }
1396
1397         // clone the handle again while it's reading, then let it finish the
1398         // read.
1399         let _ = tcp.clone();
1400         tx.send(()).unwrap();
1401         rxdone.recv().unwrap();
1402         rxdone.recv().unwrap();
1403     }
1404
1405     #[test]
1406     fn clone_accept_smoke() {
1407         let addr = next_test_ip4();
1408         let l = TcpListener::bind(addr);
1409         let mut a = l.listen().unwrap();
1410         let mut a2 = a.clone();
1411
1412         let _t = Thread::spawn(move|| {
1413             let _ = TcpStream::connect(addr);
1414         });
1415         let _t = Thread::spawn(move|| {
1416             let _ = TcpStream::connect(addr);
1417         });
1418
1419         assert!(a.accept().is_ok());
1420         assert!(a2.accept().is_ok());
1421     }
1422
1423     #[test]
1424     fn clone_accept_concurrent() {
1425         let addr = next_test_ip4();
1426         let l = TcpListener::bind(addr);
1427         let a = l.listen().unwrap();
1428         let a2 = a.clone();
1429
1430         let (tx, rx) = channel();
1431         let tx2 = tx.clone();
1432
1433         let _t = Thread::spawn(move|| {
1434             let mut a = a;
1435             tx.send(a.accept()).unwrap();
1436         });
1437         let _t = Thread::spawn(move|| {
1438             let mut a = a2;
1439             tx2.send(a.accept()).unwrap();
1440         });
1441
1442         let _t = Thread::spawn(move|| {
1443             let _ = TcpStream::connect(addr);
1444         });
1445         let _t = Thread::spawn(move|| {
1446             let _ = TcpStream::connect(addr);
1447         });
1448
1449         assert!(rx.recv().unwrap().is_ok());
1450         assert!(rx.recv().unwrap().is_ok());
1451     }
1452
1453     #[test]
1454     fn close_accept_smoke() {
1455         let addr = next_test_ip4();
1456         let l = TcpListener::bind(addr);
1457         let mut a = l.listen().unwrap();
1458
1459         a.close_accept().unwrap();
1460         assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
1461     }
1462
1463     #[test]
1464     fn close_accept_concurrent() {
1465         let addr = next_test_ip4();
1466         let l = TcpListener::bind(addr);
1467         let a = l.listen().unwrap();
1468         let mut a2 = a.clone();
1469
1470         let (tx, rx) = channel();
1471         let _t = Thread::spawn(move|| {
1472             let mut a = a;
1473             tx.send(a.accept()).unwrap();
1474         });
1475         a2.close_accept().unwrap();
1476
1477         assert_eq!(rx.recv().unwrap().err().unwrap().kind, EndOfFile);
1478     }
1479 }