]> git.lizzy.rs Git - rust.git/blob - src/libstd/io/net/tcp.rs
2545e07cbb5c2bd1dddbbe5a3697502abeb1ffdd
[rust.git] / src / libstd / io / net / tcp.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! TCP network connections
12 //!
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
16 //!
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
19
20 use clone::Clone;
21 use io::IoResult;
22 use iter::Iterator;
23 use result::Err;
24 use io::net::ip::{SocketAddr, ToSocketAddr};
25 use io::{Reader, Writer, Listener, Acceptor};
26 use io::{standard_error, TimedOut};
27 use option::{None, Some, Option};
28 use time::Duration;
29
30 use sys::tcp::TcpStream as TcpStreamImp;
31 use sys::tcp::TcpListener as TcpListenerImp;
32 use sys::tcp::TcpAcceptor as TcpAcceptorImp;
33
34 /// A structure which represents a TCP stream between a local socket and a
35 /// remote socket.
36 ///
37 /// # Example
38 ///
39 /// ```no_run
40 /// # #![allow(unused_must_use)]
41 /// use std::io::TcpStream;
42 ///
43 /// let mut stream = TcpStream::connect("127.0.0.1:34254");
44 ///
45 /// stream.write([1]);
46 /// let mut buf = [0];
47 /// stream.read(buf);
48 /// drop(stream); // close the connection
49 /// ```
50 pub struct TcpStream {
51     inner: TcpStreamImp,
52 }
53
54 impl TcpStream {
55     fn new(s: TcpStreamImp) -> TcpStream {
56         TcpStream { inner: s }
57     }
58
59     /// Open a TCP connection to a remote host.
60     ///
61     /// `addr` is an address of the remote host. Anything which implements `ToSocketAddr`
62     /// trait can be supplied for the address; see this trait documentation for
63     /// concrete examples.
64     pub fn connect<A: ToSocketAddr>(addr: A) -> IoResult<TcpStream> {
65         super::with_addresses(addr, |addr| {
66             TcpStreamImp::connect(addr, None).map(TcpStream::new)
67         })
68     }
69
70     /// Creates a TCP connection to a remote socket address, timing out after
71     /// the specified duration.
72     ///
73     /// This is the same as the `connect` method, except that if the timeout
74     /// specified elapses before a connection is made an error will be
75     /// returned. The error's kind will be `TimedOut`.
76     ///
77     /// Same as the `connect` method, `addr` argument type can be anything which
78     /// implements `ToSocketAddr` trait.
79     ///
80     /// If a `timeout` with zero or negative duration is specified then
81     /// the function returns `Err`, with the error kind set to `TimedOut`.
82     #[experimental = "the timeout argument may eventually change types"]
83     pub fn connect_timeout<A: ToSocketAddr>(addr: A,
84                                             timeout: Duration) -> IoResult<TcpStream> {
85         if timeout <= Duration::milliseconds(0) {
86             return Err(standard_error(TimedOut));
87         }
88
89         super::with_addresses(addr, |addr| {
90             TcpStreamImp::connect(addr, Some(timeout.num_milliseconds() as u64))
91                 .map(TcpStream::new)
92         })
93     }
94
95     /// Returns the socket address of the remote peer of this TCP connection.
96     pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
97         self.inner.peer_name()
98     }
99
100     /// Returns the socket address of the local half of this TCP connection.
101     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
102         self.inner.socket_name()
103     }
104
105     /// Sets the nodelay flag on this connection to the boolean specified
106     #[experimental]
107     pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
108         self.inner.set_nodelay(nodelay)
109     }
110
111     /// Sets the keepalive timeout to the timeout specified.
112     ///
113     /// If the value specified is `None`, then the keepalive flag is cleared on
114     /// this connection. Otherwise, the keepalive timeout will be set to the
115     /// specified time, in seconds.
116     #[experimental]
117     pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
118         self.inner.set_keepalive(delay_in_seconds)
119     }
120
121     /// Closes the reading half of this connection.
122     ///
123     /// This method will close the reading portion of this connection, causing
124     /// all pending and future reads to immediately return with an error.
125     ///
126     /// # Example
127     ///
128     /// ```no_run
129     /// # #![allow(unused_must_use)]
130     /// use std::io::timer;
131     /// use std::io::TcpStream;
132     /// use std::time::Duration;
133     ///
134     /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
135     /// let stream2 = stream.clone();
136     ///
137     /// spawn(proc() {
138     ///     // close this stream after one second
139     ///     timer::sleep(Duration::seconds(1));
140     ///     let mut stream = stream2;
141     ///     stream.close_read();
142     /// });
143     ///
144     /// // wait for some data, will get canceled after one second
145     /// let mut buf = [0];
146     /// stream.read(buf);
147     /// ```
148     ///
149     /// Note that this method affects all cloned handles associated with this
150     /// stream, not just this one handle.
151     pub fn close_read(&mut self) -> IoResult<()> {
152         self.inner.close_read()
153     }
154
155     /// Closes the writing half of this connection.
156     ///
157     /// This method will close the writing portion of this connection, causing
158     /// all future writes to immediately return with an error.
159     ///
160     /// Note that this method affects all cloned handles associated with this
161     /// stream, not just this one handle.
162     pub fn close_write(&mut self) -> IoResult<()> {
163         self.inner.close_write()
164     }
165
166     /// Sets a timeout, in milliseconds, for blocking operations on this stream.
167     ///
168     /// This function will set a timeout for all blocking operations (including
169     /// reads and writes) on this stream. The timeout specified is a relative
170     /// time, in milliseconds, into the future after which point operations will
171     /// time out. This means that the timeout must be reset periodically to keep
172     /// it from expiring. Specifying a value of `None` will clear the timeout
173     /// for this stream.
174     ///
175     /// The timeout on this stream is local to this stream only. Setting a
176     /// timeout does not affect any other cloned instances of this stream, nor
177     /// does the timeout propagated to cloned handles of this stream. Setting
178     /// this timeout will override any specific read or write timeouts
179     /// previously set for this stream.
180     ///
181     /// For clarification on the semantics of interrupting a read and a write,
182     /// take a look at `set_read_timeout` and `set_write_timeout`.
183     #[experimental = "the timeout argument may change in type and value"]
184     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
185         self.inner.set_timeout(timeout_ms)
186     }
187
188     /// Sets the timeout for read operations on this stream.
189     ///
190     /// See documentation in `set_timeout` for the semantics of this read time.
191     /// This will overwrite any previous read timeout set through either this
192     /// function or `set_timeout`.
193     ///
194     /// # Errors
195     ///
196     /// When this timeout expires, if there is no pending read operation, no
197     /// action is taken. Otherwise, the read operation will be scheduled to
198     /// promptly return. If a timeout error is returned, then no data was read
199     /// during the timeout period.
200     #[experimental = "the timeout argument may change in type and value"]
201     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
202         self.inner.set_read_timeout(timeout_ms)
203     }
204
205     /// Sets the timeout for write operations on this stream.
206     ///
207     /// See documentation in `set_timeout` for the semantics of this write time.
208     /// This will overwrite any previous write timeout set through either this
209     /// function or `set_timeout`.
210     ///
211     /// # Errors
212     ///
213     /// When this timeout expires, if there is no pending write operation, no
214     /// action is taken. Otherwise, the pending write operation will be
215     /// scheduled to promptly return. The actual state of the underlying stream
216     /// is not specified.
217     ///
218     /// The write operation may return an error of type `ShortWrite` which
219     /// indicates that the object is known to have written an exact number of
220     /// bytes successfully during the timeout period, and the remaining bytes
221     /// were never written.
222     ///
223     /// If the write operation returns `TimedOut`, then it the timeout primitive
224     /// does not know how many bytes were written as part of the timeout
225     /// operation. It may be the case that bytes continue to be written in an
226     /// asynchronous fashion after the call to write returns.
227     #[experimental = "the timeout argument may change in type and value"]
228     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
229         self.inner.set_write_timeout(timeout_ms)
230     }
231 }
232
233 impl Clone for TcpStream {
234     /// Creates a new handle to this TCP stream, allowing for simultaneous reads
235     /// and writes of this connection.
236     ///
237     /// The underlying TCP stream will not be closed until all handles to the
238     /// stream have been deallocated. All handles will also follow the same
239     /// stream, but two concurrent reads will not receive the same data.
240     /// Instead, the first read will receive the first packet received, and the
241     /// second read will receive the second packet.
242     fn clone(&self) -> TcpStream {
243         TcpStream { inner: self.inner.clone() }
244     }
245 }
246
247 impl Reader for TcpStream {
248     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
249         self.inner.read(buf)
250     }
251 }
252
253 impl Writer for TcpStream {
254     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
255         self.inner.write(buf)
256     }
257 }
258
259 /// A structure representing a socket server. This listener is used to create a
260 /// `TcpAcceptor` which can be used to accept sockets on a local port.
261 ///
262 /// # Example
263 ///
264 /// ```rust
265 /// # fn main() { }
266 /// # fn foo() {
267 /// # #![allow(dead_code)]
268 /// use std::io::{TcpListener, TcpStream};
269 /// use std::io::{Acceptor, Listener};
270 ///
271 /// let listener = TcpListener::bind("127.0.0.1:80");
272 ///
273 /// // bind the listener to the specified address
274 /// let mut acceptor = listener.listen();
275 ///
276 /// fn handle_client(mut stream: TcpStream) {
277 ///     // ...
278 /// # &mut stream; // silence unused mutability/variable warning
279 /// }
280 /// // accept connections and process them, spawning a new tasks for each one
281 /// for stream in acceptor.incoming() {
282 ///     match stream {
283 ///         Err(e) => { /* connection failed */ }
284 ///         Ok(stream) => spawn(proc() {
285 ///             // connection succeeded
286 ///             handle_client(stream)
287 ///         })
288 ///     }
289 /// }
290 ///
291 /// // close the socket server
292 /// drop(acceptor);
293 /// # }
294 /// ```
295 pub struct TcpListener {
296     inner: TcpListenerImp,
297 }
298
299 impl TcpListener {
300     /// Creates a new `TcpListener` which will be bound to the specified address.
301     /// This listener is not ready for accepting connections, `listen` must be called
302     /// on it before that's possible.
303     ///
304     /// Binding with a port number of 0 will request that the OS assigns a port
305     /// to this listener. The port allocated can be queried via the
306     /// `socket_name` function.
307     ///
308     /// The address type can be any implementor of `ToSocketAddr` trait. See its
309     /// documentation for concrete examples.
310     pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<TcpListener> {
311         super::with_addresses(addr, |addr| {
312             TcpListenerImp::bind(addr).map(|inner| TcpListener { inner: inner })
313         })
314     }
315
316     /// Returns the local socket address of this listener.
317     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
318         self.inner.socket_name()
319     }
320 }
321
322 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
323     fn listen(self) -> IoResult<TcpAcceptor> {
324         self.inner.listen(128).map(|a| TcpAcceptor { inner: a })
325     }
326 }
327
328 /// The accepting half of a TCP socket server. This structure is created through
329 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
330 /// `TcpStream` instances.
331 pub struct TcpAcceptor {
332     inner: TcpAcceptorImp,
333 }
334
335 impl TcpAcceptor {
336     /// Prevents blocking on all future accepts after `ms` milliseconds have
337     /// elapsed.
338     ///
339     /// This function is used to set a deadline after which this acceptor will
340     /// time out accepting any connections. The argument is the relative
341     /// distance, in milliseconds, to a point in the future after which all
342     /// accepts will fail.
343     ///
344     /// If the argument specified is `None`, then any previously registered
345     /// timeout is cleared.
346     ///
347     /// A timeout of `0` can be used to "poll" this acceptor to see if it has
348     /// any pending connections. All pending connections will be accepted,
349     /// regardless of whether the timeout has expired or not (the accept will
350     /// not block in this case).
351     ///
352     /// # Example
353     ///
354     /// ```no_run
355     /// # #![allow(experimental)]
356     /// use std::io::TcpListener;
357     /// use std::io::{Listener, Acceptor, TimedOut};
358     ///
359     /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
360     ///
361     /// // After 100ms have passed, all accepts will fail
362     /// a.set_timeout(Some(100));
363     ///
364     /// match a.accept() {
365     ///     Ok(..) => println!("accepted a socket"),
366     ///     Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
367     ///     Err(e) => println!("err: {}", e),
368     /// }
369     ///
370     /// // Reset the timeout and try again
371     /// a.set_timeout(Some(100));
372     /// let socket = a.accept();
373     ///
374     /// // Clear the timeout and block indefinitely waiting for a connection
375     /// a.set_timeout(None);
376     /// let socket = a.accept();
377     /// ```
378     #[experimental = "the type of the argument and name of this function are \
379                       subject to change"]
380     pub fn set_timeout(&mut self, ms: Option<u64>) { self.inner.set_timeout(ms); }
381
382     /// Closes the accepting capabilities of this acceptor.
383     ///
384     /// This function is similar to `TcpStream`'s `close_{read,write}` methods
385     /// in that it will affect *all* cloned handles of this acceptor's original
386     /// handle.
387     ///
388     /// Once this function succeeds, all future calls to `accept` will return
389     /// immediately with an error, preventing all future calls to accept. The
390     /// underlying socket will not be relinquished back to the OS until all
391     /// acceptors have been deallocated.
392     ///
393     /// This is useful for waking up a thread in an accept loop to indicate that
394     /// it should exit.
395     ///
396     /// # Example
397     ///
398     /// ```
399     /// # #![allow(experimental)]
400     /// use std::io::{TcpListener, Listener, Acceptor, EndOfFile};
401     ///
402     /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
403     /// let a2 = a.clone();
404     ///
405     /// spawn(proc() {
406     ///     let mut a2 = a2;
407     ///     for socket in a2.incoming() {
408     ///         match socket {
409     ///             Ok(s) => { /* handle s */ }
410     ///             Err(ref e) if e.kind == EndOfFile => break, // closed
411     ///             Err(e) => panic!("unexpected error: {}", e),
412     ///         }
413     ///     }
414     /// });
415     ///
416     /// # fn wait_for_sigint() {}
417     /// // Now that our accept loop is running, wait for the program to be
418     /// // requested to exit.
419     /// wait_for_sigint();
420     ///
421     /// // Signal our accept loop to exit
422     /// assert!(a.close_accept().is_ok());
423     /// ```
424     #[experimental]
425     pub fn close_accept(&mut self) -> IoResult<()> {
426         self.inner.close_accept()
427     }
428 }
429
430 impl Acceptor<TcpStream> for TcpAcceptor {
431     fn accept(&mut self) -> IoResult<TcpStream> {
432         self.inner.accept().map(TcpStream::new)
433     }
434 }
435
436 impl Clone for TcpAcceptor {
437     /// Creates a new handle to this TCP acceptor, allowing for simultaneous
438     /// accepts.
439     ///
440     /// The underlying TCP acceptor will not be closed until all handles to the
441     /// acceptor have been deallocated. Incoming connections will be received on
442     /// at most once acceptor, the same connection will not be accepted twice.
443     ///
444     /// The `close_accept` method will shut down *all* acceptors cloned from the
445     /// same original acceptor, whereas the `set_timeout` method only affects
446     /// the selector that it is called on.
447     ///
448     /// This function is useful for creating a handle to invoke `close_accept`
449     /// on to wake up any other task blocked in `accept`.
450     fn clone(&self) -> TcpAcceptor {
451         TcpAcceptor { inner: self.inner.clone() }
452     }
453 }
454
455 #[cfg(test)]
456 #[allow(experimental)]
457 mod test {
458     use io::net::tcp::*;
459     use io::net::ip::*;
460     use io::*;
461     use io::test::*;
462     use prelude::*;
463
464     // FIXME #11530 this fails on android because tests are run as root
465     #[cfg_attr(any(windows, target_os = "android"), ignore)]
466     #[test]
467     fn bind_error() {
468         match TcpListener::bind("0.0.0.0:1") {
469             Ok(..) => panic!(),
470             Err(e) => assert_eq!(e.kind, PermissionDenied),
471         }
472     }
473
474     #[test]
475     fn connect_error() {
476         match TcpStream::connect("0.0.0.0:1") {
477             Ok(..) => panic!(),
478             Err(e) => assert_eq!(e.kind, ConnectionRefused),
479         }
480     }
481
482     #[test]
483     fn listen_ip4_localhost() {
484         let socket_addr = next_test_ip4();
485         let listener = TcpListener::bind(socket_addr);
486         let mut acceptor = listener.listen();
487
488         spawn(proc() {
489             let mut stream = TcpStream::connect(("localhost", socket_addr.port));
490             stream.write([144]).unwrap();
491         });
492
493         let mut stream = acceptor.accept();
494         let mut buf = [0];
495         stream.read(buf).unwrap();
496         assert!(buf[0] == 144);
497     }
498
499     #[test]
500     fn connect_localhost() {
501         let addr = next_test_ip4();
502         let mut acceptor = TcpListener::bind(addr).listen();
503
504         spawn(proc() {
505             let mut stream = TcpStream::connect(("localhost", addr.port));
506             stream.write([64]).unwrap();
507         });
508
509         let mut stream = acceptor.accept();
510         let mut buf = [0];
511         stream.read(buf).unwrap();
512         assert!(buf[0] == 64);
513     }
514
515     #[test]
516     fn connect_ip4_loopback() {
517         let addr = next_test_ip4();
518         let mut acceptor = TcpListener::bind(addr).listen();
519
520         spawn(proc() {
521             let mut stream = TcpStream::connect(("127.0.0.1", addr.port));
522             stream.write([44]).unwrap();
523         });
524
525         let mut stream = acceptor.accept();
526         let mut buf = [0];
527         stream.read(buf).unwrap();
528         assert!(buf[0] == 44);
529     }
530
531     #[test]
532     fn connect_ip6_loopback() {
533         let addr = next_test_ip6();
534         let mut acceptor = TcpListener::bind(addr).listen();
535
536         spawn(proc() {
537             let mut stream = TcpStream::connect(("::1", addr.port));
538             stream.write([66]).unwrap();
539         });
540
541         let mut stream = acceptor.accept();
542         let mut buf = [0];
543         stream.read(buf).unwrap();
544         assert!(buf[0] == 66);
545     }
546
547     #[test]
548     fn smoke_test_ip4() {
549         let addr = next_test_ip4();
550         let mut acceptor = TcpListener::bind(addr).listen();
551
552         spawn(proc() {
553             let mut stream = TcpStream::connect(addr);
554             stream.write([99]).unwrap();
555         });
556
557         let mut stream = acceptor.accept();
558         let mut buf = [0];
559         stream.read(buf).unwrap();
560         assert!(buf[0] == 99);
561     }
562
563     #[test]
564     fn smoke_test_ip6() {
565         let addr = next_test_ip6();
566         let mut acceptor = TcpListener::bind(addr).listen();
567
568         spawn(proc() {
569             let mut stream = TcpStream::connect(addr);
570             stream.write([99]).unwrap();
571         });
572
573         let mut stream = acceptor.accept();
574         let mut buf = [0];
575         stream.read(buf).unwrap();
576         assert!(buf[0] == 99);
577     }
578
579     #[test]
580     fn read_eof_ip4() {
581         let addr = next_test_ip4();
582         let mut acceptor = TcpListener::bind(addr).listen();
583
584         spawn(proc() {
585             let _stream = TcpStream::connect(addr);
586             // Close
587         });
588
589         let mut stream = acceptor.accept();
590         let mut buf = [0];
591         let nread = stream.read(buf);
592         assert!(nread.is_err());
593     }
594
595     #[test]
596     fn read_eof_ip6() {
597         let addr = next_test_ip6();
598         let mut acceptor = TcpListener::bind(addr).listen();
599
600         spawn(proc() {
601             let _stream = TcpStream::connect(addr);
602             // Close
603         });
604
605         let mut stream = acceptor.accept();
606         let mut buf = [0];
607         let nread = stream.read(buf);
608         assert!(nread.is_err());
609     }
610
611     #[test]
612     fn read_eof_twice_ip4() {
613         let addr = next_test_ip4();
614         let mut acceptor = TcpListener::bind(addr).listen();
615
616         spawn(proc() {
617             let _stream = TcpStream::connect(addr);
618             // Close
619         });
620
621         let mut stream = acceptor.accept();
622         let mut buf = [0];
623         let nread = stream.read(buf);
624         assert!(nread.is_err());
625
626         match stream.read(buf) {
627             Ok(..) => panic!(),
628             Err(ref e) => {
629                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
630                         "unknown kind: {}", e.kind);
631             }
632         }
633     }
634
635     #[test]
636     fn read_eof_twice_ip6() {
637         let addr = next_test_ip6();
638         let mut acceptor = TcpListener::bind(addr).listen();
639
640         spawn(proc() {
641             let _stream = TcpStream::connect(addr);
642             // Close
643         });
644
645         let mut stream = acceptor.accept();
646         let mut buf = [0];
647         let nread = stream.read(buf);
648         assert!(nread.is_err());
649
650         match stream.read(buf) {
651             Ok(..) => panic!(),
652             Err(ref e) => {
653                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
654                         "unknown kind: {}", e.kind);
655             }
656         }
657     }
658
659     #[test]
660     fn write_close_ip4() {
661         let addr = next_test_ip4();
662         let mut acceptor = TcpListener::bind(addr).listen();
663
664         spawn(proc() {
665             let _stream = TcpStream::connect(addr);
666             // Close
667         });
668
669         let mut stream = acceptor.accept();
670         let buf = [0];
671         loop {
672             match stream.write(buf) {
673                 Ok(..) => {}
674                 Err(e) => {
675                     assert!(e.kind == ConnectionReset ||
676                             e.kind == BrokenPipe ||
677                             e.kind == ConnectionAborted,
678                             "unknown error: {}", e);
679                     break;
680                 }
681             }
682         }
683     }
684
685     #[test]
686     fn write_close_ip6() {
687         let addr = next_test_ip6();
688         let mut acceptor = TcpListener::bind(addr).listen();
689
690         spawn(proc() {
691             let _stream = TcpStream::connect(addr);
692             // Close
693         });
694
695         let mut stream = acceptor.accept();
696         let buf = [0];
697         loop {
698             match stream.write(buf) {
699                 Ok(..) => {}
700                 Err(e) => {
701                     assert!(e.kind == ConnectionReset ||
702                             e.kind == BrokenPipe ||
703                             e.kind == ConnectionAborted,
704                             "unknown error: {}", e);
705                     break;
706                 }
707             }
708         }
709     }
710
711     #[test]
712     fn multiple_connect_serial_ip4() {
713         let addr = next_test_ip4();
714         let max = 10u;
715         let mut acceptor = TcpListener::bind(addr).listen();
716
717         spawn(proc() {
718             for _ in range(0, max) {
719                 let mut stream = TcpStream::connect(addr);
720                 stream.write([99]).unwrap();
721             }
722         });
723
724         for ref mut stream in acceptor.incoming().take(max) {
725             let mut buf = [0];
726             stream.read(buf).unwrap();
727             assert_eq!(buf[0], 99);
728         }
729     }
730
731     #[test]
732     fn multiple_connect_serial_ip6() {
733         let addr = next_test_ip6();
734         let max = 10u;
735         let mut acceptor = TcpListener::bind(addr).listen();
736
737         spawn(proc() {
738             for _ in range(0, max) {
739                 let mut stream = TcpStream::connect(addr);
740                 stream.write([99]).unwrap();
741             }
742         });
743
744         for ref mut stream in acceptor.incoming().take(max) {
745             let mut buf = [0];
746             stream.read(buf).unwrap();
747             assert_eq!(buf[0], 99);
748         }
749     }
750
751     #[test]
752     fn multiple_connect_interleaved_greedy_schedule_ip4() {
753         let addr = next_test_ip4();
754         static MAX: int = 10;
755         let acceptor = TcpListener::bind(addr).listen();
756
757         spawn(proc() {
758             let mut acceptor = acceptor;
759             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
760                 // Start another task to handle the connection
761                 spawn(proc() {
762                     let mut stream = stream;
763                     let mut buf = [0];
764                     stream.read(buf).unwrap();
765                     assert!(buf[0] == i as u8);
766                     debug!("read");
767                 });
768             }
769         });
770
771         connect(0, addr);
772
773         fn connect(i: int, addr: SocketAddr) {
774             if i == MAX { return }
775
776             spawn(proc() {
777                 debug!("connecting");
778                 let mut stream = TcpStream::connect(addr);
779                 // Connect again before writing
780                 connect(i + 1, addr);
781                 debug!("writing");
782                 stream.write([i as u8]).unwrap();
783             });
784         }
785     }
786
787     #[test]
788     fn multiple_connect_interleaved_greedy_schedule_ip6() {
789         let addr = next_test_ip6();
790         static MAX: int = 10;
791         let acceptor = TcpListener::bind(addr).listen();
792
793         spawn(proc() {
794             let mut acceptor = acceptor;
795             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
796                 // Start another task to handle the connection
797                 spawn(proc() {
798                     let mut stream = stream;
799                     let mut buf = [0];
800                     stream.read(buf).unwrap();
801                     assert!(buf[0] == i as u8);
802                     debug!("read");
803                 });
804             }
805         });
806
807         connect(0, addr);
808
809         fn connect(i: int, addr: SocketAddr) {
810             if i == MAX { return }
811
812             spawn(proc() {
813                 debug!("connecting");
814                 let mut stream = TcpStream::connect(addr);
815                 // Connect again before writing
816                 connect(i + 1, addr);
817                 debug!("writing");
818                 stream.write([i as u8]).unwrap();
819             });
820         }
821     }
822
823     #[test]
824     fn multiple_connect_interleaved_lazy_schedule_ip4() {
825         static MAX: int = 10;
826         let addr = next_test_ip4();
827         let acceptor = TcpListener::bind(addr).listen();
828
829         spawn(proc() {
830             let mut acceptor = acceptor;
831             for stream in acceptor.incoming().take(MAX as uint) {
832                 // Start another task to handle the connection
833                 spawn(proc() {
834                     let mut stream = stream;
835                     let mut buf = [0];
836                     stream.read(buf).unwrap();
837                     assert!(buf[0] == 99);
838                     debug!("read");
839                 });
840             }
841         });
842
843         connect(0, addr);
844
845         fn connect(i: int, addr: SocketAddr) {
846             if i == MAX { return }
847
848             spawn(proc() {
849                 debug!("connecting");
850                 let mut stream = TcpStream::connect(addr);
851                 // Connect again before writing
852                 connect(i + 1, addr);
853                 debug!("writing");
854                 stream.write([99]).unwrap();
855             });
856         }
857     }
858
859     #[test]
860     fn multiple_connect_interleaved_lazy_schedule_ip6() {
861         static MAX: int = 10;
862         let addr = next_test_ip6();
863         let acceptor = TcpListener::bind(addr).listen();
864
865         spawn(proc() {
866             let mut acceptor = acceptor;
867             for stream in acceptor.incoming().take(MAX as uint) {
868                 // Start another task to handle the connection
869                 spawn(proc() {
870                     let mut stream = stream;
871                     let mut buf = [0];
872                     stream.read(buf).unwrap();
873                     assert!(buf[0] == 99);
874                     debug!("read");
875                 });
876             }
877         });
878
879         connect(0, addr);
880
881         fn connect(i: int, addr: SocketAddr) {
882             if i == MAX { return }
883
884             spawn(proc() {
885                 debug!("connecting");
886                 let mut stream = TcpStream::connect(addr);
887                 // Connect again before writing
888                 connect(i + 1, addr);
889                 debug!("writing");
890                 stream.write([99]).unwrap();
891             });
892         }
893     }
894
895     pub fn socket_name(addr: SocketAddr) {
896         let mut listener = TcpListener::bind(addr).unwrap();
897
898         // Make sure socket_name gives
899         // us the socket we binded to.
900         let so_name = listener.socket_name();
901         assert!(so_name.is_ok());
902         assert_eq!(addr, so_name.unwrap());
903     }
904
905     pub fn peer_name(addr: SocketAddr) {
906         let acceptor = TcpListener::bind(addr).listen();
907         spawn(proc() {
908             let mut acceptor = acceptor;
909             acceptor.accept().unwrap();
910         });
911
912         let stream = TcpStream::connect(addr);
913
914         assert!(stream.is_ok());
915         let mut stream = stream.unwrap();
916
917         // Make sure peer_name gives us the
918         // address/port of the peer we've
919         // connected to.
920         let peer_name = stream.peer_name();
921         assert!(peer_name.is_ok());
922         assert_eq!(addr, peer_name.unwrap());
923     }
924
925     #[test]
926     fn socket_and_peer_name_ip4() {
927         peer_name(next_test_ip4());
928         socket_name(next_test_ip4());
929     }
930
931     #[test]
932     fn socket_and_peer_name_ip6() {
933         // FIXME: peer name is not consistent
934         //peer_name(next_test_ip6());
935         socket_name(next_test_ip6());
936     }
937
938     #[test]
939     fn partial_read() {
940         let addr = next_test_ip4();
941         let (tx, rx) = channel();
942         spawn(proc() {
943             let mut srv = TcpListener::bind(addr).listen().unwrap();
944             tx.send(());
945             let mut cl = srv.accept().unwrap();
946             cl.write([10]).unwrap();
947             let mut b = [0];
948             cl.read(b).unwrap();
949             tx.send(());
950         });
951
952         rx.recv();
953         let mut c = TcpStream::connect(addr).unwrap();
954         let mut b = [0, ..10];
955         assert_eq!(c.read(b), Ok(1));
956         c.write([1]).unwrap();
957         rx.recv();
958     }
959
960     #[test]
961     fn double_bind() {
962         let addr = next_test_ip4();
963         let listener = TcpListener::bind(addr).unwrap().listen();
964         assert!(listener.is_ok());
965         match TcpListener::bind(addr).listen() {
966             Ok(..) => panic!(),
967             Err(e) => {
968                 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
969                         "unknown error: {} {}", e, e.kind);
970             }
971         }
972     }
973
974     #[test]
975     fn fast_rebind() {
976         let addr = next_test_ip4();
977         let (tx, rx) = channel();
978
979         spawn(proc() {
980             rx.recv();
981             let _stream = TcpStream::connect(addr).unwrap();
982             // Close
983             rx.recv();
984         });
985
986         {
987             let mut acceptor = TcpListener::bind(addr).listen();
988             tx.send(());
989             {
990                 let _stream = acceptor.accept().unwrap();
991                 // Close client
992                 tx.send(());
993             }
994             // Close listener
995         }
996         let _listener = TcpListener::bind(addr);
997     }
998
999     #[test]
1000     fn tcp_clone_smoke() {
1001         let addr = next_test_ip4();
1002         let mut acceptor = TcpListener::bind(addr).listen();
1003
1004         spawn(proc() {
1005             let mut s = TcpStream::connect(addr);
1006             let mut buf = [0, 0];
1007             assert_eq!(s.read(buf), Ok(1));
1008             assert_eq!(buf[0], 1);
1009             s.write([2]).unwrap();
1010         });
1011
1012         let mut s1 = acceptor.accept().unwrap();
1013         let s2 = s1.clone();
1014
1015         let (tx1, rx1) = channel();
1016         let (tx2, rx2) = channel();
1017         spawn(proc() {
1018             let mut s2 = s2;
1019             rx1.recv();
1020             s2.write([1]).unwrap();
1021             tx2.send(());
1022         });
1023         tx1.send(());
1024         let mut buf = [0, 0];
1025         assert_eq!(s1.read(buf), Ok(1));
1026         rx2.recv();
1027     }
1028
1029     #[test]
1030     fn tcp_clone_two_read() {
1031         let addr = next_test_ip6();
1032         let mut acceptor = TcpListener::bind(addr).listen();
1033         let (tx1, rx) = channel();
1034         let tx2 = tx1.clone();
1035
1036         spawn(proc() {
1037             let mut s = TcpStream::connect(addr);
1038             s.write([1]).unwrap();
1039             rx.recv();
1040             s.write([2]).unwrap();
1041             rx.recv();
1042         });
1043
1044         let mut s1 = acceptor.accept().unwrap();
1045         let s2 = s1.clone();
1046
1047         let (done, rx) = channel();
1048         spawn(proc() {
1049             let mut s2 = s2;
1050             let mut buf = [0, 0];
1051             s2.read(buf).unwrap();
1052             tx2.send(());
1053             done.send(());
1054         });
1055         let mut buf = [0, 0];
1056         s1.read(buf).unwrap();
1057         tx1.send(());
1058
1059         rx.recv();
1060     }
1061
1062     #[test]
1063     fn tcp_clone_two_write() {
1064         let addr = next_test_ip4();
1065         let mut acceptor = TcpListener::bind(addr).listen();
1066
1067         spawn(proc() {
1068             let mut s = TcpStream::connect(addr);
1069             let mut buf = [0, 1];
1070             s.read(buf).unwrap();
1071             s.read(buf).unwrap();
1072         });
1073
1074         let mut s1 = acceptor.accept().unwrap();
1075         let s2 = s1.clone();
1076
1077         let (done, rx) = channel();
1078         spawn(proc() {
1079             let mut s2 = s2;
1080             s2.write([1]).unwrap();
1081             done.send(());
1082         });
1083         s1.write([2]).unwrap();
1084
1085         rx.recv();
1086     }
1087
1088     #[test]
1089     fn shutdown_smoke() {
1090         let addr = next_test_ip4();
1091         let a = TcpListener::bind(addr).unwrap().listen();
1092         spawn(proc() {
1093             let mut a = a;
1094             let mut c = a.accept().unwrap();
1095             assert_eq!(c.read_to_end(), Ok(vec!()));
1096             c.write([1]).unwrap();
1097         });
1098
1099         let mut s = TcpStream::connect(addr).unwrap();
1100         assert!(s.inner.close_write().is_ok());
1101         assert!(s.write([1]).is_err());
1102         assert_eq!(s.read_to_end(), Ok(vec!(1)));
1103     }
1104
1105     #[test]
1106     fn accept_timeout() {
1107         let addr = next_test_ip4();
1108         let mut a = TcpListener::bind(addr).unwrap().listen().unwrap();
1109
1110         a.set_timeout(Some(10));
1111
1112         // Make sure we time out once and future invocations also time out
1113         let err = a.accept().err().unwrap();
1114         assert_eq!(err.kind, TimedOut);
1115         let err = a.accept().err().unwrap();
1116         assert_eq!(err.kind, TimedOut);
1117
1118         // Also make sure that even though the timeout is expired that we will
1119         // continue to receive any pending connections.
1120         //
1121         // FIXME: freebsd apparently never sees the pending connection, but
1122         //        testing manually always works. Need to investigate this
1123         //        flakiness.
1124         if !cfg!(target_os = "freebsd") {
1125             let (tx, rx) = channel();
1126             spawn(proc() {
1127                 tx.send(TcpStream::connect(addr).unwrap());
1128             });
1129             let _l = rx.recv();
1130             for i in range(0i, 1001) {
1131                 match a.accept() {
1132                     Ok(..) => break,
1133                     Err(ref e) if e.kind == TimedOut => {}
1134                     Err(e) => panic!("error: {}", e),
1135                 }
1136                 ::task::deschedule();
1137                 if i == 1000 { panic!("should have a pending connection") }
1138             }
1139         }
1140
1141         // Unset the timeout and make sure that this always blocks.
1142         a.set_timeout(None);
1143         spawn(proc() {
1144             drop(TcpStream::connect(addr).unwrap());
1145         });
1146         a.accept().unwrap();
1147     }
1148
1149     #[test]
1150     fn close_readwrite_smoke() {
1151         let addr = next_test_ip4();
1152         let a = TcpListener::bind(addr).listen().unwrap();
1153         let (_tx, rx) = channel::<()>();
1154         spawn(proc() {
1155             let mut a = a;
1156             let _s = a.accept().unwrap();
1157             let _ = rx.recv_opt();
1158         });
1159
1160         let mut b = [0];
1161         let mut s = TcpStream::connect(addr).unwrap();
1162         let mut s2 = s.clone();
1163
1164         // closing should prevent reads/writes
1165         s.close_write().unwrap();
1166         assert!(s.write([0]).is_err());
1167         s.close_read().unwrap();
1168         assert!(s.read(b).is_err());
1169
1170         // closing should affect previous handles
1171         assert!(s2.write([0]).is_err());
1172         assert!(s2.read(b).is_err());
1173
1174         // closing should affect new handles
1175         let mut s3 = s.clone();
1176         assert!(s3.write([0]).is_err());
1177         assert!(s3.read(b).is_err());
1178
1179         // make sure these don't die
1180         let _ = s2.close_read();
1181         let _ = s2.close_write();
1182         let _ = s3.close_read();
1183         let _ = s3.close_write();
1184     }
1185
1186     #[test]
1187     fn close_read_wakes_up() {
1188         let addr = next_test_ip4();
1189         let a = TcpListener::bind(addr).listen().unwrap();
1190         let (_tx, rx) = channel::<()>();
1191         spawn(proc() {
1192             let mut a = a;
1193             let _s = a.accept().unwrap();
1194             let _ = rx.recv_opt();
1195         });
1196
1197         let mut s = TcpStream::connect(addr).unwrap();
1198         let s2 = s.clone();
1199         let (tx, rx) = channel();
1200         spawn(proc() {
1201             let mut s2 = s2;
1202             assert!(s2.read([0]).is_err());
1203             tx.send(());
1204         });
1205         // this should wake up the child task
1206         s.close_read().unwrap();
1207
1208         // this test will never finish if the child doesn't wake up
1209         rx.recv();
1210     }
1211
1212     #[test]
1213     fn readwrite_timeouts() {
1214         let addr = next_test_ip6();
1215         let mut a = TcpListener::bind(addr).listen().unwrap();
1216         let (tx, rx) = channel::<()>();
1217         spawn(proc() {
1218             let mut s = TcpStream::connect(addr).unwrap();
1219             rx.recv();
1220             assert!(s.write([0]).is_ok());
1221             let _ = rx.recv_opt();
1222         });
1223
1224         let mut s = a.accept().unwrap();
1225         s.set_timeout(Some(20));
1226         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1227         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1228
1229         s.set_timeout(Some(20));
1230         for i in range(0i, 1001) {
1231             match s.write([0, .. 128 * 1024]) {
1232                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1233                 Err(IoError { kind: TimedOut, .. }) => break,
1234                 Err(e) => panic!("{}", e),
1235            }
1236            if i == 1000 { panic!("should have filled up?!"); }
1237         }
1238         assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1239
1240         tx.send(());
1241         s.set_timeout(None);
1242         assert_eq!(s.read([0, 0]), Ok(1));
1243     }
1244
1245     #[test]
1246     fn read_timeouts() {
1247         let addr = next_test_ip6();
1248         let mut a = TcpListener::bind(addr).listen().unwrap();
1249         let (tx, rx) = channel::<()>();
1250         spawn(proc() {
1251             let mut s = TcpStream::connect(addr).unwrap();
1252             rx.recv();
1253             let mut amt = 0;
1254             while amt < 100 * 128 * 1024 {
1255                 match s.read([0, ..128 * 1024]) {
1256                     Ok(n) => { amt += n; }
1257                     Err(e) => panic!("{}", e),
1258                 }
1259             }
1260             let _ = rx.recv_opt();
1261         });
1262
1263         let mut s = a.accept().unwrap();
1264         s.set_read_timeout(Some(20));
1265         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1266         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1267
1268         tx.send(());
1269         for _ in range(0i, 100) {
1270             assert!(s.write([0, ..128 * 1024]).is_ok());
1271         }
1272     }
1273
1274     #[test]
1275     fn write_timeouts() {
1276         let addr = next_test_ip6();
1277         let mut a = TcpListener::bind(addr).listen().unwrap();
1278         let (tx, rx) = channel::<()>();
1279         spawn(proc() {
1280             let mut s = TcpStream::connect(addr).unwrap();
1281             rx.recv();
1282             assert!(s.write([0]).is_ok());
1283             let _ = rx.recv_opt();
1284         });
1285
1286         let mut s = a.accept().unwrap();
1287         s.set_write_timeout(Some(20));
1288         for i in range(0i, 1001) {
1289             match s.write([0, .. 128 * 1024]) {
1290                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1291                 Err(IoError { kind: TimedOut, .. }) => break,
1292                 Err(e) => panic!("{}", e),
1293            }
1294            if i == 1000 { panic!("should have filled up?!"); }
1295         }
1296         assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1297
1298         tx.send(());
1299         assert!(s.read([0]).is_ok());
1300     }
1301
1302     #[test]
1303     fn timeout_concurrent_read() {
1304         let addr = next_test_ip6();
1305         let mut a = TcpListener::bind(addr).listen().unwrap();
1306         let (tx, rx) = channel::<()>();
1307         spawn(proc() {
1308             let mut s = TcpStream::connect(addr).unwrap();
1309             rx.recv();
1310             assert_eq!(s.write([0]), Ok(()));
1311             let _ = rx.recv_opt();
1312         });
1313
1314         let mut s = a.accept().unwrap();
1315         let s2 = s.clone();
1316         let (tx2, rx2) = channel();
1317         spawn(proc() {
1318             let mut s2 = s2;
1319             assert_eq!(s2.read([0]), Ok(1));
1320             tx2.send(());
1321         });
1322
1323         s.set_read_timeout(Some(20));
1324         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1325         tx.send(());
1326
1327         rx2.recv();
1328     }
1329
1330     #[test]
1331     fn clone_while_reading() {
1332         let addr = next_test_ip6();
1333         let listen = TcpListener::bind(addr);
1334         let mut accept = listen.listen().unwrap();
1335
1336         // Enqueue a task to write to a socket
1337         let (tx, rx) = channel();
1338         let (txdone, rxdone) = channel();
1339         let txdone2 = txdone.clone();
1340         spawn(proc() {
1341             let mut tcp = TcpStream::connect(addr).unwrap();
1342             rx.recv();
1343             tcp.write_u8(0).unwrap();
1344             txdone2.send(());
1345         });
1346
1347         // Spawn off a reading clone
1348         let tcp = accept.accept().unwrap();
1349         let tcp2 = tcp.clone();
1350         let txdone3 = txdone.clone();
1351         spawn(proc() {
1352             let mut tcp2 = tcp2;
1353             tcp2.read_u8().unwrap();
1354             txdone3.send(());
1355         });
1356
1357         // Try to ensure that the reading clone is indeed reading
1358         for _ in range(0i, 50) {
1359             ::task::deschedule();
1360         }
1361
1362         // clone the handle again while it's reading, then let it finish the
1363         // read.
1364         let _ = tcp.clone();
1365         tx.send(());
1366         rxdone.recv();
1367         rxdone.recv();
1368     }
1369
1370     #[test]
1371     fn clone_accept_smoke() {
1372         let addr = next_test_ip4();
1373         let l = TcpListener::bind(addr);
1374         let mut a = l.listen().unwrap();
1375         let mut a2 = a.clone();
1376
1377         spawn(proc() {
1378             let _ = TcpStream::connect(addr);
1379         });
1380         spawn(proc() {
1381             let _ = TcpStream::connect(addr);
1382         });
1383
1384         assert!(a.accept().is_ok());
1385         assert!(a2.accept().is_ok());
1386     }
1387
1388     #[test]
1389     fn clone_accept_concurrent() {
1390         let addr = next_test_ip4();
1391         let l = TcpListener::bind(addr);
1392         let a = l.listen().unwrap();
1393         let a2 = a.clone();
1394
1395         let (tx, rx) = channel();
1396         let tx2 = tx.clone();
1397
1398         spawn(proc() { let mut a = a; tx.send(a.accept()) });
1399         spawn(proc() { let mut a = a2; tx2.send(a.accept()) });
1400
1401         spawn(proc() {
1402             let _ = TcpStream::connect(addr);
1403         });
1404         spawn(proc() {
1405             let _ = TcpStream::connect(addr);
1406         });
1407
1408         assert!(rx.recv().is_ok());
1409         assert!(rx.recv().is_ok());
1410     }
1411
1412     #[test]
1413     fn close_accept_smoke() {
1414         let addr = next_test_ip4();
1415         let l = TcpListener::bind(addr);
1416         let mut a = l.listen().unwrap();
1417
1418         a.close_accept().unwrap();
1419         assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
1420     }
1421
1422     #[test]
1423     fn close_accept_concurrent() {
1424         let addr = next_test_ip4();
1425         let l = TcpListener::bind(addr);
1426         let a = l.listen().unwrap();
1427         let mut a2 = a.clone();
1428
1429         let (tx, rx) = channel();
1430         spawn(proc() {
1431             let mut a = a;
1432             tx.send(a.accept());
1433         });
1434         a2.close_accept().unwrap();
1435
1436         assert_eq!(rx.recv().err().unwrap().kind, EndOfFile);
1437     }
1438 }