]> git.lizzy.rs Git - rust.git/blob - src/libstd/io/net/tcp.rs
std: Fix a flaky test on OSX 10.10
[rust.git] / src / libstd / io / net / tcp.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! TCP network connections
12 //!
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
16 //!
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
19
20 use clone::Clone;
21 use io::IoResult;
22 use iter::Iterator;
23 use result::Err;
24 use io::net::ip::{SocketAddr, ToSocketAddr};
25 use io::{Reader, Writer, Listener, Acceptor};
26 use io::{standard_error, TimedOut};
27 use option::{None, Some, Option};
28 use time::Duration;
29
30 use sys::tcp::TcpStream as TcpStreamImp;
31 use sys::tcp::TcpListener as TcpListenerImp;
32 use sys::tcp::TcpAcceptor as TcpAcceptorImp;
33
34 /// A structure which represents a TCP stream between a local socket and a
35 /// remote socket.
36 ///
37 /// # Example
38 ///
39 /// ```no_run
40 /// # #![allow(unused_must_use)]
41 /// use std::io::TcpStream;
42 ///
43 /// let mut stream = TcpStream::connect("127.0.0.1:34254");
44 ///
45 /// stream.write([1]);
46 /// let mut buf = [0];
47 /// stream.read(buf);
48 /// drop(stream); // close the connection
49 /// ```
50 pub struct TcpStream {
51     inner: TcpStreamImp,
52 }
53
54 impl TcpStream {
55     fn new(s: TcpStreamImp) -> TcpStream {
56         TcpStream { inner: s }
57     }
58
59     /// Open a TCP connection to a remote host.
60     ///
61     /// `addr` is an address of the remote host. Anything which implements `ToSocketAddr`
62     /// trait can be supplied for the address; see this trait documentation for
63     /// concrete examples.
64     pub fn connect<A: ToSocketAddr>(addr: A) -> IoResult<TcpStream> {
65         super::with_addresses(addr, |addr| {
66             TcpStreamImp::connect(addr, None).map(TcpStream::new)
67         })
68     }
69
70     /// Creates a TCP connection to a remote socket address, timing out after
71     /// the specified duration.
72     ///
73     /// This is the same as the `connect` method, except that if the timeout
74     /// specified elapses before a connection is made an error will be
75     /// returned. The error's kind will be `TimedOut`.
76     ///
77     /// Same as the `connect` method, `addr` argument type can be anything which
78     /// implements `ToSocketAddr` trait.
79     ///
80     /// If a `timeout` with zero or negative duration is specified then
81     /// the function returns `Err`, with the error kind set to `TimedOut`.
82     #[experimental = "the timeout argument may eventually change types"]
83     pub fn connect_timeout<A: ToSocketAddr>(addr: A,
84                                             timeout: Duration) -> IoResult<TcpStream> {
85         if timeout <= Duration::milliseconds(0) {
86             return Err(standard_error(TimedOut));
87         }
88
89         super::with_addresses(addr, |addr| {
90             TcpStreamImp::connect(addr, Some(timeout.num_milliseconds() as u64))
91                 .map(TcpStream::new)
92         })
93     }
94
95     /// Returns the socket address of the remote peer of this TCP connection.
96     pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
97         self.inner.peer_name()
98     }
99
100     /// Returns the socket address of the local half of this TCP connection.
101     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
102         self.inner.socket_name()
103     }
104
105     /// Sets the nodelay flag on this connection to the boolean specified
106     #[experimental]
107     pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
108         self.inner.set_nodelay(nodelay)
109     }
110
111     /// Sets the keepalive timeout to the timeout specified.
112     ///
113     /// If the value specified is `None`, then the keepalive flag is cleared on
114     /// this connection. Otherwise, the keepalive timeout will be set to the
115     /// specified time, in seconds.
116     #[experimental]
117     pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
118         self.inner.set_keepalive(delay_in_seconds)
119     }
120
121     /// Closes the reading half of this connection.
122     ///
123     /// This method will close the reading portion of this connection, causing
124     /// all pending and future reads to immediately return with an error.
125     ///
126     /// # Example
127     ///
128     /// ```no_run
129     /// # #![allow(unused_must_use)]
130     /// use std::io::timer;
131     /// use std::io::TcpStream;
132     /// use std::time::Duration;
133     ///
134     /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
135     /// let stream2 = stream.clone();
136     ///
137     /// spawn(proc() {
138     ///     // close this stream after one second
139     ///     timer::sleep(Duration::seconds(1));
140     ///     let mut stream = stream2;
141     ///     stream.close_read();
142     /// });
143     ///
144     /// // wait for some data, will get canceled after one second
145     /// let mut buf = [0];
146     /// stream.read(buf);
147     /// ```
148     ///
149     /// Note that this method affects all cloned handles associated with this
150     /// stream, not just this one handle.
151     pub fn close_read(&mut self) -> IoResult<()> {
152         self.inner.close_read()
153     }
154
155     /// Closes the writing half of this connection.
156     ///
157     /// This method will close the writing portion of this connection, causing
158     /// all future writes to immediately return with an error.
159     ///
160     /// Note that this method affects all cloned handles associated with this
161     /// stream, not just this one handle.
162     pub fn close_write(&mut self) -> IoResult<()> {
163         self.inner.close_write()
164     }
165
166     /// Sets a timeout, in milliseconds, for blocking operations on this stream.
167     ///
168     /// This function will set a timeout for all blocking operations (including
169     /// reads and writes) on this stream. The timeout specified is a relative
170     /// time, in milliseconds, into the future after which point operations will
171     /// time out. This means that the timeout must be reset periodically to keep
172     /// it from expiring. Specifying a value of `None` will clear the timeout
173     /// for this stream.
174     ///
175     /// The timeout on this stream is local to this stream only. Setting a
176     /// timeout does not affect any other cloned instances of this stream, nor
177     /// does the timeout propagated to cloned handles of this stream. Setting
178     /// this timeout will override any specific read or write timeouts
179     /// previously set for this stream.
180     ///
181     /// For clarification on the semantics of interrupting a read and a write,
182     /// take a look at `set_read_timeout` and `set_write_timeout`.
183     #[experimental = "the timeout argument may change in type and value"]
184     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
185         self.inner.set_timeout(timeout_ms)
186     }
187
188     /// Sets the timeout for read operations on this stream.
189     ///
190     /// See documentation in `set_timeout` for the semantics of this read time.
191     /// This will overwrite any previous read timeout set through either this
192     /// function or `set_timeout`.
193     ///
194     /// # Errors
195     ///
196     /// When this timeout expires, if there is no pending read operation, no
197     /// action is taken. Otherwise, the read operation will be scheduled to
198     /// promptly return. If a timeout error is returned, then no data was read
199     /// during the timeout period.
200     #[experimental = "the timeout argument may change in type and value"]
201     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
202         self.inner.set_read_timeout(timeout_ms)
203     }
204
205     /// Sets the timeout for write operations on this stream.
206     ///
207     /// See documentation in `set_timeout` for the semantics of this write time.
208     /// This will overwrite any previous write timeout set through either this
209     /// function or `set_timeout`.
210     ///
211     /// # Errors
212     ///
213     /// When this timeout expires, if there is no pending write operation, no
214     /// action is taken. Otherwise, the pending write operation will be
215     /// scheduled to promptly return. The actual state of the underlying stream
216     /// is not specified.
217     ///
218     /// The write operation may return an error of type `ShortWrite` which
219     /// indicates that the object is known to have written an exact number of
220     /// bytes successfully during the timeout period, and the remaining bytes
221     /// were never written.
222     ///
223     /// If the write operation returns `TimedOut`, then it the timeout primitive
224     /// does not know how many bytes were written as part of the timeout
225     /// operation. It may be the case that bytes continue to be written in an
226     /// asynchronous fashion after the call to write returns.
227     #[experimental = "the timeout argument may change in type and value"]
228     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
229         self.inner.set_write_timeout(timeout_ms)
230     }
231 }
232
233 impl Clone for TcpStream {
234     /// Creates a new handle to this TCP stream, allowing for simultaneous reads
235     /// and writes of this connection.
236     ///
237     /// The underlying TCP stream will not be closed until all handles to the
238     /// stream have been deallocated. All handles will also follow the same
239     /// stream, but two concurrent reads will not receive the same data.
240     /// Instead, the first read will receive the first packet received, and the
241     /// second read will receive the second packet.
242     fn clone(&self) -> TcpStream {
243         TcpStream { inner: self.inner.clone() }
244     }
245 }
246
247 impl Reader for TcpStream {
248     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
249         self.inner.read(buf)
250     }
251 }
252
253 impl Writer for TcpStream {
254     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
255         self.inner.write(buf)
256     }
257 }
258
259 /// A structure representing a socket server. This listener is used to create a
260 /// `TcpAcceptor` which can be used to accept sockets on a local port.
261 ///
262 /// # Example
263 ///
264 /// ```rust
265 /// # fn main() { }
266 /// # fn foo() {
267 /// # #![allow(dead_code)]
268 /// use std::io::{TcpListener, TcpStream};
269 /// use std::io::{Acceptor, Listener};
270 ///
271 /// let listener = TcpListener::bind("127.0.0.1:80");
272 ///
273 /// // bind the listener to the specified address
274 /// let mut acceptor = listener.listen();
275 ///
276 /// fn handle_client(mut stream: TcpStream) {
277 ///     // ...
278 /// # &mut stream; // silence unused mutability/variable warning
279 /// }
280 /// // accept connections and process them, spawning a new tasks for each one
281 /// for stream in acceptor.incoming() {
282 ///     match stream {
283 ///         Err(e) => { /* connection failed */ }
284 ///         Ok(stream) => spawn(proc() {
285 ///             // connection succeeded
286 ///             handle_client(stream)
287 ///         })
288 ///     }
289 /// }
290 ///
291 /// // close the socket server
292 /// drop(acceptor);
293 /// # }
294 /// ```
295 pub struct TcpListener {
296     inner: TcpListenerImp,
297 }
298
299 impl TcpListener {
300     /// Creates a new `TcpListener` which will be bound to the specified address.
301     /// This listener is not ready for accepting connections, `listen` must be called
302     /// on it before that's possible.
303     ///
304     /// Binding with a port number of 0 will request that the OS assigns a port
305     /// to this listener. The port allocated can be queried via the
306     /// `socket_name` function.
307     ///
308     /// The address type can be any implementor of `ToSocketAddr` trait. See its
309     /// documentation for concrete examples.
310     pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<TcpListener> {
311         super::with_addresses(addr, |addr| {
312             TcpListenerImp::bind(addr).map(|inner| TcpListener { inner: inner })
313         })
314     }
315
316     /// Returns the local socket address of this listener.
317     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
318         self.inner.socket_name()
319     }
320 }
321
322 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
323     fn listen(self) -> IoResult<TcpAcceptor> {
324         self.inner.listen(128).map(|a| TcpAcceptor { inner: a })
325     }
326 }
327
328 /// The accepting half of a TCP socket server. This structure is created through
329 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
330 /// `TcpStream` instances.
331 pub struct TcpAcceptor {
332     inner: TcpAcceptorImp,
333 }
334
335 impl TcpAcceptor {
336     /// Prevents blocking on all future accepts after `ms` milliseconds have
337     /// elapsed.
338     ///
339     /// This function is used to set a deadline after which this acceptor will
340     /// time out accepting any connections. The argument is the relative
341     /// distance, in milliseconds, to a point in the future after which all
342     /// accepts will fail.
343     ///
344     /// If the argument specified is `None`, then any previously registered
345     /// timeout is cleared.
346     ///
347     /// A timeout of `0` can be used to "poll" this acceptor to see if it has
348     /// any pending connections. All pending connections will be accepted,
349     /// regardless of whether the timeout has expired or not (the accept will
350     /// not block in this case).
351     ///
352     /// # Example
353     ///
354     /// ```no_run
355     /// # #![allow(experimental)]
356     /// use std::io::TcpListener;
357     /// use std::io::{Listener, Acceptor, TimedOut};
358     ///
359     /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
360     ///
361     /// // After 100ms have passed, all accepts will fail
362     /// a.set_timeout(Some(100));
363     ///
364     /// match a.accept() {
365     ///     Ok(..) => println!("accepted a socket"),
366     ///     Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
367     ///     Err(e) => println!("err: {}", e),
368     /// }
369     ///
370     /// // Reset the timeout and try again
371     /// a.set_timeout(Some(100));
372     /// let socket = a.accept();
373     ///
374     /// // Clear the timeout and block indefinitely waiting for a connection
375     /// a.set_timeout(None);
376     /// let socket = a.accept();
377     /// ```
378     #[experimental = "the type of the argument and name of this function are \
379                       subject to change"]
380     pub fn set_timeout(&mut self, ms: Option<u64>) { self.inner.set_timeout(ms); }
381
382     /// Closes the accepting capabilities of this acceptor.
383     ///
384     /// This function is similar to `TcpStream`'s `close_{read,write}` methods
385     /// in that it will affect *all* cloned handles of this acceptor's original
386     /// handle.
387     ///
388     /// Once this function succeeds, all future calls to `accept` will return
389     /// immediately with an error, preventing all future calls to accept. The
390     /// underlying socket will not be relinquished back to the OS until all
391     /// acceptors have been deallocated.
392     ///
393     /// This is useful for waking up a thread in an accept loop to indicate that
394     /// it should exit.
395     ///
396     /// # Example
397     ///
398     /// ```
399     /// # #![allow(experimental)]
400     /// use std::io::{TcpListener, Listener, Acceptor, EndOfFile};
401     ///
402     /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
403     /// let a2 = a.clone();
404     ///
405     /// spawn(proc() {
406     ///     let mut a2 = a2;
407     ///     for socket in a2.incoming() {
408     ///         match socket {
409     ///             Ok(s) => { /* handle s */ }
410     ///             Err(ref e) if e.kind == EndOfFile => break, // closed
411     ///             Err(e) => panic!("unexpected error: {}", e),
412     ///         }
413     ///     }
414     /// });
415     ///
416     /// # fn wait_for_sigint() {}
417     /// // Now that our accept loop is running, wait for the program to be
418     /// // requested to exit.
419     /// wait_for_sigint();
420     ///
421     /// // Signal our accept loop to exit
422     /// assert!(a.close_accept().is_ok());
423     /// ```
424     #[experimental]
425     pub fn close_accept(&mut self) -> IoResult<()> {
426         self.inner.close_accept()
427     }
428 }
429
430 impl Acceptor<TcpStream> for TcpAcceptor {
431     fn accept(&mut self) -> IoResult<TcpStream> {
432         self.inner.accept().map(TcpStream::new)
433     }
434 }
435
436 impl Clone for TcpAcceptor {
437     /// Creates a new handle to this TCP acceptor, allowing for simultaneous
438     /// accepts.
439     ///
440     /// The underlying TCP acceptor will not be closed until all handles to the
441     /// acceptor have been deallocated. Incoming connections will be received on
442     /// at most once acceptor, the same connection will not be accepted twice.
443     ///
444     /// The `close_accept` method will shut down *all* acceptors cloned from the
445     /// same original acceptor, whereas the `set_timeout` method only affects
446     /// the selector that it is called on.
447     ///
448     /// This function is useful for creating a handle to invoke `close_accept`
449     /// on to wake up any other task blocked in `accept`.
450     fn clone(&self) -> TcpAcceptor {
451         TcpAcceptor { inner: self.inner.clone() }
452     }
453 }
454
455 #[cfg(test)]
456 #[allow(experimental)]
457 mod test {
458     use io::net::tcp::*;
459     use io::net::ip::*;
460     use io::*;
461     use io::test::*;
462     use prelude::*;
463
464     // FIXME #11530 this fails on android because tests are run as root
465     #[cfg_attr(any(windows, target_os = "android"), ignore)]
466     #[test]
467     fn bind_error() {
468         match TcpListener::bind("0.0.0.0:1") {
469             Ok(..) => panic!(),
470             Err(e) => assert_eq!(e.kind, PermissionDenied),
471         }
472     }
473
474     #[test]
475     fn connect_error() {
476         match TcpStream::connect("0.0.0.0:1") {
477             Ok(..) => panic!(),
478             Err(e) => assert_eq!(e.kind, ConnectionRefused),
479         }
480     }
481
482     #[test]
483     fn listen_ip4_localhost() {
484         let socket_addr = next_test_ip4();
485         let listener = TcpListener::bind(socket_addr);
486         let mut acceptor = listener.listen();
487
488         spawn(proc() {
489             let mut stream = TcpStream::connect(("localhost", socket_addr.port));
490             stream.write([144]).unwrap();
491         });
492
493         let mut stream = acceptor.accept();
494         let mut buf = [0];
495         stream.read(buf).unwrap();
496         assert!(buf[0] == 144);
497     }
498
499     #[test]
500     fn connect_localhost() {
501         let addr = next_test_ip4();
502         let mut acceptor = TcpListener::bind(addr).listen();
503
504         spawn(proc() {
505             let mut stream = TcpStream::connect(("localhost", addr.port));
506             stream.write([64]).unwrap();
507         });
508
509         let mut stream = acceptor.accept();
510         let mut buf = [0];
511         stream.read(buf).unwrap();
512         assert!(buf[0] == 64);
513     }
514
515     #[test]
516     fn connect_ip4_loopback() {
517         let addr = next_test_ip4();
518         let mut acceptor = TcpListener::bind(addr).listen();
519
520         spawn(proc() {
521             let mut stream = TcpStream::connect(("127.0.0.1", addr.port));
522             stream.write([44]).unwrap();
523         });
524
525         let mut stream = acceptor.accept();
526         let mut buf = [0];
527         stream.read(buf).unwrap();
528         assert!(buf[0] == 44);
529     }
530
531     #[test]
532     fn connect_ip6_loopback() {
533         let addr = next_test_ip6();
534         let mut acceptor = TcpListener::bind(addr).listen();
535
536         spawn(proc() {
537             let mut stream = TcpStream::connect(("::1", addr.port));
538             stream.write([66]).unwrap();
539         });
540
541         let mut stream = acceptor.accept();
542         let mut buf = [0];
543         stream.read(buf).unwrap();
544         assert!(buf[0] == 66);
545     }
546
547     #[test]
548     fn smoke_test_ip4() {
549         let addr = next_test_ip4();
550         let mut acceptor = TcpListener::bind(addr).listen();
551
552         spawn(proc() {
553             let mut stream = TcpStream::connect(addr);
554             stream.write([99]).unwrap();
555         });
556
557         let mut stream = acceptor.accept();
558         let mut buf = [0];
559         stream.read(buf).unwrap();
560         assert!(buf[0] == 99);
561     }
562
563     #[test]
564     fn smoke_test_ip6() {
565         let addr = next_test_ip6();
566         let mut acceptor = TcpListener::bind(addr).listen();
567
568         spawn(proc() {
569             let mut stream = TcpStream::connect(addr);
570             stream.write([99]).unwrap();
571         });
572
573         let mut stream = acceptor.accept();
574         let mut buf = [0];
575         stream.read(buf).unwrap();
576         assert!(buf[0] == 99);
577     }
578
579     #[test]
580     fn read_eof_ip4() {
581         let addr = next_test_ip4();
582         let mut acceptor = TcpListener::bind(addr).listen();
583
584         spawn(proc() {
585             let _stream = TcpStream::connect(addr);
586             // Close
587         });
588
589         let mut stream = acceptor.accept();
590         let mut buf = [0];
591         let nread = stream.read(buf);
592         assert!(nread.is_err());
593     }
594
595     #[test]
596     fn read_eof_ip6() {
597         let addr = next_test_ip6();
598         let mut acceptor = TcpListener::bind(addr).listen();
599
600         spawn(proc() {
601             let _stream = TcpStream::connect(addr);
602             // Close
603         });
604
605         let mut stream = acceptor.accept();
606         let mut buf = [0];
607         let nread = stream.read(buf);
608         assert!(nread.is_err());
609     }
610
611     #[test]
612     fn read_eof_twice_ip4() {
613         let addr = next_test_ip4();
614         let mut acceptor = TcpListener::bind(addr).listen();
615
616         spawn(proc() {
617             let _stream = TcpStream::connect(addr);
618             // Close
619         });
620
621         let mut stream = acceptor.accept();
622         let mut buf = [0];
623         let nread = stream.read(buf);
624         assert!(nread.is_err());
625
626         match stream.read(buf) {
627             Ok(..) => panic!(),
628             Err(ref e) => {
629                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
630                         "unknown kind: {}", e.kind);
631             }
632         }
633     }
634
635     #[test]
636     fn read_eof_twice_ip6() {
637         let addr = next_test_ip6();
638         let mut acceptor = TcpListener::bind(addr).listen();
639
640         spawn(proc() {
641             let _stream = TcpStream::connect(addr);
642             // Close
643         });
644
645         let mut stream = acceptor.accept();
646         let mut buf = [0];
647         let nread = stream.read(buf);
648         assert!(nread.is_err());
649
650         match stream.read(buf) {
651             Ok(..) => panic!(),
652             Err(ref e) => {
653                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
654                         "unknown kind: {}", e.kind);
655             }
656         }
657     }
658
659     #[test]
660     fn write_close_ip4() {
661         let addr = next_test_ip4();
662         let mut acceptor = TcpListener::bind(addr).listen();
663
664         let (tx, rx) = channel();
665         spawn(proc() {
666             drop(TcpStream::connect(addr));
667             tx.send(());
668         });
669
670         let mut stream = acceptor.accept();
671         rx.recv();
672         let buf = [0];
673         match stream.write(buf) {
674             Ok(..) => {}
675             Err(e) => {
676                 assert!(e.kind == ConnectionReset ||
677                         e.kind == BrokenPipe ||
678                         e.kind == ConnectionAborted,
679                         "unknown error: {}", e);
680             }
681         }
682     }
683
684     #[test]
685     fn write_close_ip6() {
686         let addr = next_test_ip6();
687         let mut acceptor = TcpListener::bind(addr).listen();
688
689         let (tx, rx) = channel();
690         spawn(proc() {
691             drop(TcpStream::connect(addr));
692             tx.send(());
693         });
694
695         let mut stream = acceptor.accept();
696         rx.recv();
697         let buf = [0];
698         match stream.write(buf) {
699             Ok(..) => {}
700             Err(e) => {
701                 assert!(e.kind == ConnectionReset ||
702                         e.kind == BrokenPipe ||
703                         e.kind == ConnectionAborted,
704                         "unknown error: {}", e);
705             }
706         }
707     }
708
709     #[test]
710     fn multiple_connect_serial_ip4() {
711         let addr = next_test_ip4();
712         let max = 10u;
713         let mut acceptor = TcpListener::bind(addr).listen();
714
715         spawn(proc() {
716             for _ in range(0, max) {
717                 let mut stream = TcpStream::connect(addr);
718                 stream.write([99]).unwrap();
719             }
720         });
721
722         for ref mut stream in acceptor.incoming().take(max) {
723             let mut buf = [0];
724             stream.read(buf).unwrap();
725             assert_eq!(buf[0], 99);
726         }
727     }
728
729     #[test]
730     fn multiple_connect_serial_ip6() {
731         let addr = next_test_ip6();
732         let max = 10u;
733         let mut acceptor = TcpListener::bind(addr).listen();
734
735         spawn(proc() {
736             for _ in range(0, max) {
737                 let mut stream = TcpStream::connect(addr);
738                 stream.write([99]).unwrap();
739             }
740         });
741
742         for ref mut stream in acceptor.incoming().take(max) {
743             let mut buf = [0];
744             stream.read(buf).unwrap();
745             assert_eq!(buf[0], 99);
746         }
747     }
748
749     #[test]
750     fn multiple_connect_interleaved_greedy_schedule_ip4() {
751         let addr = next_test_ip4();
752         static MAX: int = 10;
753         let acceptor = TcpListener::bind(addr).listen();
754
755         spawn(proc() {
756             let mut acceptor = acceptor;
757             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
758                 // Start another task to handle the connection
759                 spawn(proc() {
760                     let mut stream = stream;
761                     let mut buf = [0];
762                     stream.read(buf).unwrap();
763                     assert!(buf[0] == i as u8);
764                     debug!("read");
765                 });
766             }
767         });
768
769         connect(0, addr);
770
771         fn connect(i: int, addr: SocketAddr) {
772             if i == MAX { return }
773
774             spawn(proc() {
775                 debug!("connecting");
776                 let mut stream = TcpStream::connect(addr);
777                 // Connect again before writing
778                 connect(i + 1, addr);
779                 debug!("writing");
780                 stream.write([i as u8]).unwrap();
781             });
782         }
783     }
784
785     #[test]
786     fn multiple_connect_interleaved_greedy_schedule_ip6() {
787         let addr = next_test_ip6();
788         static MAX: int = 10;
789         let acceptor = TcpListener::bind(addr).listen();
790
791         spawn(proc() {
792             let mut acceptor = acceptor;
793             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
794                 // Start another task to handle the connection
795                 spawn(proc() {
796                     let mut stream = stream;
797                     let mut buf = [0];
798                     stream.read(buf).unwrap();
799                     assert!(buf[0] == i as u8);
800                     debug!("read");
801                 });
802             }
803         });
804
805         connect(0, addr);
806
807         fn connect(i: int, addr: SocketAddr) {
808             if i == MAX { return }
809
810             spawn(proc() {
811                 debug!("connecting");
812                 let mut stream = TcpStream::connect(addr);
813                 // Connect again before writing
814                 connect(i + 1, addr);
815                 debug!("writing");
816                 stream.write([i as u8]).unwrap();
817             });
818         }
819     }
820
821     #[test]
822     fn multiple_connect_interleaved_lazy_schedule_ip4() {
823         static MAX: int = 10;
824         let addr = next_test_ip4();
825         let acceptor = TcpListener::bind(addr).listen();
826
827         spawn(proc() {
828             let mut acceptor = acceptor;
829             for stream in acceptor.incoming().take(MAX as uint) {
830                 // Start another task to handle the connection
831                 spawn(proc() {
832                     let mut stream = stream;
833                     let mut buf = [0];
834                     stream.read(buf).unwrap();
835                     assert!(buf[0] == 99);
836                     debug!("read");
837                 });
838             }
839         });
840
841         connect(0, addr);
842
843         fn connect(i: int, addr: SocketAddr) {
844             if i == MAX { return }
845
846             spawn(proc() {
847                 debug!("connecting");
848                 let mut stream = TcpStream::connect(addr);
849                 // Connect again before writing
850                 connect(i + 1, addr);
851                 debug!("writing");
852                 stream.write([99]).unwrap();
853             });
854         }
855     }
856
857     #[test]
858     fn multiple_connect_interleaved_lazy_schedule_ip6() {
859         static MAX: int = 10;
860         let addr = next_test_ip6();
861         let acceptor = TcpListener::bind(addr).listen();
862
863         spawn(proc() {
864             let mut acceptor = acceptor;
865             for stream in acceptor.incoming().take(MAX as uint) {
866                 // Start another task to handle the connection
867                 spawn(proc() {
868                     let mut stream = stream;
869                     let mut buf = [0];
870                     stream.read(buf).unwrap();
871                     assert!(buf[0] == 99);
872                     debug!("read");
873                 });
874             }
875         });
876
877         connect(0, addr);
878
879         fn connect(i: int, addr: SocketAddr) {
880             if i == MAX { return }
881
882             spawn(proc() {
883                 debug!("connecting");
884                 let mut stream = TcpStream::connect(addr);
885                 // Connect again before writing
886                 connect(i + 1, addr);
887                 debug!("writing");
888                 stream.write([99]).unwrap();
889             });
890         }
891     }
892
893     pub fn socket_name(addr: SocketAddr) {
894         let mut listener = TcpListener::bind(addr).unwrap();
895
896         // Make sure socket_name gives
897         // us the socket we binded to.
898         let so_name = listener.socket_name();
899         assert!(so_name.is_ok());
900         assert_eq!(addr, so_name.unwrap());
901     }
902
903     pub fn peer_name(addr: SocketAddr) {
904         let acceptor = TcpListener::bind(addr).listen();
905         spawn(proc() {
906             let mut acceptor = acceptor;
907             acceptor.accept().unwrap();
908         });
909
910         let stream = TcpStream::connect(addr);
911
912         assert!(stream.is_ok());
913         let mut stream = stream.unwrap();
914
915         // Make sure peer_name gives us the
916         // address/port of the peer we've
917         // connected to.
918         let peer_name = stream.peer_name();
919         assert!(peer_name.is_ok());
920         assert_eq!(addr, peer_name.unwrap());
921     }
922
923     #[test]
924     fn socket_and_peer_name_ip4() {
925         peer_name(next_test_ip4());
926         socket_name(next_test_ip4());
927     }
928
929     #[test]
930     fn socket_and_peer_name_ip6() {
931         // FIXME: peer name is not consistent
932         //peer_name(next_test_ip6());
933         socket_name(next_test_ip6());
934     }
935
936     #[test]
937     fn partial_read() {
938         let addr = next_test_ip4();
939         let (tx, rx) = channel();
940         spawn(proc() {
941             let mut srv = TcpListener::bind(addr).listen().unwrap();
942             tx.send(());
943             let mut cl = srv.accept().unwrap();
944             cl.write([10]).unwrap();
945             let mut b = [0];
946             cl.read(b).unwrap();
947             tx.send(());
948         });
949
950         rx.recv();
951         let mut c = TcpStream::connect(addr).unwrap();
952         let mut b = [0, ..10];
953         assert_eq!(c.read(b), Ok(1));
954         c.write([1]).unwrap();
955         rx.recv();
956     }
957
958     #[test]
959     fn double_bind() {
960         let addr = next_test_ip4();
961         let listener = TcpListener::bind(addr).unwrap().listen();
962         assert!(listener.is_ok());
963         match TcpListener::bind(addr).listen() {
964             Ok(..) => panic!(),
965             Err(e) => {
966                 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
967                         "unknown error: {} {}", e, e.kind);
968             }
969         }
970     }
971
972     #[test]
973     fn fast_rebind() {
974         let addr = next_test_ip4();
975         let (tx, rx) = channel();
976
977         spawn(proc() {
978             rx.recv();
979             let _stream = TcpStream::connect(addr).unwrap();
980             // Close
981             rx.recv();
982         });
983
984         {
985             let mut acceptor = TcpListener::bind(addr).listen();
986             tx.send(());
987             {
988                 let _stream = acceptor.accept().unwrap();
989                 // Close client
990                 tx.send(());
991             }
992             // Close listener
993         }
994         let _listener = TcpListener::bind(addr);
995     }
996
997     #[test]
998     fn tcp_clone_smoke() {
999         let addr = next_test_ip4();
1000         let mut acceptor = TcpListener::bind(addr).listen();
1001
1002         spawn(proc() {
1003             let mut s = TcpStream::connect(addr);
1004             let mut buf = [0, 0];
1005             assert_eq!(s.read(buf), Ok(1));
1006             assert_eq!(buf[0], 1);
1007             s.write([2]).unwrap();
1008         });
1009
1010         let mut s1 = acceptor.accept().unwrap();
1011         let s2 = s1.clone();
1012
1013         let (tx1, rx1) = channel();
1014         let (tx2, rx2) = channel();
1015         spawn(proc() {
1016             let mut s2 = s2;
1017             rx1.recv();
1018             s2.write([1]).unwrap();
1019             tx2.send(());
1020         });
1021         tx1.send(());
1022         let mut buf = [0, 0];
1023         assert_eq!(s1.read(buf), Ok(1));
1024         rx2.recv();
1025     }
1026
1027     #[test]
1028     fn tcp_clone_two_read() {
1029         let addr = next_test_ip6();
1030         let mut acceptor = TcpListener::bind(addr).listen();
1031         let (tx1, rx) = channel();
1032         let tx2 = tx1.clone();
1033
1034         spawn(proc() {
1035             let mut s = TcpStream::connect(addr);
1036             s.write([1]).unwrap();
1037             rx.recv();
1038             s.write([2]).unwrap();
1039             rx.recv();
1040         });
1041
1042         let mut s1 = acceptor.accept().unwrap();
1043         let s2 = s1.clone();
1044
1045         let (done, rx) = channel();
1046         spawn(proc() {
1047             let mut s2 = s2;
1048             let mut buf = [0, 0];
1049             s2.read(buf).unwrap();
1050             tx2.send(());
1051             done.send(());
1052         });
1053         let mut buf = [0, 0];
1054         s1.read(buf).unwrap();
1055         tx1.send(());
1056
1057         rx.recv();
1058     }
1059
1060     #[test]
1061     fn tcp_clone_two_write() {
1062         let addr = next_test_ip4();
1063         let mut acceptor = TcpListener::bind(addr).listen();
1064
1065         spawn(proc() {
1066             let mut s = TcpStream::connect(addr);
1067             let mut buf = [0, 1];
1068             s.read(buf).unwrap();
1069             s.read(buf).unwrap();
1070         });
1071
1072         let mut s1 = acceptor.accept().unwrap();
1073         let s2 = s1.clone();
1074
1075         let (done, rx) = channel();
1076         spawn(proc() {
1077             let mut s2 = s2;
1078             s2.write([1]).unwrap();
1079             done.send(());
1080         });
1081         s1.write([2]).unwrap();
1082
1083         rx.recv();
1084     }
1085
1086     #[test]
1087     fn shutdown_smoke() {
1088         let addr = next_test_ip4();
1089         let a = TcpListener::bind(addr).unwrap().listen();
1090         spawn(proc() {
1091             let mut a = a;
1092             let mut c = a.accept().unwrap();
1093             assert_eq!(c.read_to_end(), Ok(vec!()));
1094             c.write([1]).unwrap();
1095         });
1096
1097         let mut s = TcpStream::connect(addr).unwrap();
1098         assert!(s.inner.close_write().is_ok());
1099         assert!(s.write([1]).is_err());
1100         assert_eq!(s.read_to_end(), Ok(vec!(1)));
1101     }
1102
1103     #[test]
1104     fn accept_timeout() {
1105         let addr = next_test_ip4();
1106         let mut a = TcpListener::bind(addr).unwrap().listen().unwrap();
1107
1108         a.set_timeout(Some(10));
1109
1110         // Make sure we time out once and future invocations also time out
1111         let err = a.accept().err().unwrap();
1112         assert_eq!(err.kind, TimedOut);
1113         let err = a.accept().err().unwrap();
1114         assert_eq!(err.kind, TimedOut);
1115
1116         // Also make sure that even though the timeout is expired that we will
1117         // continue to receive any pending connections.
1118         //
1119         // FIXME: freebsd apparently never sees the pending connection, but
1120         //        testing manually always works. Need to investigate this
1121         //        flakiness.
1122         if !cfg!(target_os = "freebsd") {
1123             let (tx, rx) = channel();
1124             spawn(proc() {
1125                 tx.send(TcpStream::connect(addr).unwrap());
1126             });
1127             let _l = rx.recv();
1128             for i in range(0i, 1001) {
1129                 match a.accept() {
1130                     Ok(..) => break,
1131                     Err(ref e) if e.kind == TimedOut => {}
1132                     Err(e) => panic!("error: {}", e),
1133                 }
1134                 ::task::deschedule();
1135                 if i == 1000 { panic!("should have a pending connection") }
1136             }
1137         }
1138
1139         // Unset the timeout and make sure that this always blocks.
1140         a.set_timeout(None);
1141         spawn(proc() {
1142             drop(TcpStream::connect(addr).unwrap());
1143         });
1144         a.accept().unwrap();
1145     }
1146
1147     #[test]
1148     fn close_readwrite_smoke() {
1149         let addr = next_test_ip4();
1150         let a = TcpListener::bind(addr).listen().unwrap();
1151         let (_tx, rx) = channel::<()>();
1152         spawn(proc() {
1153             let mut a = a;
1154             let _s = a.accept().unwrap();
1155             let _ = rx.recv_opt();
1156         });
1157
1158         let mut b = [0];
1159         let mut s = TcpStream::connect(addr).unwrap();
1160         let mut s2 = s.clone();
1161
1162         // closing should prevent reads/writes
1163         s.close_write().unwrap();
1164         assert!(s.write([0]).is_err());
1165         s.close_read().unwrap();
1166         assert!(s.read(b).is_err());
1167
1168         // closing should affect previous handles
1169         assert!(s2.write([0]).is_err());
1170         assert!(s2.read(b).is_err());
1171
1172         // closing should affect new handles
1173         let mut s3 = s.clone();
1174         assert!(s3.write([0]).is_err());
1175         assert!(s3.read(b).is_err());
1176
1177         // make sure these don't die
1178         let _ = s2.close_read();
1179         let _ = s2.close_write();
1180         let _ = s3.close_read();
1181         let _ = s3.close_write();
1182     }
1183
1184     #[test]
1185     fn close_read_wakes_up() {
1186         let addr = next_test_ip4();
1187         let a = TcpListener::bind(addr).listen().unwrap();
1188         let (_tx, rx) = channel::<()>();
1189         spawn(proc() {
1190             let mut a = a;
1191             let _s = a.accept().unwrap();
1192             let _ = rx.recv_opt();
1193         });
1194
1195         let mut s = TcpStream::connect(addr).unwrap();
1196         let s2 = s.clone();
1197         let (tx, rx) = channel();
1198         spawn(proc() {
1199             let mut s2 = s2;
1200             assert!(s2.read([0]).is_err());
1201             tx.send(());
1202         });
1203         // this should wake up the child task
1204         s.close_read().unwrap();
1205
1206         // this test will never finish if the child doesn't wake up
1207         rx.recv();
1208     }
1209
1210     #[test]
1211     fn readwrite_timeouts() {
1212         let addr = next_test_ip6();
1213         let mut a = TcpListener::bind(addr).listen().unwrap();
1214         let (tx, rx) = channel::<()>();
1215         spawn(proc() {
1216             let mut s = TcpStream::connect(addr).unwrap();
1217             rx.recv();
1218             assert!(s.write([0]).is_ok());
1219             let _ = rx.recv_opt();
1220         });
1221
1222         let mut s = a.accept().unwrap();
1223         s.set_timeout(Some(20));
1224         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1225         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1226
1227         s.set_timeout(Some(20));
1228         for i in range(0i, 1001) {
1229             match s.write([0, .. 128 * 1024]) {
1230                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1231                 Err(IoError { kind: TimedOut, .. }) => break,
1232                 Err(e) => panic!("{}", e),
1233            }
1234            if i == 1000 { panic!("should have filled up?!"); }
1235         }
1236         assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1237
1238         tx.send(());
1239         s.set_timeout(None);
1240         assert_eq!(s.read([0, 0]), Ok(1));
1241     }
1242
1243     #[test]
1244     fn read_timeouts() {
1245         let addr = next_test_ip6();
1246         let mut a = TcpListener::bind(addr).listen().unwrap();
1247         let (tx, rx) = channel::<()>();
1248         spawn(proc() {
1249             let mut s = TcpStream::connect(addr).unwrap();
1250             rx.recv();
1251             let mut amt = 0;
1252             while amt < 100 * 128 * 1024 {
1253                 match s.read([0, ..128 * 1024]) {
1254                     Ok(n) => { amt += n; }
1255                     Err(e) => panic!("{}", e),
1256                 }
1257             }
1258             let _ = rx.recv_opt();
1259         });
1260
1261         let mut s = a.accept().unwrap();
1262         s.set_read_timeout(Some(20));
1263         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1264         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1265
1266         tx.send(());
1267         for _ in range(0i, 100) {
1268             assert!(s.write([0, ..128 * 1024]).is_ok());
1269         }
1270     }
1271
1272     #[test]
1273     fn write_timeouts() {
1274         let addr = next_test_ip6();
1275         let mut a = TcpListener::bind(addr).listen().unwrap();
1276         let (tx, rx) = channel::<()>();
1277         spawn(proc() {
1278             let mut s = TcpStream::connect(addr).unwrap();
1279             rx.recv();
1280             assert!(s.write([0]).is_ok());
1281             let _ = rx.recv_opt();
1282         });
1283
1284         let mut s = a.accept().unwrap();
1285         s.set_write_timeout(Some(20));
1286         for i in range(0i, 1001) {
1287             match s.write([0, .. 128 * 1024]) {
1288                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1289                 Err(IoError { kind: TimedOut, .. }) => break,
1290                 Err(e) => panic!("{}", e),
1291            }
1292            if i == 1000 { panic!("should have filled up?!"); }
1293         }
1294         assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1295
1296         tx.send(());
1297         assert!(s.read([0]).is_ok());
1298     }
1299
1300     #[test]
1301     fn timeout_concurrent_read() {
1302         let addr = next_test_ip6();
1303         let mut a = TcpListener::bind(addr).listen().unwrap();
1304         let (tx, rx) = channel::<()>();
1305         spawn(proc() {
1306             let mut s = TcpStream::connect(addr).unwrap();
1307             rx.recv();
1308             assert_eq!(s.write([0]), Ok(()));
1309             let _ = rx.recv_opt();
1310         });
1311
1312         let mut s = a.accept().unwrap();
1313         let s2 = s.clone();
1314         let (tx2, rx2) = channel();
1315         spawn(proc() {
1316             let mut s2 = s2;
1317             assert_eq!(s2.read([0]), Ok(1));
1318             tx2.send(());
1319         });
1320
1321         s.set_read_timeout(Some(20));
1322         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1323         tx.send(());
1324
1325         rx2.recv();
1326     }
1327
1328     #[test]
1329     fn clone_while_reading() {
1330         let addr = next_test_ip6();
1331         let listen = TcpListener::bind(addr);
1332         let mut accept = listen.listen().unwrap();
1333
1334         // Enqueue a task to write to a socket
1335         let (tx, rx) = channel();
1336         let (txdone, rxdone) = channel();
1337         let txdone2 = txdone.clone();
1338         spawn(proc() {
1339             let mut tcp = TcpStream::connect(addr).unwrap();
1340             rx.recv();
1341             tcp.write_u8(0).unwrap();
1342             txdone2.send(());
1343         });
1344
1345         // Spawn off a reading clone
1346         let tcp = accept.accept().unwrap();
1347         let tcp2 = tcp.clone();
1348         let txdone3 = txdone.clone();
1349         spawn(proc() {
1350             let mut tcp2 = tcp2;
1351             tcp2.read_u8().unwrap();
1352             txdone3.send(());
1353         });
1354
1355         // Try to ensure that the reading clone is indeed reading
1356         for _ in range(0i, 50) {
1357             ::task::deschedule();
1358         }
1359
1360         // clone the handle again while it's reading, then let it finish the
1361         // read.
1362         let _ = tcp.clone();
1363         tx.send(());
1364         rxdone.recv();
1365         rxdone.recv();
1366     }
1367
1368     #[test]
1369     fn clone_accept_smoke() {
1370         let addr = next_test_ip4();
1371         let l = TcpListener::bind(addr);
1372         let mut a = l.listen().unwrap();
1373         let mut a2 = a.clone();
1374
1375         spawn(proc() {
1376             let _ = TcpStream::connect(addr);
1377         });
1378         spawn(proc() {
1379             let _ = TcpStream::connect(addr);
1380         });
1381
1382         assert!(a.accept().is_ok());
1383         assert!(a2.accept().is_ok());
1384     }
1385
1386     #[test]
1387     fn clone_accept_concurrent() {
1388         let addr = next_test_ip4();
1389         let l = TcpListener::bind(addr);
1390         let a = l.listen().unwrap();
1391         let a2 = a.clone();
1392
1393         let (tx, rx) = channel();
1394         let tx2 = tx.clone();
1395
1396         spawn(proc() { let mut a = a; tx.send(a.accept()) });
1397         spawn(proc() { let mut a = a2; tx2.send(a.accept()) });
1398
1399         spawn(proc() {
1400             let _ = TcpStream::connect(addr);
1401         });
1402         spawn(proc() {
1403             let _ = TcpStream::connect(addr);
1404         });
1405
1406         assert!(rx.recv().is_ok());
1407         assert!(rx.recv().is_ok());
1408     }
1409
1410     #[test]
1411     fn close_accept_smoke() {
1412         let addr = next_test_ip4();
1413         let l = TcpListener::bind(addr);
1414         let mut a = l.listen().unwrap();
1415
1416         a.close_accept().unwrap();
1417         assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
1418     }
1419
1420     #[test]
1421     fn close_accept_concurrent() {
1422         let addr = next_test_ip4();
1423         let l = TcpListener::bind(addr);
1424         let a = l.listen().unwrap();
1425         let mut a2 = a.clone();
1426
1427         let (tx, rx) = channel();
1428         spawn(proc() {
1429             let mut a = a;
1430             tx.send(a.accept());
1431         });
1432         a2.close_accept().unwrap();
1433
1434         assert_eq!(rx.recv().err().unwrap().kind, EndOfFile);
1435     }
1436 }