]> git.lizzy.rs Git - rust.git/blob - src/libstd/io/net/tcp.rs
librustc: Remove the fallback to `int` from typechecking.
[rust.git] / src / libstd / io / net / tcp.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! TCP network connections
12 //!
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
16 //!
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
19
20 use clone::Clone;
21 use io::IoResult;
22 use iter::Iterator;
23 use slice::ImmutableVector;
24 use result::{Ok,Err};
25 use io::net::addrinfo::get_host_addresses;
26 use io::net::ip::SocketAddr;
27 use io::{IoError, ConnectionFailed, InvalidInput};
28 use io::{Reader, Writer, Listener, Acceptor};
29 use from_str::FromStr;
30 use kinds::Send;
31 use option::{None, Some, Option};
32 use owned::Box;
33 use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener};
34 use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
35 use rt::rtio;
36
37 /// A structure which represents a TCP stream between a local socket and a
38 /// remote socket.
39 ///
40 /// # Example
41 ///
42 /// ```no_run
43 /// # #![allow(unused_must_use)]
44 /// use std::io::net::tcp::TcpStream;
45 ///
46 /// let mut stream = TcpStream::connect("127.0.0.1", 34254);
47 ///
48 /// stream.write([1]);
49 /// let mut buf = [0];
50 /// stream.read(buf);
51 /// drop(stream); // close the connection
52 /// ```
53 pub struct TcpStream {
54     obj: Box<RtioTcpStream + Send>,
55 }
56
57 impl TcpStream {
58     fn new(s: Box<RtioTcpStream + Send>) -> TcpStream {
59         TcpStream { obj: s }
60     }
61
62     /// Open a TCP connection to a remote host by hostname or IP address.
63     ///
64     /// `host` can be a hostname or IP address string. If no error is
65     /// encountered, then `Ok(stream)` is returned.
66     pub fn connect(host: &str, port: u16) -> IoResult<TcpStream> {
67         let addresses = match FromStr::from_str(host) {
68             Some(addr) => vec!(addr),
69             None => try!(get_host_addresses(host))
70         };
71         let mut err = IoError {
72             kind: ConnectionFailed,
73             desc: "no addresses found for hostname",
74             detail: None
75         };
76         for addr in addresses.iter() {
77             let addr = rtio::SocketAddr{ ip: super::to_rtio(*addr), port: port };
78             let result = LocalIo::maybe_raise(|io| {
79                 io.tcp_connect(addr, None).map(TcpStream::new)
80             });
81             match result {
82                 Ok(stream) => {
83                     return Ok(stream)
84                 }
85                 Err(connect_err) => {
86                     err = IoError::from_rtio_error(connect_err)
87                 }
88             }
89         }
90         Err(err)
91     }
92
93     /// Creates a TCP connection to a remote socket address, timing out after
94     /// the specified number of milliseconds.
95     ///
96     /// This is the same as the `connect` method, except that if the timeout
97     /// specified (in milliseconds) elapses before a connection is made an error
98     /// will be returned. The error's kind will be `TimedOut`.
99     ///
100     /// Note that the `addr` argument may one day be split into a separate host
101     /// and port, similar to the API seen in `connect`.
102     #[experimental = "the timeout argument may eventually change types"]
103     pub fn connect_timeout(addr: SocketAddr,
104                            timeout_ms: u64) -> IoResult<TcpStream> {
105         let SocketAddr { ip, port } = addr;
106         let addr = rtio::SocketAddr { ip: super::to_rtio(ip), port: port };
107         LocalIo::maybe_raise(|io| {
108             io.tcp_connect(addr, Some(timeout_ms)).map(TcpStream::new)
109         }).map_err(IoError::from_rtio_error)
110     }
111
112     /// Returns the socket address of the remote peer of this TCP connection.
113     pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
114         match self.obj.peer_name() {
115             Ok(rtio::SocketAddr { ip, port }) => {
116                 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
117             }
118             Err(e) => Err(IoError::from_rtio_error(e)),
119         }
120     }
121
122     /// Returns the socket address of the local half of this TCP connection.
123     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
124         match self.obj.socket_name() {
125             Ok(rtio::SocketAddr { ip, port }) => {
126                 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
127             }
128             Err(e) => Err(IoError::from_rtio_error(e)),
129         }
130     }
131
132     /// Sets the nodelay flag on this connection to the boolean specified
133     #[experimental]
134     pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
135         if nodelay {
136             self.obj.nodelay()
137         } else {
138             self.obj.control_congestion()
139         }.map_err(IoError::from_rtio_error)
140     }
141
142     /// Sets the keepalive timeout to the timeout specified.
143     ///
144     /// If the value specified is `None`, then the keepalive flag is cleared on
145     /// this connection. Otherwise, the keepalive timeout will be set to the
146     /// specified time, in seconds.
147     #[experimental]
148     pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
149         match delay_in_seconds {
150             Some(i) => self.obj.keepalive(i),
151             None => self.obj.letdie(),
152         }.map_err(IoError::from_rtio_error)
153     }
154
155     /// Closes the reading half of this connection.
156     ///
157     /// This method will close the reading portion of this connection, causing
158     /// all pending and future reads to immediately return with an error.
159     ///
160     /// # Example
161     ///
162     /// ```no_run
163     /// # #![allow(unused_must_use)]
164     /// use std::io::timer;
165     /// use std::io::net::tcp::TcpStream;
166     ///
167     /// let mut stream = TcpStream::connect("127.0.0.1", 34254).unwrap();
168     /// let stream2 = stream.clone();
169     ///
170     /// spawn(proc() {
171     ///     // close this stream after one second
172     ///     timer::sleep(1000);
173     ///     let mut stream = stream2;
174     ///     stream.close_read();
175     /// });
176     ///
177     /// // wait for some data, will get canceled after one second
178     /// let mut buf = [0];
179     /// stream.read(buf);
180     /// ```
181     ///
182     /// Note that this method affects all cloned handles associated with this
183     /// stream, not just this one handle.
184     pub fn close_read(&mut self) -> IoResult<()> {
185         self.obj.close_read().map_err(IoError::from_rtio_error)
186     }
187
188     /// Closes the writing half of this connection.
189     ///
190     /// This method will close the writing portion of this connection, causing
191     /// all future writes to immediately return with an error.
192     ///
193     /// Note that this method affects all cloned handles associated with this
194     /// stream, not just this one handle.
195     pub fn close_write(&mut self) -> IoResult<()> {
196         self.obj.close_write().map_err(IoError::from_rtio_error)
197     }
198
199     /// Sets a timeout, in milliseconds, for blocking operations on this stream.
200     ///
201     /// This function will set a timeout for all blocking operations (including
202     /// reads and writes) on this stream. The timeout specified is a relative
203     /// time, in milliseconds, into the future after which point operations will
204     /// time out. This means that the timeout must be reset periodically to keep
205     /// it from expiring. Specifying a value of `None` will clear the timeout
206     /// for this stream.
207     ///
208     /// The timeout on this stream is local to this stream only. Setting a
209     /// timeout does not affect any other cloned instances of this stream, nor
210     /// does the timeout propagated to cloned handles of this stream. Setting
211     /// this timeout will override any specific read or write timeouts
212     /// previously set for this stream.
213     ///
214     /// For clarification on the semantics of interrupting a read and a write,
215     /// take a look at `set_read_timeout` and `set_write_timeout`.
216     #[experimental = "the timeout argument may change in type and value"]
217     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
218         self.obj.set_timeout(timeout_ms)
219     }
220
221     /// Sets the timeout for read operations on this stream.
222     ///
223     /// See documentation in `set_timeout` for the semantics of this read time.
224     /// This will overwrite any previous read timeout set through either this
225     /// function or `set_timeout`.
226     ///
227     /// # Errors
228     ///
229     /// When this timeout expires, if there is no pending read operation, no
230     /// action is taken. Otherwise, the read operation will be scheduled to
231     /// promptly return. If a timeout error is returned, then no data was read
232     /// during the timeout period.
233     #[experimental = "the timeout argument may change in type and value"]
234     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
235         self.obj.set_read_timeout(timeout_ms)
236     }
237
238     /// Sets the timeout for write operations on this stream.
239     ///
240     /// See documentation in `set_timeout` for the semantics of this write time.
241     /// This will overwrite any previous write timeout set through either this
242     /// function or `set_timeout`.
243     ///
244     /// # Errors
245     ///
246     /// When this timeout expires, if there is no pending write operation, no
247     /// action is taken. Otherwise, the pending write operation will be
248     /// scheduled to promptly return. The actual state of the underlying stream
249     /// is not specified.
250     ///
251     /// The write operation may return an error of type `ShortWrite` which
252     /// indicates that the object is known to have written an exact number of
253     /// bytes successfully during the timeout period, and the remaining bytes
254     /// were never written.
255     ///
256     /// If the write operation returns `TimedOut`, then it the timeout primitive
257     /// does not know how many bytes were written as part of the timeout
258     /// operation. It may be the case that bytes continue to be written in an
259     /// asynchronous fashion after the call to write returns.
260     #[experimental = "the timeout argument may change in type and value"]
261     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
262         self.obj.set_write_timeout(timeout_ms)
263     }
264 }
265
266 impl Clone for TcpStream {
267     /// Creates a new handle to this TCP stream, allowing for simultaneous reads
268     /// and writes of this connection.
269     ///
270     /// The underlying TCP stream will not be closed until all handles to the
271     /// stream have been deallocated. All handles will also follow the same
272     /// stream, but two concurrent reads will not receive the same data.
273     /// Instead, the first read will receive the first packet received, and the
274     /// second read will receive the second packet.
275     fn clone(&self) -> TcpStream {
276         TcpStream { obj: self.obj.clone() }
277     }
278 }
279
280 impl Reader for TcpStream {
281     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
282         self.obj.read(buf).map_err(IoError::from_rtio_error)
283     }
284 }
285
286 impl Writer for TcpStream {
287     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
288         self.obj.write(buf).map_err(IoError::from_rtio_error)
289     }
290 }
291
292 /// A structure representing a socket server. This listener is used to create a
293 /// `TcpAcceptor` which can be used to accept sockets on a local port.
294 ///
295 /// # Example
296 ///
297 /// ```rust
298 /// # fn main() { }
299 /// # fn foo() {
300 /// # #![allow(dead_code)]
301 /// use std::io::{TcpListener, TcpStream};
302 /// use std::io::{Acceptor, Listener};
303 ///
304 /// let listener = TcpListener::bind("127.0.0.1", 80);
305 ///
306 /// // bind the listener to the specified address
307 /// let mut acceptor = listener.listen();
308 ///
309 /// fn handle_client(mut stream: TcpStream) {
310 ///     // ...
311 /// # &mut stream; // silence unused mutability/variable warning
312 /// }
313 /// // accept connections and process them, spawning a new tasks for each one
314 /// for stream in acceptor.incoming() {
315 ///     match stream {
316 ///         Err(e) => { /* connection failed */ }
317 ///         Ok(stream) => spawn(proc() {
318 ///             // connection succeeded
319 ///             handle_client(stream)
320 ///         })
321 ///     }
322 /// }
323 ///
324 /// // close the socket server
325 /// drop(acceptor);
326 /// # }
327 /// ```
328 pub struct TcpListener {
329     obj: Box<RtioTcpListener + Send>,
330 }
331
332 impl TcpListener {
333     /// Creates a new `TcpListener` which will be bound to the specified IP
334     /// and port. This listener is not ready for accepting connections,
335     /// `listen` must be called on it before that's possible.
336     ///
337     /// Binding with a port number of 0 will request that the OS assigns a port
338     /// to this listener. The port allocated can be queried via the
339     /// `socket_name` function.
340     pub fn bind(addr: &str, port: u16) -> IoResult<TcpListener> {
341         match FromStr::from_str(addr) {
342             Some(ip) => {
343                 let addr = rtio::SocketAddr{
344                     ip: super::to_rtio(ip),
345                     port: port,
346                 };
347                 LocalIo::maybe_raise(|io| {
348                     io.tcp_bind(addr).map(|l| TcpListener { obj: l })
349                 }).map_err(IoError::from_rtio_error)
350             }
351             None => {
352                 Err(IoError{
353                     kind: InvalidInput,
354                     desc: "invalid IP address specified",
355                     detail: None
356                 })
357             }
358         }
359     }
360
361     /// Returns the local socket address of this listener.
362     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
363         match self.obj.socket_name() {
364             Ok(rtio::SocketAddr { ip, port }) => {
365                 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
366             }
367             Err(e) => Err(IoError::from_rtio_error(e)),
368         }
369     }
370 }
371
372 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
373     fn listen(self) -> IoResult<TcpAcceptor> {
374         match self.obj.listen() {
375             Ok(acceptor) => Ok(TcpAcceptor { obj: acceptor }),
376             Err(e) => Err(IoError::from_rtio_error(e)),
377         }
378     }
379 }
380
381 /// The accepting half of a TCP socket server. This structure is created through
382 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
383 /// `TcpStream` instances.
384 pub struct TcpAcceptor {
385     obj: Box<RtioTcpAcceptor + Send>,
386 }
387
388 impl TcpAcceptor {
389     /// Prevents blocking on all future accepts after `ms` milliseconds have
390     /// elapsed.
391     ///
392     /// This function is used to set a deadline after which this acceptor will
393     /// time out accepting any connections. The argument is the relative
394     /// distance, in milliseconds, to a point in the future after which all
395     /// accepts will fail.
396     ///
397     /// If the argument specified is `None`, then any previously registered
398     /// timeout is cleared.
399     ///
400     /// A timeout of `0` can be used to "poll" this acceptor to see if it has
401     /// any pending connections. All pending connections will be accepted,
402     /// regardless of whether the timeout has expired or not (the accept will
403     /// not block in this case).
404     ///
405     /// # Example
406     ///
407     /// ```no_run
408     /// # #![allow(experimental)]
409     /// use std::io::net::tcp::TcpListener;
410     /// use std::io::{Listener, Acceptor, TimedOut};
411     ///
412     /// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap();
413     ///
414     /// // After 100ms have passed, all accepts will fail
415     /// a.set_timeout(Some(100));
416     ///
417     /// match a.accept() {
418     ///     Ok(..) => println!("accepted a socket"),
419     ///     Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
420     ///     Err(e) => println!("err: {}", e),
421     /// }
422     ///
423     /// // Reset the timeout and try again
424     /// a.set_timeout(Some(100));
425     /// let socket = a.accept();
426     ///
427     /// // Clear the timeout and block indefinitely waiting for a connection
428     /// a.set_timeout(None);
429     /// let socket = a.accept();
430     /// ```
431     #[experimental = "the type of the argument and name of this function are \
432                       subject to change"]
433     pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); }
434 }
435
436 impl Acceptor<TcpStream> for TcpAcceptor {
437     fn accept(&mut self) -> IoResult<TcpStream> {
438         match self.obj.accept(){
439             Ok(s) => Ok(TcpStream::new(s)),
440             Err(e) => Err(IoError::from_rtio_error(e)),
441         }
442     }
443 }
444
445 #[cfg(test)]
446 #[allow(experimental)]
447 mod test {
448     use super::*;
449     use io::net::ip::SocketAddr;
450     use io::*;
451     use prelude::*;
452
453     // FIXME #11530 this fails on android because tests are run as root
454     iotest!(fn bind_error() {
455         match TcpListener::bind("0.0.0.0", 1) {
456             Ok(..) => fail!(),
457             Err(e) => assert_eq!(e.kind, PermissionDenied),
458         }
459     } #[ignore(cfg(windows))] #[ignore(cfg(target_os = "android"))])
460
461     iotest!(fn connect_error() {
462         match TcpStream::connect("0.0.0.0", 1) {
463             Ok(..) => fail!(),
464             Err(e) => assert_eq!(e.kind, ConnectionRefused),
465         }
466     })
467
468     iotest!(fn listen_ip4_localhost() {
469         let socket_addr = next_test_ip4();
470         let ip_str = socket_addr.ip.to_str();
471         let port = socket_addr.port;
472         let listener = TcpListener::bind(ip_str.as_slice(), port);
473         let mut acceptor = listener.listen();
474
475         spawn(proc() {
476             let mut stream = TcpStream::connect("localhost", port);
477             stream.write([144]).unwrap();
478         });
479
480         let mut stream = acceptor.accept();
481         let mut buf = [0];
482         stream.read(buf).unwrap();
483         assert!(buf[0] == 144);
484     })
485
486     iotest!(fn connect_localhost() {
487         let addr = next_test_ip4();
488         let ip_str = addr.ip.to_str();
489         let port = addr.port;
490         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
491
492         spawn(proc() {
493             let mut stream = TcpStream::connect("localhost", addr.port);
494             stream.write([64]).unwrap();
495         });
496
497         let mut stream = acceptor.accept();
498         let mut buf = [0];
499         stream.read(buf).unwrap();
500         assert!(buf[0] == 64);
501     })
502
503     iotest!(fn connect_ip4_loopback() {
504         let addr = next_test_ip4();
505         let ip_str = addr.ip.to_str();
506         let port = addr.port;
507         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
508
509         spawn(proc() {
510             let mut stream = TcpStream::connect("127.0.0.1", addr.port);
511             stream.write([44]).unwrap();
512         });
513
514         let mut stream = acceptor.accept();
515         let mut buf = [0];
516         stream.read(buf).unwrap();
517         assert!(buf[0] == 44);
518     })
519
520     iotest!(fn connect_ip6_loopback() {
521         let addr = next_test_ip6();
522         let ip_str = addr.ip.to_str();
523         let port = addr.port;
524         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
525
526         spawn(proc() {
527             let mut stream = TcpStream::connect("::1", addr.port);
528             stream.write([66]).unwrap();
529         });
530
531         let mut stream = acceptor.accept();
532         let mut buf = [0];
533         stream.read(buf).unwrap();
534         assert!(buf[0] == 66);
535     })
536
537     iotest!(fn smoke_test_ip4() {
538         let addr = next_test_ip4();
539         let ip_str = addr.ip.to_str();
540         let port = addr.port;
541         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
542
543         spawn(proc() {
544             let mut stream = TcpStream::connect(ip_str.as_slice(), port);
545             stream.write([99]).unwrap();
546         });
547
548         let mut stream = acceptor.accept();
549         let mut buf = [0];
550         stream.read(buf).unwrap();
551         assert!(buf[0] == 99);
552     })
553
554     iotest!(fn smoke_test_ip6() {
555         let addr = next_test_ip6();
556         let ip_str = addr.ip.to_str();
557         let port = addr.port;
558         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
559
560         spawn(proc() {
561             let mut stream = TcpStream::connect(ip_str.as_slice(), port);
562             stream.write([99]).unwrap();
563         });
564
565         let mut stream = acceptor.accept();
566         let mut buf = [0];
567         stream.read(buf).unwrap();
568         assert!(buf[0] == 99);
569     })
570
571     iotest!(fn read_eof_ip4() {
572         let addr = next_test_ip4();
573         let ip_str = addr.ip.to_str();
574         let port = addr.port;
575         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
576
577         spawn(proc() {
578             let _stream = TcpStream::connect(ip_str.as_slice(), port);
579             // Close
580         });
581
582         let mut stream = acceptor.accept();
583         let mut buf = [0];
584         let nread = stream.read(buf);
585         assert!(nread.is_err());
586     })
587
588     iotest!(fn read_eof_ip6() {
589         let addr = next_test_ip6();
590         let ip_str = addr.ip.to_str();
591         let port = addr.port;
592         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
593
594         spawn(proc() {
595             let _stream = TcpStream::connect(ip_str.as_slice(), port);
596             // Close
597         });
598
599         let mut stream = acceptor.accept();
600         let mut buf = [0];
601         let nread = stream.read(buf);
602         assert!(nread.is_err());
603     })
604
605     iotest!(fn read_eof_twice_ip4() {
606         let addr = next_test_ip4();
607         let ip_str = addr.ip.to_str();
608         let port = addr.port;
609         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
610
611         spawn(proc() {
612             let _stream = TcpStream::connect(ip_str.as_slice(), port);
613             // Close
614         });
615
616         let mut stream = acceptor.accept();
617         let mut buf = [0];
618         let nread = stream.read(buf);
619         assert!(nread.is_err());
620
621         match stream.read(buf) {
622             Ok(..) => fail!(),
623             Err(ref e) => {
624                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
625                         "unknown kind: {:?}", e.kind);
626             }
627         }
628     })
629
630     iotest!(fn read_eof_twice_ip6() {
631         let addr = next_test_ip6();
632         let ip_str = addr.ip.to_str();
633         let port = addr.port;
634         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
635
636         spawn(proc() {
637             let _stream = TcpStream::connect(ip_str.as_slice(), port);
638             // Close
639         });
640
641         let mut stream = acceptor.accept();
642         let mut buf = [0];
643         let nread = stream.read(buf);
644         assert!(nread.is_err());
645
646         match stream.read(buf) {
647             Ok(..) => fail!(),
648             Err(ref e) => {
649                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
650                         "unknown kind: {:?}", e.kind);
651             }
652         }
653     })
654
655     iotest!(fn write_close_ip4() {
656         let addr = next_test_ip4();
657         let ip_str = addr.ip.to_str();
658         let port = addr.port;
659         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
660
661         spawn(proc() {
662             let _stream = TcpStream::connect(ip_str.as_slice(), port);
663             // Close
664         });
665
666         let mut stream = acceptor.accept();
667         let buf = [0];
668         loop {
669             match stream.write(buf) {
670                 Ok(..) => {}
671                 Err(e) => {
672                     assert!(e.kind == ConnectionReset ||
673                             e.kind == BrokenPipe ||
674                             e.kind == ConnectionAborted,
675                             "unknown error: {:?}", e);
676                     break;
677                 }
678             }
679         }
680     })
681
682     iotest!(fn write_close_ip6() {
683         let addr = next_test_ip6();
684         let ip_str = addr.ip.to_str();
685         let port = addr.port;
686         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
687
688         spawn(proc() {
689             let _stream = TcpStream::connect(ip_str.as_slice(), port);
690             // Close
691         });
692
693         let mut stream = acceptor.accept();
694         let buf = [0];
695         loop {
696             match stream.write(buf) {
697                 Ok(..) => {}
698                 Err(e) => {
699                     assert!(e.kind == ConnectionReset ||
700                             e.kind == BrokenPipe ||
701                             e.kind == ConnectionAborted,
702                             "unknown error: {:?}", e);
703                     break;
704                 }
705             }
706         }
707     })
708
709     iotest!(fn multiple_connect_serial_ip4() {
710         let addr = next_test_ip4();
711         let ip_str = addr.ip.to_str();
712         let port = addr.port;
713         let max = 10u;
714         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
715
716         spawn(proc() {
717             for _ in range(0, max) {
718                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
719                 stream.write([99]).unwrap();
720             }
721         });
722
723         for ref mut stream in acceptor.incoming().take(max) {
724             let mut buf = [0];
725             stream.read(buf).unwrap();
726             assert_eq!(buf[0], 99);
727         }
728     })
729
730     iotest!(fn multiple_connect_serial_ip6() {
731         let addr = next_test_ip6();
732         let ip_str = addr.ip.to_str();
733         let port = addr.port;
734         let max = 10u;
735         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
736
737         spawn(proc() {
738             for _ in range(0, max) {
739                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
740                 stream.write([99]).unwrap();
741             }
742         });
743
744         for ref mut stream in acceptor.incoming().take(max) {
745             let mut buf = [0];
746             stream.read(buf).unwrap();
747             assert_eq!(buf[0], 99);
748         }
749     })
750
751     iotest!(fn multiple_connect_interleaved_greedy_schedule_ip4() {
752         let addr = next_test_ip4();
753         let ip_str = addr.ip.to_str();
754         let port = addr.port;
755         static MAX: int = 10;
756         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
757
758         spawn(proc() {
759             let mut acceptor = acceptor;
760             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
761                 // Start another task to handle the connection
762                 spawn(proc() {
763                     let mut stream = stream;
764                     let mut buf = [0];
765                     stream.read(buf).unwrap();
766                     assert!(buf[0] == i as u8);
767                     debug!("read");
768                 });
769             }
770         });
771
772         connect(0, addr);
773
774         fn connect(i: int, addr: SocketAddr) {
775             let ip_str = addr.ip.to_str();
776             let port = addr.port;
777             if i == MAX { return }
778
779             spawn(proc() {
780                 debug!("connecting");
781                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
782                 // Connect again before writing
783                 connect(i + 1, addr);
784                 debug!("writing");
785                 stream.write([i as u8]).unwrap();
786             });
787         }
788     })
789
790     iotest!(fn multiple_connect_interleaved_greedy_schedule_ip6() {
791         let addr = next_test_ip6();
792         let ip_str = addr.ip.to_str();
793         let port = addr.port;
794         static MAX: int = 10;
795         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
796
797         spawn(proc() {
798             let mut acceptor = acceptor;
799             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
800                 // Start another task to handle the connection
801                 spawn(proc() {
802                     let mut stream = stream;
803                     let mut buf = [0];
804                     stream.read(buf).unwrap();
805                     assert!(buf[0] == i as u8);
806                     debug!("read");
807                 });
808             }
809         });
810
811         connect(0, addr);
812
813         fn connect(i: int, addr: SocketAddr) {
814             let ip_str = addr.ip.to_str();
815             let port = addr.port;
816             if i == MAX { return }
817
818             spawn(proc() {
819                 debug!("connecting");
820                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
821                 // Connect again before writing
822                 connect(i + 1, addr);
823                 debug!("writing");
824                 stream.write([i as u8]).unwrap();
825             });
826         }
827     })
828
829     iotest!(fn multiple_connect_interleaved_lazy_schedule_ip4() {
830         static MAX: int = 10;
831         let addr = next_test_ip4();
832         let ip_str = addr.ip.to_str();
833         let port = addr.port;
834         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
835
836         spawn(proc() {
837             let mut acceptor = acceptor;
838             for stream in acceptor.incoming().take(MAX as uint) {
839                 // Start another task to handle the connection
840                 spawn(proc() {
841                     let mut stream = stream;
842                     let mut buf = [0];
843                     stream.read(buf).unwrap();
844                     assert!(buf[0] == 99);
845                     debug!("read");
846                 });
847             }
848         });
849
850         connect(0, addr);
851
852         fn connect(i: int, addr: SocketAddr) {
853             let ip_str = addr.ip.to_str();
854             let port = addr.port;
855             if i == MAX { return }
856
857             spawn(proc() {
858                 debug!("connecting");
859                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
860                 // Connect again before writing
861                 connect(i + 1, addr);
862                 debug!("writing");
863                 stream.write([99]).unwrap();
864             });
865         }
866     })
867
868     iotest!(fn multiple_connect_interleaved_lazy_schedule_ip6() {
869         static MAX: int = 10;
870         let addr = next_test_ip6();
871         let ip_str = addr.ip.to_str();
872         let port = addr.port;
873         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
874
875         spawn(proc() {
876             let mut acceptor = acceptor;
877             for stream in acceptor.incoming().take(MAX as uint) {
878                 // Start another task to handle the connection
879                 spawn(proc() {
880                     let mut stream = stream;
881                     let mut buf = [0];
882                     stream.read(buf).unwrap();
883                     assert!(buf[0] == 99);
884                     debug!("read");
885                 });
886             }
887         });
888
889         connect(0, addr);
890
891         fn connect(i: int, addr: SocketAddr) {
892             let ip_str = addr.ip.to_str();
893             let port = addr.port;
894             if i == MAX { return }
895
896             spawn(proc() {
897                 debug!("connecting");
898                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
899                 // Connect again before writing
900                 connect(i + 1, addr);
901                 debug!("writing");
902                 stream.write([99]).unwrap();
903             });
904         }
905     })
906
907     pub fn socket_name(addr: SocketAddr) {
908         let ip_str = addr.ip.to_str();
909         let port = addr.port;
910         let mut listener = TcpListener::bind(ip_str.as_slice(), port).unwrap();
911
912         // Make sure socket_name gives
913         // us the socket we binded to.
914         let so_name = listener.socket_name();
915         assert!(so_name.is_ok());
916         assert_eq!(addr, so_name.unwrap());
917     }
918
919     pub fn peer_name(addr: SocketAddr) {
920         let ip_str = addr.ip.to_str();
921         let port = addr.port;
922         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
923         spawn(proc() {
924             let mut acceptor = acceptor;
925             acceptor.accept().unwrap();
926         });
927
928         let stream = TcpStream::connect(ip_str.as_slice(), port);
929
930         assert!(stream.is_ok());
931         let mut stream = stream.unwrap();
932
933         // Make sure peer_name gives us the
934         // address/port of the peer we've
935         // connected to.
936         let peer_name = stream.peer_name();
937         assert!(peer_name.is_ok());
938         assert_eq!(addr, peer_name.unwrap());
939     }
940
941     iotest!(fn socket_and_peer_name_ip4() {
942         peer_name(next_test_ip4());
943         socket_name(next_test_ip4());
944     })
945
946     iotest!(fn socket_and_peer_name_ip6() {
947         // FIXME: peer name is not consistent
948         //peer_name(next_test_ip6());
949         socket_name(next_test_ip6());
950     })
951
952     iotest!(fn partial_read() {
953         let addr = next_test_ip4();
954         let port = addr.port;
955         let (tx, rx) = channel();
956         spawn(proc() {
957             let ip_str = addr.ip.to_str();
958             let mut srv = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
959             tx.send(());
960             let mut cl = srv.accept().unwrap();
961             cl.write([10]).unwrap();
962             let mut b = [0];
963             cl.read(b).unwrap();
964             tx.send(());
965         });
966
967         rx.recv();
968         let ip_str = addr.ip.to_str();
969         let mut c = TcpStream::connect(ip_str.as_slice(), port).unwrap();
970         let mut b = [0, ..10];
971         assert_eq!(c.read(b), Ok(1));
972         c.write([1]).unwrap();
973         rx.recv();
974     })
975
976     iotest!(fn double_bind() {
977         let addr = next_test_ip4();
978         let ip_str = addr.ip.to_str();
979         let port = addr.port;
980         let listener = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen();
981         assert!(listener.is_ok());
982         match TcpListener::bind(ip_str.as_slice(), port).listen() {
983             Ok(..) => fail!(),
984             Err(e) => {
985                 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
986                         "unknown error: {} {}", e, e.kind);
987             }
988         }
989     })
990
991     iotest!(fn fast_rebind() {
992         let addr = next_test_ip4();
993         let port = addr.port;
994         let (tx, rx) = channel();
995
996         spawn(proc() {
997             let ip_str = addr.ip.to_str();
998             rx.recv();
999             let _stream = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1000             // Close
1001             rx.recv();
1002         });
1003
1004         {
1005             let ip_str = addr.ip.to_str();
1006             let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1007             tx.send(());
1008             {
1009                 let _stream = acceptor.accept().unwrap();
1010                 // Close client
1011                 tx.send(());
1012             }
1013             // Close listener
1014         }
1015         let _listener = TcpListener::bind(addr.ip.to_str().as_slice(), port);
1016     })
1017
1018     iotest!(fn tcp_clone_smoke() {
1019         let addr = next_test_ip4();
1020         let ip_str = addr.ip.to_str();
1021         let port = addr.port;
1022         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1023
1024         spawn(proc() {
1025             let mut s = TcpStream::connect(ip_str.as_slice(), port);
1026             let mut buf = [0, 0];
1027             assert_eq!(s.read(buf), Ok(1));
1028             assert_eq!(buf[0], 1);
1029             s.write([2]).unwrap();
1030         });
1031
1032         let mut s1 = acceptor.accept().unwrap();
1033         let s2 = s1.clone();
1034
1035         let (tx1, rx1) = channel();
1036         let (tx2, rx2) = channel();
1037         spawn(proc() {
1038             let mut s2 = s2;
1039             rx1.recv();
1040             s2.write([1]).unwrap();
1041             tx2.send(());
1042         });
1043         tx1.send(());
1044         let mut buf = [0, 0];
1045         assert_eq!(s1.read(buf), Ok(1));
1046         rx2.recv();
1047     })
1048
1049     iotest!(fn tcp_clone_two_read() {
1050         let addr = next_test_ip6();
1051         let ip_str = addr.ip.to_str();
1052         let port = addr.port;
1053         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1054         let (tx1, rx) = channel();
1055         let tx2 = tx1.clone();
1056
1057         spawn(proc() {
1058             let mut s = TcpStream::connect(ip_str.as_slice(), port);
1059             s.write([1]).unwrap();
1060             rx.recv();
1061             s.write([2]).unwrap();
1062             rx.recv();
1063         });
1064
1065         let mut s1 = acceptor.accept().unwrap();
1066         let s2 = s1.clone();
1067
1068         let (done, rx) = channel();
1069         spawn(proc() {
1070             let mut s2 = s2;
1071             let mut buf = [0, 0];
1072             s2.read(buf).unwrap();
1073             tx2.send(());
1074             done.send(());
1075         });
1076         let mut buf = [0, 0];
1077         s1.read(buf).unwrap();
1078         tx1.send(());
1079
1080         rx.recv();
1081     })
1082
1083     iotest!(fn tcp_clone_two_write() {
1084         let addr = next_test_ip4();
1085         let ip_str = addr.ip.to_str();
1086         let port = addr.port;
1087         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1088
1089         spawn(proc() {
1090             let mut s = TcpStream::connect(ip_str.as_slice(), port);
1091             let mut buf = [0, 1];
1092             s.read(buf).unwrap();
1093             s.read(buf).unwrap();
1094         });
1095
1096         let mut s1 = acceptor.accept().unwrap();
1097         let s2 = s1.clone();
1098
1099         let (done, rx) = channel();
1100         spawn(proc() {
1101             let mut s2 = s2;
1102             s2.write([1]).unwrap();
1103             done.send(());
1104         });
1105         s1.write([2]).unwrap();
1106
1107         rx.recv();
1108     })
1109
1110     iotest!(fn shutdown_smoke() {
1111         use rt::rtio::RtioTcpStream;
1112
1113         let addr = next_test_ip4();
1114         let ip_str = addr.ip.to_str();
1115         let port = addr.port;
1116         let a = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen();
1117         spawn(proc() {
1118             let mut a = a;
1119             let mut c = a.accept().unwrap();
1120             assert_eq!(c.read_to_end(), Ok(vec!()));
1121             c.write([1]).unwrap();
1122         });
1123
1124         let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1125         assert!(s.obj.close_write().is_ok());
1126         assert!(s.write([1]).is_err());
1127         assert_eq!(s.read_to_end(), Ok(vec!(1)));
1128     })
1129
1130     iotest!(fn accept_timeout() {
1131         let addr = next_test_ip4();
1132         let ip_str = addr.ip.to_str();
1133         let port = addr.port;
1134         let mut a = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen().unwrap();
1135
1136         a.set_timeout(Some(10));
1137
1138         // Make sure we time out once and future invocations also time out
1139         let err = a.accept().err().unwrap();
1140         assert_eq!(err.kind, TimedOut);
1141         let err = a.accept().err().unwrap();
1142         assert_eq!(err.kind, TimedOut);
1143
1144         // Also make sure that even though the timeout is expired that we will
1145         // continue to receive any pending connections.
1146         //
1147         // FIXME: freebsd apparently never sees the pending connection, but
1148         //        testing manually always works. Need to investigate this
1149         //        flakiness.
1150         if !cfg!(target_os = "freebsd") {
1151             let (tx, rx) = channel();
1152             spawn(proc() {
1153                 tx.send(TcpStream::connect(addr.ip.to_str().as_slice(),
1154                                            port).unwrap());
1155             });
1156             let _l = rx.recv();
1157             for i in range(0i, 1001) {
1158                 match a.accept() {
1159                     Ok(..) => break,
1160                     Err(ref e) if e.kind == TimedOut => {}
1161                     Err(e) => fail!("error: {}", e),
1162                 }
1163                 ::task::deschedule();
1164                 if i == 1000 { fail!("should have a pending connection") }
1165             }
1166         }
1167
1168         // Unset the timeout and make sure that this always blocks.
1169         a.set_timeout(None);
1170         spawn(proc() {
1171             drop(TcpStream::connect(addr.ip.to_str().as_slice(),
1172                                     port).unwrap());
1173         });
1174         a.accept().unwrap();
1175     })
1176
1177     iotest!(fn close_readwrite_smoke() {
1178         let addr = next_test_ip4();
1179         let ip_str = addr.ip.to_str();
1180         let port = addr.port;
1181         let a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1182         let (_tx, rx) = channel::<()>();
1183         spawn(proc() {
1184             let mut a = a;
1185             let _s = a.accept().unwrap();
1186             let _ = rx.recv_opt();
1187         });
1188
1189         let mut b = [0];
1190         let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1191         let mut s2 = s.clone();
1192
1193         // closing should prevent reads/writes
1194         s.close_write().unwrap();
1195         assert!(s.write([0]).is_err());
1196         s.close_read().unwrap();
1197         assert!(s.read(b).is_err());
1198
1199         // closing should affect previous handles
1200         assert!(s2.write([0]).is_err());
1201         assert!(s2.read(b).is_err());
1202
1203         // closing should affect new handles
1204         let mut s3 = s.clone();
1205         assert!(s3.write([0]).is_err());
1206         assert!(s3.read(b).is_err());
1207
1208         // make sure these don't die
1209         let _ = s2.close_read();
1210         let _ = s2.close_write();
1211         let _ = s3.close_read();
1212         let _ = s3.close_write();
1213     })
1214
1215     iotest!(fn close_read_wakes_up() {
1216         let addr = next_test_ip4();
1217         let ip_str = addr.ip.to_str();
1218         let port = addr.port;
1219         let a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1220         let (_tx, rx) = channel::<()>();
1221         spawn(proc() {
1222             let mut a = a;
1223             let _s = a.accept().unwrap();
1224             let _ = rx.recv_opt();
1225         });
1226
1227         let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1228         let s2 = s.clone();
1229         let (tx, rx) = channel();
1230         spawn(proc() {
1231             let mut s2 = s2;
1232             assert!(s2.read([0]).is_err());
1233             tx.send(());
1234         });
1235         // this should wake up the child task
1236         s.close_read().unwrap();
1237
1238         // this test will never finish if the child doesn't wake up
1239         rx.recv();
1240     })
1241
1242     iotest!(fn readwrite_timeouts() {
1243         let addr = next_test_ip6();
1244         let ip_str = addr.ip.to_str();
1245         let port = addr.port;
1246         let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1247         let (tx, rx) = channel::<()>();
1248         spawn(proc() {
1249             let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1250             rx.recv();
1251             assert!(s.write([0]).is_ok());
1252             let _ = rx.recv_opt();
1253         });
1254
1255         let mut s = a.accept().unwrap();
1256         s.set_timeout(Some(20));
1257         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1258         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1259
1260         s.set_timeout(Some(20));
1261         for i in range(0i, 1001) {
1262             match s.write([0, .. 128 * 1024]) {
1263                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1264                 Err(IoError { kind: TimedOut, .. }) => break,
1265                 Err(e) => fail!("{}", e),
1266            }
1267            if i == 1000 { fail!("should have filled up?!"); }
1268         }
1269         assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1270
1271         tx.send(());
1272         s.set_timeout(None);
1273         assert_eq!(s.read([0, 0]), Ok(1));
1274     })
1275
1276     iotest!(fn read_timeouts() {
1277         let addr = next_test_ip6();
1278         let ip_str = addr.ip.to_str();
1279         let port = addr.port;
1280         let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1281         let (tx, rx) = channel::<()>();
1282         spawn(proc() {
1283             let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1284             rx.recv();
1285             let mut amt = 0;
1286             while amt < 100 * 128 * 1024 {
1287                 match s.read([0, ..128 * 1024]) {
1288                     Ok(n) => { amt += n; }
1289                     Err(e) => fail!("{}", e),
1290                 }
1291             }
1292             let _ = rx.recv_opt();
1293         });
1294
1295         let mut s = a.accept().unwrap();
1296         s.set_read_timeout(Some(20));
1297         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1298         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1299
1300         tx.send(());
1301         for _ in range(0i, 100) {
1302             assert!(s.write([0, ..128 * 1024]).is_ok());
1303         }
1304     })
1305
1306     iotest!(fn write_timeouts() {
1307         let addr = next_test_ip6();
1308         let ip_str = addr.ip.to_str();
1309         let port = addr.port;
1310         let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1311         let (tx, rx) = channel::<()>();
1312         spawn(proc() {
1313             let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1314             rx.recv();
1315             assert!(s.write([0]).is_ok());
1316             let _ = rx.recv_opt();
1317         });
1318
1319         let mut s = a.accept().unwrap();
1320         s.set_write_timeout(Some(20));
1321         for i in range(0i, 1001) {
1322             match s.write([0, .. 128 * 1024]) {
1323                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1324                 Err(IoError { kind: TimedOut, .. }) => break,
1325                 Err(e) => fail!("{}", e),
1326            }
1327            if i == 1000 { fail!("should have filled up?!"); }
1328         }
1329         assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1330
1331         tx.send(());
1332         assert!(s.read([0]).is_ok());
1333     })
1334
1335     iotest!(fn timeout_concurrent_read() {
1336         let addr = next_test_ip6();
1337         let ip_str = addr.ip.to_str();
1338         let port = addr.port;
1339         let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1340         let (tx, rx) = channel::<()>();
1341         spawn(proc() {
1342             let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1343             rx.recv();
1344             assert_eq!(s.write([0]), Ok(()));
1345             let _ = rx.recv_opt();
1346         });
1347
1348         let mut s = a.accept().unwrap();
1349         let s2 = s.clone();
1350         let (tx2, rx2) = channel();
1351         spawn(proc() {
1352             let mut s2 = s2;
1353             assert_eq!(s2.read([0]), Ok(1));
1354             tx2.send(());
1355         });
1356
1357         s.set_read_timeout(Some(20));
1358         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1359         tx.send(());
1360
1361         rx2.recv();
1362     })
1363 }