]> git.lizzy.rs Git - rust.git/blob - src/libstd/io/net/tcp.rs
rollup merge of #18407 : thestinger/arena
[rust.git] / src / libstd / io / net / tcp.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! TCP network connections
12 //!
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
16 //!
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
19
20 use clone::Clone;
21 use io::IoResult;
22 use iter::Iterator;
23 use slice::ImmutableSlice;
24 use result::{Ok,Err};
25 use io::net::addrinfo::get_host_addresses;
26 use io::net::ip::SocketAddr;
27 use io::{IoError, ConnectionFailed, InvalidInput};
28 use io::{Reader, Writer, Listener, Acceptor};
29 use io::{standard_error, TimedOut};
30 use from_str::FromStr;
31 use kinds::Send;
32 use option::{None, Some, Option};
33 use boxed::Box;
34 use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener};
35 use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
36 use rt::rtio;
37 use time::Duration;
38
39 /// A structure which represents a TCP stream between a local socket and a
40 /// remote socket.
41 ///
42 /// # Example
43 ///
44 /// ```no_run
45 /// # #![allow(unused_must_use)]
46 /// use std::io::TcpStream;
47 ///
48 /// let mut stream = TcpStream::connect("127.0.0.1", 34254);
49 ///
50 /// stream.write([1]);
51 /// let mut buf = [0];
52 /// stream.read(buf);
53 /// drop(stream); // close the connection
54 /// ```
55 pub struct TcpStream {
56     obj: Box<RtioTcpStream + Send>,
57 }
58
59 impl TcpStream {
60     fn new(s: Box<RtioTcpStream + Send>) -> TcpStream {
61         TcpStream { obj: s }
62     }
63
64     /// Open a TCP connection to a remote host by hostname or IP address.
65     ///
66     /// `host` can be a hostname or IP address string. If no error is
67     /// encountered, then `Ok(stream)` is returned.
68     pub fn connect(host: &str, port: u16) -> IoResult<TcpStream> {
69         let addresses = match FromStr::from_str(host) {
70             Some(addr) => vec!(addr),
71             None => try!(get_host_addresses(host))
72         };
73         let mut err = IoError {
74             kind: ConnectionFailed,
75             desc: "no addresses found for hostname",
76             detail: None
77         };
78         for addr in addresses.iter() {
79             let addr = rtio::SocketAddr{ ip: super::to_rtio(*addr), port: port };
80             let result = LocalIo::maybe_raise(|io| {
81                 io.tcp_connect(addr, None).map(TcpStream::new)
82             });
83             match result {
84                 Ok(stream) => {
85                     return Ok(stream)
86                 }
87                 Err(connect_err) => {
88                     err = IoError::from_rtio_error(connect_err)
89                 }
90             }
91         }
92         Err(err)
93     }
94
95     /// Creates a TCP connection to a remote socket address, timing out after
96     /// the specified duration.
97     ///
98     /// This is the same as the `connect` method, except that if the timeout
99     /// specified elapses before a connection is made an error will be
100     /// returned. The error's kind will be `TimedOut`.
101     ///
102     /// Note that the `addr` argument may one day be split into a separate host
103     /// and port, similar to the API seen in `connect`.
104     ///
105     /// If a `timeout` with zero or negative duration is specified then
106     /// the function returns `Err`, with the error kind set to `TimedOut`.
107     #[experimental = "the timeout argument may eventually change types"]
108     pub fn connect_timeout(addr: SocketAddr,
109                            timeout: Duration) -> IoResult<TcpStream> {
110         if timeout <= Duration::milliseconds(0) {
111             return Err(standard_error(TimedOut));
112         }
113
114         let SocketAddr { ip, port } = addr;
115         let addr = rtio::SocketAddr { ip: super::to_rtio(ip), port: port };
116         LocalIo::maybe_raise(|io| {
117             io.tcp_connect(addr, Some(timeout.num_milliseconds() as u64)).map(TcpStream::new)
118         }).map_err(IoError::from_rtio_error)
119     }
120
121     /// Returns the socket address of the remote peer of this TCP connection.
122     pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
123         match self.obj.peer_name() {
124             Ok(rtio::SocketAddr { ip, port }) => {
125                 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
126             }
127             Err(e) => Err(IoError::from_rtio_error(e)),
128         }
129     }
130
131     /// Returns the socket address of the local half of this TCP connection.
132     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
133         match self.obj.socket_name() {
134             Ok(rtio::SocketAddr { ip, port }) => {
135                 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
136             }
137             Err(e) => Err(IoError::from_rtio_error(e)),
138         }
139     }
140
141     /// Sets the nodelay flag on this connection to the boolean specified
142     #[experimental]
143     pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
144         if nodelay {
145             self.obj.nodelay()
146         } else {
147             self.obj.control_congestion()
148         }.map_err(IoError::from_rtio_error)
149     }
150
151     /// Sets the keepalive timeout to the timeout specified.
152     ///
153     /// If the value specified is `None`, then the keepalive flag is cleared on
154     /// this connection. Otherwise, the keepalive timeout will be set to the
155     /// specified time, in seconds.
156     #[experimental]
157     pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
158         match delay_in_seconds {
159             Some(i) => self.obj.keepalive(i),
160             None => self.obj.letdie(),
161         }.map_err(IoError::from_rtio_error)
162     }
163
164     /// Closes the reading half of this connection.
165     ///
166     /// This method will close the reading portion of this connection, causing
167     /// all pending and future reads to immediately return with an error.
168     ///
169     /// # Example
170     ///
171     /// ```no_run
172     /// # #![allow(unused_must_use)]
173     /// use std::io::timer;
174     /// use std::io::TcpStream;
175     /// use std::time::Duration;
176     ///
177     /// let mut stream = TcpStream::connect("127.0.0.1", 34254).unwrap();
178     /// let stream2 = stream.clone();
179     ///
180     /// spawn(proc() {
181     ///     // close this stream after one second
182     ///     timer::sleep(Duration::seconds(1));
183     ///     let mut stream = stream2;
184     ///     stream.close_read();
185     /// });
186     ///
187     /// // wait for some data, will get canceled after one second
188     /// let mut buf = [0];
189     /// stream.read(buf);
190     /// ```
191     ///
192     /// Note that this method affects all cloned handles associated with this
193     /// stream, not just this one handle.
194     pub fn close_read(&mut self) -> IoResult<()> {
195         self.obj.close_read().map_err(IoError::from_rtio_error)
196     }
197
198     /// Closes the writing half of this connection.
199     ///
200     /// This method will close the writing portion of this connection, causing
201     /// all future writes to immediately return with an error.
202     ///
203     /// Note that this method affects all cloned handles associated with this
204     /// stream, not just this one handle.
205     pub fn close_write(&mut self) -> IoResult<()> {
206         self.obj.close_write().map_err(IoError::from_rtio_error)
207     }
208
209     /// Sets a timeout, in milliseconds, for blocking operations on this stream.
210     ///
211     /// This function will set a timeout for all blocking operations (including
212     /// reads and writes) on this stream. The timeout specified is a relative
213     /// time, in milliseconds, into the future after which point operations will
214     /// time out. This means that the timeout must be reset periodically to keep
215     /// it from expiring. Specifying a value of `None` will clear the timeout
216     /// for this stream.
217     ///
218     /// The timeout on this stream is local to this stream only. Setting a
219     /// timeout does not affect any other cloned instances of this stream, nor
220     /// does the timeout propagated to cloned handles of this stream. Setting
221     /// this timeout will override any specific read or write timeouts
222     /// previously set for this stream.
223     ///
224     /// For clarification on the semantics of interrupting a read and a write,
225     /// take a look at `set_read_timeout` and `set_write_timeout`.
226     #[experimental = "the timeout argument may change in type and value"]
227     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
228         self.obj.set_timeout(timeout_ms)
229     }
230
231     /// Sets the timeout for read operations on this stream.
232     ///
233     /// See documentation in `set_timeout` for the semantics of this read time.
234     /// This will overwrite any previous read timeout set through either this
235     /// function or `set_timeout`.
236     ///
237     /// # Errors
238     ///
239     /// When this timeout expires, if there is no pending read operation, no
240     /// action is taken. Otherwise, the read operation will be scheduled to
241     /// promptly return. If a timeout error is returned, then no data was read
242     /// during the timeout period.
243     #[experimental = "the timeout argument may change in type and value"]
244     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
245         self.obj.set_read_timeout(timeout_ms)
246     }
247
248     /// Sets the timeout for write operations on this stream.
249     ///
250     /// See documentation in `set_timeout` for the semantics of this write time.
251     /// This will overwrite any previous write timeout set through either this
252     /// function or `set_timeout`.
253     ///
254     /// # Errors
255     ///
256     /// When this timeout expires, if there is no pending write operation, no
257     /// action is taken. Otherwise, the pending write operation will be
258     /// scheduled to promptly return. The actual state of the underlying stream
259     /// is not specified.
260     ///
261     /// The write operation may return an error of type `ShortWrite` which
262     /// indicates that the object is known to have written an exact number of
263     /// bytes successfully during the timeout period, and the remaining bytes
264     /// were never written.
265     ///
266     /// If the write operation returns `TimedOut`, then it the timeout primitive
267     /// does not know how many bytes were written as part of the timeout
268     /// operation. It may be the case that bytes continue to be written in an
269     /// asynchronous fashion after the call to write returns.
270     #[experimental = "the timeout argument may change in type and value"]
271     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
272         self.obj.set_write_timeout(timeout_ms)
273     }
274 }
275
276 impl Clone for TcpStream {
277     /// Creates a new handle to this TCP stream, allowing for simultaneous reads
278     /// and writes of this connection.
279     ///
280     /// The underlying TCP stream will not be closed until all handles to the
281     /// stream have been deallocated. All handles will also follow the same
282     /// stream, but two concurrent reads will not receive the same data.
283     /// Instead, the first read will receive the first packet received, and the
284     /// second read will receive the second packet.
285     fn clone(&self) -> TcpStream {
286         TcpStream { obj: self.obj.clone() }
287     }
288 }
289
290 impl Reader for TcpStream {
291     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
292         self.obj.read(buf).map_err(IoError::from_rtio_error)
293     }
294 }
295
296 impl Writer for TcpStream {
297     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
298         self.obj.write(buf).map_err(IoError::from_rtio_error)
299     }
300 }
301
302 /// A structure representing a socket server. This listener is used to create a
303 /// `TcpAcceptor` which can be used to accept sockets on a local port.
304 ///
305 /// # Example
306 ///
307 /// ```rust
308 /// # fn main() { }
309 /// # fn foo() {
310 /// # #![allow(dead_code)]
311 /// use std::io::{TcpListener, TcpStream};
312 /// use std::io::{Acceptor, Listener};
313 ///
314 /// let listener = TcpListener::bind("127.0.0.1", 80);
315 ///
316 /// // bind the listener to the specified address
317 /// let mut acceptor = listener.listen();
318 ///
319 /// fn handle_client(mut stream: TcpStream) {
320 ///     // ...
321 /// # &mut stream; // silence unused mutability/variable warning
322 /// }
323 /// // accept connections and process them, spawning a new tasks for each one
324 /// for stream in acceptor.incoming() {
325 ///     match stream {
326 ///         Err(e) => { /* connection failed */ }
327 ///         Ok(stream) => spawn(proc() {
328 ///             // connection succeeded
329 ///             handle_client(stream)
330 ///         })
331 ///     }
332 /// }
333 ///
334 /// // close the socket server
335 /// drop(acceptor);
336 /// # }
337 /// ```
338 pub struct TcpListener {
339     obj: Box<RtioTcpListener + Send>,
340 }
341
342 impl TcpListener {
343     /// Creates a new `TcpListener` which will be bound to the specified IP
344     /// and port. This listener is not ready for accepting connections,
345     /// `listen` must be called on it before that's possible.
346     ///
347     /// Binding with a port number of 0 will request that the OS assigns a port
348     /// to this listener. The port allocated can be queried via the
349     /// `socket_name` function.
350     pub fn bind(addr: &str, port: u16) -> IoResult<TcpListener> {
351         match FromStr::from_str(addr) {
352             Some(ip) => {
353                 let addr = rtio::SocketAddr{
354                     ip: super::to_rtio(ip),
355                     port: port,
356                 };
357                 LocalIo::maybe_raise(|io| {
358                     io.tcp_bind(addr).map(|l| TcpListener { obj: l })
359                 }).map_err(IoError::from_rtio_error)
360             }
361             None => {
362                 Err(IoError{
363                     kind: InvalidInput,
364                     desc: "invalid IP address specified",
365                     detail: None
366                 })
367             }
368         }
369     }
370
371     /// Returns the local socket address of this listener.
372     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
373         match self.obj.socket_name() {
374             Ok(rtio::SocketAddr { ip, port }) => {
375                 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
376             }
377             Err(e) => Err(IoError::from_rtio_error(e)),
378         }
379     }
380 }
381
382 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
383     fn listen(self) -> IoResult<TcpAcceptor> {
384         match self.obj.listen() {
385             Ok(acceptor) => Ok(TcpAcceptor { obj: acceptor }),
386             Err(e) => Err(IoError::from_rtio_error(e)),
387         }
388     }
389 }
390
391 /// The accepting half of a TCP socket server. This structure is created through
392 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
393 /// `TcpStream` instances.
394 pub struct TcpAcceptor {
395     obj: Box<RtioTcpAcceptor + Send>,
396 }
397
398 impl TcpAcceptor {
399     /// Prevents blocking on all future accepts after `ms` milliseconds have
400     /// elapsed.
401     ///
402     /// This function is used to set a deadline after which this acceptor will
403     /// time out accepting any connections. The argument is the relative
404     /// distance, in milliseconds, to a point in the future after which all
405     /// accepts will fail.
406     ///
407     /// If the argument specified is `None`, then any previously registered
408     /// timeout is cleared.
409     ///
410     /// A timeout of `0` can be used to "poll" this acceptor to see if it has
411     /// any pending connections. All pending connections will be accepted,
412     /// regardless of whether the timeout has expired or not (the accept will
413     /// not block in this case).
414     ///
415     /// # Example
416     ///
417     /// ```no_run
418     /// # #![allow(experimental)]
419     /// use std::io::TcpListener;
420     /// use std::io::{Listener, Acceptor, TimedOut};
421     ///
422     /// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap();
423     ///
424     /// // After 100ms have passed, all accepts will fail
425     /// a.set_timeout(Some(100));
426     ///
427     /// match a.accept() {
428     ///     Ok(..) => println!("accepted a socket"),
429     ///     Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
430     ///     Err(e) => println!("err: {}", e),
431     /// }
432     ///
433     /// // Reset the timeout and try again
434     /// a.set_timeout(Some(100));
435     /// let socket = a.accept();
436     ///
437     /// // Clear the timeout and block indefinitely waiting for a connection
438     /// a.set_timeout(None);
439     /// let socket = a.accept();
440     /// ```
441     #[experimental = "the type of the argument and name of this function are \
442                       subject to change"]
443     pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); }
444
445     /// Closes the accepting capabilities of this acceptor.
446     ///
447     /// This function is similar to `TcpStream`'s `close_{read,write}` methods
448     /// in that it will affect *all* cloned handles of this acceptor's original
449     /// handle.
450     ///
451     /// Once this function succeeds, all future calls to `accept` will return
452     /// immediately with an error, preventing all future calls to accept. The
453     /// underlying socket will not be relinquished back to the OS until all
454     /// acceptors have been deallocated.
455     ///
456     /// This is useful for waking up a thread in an accept loop to indicate that
457     /// it should exit.
458     ///
459     /// # Example
460     ///
461     /// ```
462     /// # #![allow(experimental)]
463     /// use std::io::{TcpListener, Listener, Acceptor, EndOfFile};
464     ///
465     /// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap();
466     /// let a2 = a.clone();
467     ///
468     /// spawn(proc() {
469     ///     let mut a2 = a2;
470     ///     for socket in a2.incoming() {
471     ///         match socket {
472     ///             Ok(s) => { /* handle s */ }
473     ///             Err(ref e) if e.kind == EndOfFile => break, // closed
474     ///             Err(e) => panic!("unexpected error: {}", e),
475     ///         }
476     ///     }
477     /// });
478     ///
479     /// # fn wait_for_sigint() {}
480     /// // Now that our accept loop is running, wait for the program to be
481     /// // requested to exit.
482     /// wait_for_sigint();
483     ///
484     /// // Signal our accept loop to exit
485     /// assert!(a.close_accept().is_ok());
486     /// ```
487     #[experimental]
488     pub fn close_accept(&mut self) -> IoResult<()> {
489         self.obj.close_accept().map_err(IoError::from_rtio_error)
490     }
491 }
492
493 impl Acceptor<TcpStream> for TcpAcceptor {
494     fn accept(&mut self) -> IoResult<TcpStream> {
495         match self.obj.accept(){
496             Ok(s) => Ok(TcpStream::new(s)),
497             Err(e) => Err(IoError::from_rtio_error(e)),
498         }
499     }
500 }
501
502 impl Clone for TcpAcceptor {
503     /// Creates a new handle to this TCP acceptor, allowing for simultaneous
504     /// accepts.
505     ///
506     /// The underlying TCP acceptor will not be closed until all handles to the
507     /// acceptor have been deallocated. Incoming connections will be received on
508     /// at most once acceptor, the same connection will not be accepted twice.
509     ///
510     /// The `close_accept` method will shut down *all* acceptors cloned from the
511     /// same original acceptor, whereas the `set_timeout` method only affects
512     /// the selector that it is called on.
513     ///
514     /// This function is useful for creating a handle to invoke `close_accept`
515     /// on to wake up any other task blocked in `accept`.
516     fn clone(&self) -> TcpAcceptor {
517         TcpAcceptor { obj: self.obj.clone() }
518     }
519 }
520
521 #[cfg(test)]
522 #[allow(experimental)]
523 mod test {
524     use io::net::tcp::*;
525     use io::net::ip::*;
526     use io::*;
527     use io::test::*;
528     use prelude::*;
529
530     // FIXME #11530 this fails on android because tests are run as root
531     #[cfg_attr(any(windows, target_os = "android"), ignore)]
532     #[test]
533     fn bind_error() {
534         match TcpListener::bind("0.0.0.0", 1) {
535             Ok(..) => panic!(),
536             Err(e) => assert_eq!(e.kind, PermissionDenied),
537         }
538     }
539
540     #[test]
541     fn connect_error() {
542         match TcpStream::connect("0.0.0.0", 1) {
543             Ok(..) => panic!(),
544             Err(e) => assert_eq!(e.kind, ConnectionRefused),
545         }
546     }
547
548     #[test]
549     fn listen_ip4_localhost() {
550         let socket_addr = next_test_ip4();
551         let ip_str = socket_addr.ip.to_string();
552         let port = socket_addr.port;
553         let listener = TcpListener::bind(ip_str.as_slice(), port);
554         let mut acceptor = listener.listen();
555
556         spawn(proc() {
557             let mut stream = TcpStream::connect("localhost", port);
558             stream.write([144]).unwrap();
559         });
560
561         let mut stream = acceptor.accept();
562         let mut buf = [0];
563         stream.read(buf).unwrap();
564         assert!(buf[0] == 144);
565     }
566
567     #[test]
568     fn connect_localhost() {
569         let addr = next_test_ip4();
570         let ip_str = addr.ip.to_string();
571         let port = addr.port;
572         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
573
574         spawn(proc() {
575             let mut stream = TcpStream::connect("localhost", addr.port);
576             stream.write([64]).unwrap();
577         });
578
579         let mut stream = acceptor.accept();
580         let mut buf = [0];
581         stream.read(buf).unwrap();
582         assert!(buf[0] == 64);
583     }
584
585     #[test]
586     fn connect_ip4_loopback() {
587         let addr = next_test_ip4();
588         let ip_str = addr.ip.to_string();
589         let port = addr.port;
590         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
591
592         spawn(proc() {
593             let mut stream = TcpStream::connect("127.0.0.1", addr.port);
594             stream.write([44]).unwrap();
595         });
596
597         let mut stream = acceptor.accept();
598         let mut buf = [0];
599         stream.read(buf).unwrap();
600         assert!(buf[0] == 44);
601     }
602
603     #[test]
604     fn connect_ip6_loopback() {
605         let addr = next_test_ip6();
606         let ip_str = addr.ip.to_string();
607         let port = addr.port;
608         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
609
610         spawn(proc() {
611             let mut stream = TcpStream::connect("::1", addr.port);
612             stream.write([66]).unwrap();
613         });
614
615         let mut stream = acceptor.accept();
616         let mut buf = [0];
617         stream.read(buf).unwrap();
618         assert!(buf[0] == 66);
619     }
620
621     #[test]
622     fn smoke_test_ip4() {
623         let addr = next_test_ip4();
624         let ip_str = addr.ip.to_string();
625         let port = addr.port;
626         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
627
628         spawn(proc() {
629             let mut stream = TcpStream::connect(ip_str.as_slice(), port);
630             stream.write([99]).unwrap();
631         });
632
633         let mut stream = acceptor.accept();
634         let mut buf = [0];
635         stream.read(buf).unwrap();
636         assert!(buf[0] == 99);
637     }
638
639     #[test]
640     fn smoke_test_ip6() {
641         let addr = next_test_ip6();
642         let ip_str = addr.ip.to_string();
643         let port = addr.port;
644         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
645
646         spawn(proc() {
647             let mut stream = TcpStream::connect(ip_str.as_slice(), port);
648             stream.write([99]).unwrap();
649         });
650
651         let mut stream = acceptor.accept();
652         let mut buf = [0];
653         stream.read(buf).unwrap();
654         assert!(buf[0] == 99);
655     }
656
657     #[test]
658     fn read_eof_ip4() {
659         let addr = next_test_ip4();
660         let ip_str = addr.ip.to_string();
661         let port = addr.port;
662         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
663
664         spawn(proc() {
665             let _stream = TcpStream::connect(ip_str.as_slice(), port);
666             // Close
667         });
668
669         let mut stream = acceptor.accept();
670         let mut buf = [0];
671         let nread = stream.read(buf);
672         assert!(nread.is_err());
673     }
674
675     #[test]
676     fn read_eof_ip6() {
677         let addr = next_test_ip6();
678         let ip_str = addr.ip.to_string();
679         let port = addr.port;
680         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
681
682         spawn(proc() {
683             let _stream = TcpStream::connect(ip_str.as_slice(), port);
684             // Close
685         });
686
687         let mut stream = acceptor.accept();
688         let mut buf = [0];
689         let nread = stream.read(buf);
690         assert!(nread.is_err());
691     }
692
693     #[test]
694     fn read_eof_twice_ip4() {
695         let addr = next_test_ip4();
696         let ip_str = addr.ip.to_string();
697         let port = addr.port;
698         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
699
700         spawn(proc() {
701             let _stream = TcpStream::connect(ip_str.as_slice(), port);
702             // Close
703         });
704
705         let mut stream = acceptor.accept();
706         let mut buf = [0];
707         let nread = stream.read(buf);
708         assert!(nread.is_err());
709
710         match stream.read(buf) {
711             Ok(..) => panic!(),
712             Err(ref e) => {
713                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
714                         "unknown kind: {}", e.kind);
715             }
716         }
717     }
718
719     #[test]
720     fn read_eof_twice_ip6() {
721         let addr = next_test_ip6();
722         let ip_str = addr.ip.to_string();
723         let port = addr.port;
724         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
725
726         spawn(proc() {
727             let _stream = TcpStream::connect(ip_str.as_slice(), port);
728             // Close
729         });
730
731         let mut stream = acceptor.accept();
732         let mut buf = [0];
733         let nread = stream.read(buf);
734         assert!(nread.is_err());
735
736         match stream.read(buf) {
737             Ok(..) => panic!(),
738             Err(ref e) => {
739                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
740                         "unknown kind: {}", e.kind);
741             }
742         }
743     }
744
745     #[test]
746     fn write_close_ip4() {
747         let addr = next_test_ip4();
748         let ip_str = addr.ip.to_string();
749         let port = addr.port;
750         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
751
752         spawn(proc() {
753             let _stream = TcpStream::connect(ip_str.as_slice(), port);
754             // Close
755         });
756
757         let mut stream = acceptor.accept();
758         let buf = [0];
759         loop {
760             match stream.write(buf) {
761                 Ok(..) => {}
762                 Err(e) => {
763                     assert!(e.kind == ConnectionReset ||
764                             e.kind == BrokenPipe ||
765                             e.kind == ConnectionAborted,
766                             "unknown error: {}", e);
767                     break;
768                 }
769             }
770         }
771     }
772
773     #[test]
774     fn write_close_ip6() {
775         let addr = next_test_ip6();
776         let ip_str = addr.ip.to_string();
777         let port = addr.port;
778         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
779
780         spawn(proc() {
781             let _stream = TcpStream::connect(ip_str.as_slice(), port);
782             // Close
783         });
784
785         let mut stream = acceptor.accept();
786         let buf = [0];
787         loop {
788             match stream.write(buf) {
789                 Ok(..) => {}
790                 Err(e) => {
791                     assert!(e.kind == ConnectionReset ||
792                             e.kind == BrokenPipe ||
793                             e.kind == ConnectionAborted,
794                             "unknown error: {}", e);
795                     break;
796                 }
797             }
798         }
799     }
800
801     #[test]
802     fn multiple_connect_serial_ip4() {
803         let addr = next_test_ip4();
804         let ip_str = addr.ip.to_string();
805         let port = addr.port;
806         let max = 10u;
807         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
808
809         spawn(proc() {
810             for _ in range(0, max) {
811                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
812                 stream.write([99]).unwrap();
813             }
814         });
815
816         for ref mut stream in acceptor.incoming().take(max) {
817             let mut buf = [0];
818             stream.read(buf).unwrap();
819             assert_eq!(buf[0], 99);
820         }
821     }
822
823     #[test]
824     fn multiple_connect_serial_ip6() {
825         let addr = next_test_ip6();
826         let ip_str = addr.ip.to_string();
827         let port = addr.port;
828         let max = 10u;
829         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
830
831         spawn(proc() {
832             for _ in range(0, max) {
833                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
834                 stream.write([99]).unwrap();
835             }
836         });
837
838         for ref mut stream in acceptor.incoming().take(max) {
839             let mut buf = [0];
840             stream.read(buf).unwrap();
841             assert_eq!(buf[0], 99);
842         }
843     }
844
845     #[test]
846     fn multiple_connect_interleaved_greedy_schedule_ip4() {
847         let addr = next_test_ip4();
848         let ip_str = addr.ip.to_string();
849         let port = addr.port;
850         static MAX: int = 10;
851         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
852
853         spawn(proc() {
854             let mut acceptor = acceptor;
855             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
856                 // Start another task to handle the connection
857                 spawn(proc() {
858                     let mut stream = stream;
859                     let mut buf = [0];
860                     stream.read(buf).unwrap();
861                     assert!(buf[0] == i as u8);
862                     debug!("read");
863                 });
864             }
865         });
866
867         connect(0, addr);
868
869         fn connect(i: int, addr: SocketAddr) {
870             let ip_str = addr.ip.to_string();
871             let port = addr.port;
872             if i == MAX { return }
873
874             spawn(proc() {
875                 debug!("connecting");
876                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
877                 // Connect again before writing
878                 connect(i + 1, addr);
879                 debug!("writing");
880                 stream.write([i as u8]).unwrap();
881             });
882         }
883     }
884
885     #[test]
886     fn multiple_connect_interleaved_greedy_schedule_ip6() {
887         let addr = next_test_ip6();
888         let ip_str = addr.ip.to_string();
889         let port = addr.port;
890         static MAX: int = 10;
891         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
892
893         spawn(proc() {
894             let mut acceptor = acceptor;
895             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
896                 // Start another task to handle the connection
897                 spawn(proc() {
898                     let mut stream = stream;
899                     let mut buf = [0];
900                     stream.read(buf).unwrap();
901                     assert!(buf[0] == i as u8);
902                     debug!("read");
903                 });
904             }
905         });
906
907         connect(0, addr);
908
909         fn connect(i: int, addr: SocketAddr) {
910             let ip_str = addr.ip.to_string();
911             let port = addr.port;
912             if i == MAX { return }
913
914             spawn(proc() {
915                 debug!("connecting");
916                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
917                 // Connect again before writing
918                 connect(i + 1, addr);
919                 debug!("writing");
920                 stream.write([i as u8]).unwrap();
921             });
922         }
923     }
924
925     #[test]
926     fn multiple_connect_interleaved_lazy_schedule_ip4() {
927         static MAX: int = 10;
928         let addr = next_test_ip4();
929         let ip_str = addr.ip.to_string();
930         let port = addr.port;
931         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
932
933         spawn(proc() {
934             let mut acceptor = acceptor;
935             for stream in acceptor.incoming().take(MAX as uint) {
936                 // Start another task to handle the connection
937                 spawn(proc() {
938                     let mut stream = stream;
939                     let mut buf = [0];
940                     stream.read(buf).unwrap();
941                     assert!(buf[0] == 99);
942                     debug!("read");
943                 });
944             }
945         });
946
947         connect(0, addr);
948
949         fn connect(i: int, addr: SocketAddr) {
950             let ip_str = addr.ip.to_string();
951             let port = addr.port;
952             if i == MAX { return }
953
954             spawn(proc() {
955                 debug!("connecting");
956                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
957                 // Connect again before writing
958                 connect(i + 1, addr);
959                 debug!("writing");
960                 stream.write([99]).unwrap();
961             });
962         }
963     }
964
965     #[test]
966     fn multiple_connect_interleaved_lazy_schedule_ip6() {
967         static MAX: int = 10;
968         let addr = next_test_ip6();
969         let ip_str = addr.ip.to_string();
970         let port = addr.port;
971         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
972
973         spawn(proc() {
974             let mut acceptor = acceptor;
975             for stream in acceptor.incoming().take(MAX as uint) {
976                 // Start another task to handle the connection
977                 spawn(proc() {
978                     let mut stream = stream;
979                     let mut buf = [0];
980                     stream.read(buf).unwrap();
981                     assert!(buf[0] == 99);
982                     debug!("read");
983                 });
984             }
985         });
986
987         connect(0, addr);
988
989         fn connect(i: int, addr: SocketAddr) {
990             let ip_str = addr.ip.to_string();
991             let port = addr.port;
992             if i == MAX { return }
993
994             spawn(proc() {
995                 debug!("connecting");
996                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
997                 // Connect again before writing
998                 connect(i + 1, addr);
999                 debug!("writing");
1000                 stream.write([99]).unwrap();
1001             });
1002         }
1003     }
1004
1005     pub fn socket_name(addr: SocketAddr) {
1006         let ip_str = addr.ip.to_string();
1007         let port = addr.port;
1008         let mut listener = TcpListener::bind(ip_str.as_slice(), port).unwrap();
1009
1010         // Make sure socket_name gives
1011         // us the socket we binded to.
1012         let so_name = listener.socket_name();
1013         assert!(so_name.is_ok());
1014         assert_eq!(addr, so_name.unwrap());
1015     }
1016
1017     pub fn peer_name(addr: SocketAddr) {
1018         let ip_str = addr.ip.to_string();
1019         let port = addr.port;
1020         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1021         spawn(proc() {
1022             let mut acceptor = acceptor;
1023             acceptor.accept().unwrap();
1024         });
1025
1026         let stream = TcpStream::connect(ip_str.as_slice(), port);
1027
1028         assert!(stream.is_ok());
1029         let mut stream = stream.unwrap();
1030
1031         // Make sure peer_name gives us the
1032         // address/port of the peer we've
1033         // connected to.
1034         let peer_name = stream.peer_name();
1035         assert!(peer_name.is_ok());
1036         assert_eq!(addr, peer_name.unwrap());
1037     }
1038
1039     #[test]
1040     fn socket_and_peer_name_ip4() {
1041         peer_name(next_test_ip4());
1042         socket_name(next_test_ip4());
1043     }
1044
1045     #[test]
1046     fn socket_and_peer_name_ip6() {
1047         // FIXME: peer name is not consistent
1048         //peer_name(next_test_ip6());
1049         socket_name(next_test_ip6());
1050     }
1051
1052     #[test]
1053     fn partial_read() {
1054         let addr = next_test_ip4();
1055         let port = addr.port;
1056         let (tx, rx) = channel();
1057         spawn(proc() {
1058             let ip_str = addr.ip.to_string();
1059             let mut srv = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1060             tx.send(());
1061             let mut cl = srv.accept().unwrap();
1062             cl.write([10]).unwrap();
1063             let mut b = [0];
1064             cl.read(b).unwrap();
1065             tx.send(());
1066         });
1067
1068         rx.recv();
1069         let ip_str = addr.ip.to_string();
1070         let mut c = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1071         let mut b = [0, ..10];
1072         assert_eq!(c.read(b), Ok(1));
1073         c.write([1]).unwrap();
1074         rx.recv();
1075     }
1076
1077     #[test]
1078     fn double_bind() {
1079         let addr = next_test_ip4();
1080         let ip_str = addr.ip.to_string();
1081         let port = addr.port;
1082         let listener = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen();
1083         assert!(listener.is_ok());
1084         match TcpListener::bind(ip_str.as_slice(), port).listen() {
1085             Ok(..) => panic!(),
1086             Err(e) => {
1087                 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
1088                         "unknown error: {} {}", e, e.kind);
1089             }
1090         }
1091     }
1092
1093     #[test]
1094     fn fast_rebind() {
1095         let addr = next_test_ip4();
1096         let port = addr.port;
1097         let (tx, rx) = channel();
1098
1099         spawn(proc() {
1100             let ip_str = addr.ip.to_string();
1101             rx.recv();
1102             let _stream = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1103             // Close
1104             rx.recv();
1105         });
1106
1107         {
1108             let ip_str = addr.ip.to_string();
1109             let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1110             tx.send(());
1111             {
1112                 let _stream = acceptor.accept().unwrap();
1113                 // Close client
1114                 tx.send(());
1115             }
1116             // Close listener
1117         }
1118         let _listener = TcpListener::bind(addr.ip.to_string().as_slice(), port);
1119     }
1120
1121     #[test]
1122     fn tcp_clone_smoke() {
1123         let addr = next_test_ip4();
1124         let ip_str = addr.ip.to_string();
1125         let port = addr.port;
1126         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1127
1128         spawn(proc() {
1129             let mut s = TcpStream::connect(ip_str.as_slice(), port);
1130             let mut buf = [0, 0];
1131             assert_eq!(s.read(buf), Ok(1));
1132             assert_eq!(buf[0], 1);
1133             s.write([2]).unwrap();
1134         });
1135
1136         let mut s1 = acceptor.accept().unwrap();
1137         let s2 = s1.clone();
1138
1139         let (tx1, rx1) = channel();
1140         let (tx2, rx2) = channel();
1141         spawn(proc() {
1142             let mut s2 = s2;
1143             rx1.recv();
1144             s2.write([1]).unwrap();
1145             tx2.send(());
1146         });
1147         tx1.send(());
1148         let mut buf = [0, 0];
1149         assert_eq!(s1.read(buf), Ok(1));
1150         rx2.recv();
1151     }
1152
1153     #[test]
1154     fn tcp_clone_two_read() {
1155         let addr = next_test_ip6();
1156         let ip_str = addr.ip.to_string();
1157         let port = addr.port;
1158         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1159         let (tx1, rx) = channel();
1160         let tx2 = tx1.clone();
1161
1162         spawn(proc() {
1163             let mut s = TcpStream::connect(ip_str.as_slice(), port);
1164             s.write([1]).unwrap();
1165             rx.recv();
1166             s.write([2]).unwrap();
1167             rx.recv();
1168         });
1169
1170         let mut s1 = acceptor.accept().unwrap();
1171         let s2 = s1.clone();
1172
1173         let (done, rx) = channel();
1174         spawn(proc() {
1175             let mut s2 = s2;
1176             let mut buf = [0, 0];
1177             s2.read(buf).unwrap();
1178             tx2.send(());
1179             done.send(());
1180         });
1181         let mut buf = [0, 0];
1182         s1.read(buf).unwrap();
1183         tx1.send(());
1184
1185         rx.recv();
1186     }
1187
1188     #[test]
1189     fn tcp_clone_two_write() {
1190         let addr = next_test_ip4();
1191         let ip_str = addr.ip.to_string();
1192         let port = addr.port;
1193         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1194
1195         spawn(proc() {
1196             let mut s = TcpStream::connect(ip_str.as_slice(), port);
1197             let mut buf = [0, 1];
1198             s.read(buf).unwrap();
1199             s.read(buf).unwrap();
1200         });
1201
1202         let mut s1 = acceptor.accept().unwrap();
1203         let s2 = s1.clone();
1204
1205         let (done, rx) = channel();
1206         spawn(proc() {
1207             let mut s2 = s2;
1208             s2.write([1]).unwrap();
1209             done.send(());
1210         });
1211         s1.write([2]).unwrap();
1212
1213         rx.recv();
1214     }
1215
1216     #[test]
1217     fn shutdown_smoke() {
1218         use rt::rtio::RtioTcpStream;
1219
1220         let addr = next_test_ip4();
1221         let ip_str = addr.ip.to_string();
1222         let port = addr.port;
1223         let a = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen();
1224         spawn(proc() {
1225             let mut a = a;
1226             let mut c = a.accept().unwrap();
1227             assert_eq!(c.read_to_end(), Ok(vec!()));
1228             c.write([1]).unwrap();
1229         });
1230
1231         let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1232         assert!(s.obj.close_write().is_ok());
1233         assert!(s.write([1]).is_err());
1234         assert_eq!(s.read_to_end(), Ok(vec!(1)));
1235     }
1236
1237     #[test]
1238     fn accept_timeout() {
1239         let addr = next_test_ip4();
1240         let ip_str = addr.ip.to_string();
1241         let port = addr.port;
1242         let mut a = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen().unwrap();
1243
1244         a.set_timeout(Some(10));
1245
1246         // Make sure we time out once and future invocations also time out
1247         let err = a.accept().err().unwrap();
1248         assert_eq!(err.kind, TimedOut);
1249         let err = a.accept().err().unwrap();
1250         assert_eq!(err.kind, TimedOut);
1251
1252         // Also make sure that even though the timeout is expired that we will
1253         // continue to receive any pending connections.
1254         //
1255         // FIXME: freebsd apparently never sees the pending connection, but
1256         //        testing manually always works. Need to investigate this
1257         //        flakiness.
1258         if !cfg!(target_os = "freebsd") {
1259             let (tx, rx) = channel();
1260             spawn(proc() {
1261                 tx.send(TcpStream::connect(addr.ip.to_string().as_slice(),
1262                                            port).unwrap());
1263             });
1264             let _l = rx.recv();
1265             for i in range(0i, 1001) {
1266                 match a.accept() {
1267                     Ok(..) => break,
1268                     Err(ref e) if e.kind == TimedOut => {}
1269                     Err(e) => panic!("error: {}", e),
1270                 }
1271                 ::task::deschedule();
1272                 if i == 1000 { panic!("should have a pending connection") }
1273             }
1274         }
1275
1276         // Unset the timeout and make sure that this always blocks.
1277         a.set_timeout(None);
1278         spawn(proc() {
1279             drop(TcpStream::connect(addr.ip.to_string().as_slice(),
1280                                     port).unwrap());
1281         });
1282         a.accept().unwrap();
1283     }
1284
1285     #[test]
1286     fn close_readwrite_smoke() {
1287         let addr = next_test_ip4();
1288         let ip_str = addr.ip.to_string();
1289         let port = addr.port;
1290         let a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1291         let (_tx, rx) = channel::<()>();
1292         spawn(proc() {
1293             let mut a = a;
1294             let _s = a.accept().unwrap();
1295             let _ = rx.recv_opt();
1296         });
1297
1298         let mut b = [0];
1299         let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1300         let mut s2 = s.clone();
1301
1302         // closing should prevent reads/writes
1303         s.close_write().unwrap();
1304         assert!(s.write([0]).is_err());
1305         s.close_read().unwrap();
1306         assert!(s.read(b).is_err());
1307
1308         // closing should affect previous handles
1309         assert!(s2.write([0]).is_err());
1310         assert!(s2.read(b).is_err());
1311
1312         // closing should affect new handles
1313         let mut s3 = s.clone();
1314         assert!(s3.write([0]).is_err());
1315         assert!(s3.read(b).is_err());
1316
1317         // make sure these don't die
1318         let _ = s2.close_read();
1319         let _ = s2.close_write();
1320         let _ = s3.close_read();
1321         let _ = s3.close_write();
1322     }
1323
1324     #[test]
1325     fn close_read_wakes_up() {
1326         let addr = next_test_ip4();
1327         let ip_str = addr.ip.to_string();
1328         let port = addr.port;
1329         let a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1330         let (_tx, rx) = channel::<()>();
1331         spawn(proc() {
1332             let mut a = a;
1333             let _s = a.accept().unwrap();
1334             let _ = rx.recv_opt();
1335         });
1336
1337         let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1338         let s2 = s.clone();
1339         let (tx, rx) = channel();
1340         spawn(proc() {
1341             let mut s2 = s2;
1342             assert!(s2.read([0]).is_err());
1343             tx.send(());
1344         });
1345         // this should wake up the child task
1346         s.close_read().unwrap();
1347
1348         // this test will never finish if the child doesn't wake up
1349         rx.recv();
1350     }
1351
1352     #[test]
1353     fn readwrite_timeouts() {
1354         let addr = next_test_ip6();
1355         let ip_str = addr.ip.to_string();
1356         let port = addr.port;
1357         let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1358         let (tx, rx) = channel::<()>();
1359         spawn(proc() {
1360             let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1361             rx.recv();
1362             assert!(s.write([0]).is_ok());
1363             let _ = rx.recv_opt();
1364         });
1365
1366         let mut s = a.accept().unwrap();
1367         s.set_timeout(Some(20));
1368         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1369         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1370
1371         s.set_timeout(Some(20));
1372         for i in range(0i, 1001) {
1373             match s.write([0, .. 128 * 1024]) {
1374                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1375                 Err(IoError { kind: TimedOut, .. }) => break,
1376                 Err(e) => panic!("{}", e),
1377            }
1378            if i == 1000 { panic!("should have filled up?!"); }
1379         }
1380         assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1381
1382         tx.send(());
1383         s.set_timeout(None);
1384         assert_eq!(s.read([0, 0]), Ok(1));
1385     }
1386
1387     #[test]
1388     fn read_timeouts() {
1389         let addr = next_test_ip6();
1390         let ip_str = addr.ip.to_string();
1391         let port = addr.port;
1392         let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1393         let (tx, rx) = channel::<()>();
1394         spawn(proc() {
1395             let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1396             rx.recv();
1397             let mut amt = 0;
1398             while amt < 100 * 128 * 1024 {
1399                 match s.read([0, ..128 * 1024]) {
1400                     Ok(n) => { amt += n; }
1401                     Err(e) => panic!("{}", e),
1402                 }
1403             }
1404             let _ = rx.recv_opt();
1405         });
1406
1407         let mut s = a.accept().unwrap();
1408         s.set_read_timeout(Some(20));
1409         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1410         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1411
1412         tx.send(());
1413         for _ in range(0i, 100) {
1414             assert!(s.write([0, ..128 * 1024]).is_ok());
1415         }
1416     }
1417
1418     #[test]
1419     fn write_timeouts() {
1420         let addr = next_test_ip6();
1421         let ip_str = addr.ip.to_string();
1422         let port = addr.port;
1423         let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1424         let (tx, rx) = channel::<()>();
1425         spawn(proc() {
1426             let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1427             rx.recv();
1428             assert!(s.write([0]).is_ok());
1429             let _ = rx.recv_opt();
1430         });
1431
1432         let mut s = a.accept().unwrap();
1433         s.set_write_timeout(Some(20));
1434         for i in range(0i, 1001) {
1435             match s.write([0, .. 128 * 1024]) {
1436                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1437                 Err(IoError { kind: TimedOut, .. }) => break,
1438                 Err(e) => panic!("{}", e),
1439            }
1440            if i == 1000 { panic!("should have filled up?!"); }
1441         }
1442         assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1443
1444         tx.send(());
1445         assert!(s.read([0]).is_ok());
1446     }
1447
1448     #[test]
1449     fn timeout_concurrent_read() {
1450         let addr = next_test_ip6();
1451         let ip_str = addr.ip.to_string();
1452         let port = addr.port;
1453         let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1454         let (tx, rx) = channel::<()>();
1455         spawn(proc() {
1456             let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1457             rx.recv();
1458             assert_eq!(s.write([0]), Ok(()));
1459             let _ = rx.recv_opt();
1460         });
1461
1462         let mut s = a.accept().unwrap();
1463         let s2 = s.clone();
1464         let (tx2, rx2) = channel();
1465         spawn(proc() {
1466             let mut s2 = s2;
1467             assert_eq!(s2.read([0]), Ok(1));
1468             tx2.send(());
1469         });
1470
1471         s.set_read_timeout(Some(20));
1472         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1473         tx.send(());
1474
1475         rx2.recv();
1476     }
1477
1478     #[test]
1479     fn clone_while_reading() {
1480         let addr = next_test_ip6();
1481         let listen = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
1482         let mut accept = listen.listen().unwrap();
1483
1484         // Enqueue a task to write to a socket
1485         let (tx, rx) = channel();
1486         let (txdone, rxdone) = channel();
1487         let txdone2 = txdone.clone();
1488         spawn(proc() {
1489             let mut tcp = TcpStream::connect(addr.ip.to_string().as_slice(),
1490                                              addr.port).unwrap();
1491             rx.recv();
1492             tcp.write_u8(0).unwrap();
1493             txdone2.send(());
1494         });
1495
1496         // Spawn off a reading clone
1497         let tcp = accept.accept().unwrap();
1498         let tcp2 = tcp.clone();
1499         let txdone3 = txdone.clone();
1500         spawn(proc() {
1501             let mut tcp2 = tcp2;
1502             tcp2.read_u8().unwrap();
1503             txdone3.send(());
1504         });
1505
1506         // Try to ensure that the reading clone is indeed reading
1507         for _ in range(0i, 50) {
1508             ::task::deschedule();
1509         }
1510
1511         // clone the handle again while it's reading, then let it finish the
1512         // read.
1513         let _ = tcp.clone();
1514         tx.send(());
1515         rxdone.recv();
1516         rxdone.recv();
1517     }
1518
1519     #[test]
1520     fn clone_accept_smoke() {
1521         let addr = next_test_ip4();
1522         let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
1523         let mut a = l.listen().unwrap();
1524         let mut a2 = a.clone();
1525
1526         spawn(proc() {
1527             let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
1528         });
1529         spawn(proc() {
1530             let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
1531         });
1532
1533         assert!(a.accept().is_ok());
1534         assert!(a2.accept().is_ok());
1535     }
1536
1537     #[test]
1538     fn clone_accept_concurrent() {
1539         let addr = next_test_ip4();
1540         let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
1541         let a = l.listen().unwrap();
1542         let a2 = a.clone();
1543
1544         let (tx, rx) = channel();
1545         let tx2 = tx.clone();
1546
1547         spawn(proc() { let mut a = a; tx.send(a.accept()) });
1548         spawn(proc() { let mut a = a2; tx2.send(a.accept()) });
1549
1550         spawn(proc() {
1551             let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
1552         });
1553         spawn(proc() {
1554             let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
1555         });
1556
1557         assert!(rx.recv().is_ok());
1558         assert!(rx.recv().is_ok());
1559     }
1560
1561     #[test]
1562     fn close_accept_smoke() {
1563         let addr = next_test_ip4();
1564         let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
1565         let mut a = l.listen().unwrap();
1566
1567         a.close_accept().unwrap();
1568         assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
1569     }
1570
1571     #[test]
1572     fn close_accept_concurrent() {
1573         let addr = next_test_ip4();
1574         let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
1575         let a = l.listen().unwrap();
1576         let mut a2 = a.clone();
1577
1578         let (tx, rx) = channel();
1579         spawn(proc() {
1580             let mut a = a;
1581             tx.send(a.accept());
1582         });
1583         a2.close_accept().unwrap();
1584
1585         assert_eq!(rx.recv().err().unwrap().kind, EndOfFile);
1586     }
1587 }