]> git.lizzy.rs Git - rust.git/blob - src/libstd/io/net/tcp.rs
0b0c22ed88799f3efccb362e12793d42ed6a8d59
[rust.git] / src / libstd / io / net / tcp.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! TCP network connections
12 //!
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
16 //!
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
19
20 use clone::Clone;
21 use collections::MutableSeq;
22 use io::IoResult;
23 use iter::Iterator;
24 use slice::ImmutableVector;
25 use result::{Ok,Err};
26 use io::net::addrinfo::get_host_addresses;
27 use io::net::ip::SocketAddr;
28 use io::{IoError, ConnectionFailed, InvalidInput};
29 use io::{Reader, Writer, Listener, Acceptor};
30 use from_str::FromStr;
31 use kinds::Send;
32 use option::{None, Some, Option};
33 use boxed::Box;
34 use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener};
35 use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
36 use rt::rtio;
37
38 /// A structure which represents a TCP stream between a local socket and a
39 /// remote socket.
40 ///
41 /// # Example
42 ///
43 /// ```no_run
44 /// # #![allow(unused_must_use)]
45 /// use std::io::TcpStream;
46 ///
47 /// let mut stream = TcpStream::connect("127.0.0.1", 34254);
48 ///
49 /// stream.write([1]);
50 /// let mut buf = [0];
51 /// stream.read(buf);
52 /// drop(stream); // close the connection
53 /// ```
54 pub struct TcpStream {
55     obj: Box<RtioTcpStream + Send>,
56 }
57
58 impl TcpStream {
59     fn new(s: Box<RtioTcpStream + Send>) -> TcpStream {
60         TcpStream { obj: s }
61     }
62
63     /// Open a TCP connection to a remote host by hostname or IP address.
64     ///
65     /// `host` can be a hostname or IP address string. If no error is
66     /// encountered, then `Ok(stream)` is returned.
67     pub fn connect(host: &str, port: u16) -> IoResult<TcpStream> {
68         let addresses = match FromStr::from_str(host) {
69             Some(addr) => vec!(addr),
70             None => try!(get_host_addresses(host))
71         };
72         let mut err = IoError {
73             kind: ConnectionFailed,
74             desc: "no addresses found for hostname",
75             detail: None
76         };
77         for addr in addresses.iter() {
78             let addr = rtio::SocketAddr{ ip: super::to_rtio(*addr), port: port };
79             let result = LocalIo::maybe_raise(|io| {
80                 io.tcp_connect(addr, None).map(TcpStream::new)
81             });
82             match result {
83                 Ok(stream) => {
84                     return Ok(stream)
85                 }
86                 Err(connect_err) => {
87                     err = IoError::from_rtio_error(connect_err)
88                 }
89             }
90         }
91         Err(err)
92     }
93
94     /// Creates a TCP connection to a remote socket address, timing out after
95     /// the specified number of milliseconds.
96     ///
97     /// This is the same as the `connect` method, except that if the timeout
98     /// specified (in milliseconds) elapses before a connection is made an error
99     /// will be returned. The error's kind will be `TimedOut`.
100     ///
101     /// Note that the `addr` argument may one day be split into a separate host
102     /// and port, similar to the API seen in `connect`.
103     #[experimental = "the timeout argument may eventually change types"]
104     pub fn connect_timeout(addr: SocketAddr,
105                            timeout_ms: u64) -> IoResult<TcpStream> {
106         let SocketAddr { ip, port } = addr;
107         let addr = rtio::SocketAddr { ip: super::to_rtio(ip), port: port };
108         LocalIo::maybe_raise(|io| {
109             io.tcp_connect(addr, Some(timeout_ms)).map(TcpStream::new)
110         }).map_err(IoError::from_rtio_error)
111     }
112
113     /// Returns the socket address of the remote peer of this TCP connection.
114     pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
115         match self.obj.peer_name() {
116             Ok(rtio::SocketAddr { ip, port }) => {
117                 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
118             }
119             Err(e) => Err(IoError::from_rtio_error(e)),
120         }
121     }
122
123     /// Returns the socket address of the local half of this TCP connection.
124     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
125         match self.obj.socket_name() {
126             Ok(rtio::SocketAddr { ip, port }) => {
127                 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
128             }
129             Err(e) => Err(IoError::from_rtio_error(e)),
130         }
131     }
132
133     /// Sets the nodelay flag on this connection to the boolean specified
134     #[experimental]
135     pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
136         if nodelay {
137             self.obj.nodelay()
138         } else {
139             self.obj.control_congestion()
140         }.map_err(IoError::from_rtio_error)
141     }
142
143     /// Sets the keepalive timeout to the timeout specified.
144     ///
145     /// If the value specified is `None`, then the keepalive flag is cleared on
146     /// this connection. Otherwise, the keepalive timeout will be set to the
147     /// specified time, in seconds.
148     #[experimental]
149     pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
150         match delay_in_seconds {
151             Some(i) => self.obj.keepalive(i),
152             None => self.obj.letdie(),
153         }.map_err(IoError::from_rtio_error)
154     }
155
156     /// Closes the reading half of this connection.
157     ///
158     /// This method will close the reading portion of this connection, causing
159     /// all pending and future reads to immediately return with an error.
160     ///
161     /// # Example
162     ///
163     /// ```no_run
164     /// # #![allow(unused_must_use)]
165     /// use std::io::timer;
166     /// use std::io::TcpStream;
167     ///
168     /// let mut stream = TcpStream::connect("127.0.0.1", 34254).unwrap();
169     /// let stream2 = stream.clone();
170     ///
171     /// spawn(proc() {
172     ///     // close this stream after one second
173     ///     timer::sleep(1000);
174     ///     let mut stream = stream2;
175     ///     stream.close_read();
176     /// });
177     ///
178     /// // wait for some data, will get canceled after one second
179     /// let mut buf = [0];
180     /// stream.read(buf);
181     /// ```
182     ///
183     /// Note that this method affects all cloned handles associated with this
184     /// stream, not just this one handle.
185     pub fn close_read(&mut self) -> IoResult<()> {
186         self.obj.close_read().map_err(IoError::from_rtio_error)
187     }
188
189     /// Closes the writing half of this connection.
190     ///
191     /// This method will close the writing portion of this connection, causing
192     /// all future writes to immediately return with an error.
193     ///
194     /// Note that this method affects all cloned handles associated with this
195     /// stream, not just this one handle.
196     pub fn close_write(&mut self) -> IoResult<()> {
197         self.obj.close_write().map_err(IoError::from_rtio_error)
198     }
199
200     /// Sets a timeout, in milliseconds, for blocking operations on this stream.
201     ///
202     /// This function will set a timeout for all blocking operations (including
203     /// reads and writes) on this stream. The timeout specified is a relative
204     /// time, in milliseconds, into the future after which point operations will
205     /// time out. This means that the timeout must be reset periodically to keep
206     /// it from expiring. Specifying a value of `None` will clear the timeout
207     /// for this stream.
208     ///
209     /// The timeout on this stream is local to this stream only. Setting a
210     /// timeout does not affect any other cloned instances of this stream, nor
211     /// does the timeout propagated to cloned handles of this stream. Setting
212     /// this timeout will override any specific read or write timeouts
213     /// previously set for this stream.
214     ///
215     /// For clarification on the semantics of interrupting a read and a write,
216     /// take a look at `set_read_timeout` and `set_write_timeout`.
217     #[experimental = "the timeout argument may change in type and value"]
218     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
219         self.obj.set_timeout(timeout_ms)
220     }
221
222     /// Sets the timeout for read operations on this stream.
223     ///
224     /// See documentation in `set_timeout` for the semantics of this read time.
225     /// This will overwrite any previous read timeout set through either this
226     /// function or `set_timeout`.
227     ///
228     /// # Errors
229     ///
230     /// When this timeout expires, if there is no pending read operation, no
231     /// action is taken. Otherwise, the read operation will be scheduled to
232     /// promptly return. If a timeout error is returned, then no data was read
233     /// during the timeout period.
234     #[experimental = "the timeout argument may change in type and value"]
235     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
236         self.obj.set_read_timeout(timeout_ms)
237     }
238
239     /// Sets the timeout for write operations on this stream.
240     ///
241     /// See documentation in `set_timeout` for the semantics of this write time.
242     /// This will overwrite any previous write timeout set through either this
243     /// function or `set_timeout`.
244     ///
245     /// # Errors
246     ///
247     /// When this timeout expires, if there is no pending write operation, no
248     /// action is taken. Otherwise, the pending write operation will be
249     /// scheduled to promptly return. The actual state of the underlying stream
250     /// is not specified.
251     ///
252     /// The write operation may return an error of type `ShortWrite` which
253     /// indicates that the object is known to have written an exact number of
254     /// bytes successfully during the timeout period, and the remaining bytes
255     /// were never written.
256     ///
257     /// If the write operation returns `TimedOut`, then it the timeout primitive
258     /// does not know how many bytes were written as part of the timeout
259     /// operation. It may be the case that bytes continue to be written in an
260     /// asynchronous fashion after the call to write returns.
261     #[experimental = "the timeout argument may change in type and value"]
262     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
263         self.obj.set_write_timeout(timeout_ms)
264     }
265 }
266
267 impl Clone for TcpStream {
268     /// Creates a new handle to this TCP stream, allowing for simultaneous reads
269     /// and writes of this connection.
270     ///
271     /// The underlying TCP stream will not be closed until all handles to the
272     /// stream have been deallocated. All handles will also follow the same
273     /// stream, but two concurrent reads will not receive the same data.
274     /// Instead, the first read will receive the first packet received, and the
275     /// second read will receive the second packet.
276     fn clone(&self) -> TcpStream {
277         TcpStream { obj: self.obj.clone() }
278     }
279 }
280
281 impl Reader for TcpStream {
282     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
283         self.obj.read(buf).map_err(IoError::from_rtio_error)
284     }
285 }
286
287 impl Writer for TcpStream {
288     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
289         self.obj.write(buf).map_err(IoError::from_rtio_error)
290     }
291 }
292
293 /// A structure representing a socket server. This listener is used to create a
294 /// `TcpAcceptor` which can be used to accept sockets on a local port.
295 ///
296 /// # Example
297 ///
298 /// ```rust
299 /// # fn main() { }
300 /// # fn foo() {
301 /// # #![allow(dead_code)]
302 /// use std::io::{TcpListener, TcpStream};
303 /// use std::io::{Acceptor, Listener};
304 ///
305 /// let listener = TcpListener::bind("127.0.0.1", 80);
306 ///
307 /// // bind the listener to the specified address
308 /// let mut acceptor = listener.listen();
309 ///
310 /// fn handle_client(mut stream: TcpStream) {
311 ///     // ...
312 /// # &mut stream; // silence unused mutability/variable warning
313 /// }
314 /// // accept connections and process them, spawning a new tasks for each one
315 /// for stream in acceptor.incoming() {
316 ///     match stream {
317 ///         Err(e) => { /* connection failed */ }
318 ///         Ok(stream) => spawn(proc() {
319 ///             // connection succeeded
320 ///             handle_client(stream)
321 ///         })
322 ///     }
323 /// }
324 ///
325 /// // close the socket server
326 /// drop(acceptor);
327 /// # }
328 /// ```
329 pub struct TcpListener {
330     obj: Box<RtioTcpListener + Send>,
331 }
332
333 impl TcpListener {
334     /// Creates a new `TcpListener` which will be bound to the specified IP
335     /// and port. This listener is not ready for accepting connections,
336     /// `listen` must be called on it before that's possible.
337     ///
338     /// Binding with a port number of 0 will request that the OS assigns a port
339     /// to this listener. The port allocated can be queried via the
340     /// `socket_name` function.
341     pub fn bind(addr: &str, port: u16) -> IoResult<TcpListener> {
342         match FromStr::from_str(addr) {
343             Some(ip) => {
344                 let addr = rtio::SocketAddr{
345                     ip: super::to_rtio(ip),
346                     port: port,
347                 };
348                 LocalIo::maybe_raise(|io| {
349                     io.tcp_bind(addr).map(|l| TcpListener { obj: l })
350                 }).map_err(IoError::from_rtio_error)
351             }
352             None => {
353                 Err(IoError{
354                     kind: InvalidInput,
355                     desc: "invalid IP address specified",
356                     detail: None
357                 })
358             }
359         }
360     }
361
362     /// Returns the local socket address of this listener.
363     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
364         match self.obj.socket_name() {
365             Ok(rtio::SocketAddr { ip, port }) => {
366                 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
367             }
368             Err(e) => Err(IoError::from_rtio_error(e)),
369         }
370     }
371 }
372
373 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
374     fn listen(self) -> IoResult<TcpAcceptor> {
375         match self.obj.listen() {
376             Ok(acceptor) => Ok(TcpAcceptor { obj: acceptor }),
377             Err(e) => Err(IoError::from_rtio_error(e)),
378         }
379     }
380 }
381
382 /// The accepting half of a TCP socket server. This structure is created through
383 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
384 /// `TcpStream` instances.
385 pub struct TcpAcceptor {
386     obj: Box<RtioTcpAcceptor + Send>,
387 }
388
389 impl TcpAcceptor {
390     /// Prevents blocking on all future accepts after `ms` milliseconds have
391     /// elapsed.
392     ///
393     /// This function is used to set a deadline after which this acceptor will
394     /// time out accepting any connections. The argument is the relative
395     /// distance, in milliseconds, to a point in the future after which all
396     /// accepts will fail.
397     ///
398     /// If the argument specified is `None`, then any previously registered
399     /// timeout is cleared.
400     ///
401     /// A timeout of `0` can be used to "poll" this acceptor to see if it has
402     /// any pending connections. All pending connections will be accepted,
403     /// regardless of whether the timeout has expired or not (the accept will
404     /// not block in this case).
405     ///
406     /// # Example
407     ///
408     /// ```no_run
409     /// # #![allow(experimental)]
410     /// use std::io::TcpListener;
411     /// use std::io::{Listener, Acceptor, TimedOut};
412     ///
413     /// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap();
414     ///
415     /// // After 100ms have passed, all accepts will fail
416     /// a.set_timeout(Some(100));
417     ///
418     /// match a.accept() {
419     ///     Ok(..) => println!("accepted a socket"),
420     ///     Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
421     ///     Err(e) => println!("err: {}", e),
422     /// }
423     ///
424     /// // Reset the timeout and try again
425     /// a.set_timeout(Some(100));
426     /// let socket = a.accept();
427     ///
428     /// // Clear the timeout and block indefinitely waiting for a connection
429     /// a.set_timeout(None);
430     /// let socket = a.accept();
431     /// ```
432     #[experimental = "the type of the argument and name of this function are \
433                       subject to change"]
434     pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); }
435 }
436
437 impl Acceptor<TcpStream> for TcpAcceptor {
438     fn accept(&mut self) -> IoResult<TcpStream> {
439         match self.obj.accept(){
440             Ok(s) => Ok(TcpStream::new(s)),
441             Err(e) => Err(IoError::from_rtio_error(e)),
442         }
443     }
444 }
445
446 #[cfg(test)]
447 #[allow(experimental)]
448 mod test {
449     use super::*;
450     use io::net::ip::SocketAddr;
451     use io::*;
452     use prelude::*;
453
454     // FIXME #11530 this fails on android because tests are run as root
455     iotest!(fn bind_error() {
456         match TcpListener::bind("0.0.0.0", 1) {
457             Ok(..) => fail!(),
458             Err(e) => assert_eq!(e.kind, PermissionDenied),
459         }
460     } #[ignore(cfg(windows))] #[ignore(cfg(target_os = "android"))])
461
462     iotest!(fn connect_error() {
463         match TcpStream::connect("0.0.0.0", 1) {
464             Ok(..) => fail!(),
465             Err(e) => assert_eq!(e.kind, ConnectionRefused),
466         }
467     })
468
469     iotest!(fn listen_ip4_localhost() {
470         let socket_addr = next_test_ip4();
471         let ip_str = socket_addr.ip.to_string();
472         let port = socket_addr.port;
473         let listener = TcpListener::bind(ip_str.as_slice(), port);
474         let mut acceptor = listener.listen();
475
476         spawn(proc() {
477             let mut stream = TcpStream::connect("localhost", port);
478             stream.write([144]).unwrap();
479         });
480
481         let mut stream = acceptor.accept();
482         let mut buf = [0];
483         stream.read(buf).unwrap();
484         assert!(buf[0] == 144);
485     })
486
487     iotest!(fn connect_localhost() {
488         let addr = next_test_ip4();
489         let ip_str = addr.ip.to_string();
490         let port = addr.port;
491         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
492
493         spawn(proc() {
494             let mut stream = TcpStream::connect("localhost", addr.port);
495             stream.write([64]).unwrap();
496         });
497
498         let mut stream = acceptor.accept();
499         let mut buf = [0];
500         stream.read(buf).unwrap();
501         assert!(buf[0] == 64);
502     })
503
504     iotest!(fn connect_ip4_loopback() {
505         let addr = next_test_ip4();
506         let ip_str = addr.ip.to_string();
507         let port = addr.port;
508         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
509
510         spawn(proc() {
511             let mut stream = TcpStream::connect("127.0.0.1", addr.port);
512             stream.write([44]).unwrap();
513         });
514
515         let mut stream = acceptor.accept();
516         let mut buf = [0];
517         stream.read(buf).unwrap();
518         assert!(buf[0] == 44);
519     })
520
521     iotest!(fn connect_ip6_loopback() {
522         let addr = next_test_ip6();
523         let ip_str = addr.ip.to_string();
524         let port = addr.port;
525         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
526
527         spawn(proc() {
528             let mut stream = TcpStream::connect("::1", addr.port);
529             stream.write([66]).unwrap();
530         });
531
532         let mut stream = acceptor.accept();
533         let mut buf = [0];
534         stream.read(buf).unwrap();
535         assert!(buf[0] == 66);
536     })
537
538     iotest!(fn smoke_test_ip4() {
539         let addr = next_test_ip4();
540         let ip_str = addr.ip.to_string();
541         let port = addr.port;
542         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
543
544         spawn(proc() {
545             let mut stream = TcpStream::connect(ip_str.as_slice(), port);
546             stream.write([99]).unwrap();
547         });
548
549         let mut stream = acceptor.accept();
550         let mut buf = [0];
551         stream.read(buf).unwrap();
552         assert!(buf[0] == 99);
553     })
554
555     iotest!(fn smoke_test_ip6() {
556         let addr = next_test_ip6();
557         let ip_str = addr.ip.to_string();
558         let port = addr.port;
559         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
560
561         spawn(proc() {
562             let mut stream = TcpStream::connect(ip_str.as_slice(), port);
563             stream.write([99]).unwrap();
564         });
565
566         let mut stream = acceptor.accept();
567         let mut buf = [0];
568         stream.read(buf).unwrap();
569         assert!(buf[0] == 99);
570     })
571
572     iotest!(fn read_eof_ip4() {
573         let addr = next_test_ip4();
574         let ip_str = addr.ip.to_string();
575         let port = addr.port;
576         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
577
578         spawn(proc() {
579             let _stream = TcpStream::connect(ip_str.as_slice(), port);
580             // Close
581         });
582
583         let mut stream = acceptor.accept();
584         let mut buf = [0];
585         let nread = stream.read(buf);
586         assert!(nread.is_err());
587     })
588
589     iotest!(fn read_eof_ip6() {
590         let addr = next_test_ip6();
591         let ip_str = addr.ip.to_string();
592         let port = addr.port;
593         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
594
595         spawn(proc() {
596             let _stream = TcpStream::connect(ip_str.as_slice(), port);
597             // Close
598         });
599
600         let mut stream = acceptor.accept();
601         let mut buf = [0];
602         let nread = stream.read(buf);
603         assert!(nread.is_err());
604     })
605
606     iotest!(fn read_eof_twice_ip4() {
607         let addr = next_test_ip4();
608         let ip_str = addr.ip.to_string();
609         let port = addr.port;
610         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
611
612         spawn(proc() {
613             let _stream = TcpStream::connect(ip_str.as_slice(), port);
614             // Close
615         });
616
617         let mut stream = acceptor.accept();
618         let mut buf = [0];
619         let nread = stream.read(buf);
620         assert!(nread.is_err());
621
622         match stream.read(buf) {
623             Ok(..) => fail!(),
624             Err(ref e) => {
625                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
626                         "unknown kind: {}", e.kind);
627             }
628         }
629     })
630
631     iotest!(fn read_eof_twice_ip6() {
632         let addr = next_test_ip6();
633         let ip_str = addr.ip.to_string();
634         let port = addr.port;
635         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
636
637         spawn(proc() {
638             let _stream = TcpStream::connect(ip_str.as_slice(), port);
639             // Close
640         });
641
642         let mut stream = acceptor.accept();
643         let mut buf = [0];
644         let nread = stream.read(buf);
645         assert!(nread.is_err());
646
647         match stream.read(buf) {
648             Ok(..) => fail!(),
649             Err(ref e) => {
650                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
651                         "unknown kind: {}", e.kind);
652             }
653         }
654     })
655
656     iotest!(fn write_close_ip4() {
657         let addr = next_test_ip4();
658         let ip_str = addr.ip.to_string();
659         let port = addr.port;
660         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
661
662         spawn(proc() {
663             let _stream = TcpStream::connect(ip_str.as_slice(), port);
664             // Close
665         });
666
667         let mut stream = acceptor.accept();
668         let buf = [0];
669         loop {
670             match stream.write(buf) {
671                 Ok(..) => {}
672                 Err(e) => {
673                     assert!(e.kind == ConnectionReset ||
674                             e.kind == BrokenPipe ||
675                             e.kind == ConnectionAborted,
676                             "unknown error: {}", e);
677                     break;
678                 }
679             }
680         }
681     })
682
683     iotest!(fn write_close_ip6() {
684         let addr = next_test_ip6();
685         let ip_str = addr.ip.to_string();
686         let port = addr.port;
687         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
688
689         spawn(proc() {
690             let _stream = TcpStream::connect(ip_str.as_slice(), port);
691             // Close
692         });
693
694         let mut stream = acceptor.accept();
695         let buf = [0];
696         loop {
697             match stream.write(buf) {
698                 Ok(..) => {}
699                 Err(e) => {
700                     assert!(e.kind == ConnectionReset ||
701                             e.kind == BrokenPipe ||
702                             e.kind == ConnectionAborted,
703                             "unknown error: {}", e);
704                     break;
705                 }
706             }
707         }
708     })
709
710     iotest!(fn multiple_connect_serial_ip4() {
711         let addr = next_test_ip4();
712         let ip_str = addr.ip.to_string();
713         let port = addr.port;
714         let max = 10u;
715         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
716
717         spawn(proc() {
718             for _ in range(0, max) {
719                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
720                 stream.write([99]).unwrap();
721             }
722         });
723
724         for ref mut stream in acceptor.incoming().take(max) {
725             let mut buf = [0];
726             stream.read(buf).unwrap();
727             assert_eq!(buf[0], 99);
728         }
729     })
730
731     iotest!(fn multiple_connect_serial_ip6() {
732         let addr = next_test_ip6();
733         let ip_str = addr.ip.to_string();
734         let port = addr.port;
735         let max = 10u;
736         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
737
738         spawn(proc() {
739             for _ in range(0, max) {
740                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
741                 stream.write([99]).unwrap();
742             }
743         });
744
745         for ref mut stream in acceptor.incoming().take(max) {
746             let mut buf = [0];
747             stream.read(buf).unwrap();
748             assert_eq!(buf[0], 99);
749         }
750     })
751
752     iotest!(fn multiple_connect_interleaved_greedy_schedule_ip4() {
753         let addr = next_test_ip4();
754         let ip_str = addr.ip.to_string();
755         let port = addr.port;
756         static MAX: int = 10;
757         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
758
759         spawn(proc() {
760             let mut acceptor = acceptor;
761             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
762                 // Start another task to handle the connection
763                 spawn(proc() {
764                     let mut stream = stream;
765                     let mut buf = [0];
766                     stream.read(buf).unwrap();
767                     assert!(buf[0] == i as u8);
768                     debug!("read");
769                 });
770             }
771         });
772
773         connect(0, addr);
774
775         fn connect(i: int, addr: SocketAddr) {
776             let ip_str = addr.ip.to_string();
777             let port = addr.port;
778             if i == MAX { return }
779
780             spawn(proc() {
781                 debug!("connecting");
782                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
783                 // Connect again before writing
784                 connect(i + 1, addr);
785                 debug!("writing");
786                 stream.write([i as u8]).unwrap();
787             });
788         }
789     })
790
791     iotest!(fn multiple_connect_interleaved_greedy_schedule_ip6() {
792         let addr = next_test_ip6();
793         let ip_str = addr.ip.to_string();
794         let port = addr.port;
795         static MAX: int = 10;
796         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
797
798         spawn(proc() {
799             let mut acceptor = acceptor;
800             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
801                 // Start another task to handle the connection
802                 spawn(proc() {
803                     let mut stream = stream;
804                     let mut buf = [0];
805                     stream.read(buf).unwrap();
806                     assert!(buf[0] == i as u8);
807                     debug!("read");
808                 });
809             }
810         });
811
812         connect(0, addr);
813
814         fn connect(i: int, addr: SocketAddr) {
815             let ip_str = addr.ip.to_string();
816             let port = addr.port;
817             if i == MAX { return }
818
819             spawn(proc() {
820                 debug!("connecting");
821                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
822                 // Connect again before writing
823                 connect(i + 1, addr);
824                 debug!("writing");
825                 stream.write([i as u8]).unwrap();
826             });
827         }
828     })
829
830     iotest!(fn multiple_connect_interleaved_lazy_schedule_ip4() {
831         static MAX: int = 10;
832         let addr = next_test_ip4();
833         let ip_str = addr.ip.to_string();
834         let port = addr.port;
835         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
836
837         spawn(proc() {
838             let mut acceptor = acceptor;
839             for stream in acceptor.incoming().take(MAX as uint) {
840                 // Start another task to handle the connection
841                 spawn(proc() {
842                     let mut stream = stream;
843                     let mut buf = [0];
844                     stream.read(buf).unwrap();
845                     assert!(buf[0] == 99);
846                     debug!("read");
847                 });
848             }
849         });
850
851         connect(0, addr);
852
853         fn connect(i: int, addr: SocketAddr) {
854             let ip_str = addr.ip.to_string();
855             let port = addr.port;
856             if i == MAX { return }
857
858             spawn(proc() {
859                 debug!("connecting");
860                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
861                 // Connect again before writing
862                 connect(i + 1, addr);
863                 debug!("writing");
864                 stream.write([99]).unwrap();
865             });
866         }
867     })
868
869     iotest!(fn multiple_connect_interleaved_lazy_schedule_ip6() {
870         static MAX: int = 10;
871         let addr = next_test_ip6();
872         let ip_str = addr.ip.to_string();
873         let port = addr.port;
874         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
875
876         spawn(proc() {
877             let mut acceptor = acceptor;
878             for stream in acceptor.incoming().take(MAX as uint) {
879                 // Start another task to handle the connection
880                 spawn(proc() {
881                     let mut stream = stream;
882                     let mut buf = [0];
883                     stream.read(buf).unwrap();
884                     assert!(buf[0] == 99);
885                     debug!("read");
886                 });
887             }
888         });
889
890         connect(0, addr);
891
892         fn connect(i: int, addr: SocketAddr) {
893             let ip_str = addr.ip.to_string();
894             let port = addr.port;
895             if i == MAX { return }
896
897             spawn(proc() {
898                 debug!("connecting");
899                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
900                 // Connect again before writing
901                 connect(i + 1, addr);
902                 debug!("writing");
903                 stream.write([99]).unwrap();
904             });
905         }
906     })
907
908     pub fn socket_name(addr: SocketAddr) {
909         let ip_str = addr.ip.to_string();
910         let port = addr.port;
911         let mut listener = TcpListener::bind(ip_str.as_slice(), port).unwrap();
912
913         // Make sure socket_name gives
914         // us the socket we binded to.
915         let so_name = listener.socket_name();
916         assert!(so_name.is_ok());
917         assert_eq!(addr, so_name.unwrap());
918     }
919
920     pub fn peer_name(addr: SocketAddr) {
921         let ip_str = addr.ip.to_string();
922         let port = addr.port;
923         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
924         spawn(proc() {
925             let mut acceptor = acceptor;
926             acceptor.accept().unwrap();
927         });
928
929         let stream = TcpStream::connect(ip_str.as_slice(), port);
930
931         assert!(stream.is_ok());
932         let mut stream = stream.unwrap();
933
934         // Make sure peer_name gives us the
935         // address/port of the peer we've
936         // connected to.
937         let peer_name = stream.peer_name();
938         assert!(peer_name.is_ok());
939         assert_eq!(addr, peer_name.unwrap());
940     }
941
942     iotest!(fn socket_and_peer_name_ip4() {
943         peer_name(next_test_ip4());
944         socket_name(next_test_ip4());
945     })
946
947     iotest!(fn socket_and_peer_name_ip6() {
948         // FIXME: peer name is not consistent
949         //peer_name(next_test_ip6());
950         socket_name(next_test_ip6());
951     })
952
953     iotest!(fn partial_read() {
954         let addr = next_test_ip4();
955         let port = addr.port;
956         let (tx, rx) = channel();
957         spawn(proc() {
958             let ip_str = addr.ip.to_string();
959             let mut srv = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
960             tx.send(());
961             let mut cl = srv.accept().unwrap();
962             cl.write([10]).unwrap();
963             let mut b = [0];
964             cl.read(b).unwrap();
965             tx.send(());
966         });
967
968         rx.recv();
969         let ip_str = addr.ip.to_string();
970         let mut c = TcpStream::connect(ip_str.as_slice(), port).unwrap();
971         let mut b = [0, ..10];
972         assert_eq!(c.read(b), Ok(1));
973         c.write([1]).unwrap();
974         rx.recv();
975     })
976
977     iotest!(fn double_bind() {
978         let addr = next_test_ip4();
979         let ip_str = addr.ip.to_string();
980         let port = addr.port;
981         let listener = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen();
982         assert!(listener.is_ok());
983         match TcpListener::bind(ip_str.as_slice(), port).listen() {
984             Ok(..) => fail!(),
985             Err(e) => {
986                 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
987                         "unknown error: {} {}", e, e.kind);
988             }
989         }
990     })
991
992     iotest!(fn fast_rebind() {
993         let addr = next_test_ip4();
994         let port = addr.port;
995         let (tx, rx) = channel();
996
997         spawn(proc() {
998             let ip_str = addr.ip.to_string();
999             rx.recv();
1000             let _stream = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1001             // Close
1002             rx.recv();
1003         });
1004
1005         {
1006             let ip_str = addr.ip.to_string();
1007             let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1008             tx.send(());
1009             {
1010                 let _stream = acceptor.accept().unwrap();
1011                 // Close client
1012                 tx.send(());
1013             }
1014             // Close listener
1015         }
1016         let _listener = TcpListener::bind(addr.ip.to_string().as_slice(), port);
1017     })
1018
1019     iotest!(fn tcp_clone_smoke() {
1020         let addr = next_test_ip4();
1021         let ip_str = addr.ip.to_string();
1022         let port = addr.port;
1023         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1024
1025         spawn(proc() {
1026             let mut s = TcpStream::connect(ip_str.as_slice(), port);
1027             let mut buf = [0, 0];
1028             assert_eq!(s.read(buf), Ok(1));
1029             assert_eq!(buf[0], 1);
1030             s.write([2]).unwrap();
1031         });
1032
1033         let mut s1 = acceptor.accept().unwrap();
1034         let s2 = s1.clone();
1035
1036         let (tx1, rx1) = channel();
1037         let (tx2, rx2) = channel();
1038         spawn(proc() {
1039             let mut s2 = s2;
1040             rx1.recv();
1041             s2.write([1]).unwrap();
1042             tx2.send(());
1043         });
1044         tx1.send(());
1045         let mut buf = [0, 0];
1046         assert_eq!(s1.read(buf), Ok(1));
1047         rx2.recv();
1048     })
1049
1050     iotest!(fn tcp_clone_two_read() {
1051         let addr = next_test_ip6();
1052         let ip_str = addr.ip.to_string();
1053         let port = addr.port;
1054         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1055         let (tx1, rx) = channel();
1056         let tx2 = tx1.clone();
1057
1058         spawn(proc() {
1059             let mut s = TcpStream::connect(ip_str.as_slice(), port);
1060             s.write([1]).unwrap();
1061             rx.recv();
1062             s.write([2]).unwrap();
1063             rx.recv();
1064         });
1065
1066         let mut s1 = acceptor.accept().unwrap();
1067         let s2 = s1.clone();
1068
1069         let (done, rx) = channel();
1070         spawn(proc() {
1071             let mut s2 = s2;
1072             let mut buf = [0, 0];
1073             s2.read(buf).unwrap();
1074             tx2.send(());
1075             done.send(());
1076         });
1077         let mut buf = [0, 0];
1078         s1.read(buf).unwrap();
1079         tx1.send(());
1080
1081         rx.recv();
1082     })
1083
1084     iotest!(fn tcp_clone_two_write() {
1085         let addr = next_test_ip4();
1086         let ip_str = addr.ip.to_string();
1087         let port = addr.port;
1088         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1089
1090         spawn(proc() {
1091             let mut s = TcpStream::connect(ip_str.as_slice(), port);
1092             let mut buf = [0, 1];
1093             s.read(buf).unwrap();
1094             s.read(buf).unwrap();
1095         });
1096
1097         let mut s1 = acceptor.accept().unwrap();
1098         let s2 = s1.clone();
1099
1100         let (done, rx) = channel();
1101         spawn(proc() {
1102             let mut s2 = s2;
1103             s2.write([1]).unwrap();
1104             done.send(());
1105         });
1106         s1.write([2]).unwrap();
1107
1108         rx.recv();
1109     })
1110
1111     iotest!(fn shutdown_smoke() {
1112         use rt::rtio::RtioTcpStream;
1113
1114         let addr = next_test_ip4();
1115         let ip_str = addr.ip.to_string();
1116         let port = addr.port;
1117         let a = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen();
1118         spawn(proc() {
1119             let mut a = a;
1120             let mut c = a.accept().unwrap();
1121             assert_eq!(c.read_to_end(), Ok(vec!()));
1122             c.write([1]).unwrap();
1123         });
1124
1125         let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1126         assert!(s.obj.close_write().is_ok());
1127         assert!(s.write([1]).is_err());
1128         assert_eq!(s.read_to_end(), Ok(vec!(1)));
1129     })
1130
1131     iotest!(fn accept_timeout() {
1132         let addr = next_test_ip4();
1133         let ip_str = addr.ip.to_string();
1134         let port = addr.port;
1135         let mut a = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen().unwrap();
1136
1137         a.set_timeout(Some(10));
1138
1139         // Make sure we time out once and future invocations also time out
1140         let err = a.accept().err().unwrap();
1141         assert_eq!(err.kind, TimedOut);
1142         let err = a.accept().err().unwrap();
1143         assert_eq!(err.kind, TimedOut);
1144
1145         // Also make sure that even though the timeout is expired that we will
1146         // continue to receive any pending connections.
1147         //
1148         // FIXME: freebsd apparently never sees the pending connection, but
1149         //        testing manually always works. Need to investigate this
1150         //        flakiness.
1151         if !cfg!(target_os = "freebsd") {
1152             let (tx, rx) = channel();
1153             spawn(proc() {
1154                 tx.send(TcpStream::connect(addr.ip.to_string().as_slice(),
1155                                            port).unwrap());
1156             });
1157             let _l = rx.recv();
1158             for i in range(0i, 1001) {
1159                 match a.accept() {
1160                     Ok(..) => break,
1161                     Err(ref e) if e.kind == TimedOut => {}
1162                     Err(e) => fail!("error: {}", e),
1163                 }
1164                 ::task::deschedule();
1165                 if i == 1000 { fail!("should have a pending connection") }
1166             }
1167         }
1168
1169         // Unset the timeout and make sure that this always blocks.
1170         a.set_timeout(None);
1171         spawn(proc() {
1172             drop(TcpStream::connect(addr.ip.to_string().as_slice(),
1173                                     port).unwrap());
1174         });
1175         a.accept().unwrap();
1176     })
1177
1178     iotest!(fn close_readwrite_smoke() {
1179         let addr = next_test_ip4();
1180         let ip_str = addr.ip.to_string();
1181         let port = addr.port;
1182         let a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1183         let (_tx, rx) = channel::<()>();
1184         spawn(proc() {
1185             let mut a = a;
1186             let _s = a.accept().unwrap();
1187             let _ = rx.recv_opt();
1188         });
1189
1190         let mut b = [0];
1191         let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1192         let mut s2 = s.clone();
1193
1194         // closing should prevent reads/writes
1195         s.close_write().unwrap();
1196         assert!(s.write([0]).is_err());
1197         s.close_read().unwrap();
1198         assert!(s.read(b).is_err());
1199
1200         // closing should affect previous handles
1201         assert!(s2.write([0]).is_err());
1202         assert!(s2.read(b).is_err());
1203
1204         // closing should affect new handles
1205         let mut s3 = s.clone();
1206         assert!(s3.write([0]).is_err());
1207         assert!(s3.read(b).is_err());
1208
1209         // make sure these don't die
1210         let _ = s2.close_read();
1211         let _ = s2.close_write();
1212         let _ = s3.close_read();
1213         let _ = s3.close_write();
1214     })
1215
1216     iotest!(fn close_read_wakes_up() {
1217         let addr = next_test_ip4();
1218         let ip_str = addr.ip.to_string();
1219         let port = addr.port;
1220         let a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1221         let (_tx, rx) = channel::<()>();
1222         spawn(proc() {
1223             let mut a = a;
1224             let _s = a.accept().unwrap();
1225             let _ = rx.recv_opt();
1226         });
1227
1228         let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1229         let s2 = s.clone();
1230         let (tx, rx) = channel();
1231         spawn(proc() {
1232             let mut s2 = s2;
1233             assert!(s2.read([0]).is_err());
1234             tx.send(());
1235         });
1236         // this should wake up the child task
1237         s.close_read().unwrap();
1238
1239         // this test will never finish if the child doesn't wake up
1240         rx.recv();
1241     })
1242
1243     iotest!(fn readwrite_timeouts() {
1244         let addr = next_test_ip6();
1245         let ip_str = addr.ip.to_string();
1246         let port = addr.port;
1247         let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1248         let (tx, rx) = channel::<()>();
1249         spawn(proc() {
1250             let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1251             rx.recv();
1252             assert!(s.write([0]).is_ok());
1253             let _ = rx.recv_opt();
1254         });
1255
1256         let mut s = a.accept().unwrap();
1257         s.set_timeout(Some(20));
1258         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1259         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1260
1261         s.set_timeout(Some(20));
1262         for i in range(0i, 1001) {
1263             match s.write([0, .. 128 * 1024]) {
1264                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1265                 Err(IoError { kind: TimedOut, .. }) => break,
1266                 Err(e) => fail!("{}", e),
1267            }
1268            if i == 1000 { fail!("should have filled up?!"); }
1269         }
1270         assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1271
1272         tx.send(());
1273         s.set_timeout(None);
1274         assert_eq!(s.read([0, 0]), Ok(1));
1275     })
1276
1277     iotest!(fn read_timeouts() {
1278         let addr = next_test_ip6();
1279         let ip_str = addr.ip.to_string();
1280         let port = addr.port;
1281         let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1282         let (tx, rx) = channel::<()>();
1283         spawn(proc() {
1284             let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1285             rx.recv();
1286             let mut amt = 0;
1287             while amt < 100 * 128 * 1024 {
1288                 match s.read([0, ..128 * 1024]) {
1289                     Ok(n) => { amt += n; }
1290                     Err(e) => fail!("{}", e),
1291                 }
1292             }
1293             let _ = rx.recv_opt();
1294         });
1295
1296         let mut s = a.accept().unwrap();
1297         s.set_read_timeout(Some(20));
1298         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1299         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1300
1301         tx.send(());
1302         for _ in range(0i, 100) {
1303             assert!(s.write([0, ..128 * 1024]).is_ok());
1304         }
1305     })
1306
1307     iotest!(fn write_timeouts() {
1308         let addr = next_test_ip6();
1309         let ip_str = addr.ip.to_string();
1310         let port = addr.port;
1311         let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1312         let (tx, rx) = channel::<()>();
1313         spawn(proc() {
1314             let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1315             rx.recv();
1316             assert!(s.write([0]).is_ok());
1317             let _ = rx.recv_opt();
1318         });
1319
1320         let mut s = a.accept().unwrap();
1321         s.set_write_timeout(Some(20));
1322         for i in range(0i, 1001) {
1323             match s.write([0, .. 128 * 1024]) {
1324                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1325                 Err(IoError { kind: TimedOut, .. }) => break,
1326                 Err(e) => fail!("{}", e),
1327            }
1328            if i == 1000 { fail!("should have filled up?!"); }
1329         }
1330         assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1331
1332         tx.send(());
1333         assert!(s.read([0]).is_ok());
1334     })
1335
1336     iotest!(fn timeout_concurrent_read() {
1337         let addr = next_test_ip6();
1338         let ip_str = addr.ip.to_string();
1339         let port = addr.port;
1340         let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1341         let (tx, rx) = channel::<()>();
1342         spawn(proc() {
1343             let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1344             rx.recv();
1345             assert_eq!(s.write([0]), Ok(()));
1346             let _ = rx.recv_opt();
1347         });
1348
1349         let mut s = a.accept().unwrap();
1350         let s2 = s.clone();
1351         let (tx2, rx2) = channel();
1352         spawn(proc() {
1353             let mut s2 = s2;
1354             assert_eq!(s2.read([0]), Ok(1));
1355             tx2.send(());
1356         });
1357
1358         s.set_read_timeout(Some(20));
1359         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1360         tx.send(());
1361
1362         rx2.recv();
1363     })
1364
1365     iotest!(fn clone_while_reading() {
1366         let addr = next_test_ip6();
1367         let listen = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
1368         let mut accept = listen.listen().unwrap();
1369
1370         // Enqueue a task to write to a socket
1371         let (tx, rx) = channel();
1372         let (txdone, rxdone) = channel();
1373         let txdone2 = txdone.clone();
1374         spawn(proc() {
1375             let mut tcp = TcpStream::connect(addr.ip.to_string().as_slice(),
1376                                              addr.port).unwrap();
1377             rx.recv();
1378             tcp.write_u8(0).unwrap();
1379             txdone2.send(());
1380         });
1381
1382         // Spawn off a reading clone
1383         let tcp = accept.accept().unwrap();
1384         let tcp2 = tcp.clone();
1385         let txdone3 = txdone.clone();
1386         spawn(proc() {
1387             let mut tcp2 = tcp2;
1388             tcp2.read_u8().unwrap();
1389             txdone3.send(());
1390         });
1391
1392         // Try to ensure that the reading clone is indeed reading
1393         for _ in range(0i, 50) {
1394             ::task::deschedule();
1395         }
1396
1397         // clone the handle again while it's reading, then let it finish the
1398         // read.
1399         let _ = tcp.clone();
1400         tx.send(());
1401         rxdone.recv();
1402         rxdone.recv();
1403     })
1404 }