]> git.lizzy.rs Git - rust.git/blob - src/libstd/io/net/tcp.rs
auto merge of #16332 : brson/rust/slicestab, r=aturon
[rust.git] / src / libstd / io / net / tcp.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! TCP network connections
12 //!
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
16 //!
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
19
20 use clone::Clone;
21 use collections::MutableSeq;
22 use io::IoResult;
23 use iter::Iterator;
24 use slice::ImmutableSlice;
25 use result::{Ok,Err};
26 use io::net::addrinfo::get_host_addresses;
27 use io::net::ip::SocketAddr;
28 use io::{IoError, ConnectionFailed, InvalidInput};
29 use io::{Reader, Writer, Listener, Acceptor};
30 use io::{standard_error, TimedOut};
31 use from_str::FromStr;
32 use kinds::Send;
33 use option::{None, Some, Option};
34 use boxed::Box;
35 use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener};
36 use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
37 use rt::rtio;
38 use time::Duration;
39
40 /// A structure which represents a TCP stream between a local socket and a
41 /// remote socket.
42 ///
43 /// # Example
44 ///
45 /// ```no_run
46 /// # #![allow(unused_must_use)]
47 /// use std::io::TcpStream;
48 ///
49 /// let mut stream = TcpStream::connect("127.0.0.1", 34254);
50 ///
51 /// stream.write([1]);
52 /// let mut buf = [0];
53 /// stream.read(buf);
54 /// drop(stream); // close the connection
55 /// ```
56 pub struct TcpStream {
57     obj: Box<RtioTcpStream + Send>,
58 }
59
60 impl TcpStream {
61     fn new(s: Box<RtioTcpStream + Send>) -> TcpStream {
62         TcpStream { obj: s }
63     }
64
65     /// Open a TCP connection to a remote host by hostname or IP address.
66     ///
67     /// `host` can be a hostname or IP address string. If no error is
68     /// encountered, then `Ok(stream)` is returned.
69     pub fn connect(host: &str, port: u16) -> IoResult<TcpStream> {
70         let addresses = match FromStr::from_str(host) {
71             Some(addr) => vec!(addr),
72             None => try!(get_host_addresses(host))
73         };
74         let mut err = IoError {
75             kind: ConnectionFailed,
76             desc: "no addresses found for hostname",
77             detail: None
78         };
79         for addr in addresses.iter() {
80             let addr = rtio::SocketAddr{ ip: super::to_rtio(*addr), port: port };
81             let result = LocalIo::maybe_raise(|io| {
82                 io.tcp_connect(addr, None).map(TcpStream::new)
83             });
84             match result {
85                 Ok(stream) => {
86                     return Ok(stream)
87                 }
88                 Err(connect_err) => {
89                     err = IoError::from_rtio_error(connect_err)
90                 }
91             }
92         }
93         Err(err)
94     }
95
96     /// Creates a TCP connection to a remote socket address, timing out after
97     /// the specified duration.
98     ///
99     /// This is the same as the `connect` method, except that if the timeout
100     /// specified (in milliseconds) elapses before a connection is made an error
101     /// will be returned. The error's kind will be `TimedOut`.
102     ///
103     /// Note that the `addr` argument may one day be split into a separate host
104     /// and port, similar to the API seen in `connect`.
105     ///
106     /// If a `timeout` with zero or negative duration is specified then
107     /// the function returns `Err`, with the error kind set to `TimedOut`.
108     #[experimental = "the timeout argument may eventually change types"]
109     pub fn connect_timeout(addr: SocketAddr,
110                            timeout: Duration) -> IoResult<TcpStream> {
111         if timeout <= Duration::milliseconds(0) {
112             return Err(standard_error(TimedOut));
113         }
114
115         let SocketAddr { ip, port } = addr;
116         let addr = rtio::SocketAddr { ip: super::to_rtio(ip), port: port };
117         LocalIo::maybe_raise(|io| {
118             io.tcp_connect(addr, Some(timeout.num_milliseconds() as u64)).map(TcpStream::new)
119         }).map_err(IoError::from_rtio_error)
120     }
121
122     /// Returns the socket address of the remote peer of this TCP connection.
123     pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
124         match self.obj.peer_name() {
125             Ok(rtio::SocketAddr { ip, port }) => {
126                 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
127             }
128             Err(e) => Err(IoError::from_rtio_error(e)),
129         }
130     }
131
132     /// Returns the socket address of the local half of this TCP connection.
133     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
134         match self.obj.socket_name() {
135             Ok(rtio::SocketAddr { ip, port }) => {
136                 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
137             }
138             Err(e) => Err(IoError::from_rtio_error(e)),
139         }
140     }
141
142     /// Sets the nodelay flag on this connection to the boolean specified
143     #[experimental]
144     pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
145         if nodelay {
146             self.obj.nodelay()
147         } else {
148             self.obj.control_congestion()
149         }.map_err(IoError::from_rtio_error)
150     }
151
152     /// Sets the keepalive timeout to the timeout specified.
153     ///
154     /// If the value specified is `None`, then the keepalive flag is cleared on
155     /// this connection. Otherwise, the keepalive timeout will be set to the
156     /// specified time, in seconds.
157     #[experimental]
158     pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
159         match delay_in_seconds {
160             Some(i) => self.obj.keepalive(i),
161             None => self.obj.letdie(),
162         }.map_err(IoError::from_rtio_error)
163     }
164
165     /// Closes the reading half of this connection.
166     ///
167     /// This method will close the reading portion of this connection, causing
168     /// all pending and future reads to immediately return with an error.
169     ///
170     /// # Example
171     ///
172     /// ```no_run
173     /// # #![allow(unused_must_use)]
174     /// use std::io::timer;
175     /// use std::io::TcpStream;
176     /// use std::time::Duration;
177     ///
178     /// let mut stream = TcpStream::connect("127.0.0.1", 34254).unwrap();
179     /// let stream2 = stream.clone();
180     ///
181     /// spawn(proc() {
182     ///     // close this stream after one second
183     ///     timer::sleep(Duration::seconds(1));
184     ///     let mut stream = stream2;
185     ///     stream.close_read();
186     /// });
187     ///
188     /// // wait for some data, will get canceled after one second
189     /// let mut buf = [0];
190     /// stream.read(buf);
191     /// ```
192     ///
193     /// Note that this method affects all cloned handles associated with this
194     /// stream, not just this one handle.
195     pub fn close_read(&mut self) -> IoResult<()> {
196         self.obj.close_read().map_err(IoError::from_rtio_error)
197     }
198
199     /// Closes the writing half of this connection.
200     ///
201     /// This method will close the writing portion of this connection, causing
202     /// all future writes to immediately return with an error.
203     ///
204     /// Note that this method affects all cloned handles associated with this
205     /// stream, not just this one handle.
206     pub fn close_write(&mut self) -> IoResult<()> {
207         self.obj.close_write().map_err(IoError::from_rtio_error)
208     }
209
210     /// Sets a timeout, in milliseconds, for blocking operations on this stream.
211     ///
212     /// This function will set a timeout for all blocking operations (including
213     /// reads and writes) on this stream. The timeout specified is a relative
214     /// time, in milliseconds, into the future after which point operations will
215     /// time out. This means that the timeout must be reset periodically to keep
216     /// it from expiring. Specifying a value of `None` will clear the timeout
217     /// for this stream.
218     ///
219     /// The timeout on this stream is local to this stream only. Setting a
220     /// timeout does not affect any other cloned instances of this stream, nor
221     /// does the timeout propagated to cloned handles of this stream. Setting
222     /// this timeout will override any specific read or write timeouts
223     /// previously set for this stream.
224     ///
225     /// For clarification on the semantics of interrupting a read and a write,
226     /// take a look at `set_read_timeout` and `set_write_timeout`.
227     #[experimental = "the timeout argument may change in type and value"]
228     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
229         self.obj.set_timeout(timeout_ms)
230     }
231
232     /// Sets the timeout for read operations on this stream.
233     ///
234     /// See documentation in `set_timeout` for the semantics of this read time.
235     /// This will overwrite any previous read timeout set through either this
236     /// function or `set_timeout`.
237     ///
238     /// # Errors
239     ///
240     /// When this timeout expires, if there is no pending read operation, no
241     /// action is taken. Otherwise, the read operation will be scheduled to
242     /// promptly return. If a timeout error is returned, then no data was read
243     /// during the timeout period.
244     #[experimental = "the timeout argument may change in type and value"]
245     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
246         self.obj.set_read_timeout(timeout_ms)
247     }
248
249     /// Sets the timeout for write operations on this stream.
250     ///
251     /// See documentation in `set_timeout` for the semantics of this write time.
252     /// This will overwrite any previous write timeout set through either this
253     /// function or `set_timeout`.
254     ///
255     /// # Errors
256     ///
257     /// When this timeout expires, if there is no pending write operation, no
258     /// action is taken. Otherwise, the pending write operation will be
259     /// scheduled to promptly return. The actual state of the underlying stream
260     /// is not specified.
261     ///
262     /// The write operation may return an error of type `ShortWrite` which
263     /// indicates that the object is known to have written an exact number of
264     /// bytes successfully during the timeout period, and the remaining bytes
265     /// were never written.
266     ///
267     /// If the write operation returns `TimedOut`, then it the timeout primitive
268     /// does not know how many bytes were written as part of the timeout
269     /// operation. It may be the case that bytes continue to be written in an
270     /// asynchronous fashion after the call to write returns.
271     #[experimental = "the timeout argument may change in type and value"]
272     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
273         self.obj.set_write_timeout(timeout_ms)
274     }
275 }
276
277 impl Clone for TcpStream {
278     /// Creates a new handle to this TCP stream, allowing for simultaneous reads
279     /// and writes of this connection.
280     ///
281     /// The underlying TCP stream will not be closed until all handles to the
282     /// stream have been deallocated. All handles will also follow the same
283     /// stream, but two concurrent reads will not receive the same data.
284     /// Instead, the first read will receive the first packet received, and the
285     /// second read will receive the second packet.
286     fn clone(&self) -> TcpStream {
287         TcpStream { obj: self.obj.clone() }
288     }
289 }
290
291 impl Reader for TcpStream {
292     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
293         self.obj.read(buf).map_err(IoError::from_rtio_error)
294     }
295 }
296
297 impl Writer for TcpStream {
298     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
299         self.obj.write(buf).map_err(IoError::from_rtio_error)
300     }
301 }
302
303 /// A structure representing a socket server. This listener is used to create a
304 /// `TcpAcceptor` which can be used to accept sockets on a local port.
305 ///
306 /// # Example
307 ///
308 /// ```rust
309 /// # fn main() { }
310 /// # fn foo() {
311 /// # #![allow(dead_code)]
312 /// use std::io::{TcpListener, TcpStream};
313 /// use std::io::{Acceptor, Listener};
314 ///
315 /// let listener = TcpListener::bind("127.0.0.1", 80);
316 ///
317 /// // bind the listener to the specified address
318 /// let mut acceptor = listener.listen();
319 ///
320 /// fn handle_client(mut stream: TcpStream) {
321 ///     // ...
322 /// # &mut stream; // silence unused mutability/variable warning
323 /// }
324 /// // accept connections and process them, spawning a new tasks for each one
325 /// for stream in acceptor.incoming() {
326 ///     match stream {
327 ///         Err(e) => { /* connection failed */ }
328 ///         Ok(stream) => spawn(proc() {
329 ///             // connection succeeded
330 ///             handle_client(stream)
331 ///         })
332 ///     }
333 /// }
334 ///
335 /// // close the socket server
336 /// drop(acceptor);
337 /// # }
338 /// ```
339 pub struct TcpListener {
340     obj: Box<RtioTcpListener + Send>,
341 }
342
343 impl TcpListener {
344     /// Creates a new `TcpListener` which will be bound to the specified IP
345     /// and port. This listener is not ready for accepting connections,
346     /// `listen` must be called on it before that's possible.
347     ///
348     /// Binding with a port number of 0 will request that the OS assigns a port
349     /// to this listener. The port allocated can be queried via the
350     /// `socket_name` function.
351     pub fn bind(addr: &str, port: u16) -> IoResult<TcpListener> {
352         match FromStr::from_str(addr) {
353             Some(ip) => {
354                 let addr = rtio::SocketAddr{
355                     ip: super::to_rtio(ip),
356                     port: port,
357                 };
358                 LocalIo::maybe_raise(|io| {
359                     io.tcp_bind(addr).map(|l| TcpListener { obj: l })
360                 }).map_err(IoError::from_rtio_error)
361             }
362             None => {
363                 Err(IoError{
364                     kind: InvalidInput,
365                     desc: "invalid IP address specified",
366                     detail: None
367                 })
368             }
369         }
370     }
371
372     /// Returns the local socket address of this listener.
373     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
374         match self.obj.socket_name() {
375             Ok(rtio::SocketAddr { ip, port }) => {
376                 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
377             }
378             Err(e) => Err(IoError::from_rtio_error(e)),
379         }
380     }
381 }
382
383 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
384     fn listen(self) -> IoResult<TcpAcceptor> {
385         match self.obj.listen() {
386             Ok(acceptor) => Ok(TcpAcceptor { obj: acceptor }),
387             Err(e) => Err(IoError::from_rtio_error(e)),
388         }
389     }
390 }
391
392 /// The accepting half of a TCP socket server. This structure is created through
393 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
394 /// `TcpStream` instances.
395 pub struct TcpAcceptor {
396     obj: Box<RtioTcpAcceptor + Send>,
397 }
398
399 impl TcpAcceptor {
400     /// Prevents blocking on all future accepts after `ms` milliseconds have
401     /// elapsed.
402     ///
403     /// This function is used to set a deadline after which this acceptor will
404     /// time out accepting any connections. The argument is the relative
405     /// distance, in milliseconds, to a point in the future after which all
406     /// accepts will fail.
407     ///
408     /// If the argument specified is `None`, then any previously registered
409     /// timeout is cleared.
410     ///
411     /// A timeout of `0` can be used to "poll" this acceptor to see if it has
412     /// any pending connections. All pending connections will be accepted,
413     /// regardless of whether the timeout has expired or not (the accept will
414     /// not block in this case).
415     ///
416     /// # Example
417     ///
418     /// ```no_run
419     /// # #![allow(experimental)]
420     /// use std::io::TcpListener;
421     /// use std::io::{Listener, Acceptor, TimedOut};
422     ///
423     /// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap();
424     ///
425     /// // After 100ms have passed, all accepts will fail
426     /// a.set_timeout(Some(100));
427     ///
428     /// match a.accept() {
429     ///     Ok(..) => println!("accepted a socket"),
430     ///     Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
431     ///     Err(e) => println!("err: {}", e),
432     /// }
433     ///
434     /// // Reset the timeout and try again
435     /// a.set_timeout(Some(100));
436     /// let socket = a.accept();
437     ///
438     /// // Clear the timeout and block indefinitely waiting for a connection
439     /// a.set_timeout(None);
440     /// let socket = a.accept();
441     /// ```
442     #[experimental = "the type of the argument and name of this function are \
443                       subject to change"]
444     pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); }
445 }
446
447 impl Acceptor<TcpStream> for TcpAcceptor {
448     fn accept(&mut self) -> IoResult<TcpStream> {
449         match self.obj.accept(){
450             Ok(s) => Ok(TcpStream::new(s)),
451             Err(e) => Err(IoError::from_rtio_error(e)),
452         }
453     }
454 }
455
456 #[cfg(test)]
457 #[allow(experimental)]
458 mod test {
459     use super::*;
460     use io::net::ip::SocketAddr;
461     use io::*;
462     use prelude::*;
463
464     // FIXME #11530 this fails on android because tests are run as root
465     iotest!(fn bind_error() {
466         match TcpListener::bind("0.0.0.0", 1) {
467             Ok(..) => fail!(),
468             Err(e) => assert_eq!(e.kind, PermissionDenied),
469         }
470     } #[ignore(cfg(windows))] #[ignore(cfg(target_os = "android"))])
471
472     iotest!(fn connect_error() {
473         match TcpStream::connect("0.0.0.0", 1) {
474             Ok(..) => fail!(),
475             Err(e) => assert_eq!(e.kind, ConnectionRefused),
476         }
477     })
478
479     iotest!(fn listen_ip4_localhost() {
480         let socket_addr = next_test_ip4();
481         let ip_str = socket_addr.ip.to_string();
482         let port = socket_addr.port;
483         let listener = TcpListener::bind(ip_str.as_slice(), port);
484         let mut acceptor = listener.listen();
485
486         spawn(proc() {
487             let mut stream = TcpStream::connect("localhost", port);
488             stream.write([144]).unwrap();
489         });
490
491         let mut stream = acceptor.accept();
492         let mut buf = [0];
493         stream.read(buf).unwrap();
494         assert!(buf[0] == 144);
495     })
496
497     iotest!(fn connect_localhost() {
498         let addr = next_test_ip4();
499         let ip_str = addr.ip.to_string();
500         let port = addr.port;
501         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
502
503         spawn(proc() {
504             let mut stream = TcpStream::connect("localhost", addr.port);
505             stream.write([64]).unwrap();
506         });
507
508         let mut stream = acceptor.accept();
509         let mut buf = [0];
510         stream.read(buf).unwrap();
511         assert!(buf[0] == 64);
512     })
513
514     iotest!(fn connect_ip4_loopback() {
515         let addr = next_test_ip4();
516         let ip_str = addr.ip.to_string();
517         let port = addr.port;
518         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
519
520         spawn(proc() {
521             let mut stream = TcpStream::connect("127.0.0.1", addr.port);
522             stream.write([44]).unwrap();
523         });
524
525         let mut stream = acceptor.accept();
526         let mut buf = [0];
527         stream.read(buf).unwrap();
528         assert!(buf[0] == 44);
529     })
530
531     iotest!(fn connect_ip6_loopback() {
532         let addr = next_test_ip6();
533         let ip_str = addr.ip.to_string();
534         let port = addr.port;
535         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
536
537         spawn(proc() {
538             let mut stream = TcpStream::connect("::1", addr.port);
539             stream.write([66]).unwrap();
540         });
541
542         let mut stream = acceptor.accept();
543         let mut buf = [0];
544         stream.read(buf).unwrap();
545         assert!(buf[0] == 66);
546     })
547
548     iotest!(fn smoke_test_ip4() {
549         let addr = next_test_ip4();
550         let ip_str = addr.ip.to_string();
551         let port = addr.port;
552         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
553
554         spawn(proc() {
555             let mut stream = TcpStream::connect(ip_str.as_slice(), port);
556             stream.write([99]).unwrap();
557         });
558
559         let mut stream = acceptor.accept();
560         let mut buf = [0];
561         stream.read(buf).unwrap();
562         assert!(buf[0] == 99);
563     })
564
565     iotest!(fn smoke_test_ip6() {
566         let addr = next_test_ip6();
567         let ip_str = addr.ip.to_string();
568         let port = addr.port;
569         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
570
571         spawn(proc() {
572             let mut stream = TcpStream::connect(ip_str.as_slice(), port);
573             stream.write([99]).unwrap();
574         });
575
576         let mut stream = acceptor.accept();
577         let mut buf = [0];
578         stream.read(buf).unwrap();
579         assert!(buf[0] == 99);
580     })
581
582     iotest!(fn read_eof_ip4() {
583         let addr = next_test_ip4();
584         let ip_str = addr.ip.to_string();
585         let port = addr.port;
586         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
587
588         spawn(proc() {
589             let _stream = TcpStream::connect(ip_str.as_slice(), port);
590             // Close
591         });
592
593         let mut stream = acceptor.accept();
594         let mut buf = [0];
595         let nread = stream.read(buf);
596         assert!(nread.is_err());
597     })
598
599     iotest!(fn read_eof_ip6() {
600         let addr = next_test_ip6();
601         let ip_str = addr.ip.to_string();
602         let port = addr.port;
603         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
604
605         spawn(proc() {
606             let _stream = TcpStream::connect(ip_str.as_slice(), port);
607             // Close
608         });
609
610         let mut stream = acceptor.accept();
611         let mut buf = [0];
612         let nread = stream.read(buf);
613         assert!(nread.is_err());
614     })
615
616     iotest!(fn read_eof_twice_ip4() {
617         let addr = next_test_ip4();
618         let ip_str = addr.ip.to_string();
619         let port = addr.port;
620         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
621
622         spawn(proc() {
623             let _stream = TcpStream::connect(ip_str.as_slice(), port);
624             // Close
625         });
626
627         let mut stream = acceptor.accept();
628         let mut buf = [0];
629         let nread = stream.read(buf);
630         assert!(nread.is_err());
631
632         match stream.read(buf) {
633             Ok(..) => fail!(),
634             Err(ref e) => {
635                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
636                         "unknown kind: {}", e.kind);
637             }
638         }
639     })
640
641     iotest!(fn read_eof_twice_ip6() {
642         let addr = next_test_ip6();
643         let ip_str = addr.ip.to_string();
644         let port = addr.port;
645         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
646
647         spawn(proc() {
648             let _stream = TcpStream::connect(ip_str.as_slice(), port);
649             // Close
650         });
651
652         let mut stream = acceptor.accept();
653         let mut buf = [0];
654         let nread = stream.read(buf);
655         assert!(nread.is_err());
656
657         match stream.read(buf) {
658             Ok(..) => fail!(),
659             Err(ref e) => {
660                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
661                         "unknown kind: {}", e.kind);
662             }
663         }
664     })
665
666     iotest!(fn write_close_ip4() {
667         let addr = next_test_ip4();
668         let ip_str = addr.ip.to_string();
669         let port = addr.port;
670         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
671
672         spawn(proc() {
673             let _stream = TcpStream::connect(ip_str.as_slice(), port);
674             // Close
675         });
676
677         let mut stream = acceptor.accept();
678         let buf = [0];
679         loop {
680             match stream.write(buf) {
681                 Ok(..) => {}
682                 Err(e) => {
683                     assert!(e.kind == ConnectionReset ||
684                             e.kind == BrokenPipe ||
685                             e.kind == ConnectionAborted,
686                             "unknown error: {}", e);
687                     break;
688                 }
689             }
690         }
691     })
692
693     iotest!(fn write_close_ip6() {
694         let addr = next_test_ip6();
695         let ip_str = addr.ip.to_string();
696         let port = addr.port;
697         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
698
699         spawn(proc() {
700             let _stream = TcpStream::connect(ip_str.as_slice(), port);
701             // Close
702         });
703
704         let mut stream = acceptor.accept();
705         let buf = [0];
706         loop {
707             match stream.write(buf) {
708                 Ok(..) => {}
709                 Err(e) => {
710                     assert!(e.kind == ConnectionReset ||
711                             e.kind == BrokenPipe ||
712                             e.kind == ConnectionAborted,
713                             "unknown error: {}", e);
714                     break;
715                 }
716             }
717         }
718     })
719
720     iotest!(fn multiple_connect_serial_ip4() {
721         let addr = next_test_ip4();
722         let ip_str = addr.ip.to_string();
723         let port = addr.port;
724         let max = 10u;
725         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
726
727         spawn(proc() {
728             for _ in range(0, max) {
729                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
730                 stream.write([99]).unwrap();
731             }
732         });
733
734         for ref mut stream in acceptor.incoming().take(max) {
735             let mut buf = [0];
736             stream.read(buf).unwrap();
737             assert_eq!(buf[0], 99);
738         }
739     })
740
741     iotest!(fn multiple_connect_serial_ip6() {
742         let addr = next_test_ip6();
743         let ip_str = addr.ip.to_string();
744         let port = addr.port;
745         let max = 10u;
746         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
747
748         spawn(proc() {
749             for _ in range(0, max) {
750                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
751                 stream.write([99]).unwrap();
752             }
753         });
754
755         for ref mut stream in acceptor.incoming().take(max) {
756             let mut buf = [0];
757             stream.read(buf).unwrap();
758             assert_eq!(buf[0], 99);
759         }
760     })
761
762     iotest!(fn multiple_connect_interleaved_greedy_schedule_ip4() {
763         let addr = next_test_ip4();
764         let ip_str = addr.ip.to_string();
765         let port = addr.port;
766         static MAX: int = 10;
767         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
768
769         spawn(proc() {
770             let mut acceptor = acceptor;
771             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
772                 // Start another task to handle the connection
773                 spawn(proc() {
774                     let mut stream = stream;
775                     let mut buf = [0];
776                     stream.read(buf).unwrap();
777                     assert!(buf[0] == i as u8);
778                     debug!("read");
779                 });
780             }
781         });
782
783         connect(0, addr);
784
785         fn connect(i: int, addr: SocketAddr) {
786             let ip_str = addr.ip.to_string();
787             let port = addr.port;
788             if i == MAX { return }
789
790             spawn(proc() {
791                 debug!("connecting");
792                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
793                 // Connect again before writing
794                 connect(i + 1, addr);
795                 debug!("writing");
796                 stream.write([i as u8]).unwrap();
797             });
798         }
799     })
800
801     iotest!(fn multiple_connect_interleaved_greedy_schedule_ip6() {
802         let addr = next_test_ip6();
803         let ip_str = addr.ip.to_string();
804         let port = addr.port;
805         static MAX: int = 10;
806         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
807
808         spawn(proc() {
809             let mut acceptor = acceptor;
810             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
811                 // Start another task to handle the connection
812                 spawn(proc() {
813                     let mut stream = stream;
814                     let mut buf = [0];
815                     stream.read(buf).unwrap();
816                     assert!(buf[0] == i as u8);
817                     debug!("read");
818                 });
819             }
820         });
821
822         connect(0, addr);
823
824         fn connect(i: int, addr: SocketAddr) {
825             let ip_str = addr.ip.to_string();
826             let port = addr.port;
827             if i == MAX { return }
828
829             spawn(proc() {
830                 debug!("connecting");
831                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
832                 // Connect again before writing
833                 connect(i + 1, addr);
834                 debug!("writing");
835                 stream.write([i as u8]).unwrap();
836             });
837         }
838     })
839
840     iotest!(fn multiple_connect_interleaved_lazy_schedule_ip4() {
841         static MAX: int = 10;
842         let addr = next_test_ip4();
843         let ip_str = addr.ip.to_string();
844         let port = addr.port;
845         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
846
847         spawn(proc() {
848             let mut acceptor = acceptor;
849             for stream in acceptor.incoming().take(MAX as uint) {
850                 // Start another task to handle the connection
851                 spawn(proc() {
852                     let mut stream = stream;
853                     let mut buf = [0];
854                     stream.read(buf).unwrap();
855                     assert!(buf[0] == 99);
856                     debug!("read");
857                 });
858             }
859         });
860
861         connect(0, addr);
862
863         fn connect(i: int, addr: SocketAddr) {
864             let ip_str = addr.ip.to_string();
865             let port = addr.port;
866             if i == MAX { return }
867
868             spawn(proc() {
869                 debug!("connecting");
870                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
871                 // Connect again before writing
872                 connect(i + 1, addr);
873                 debug!("writing");
874                 stream.write([99]).unwrap();
875             });
876         }
877     })
878
879     iotest!(fn multiple_connect_interleaved_lazy_schedule_ip6() {
880         static MAX: int = 10;
881         let addr = next_test_ip6();
882         let ip_str = addr.ip.to_string();
883         let port = addr.port;
884         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
885
886         spawn(proc() {
887             let mut acceptor = acceptor;
888             for stream in acceptor.incoming().take(MAX as uint) {
889                 // Start another task to handle the connection
890                 spawn(proc() {
891                     let mut stream = stream;
892                     let mut buf = [0];
893                     stream.read(buf).unwrap();
894                     assert!(buf[0] == 99);
895                     debug!("read");
896                 });
897             }
898         });
899
900         connect(0, addr);
901
902         fn connect(i: int, addr: SocketAddr) {
903             let ip_str = addr.ip.to_string();
904             let port = addr.port;
905             if i == MAX { return }
906
907             spawn(proc() {
908                 debug!("connecting");
909                 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
910                 // Connect again before writing
911                 connect(i + 1, addr);
912                 debug!("writing");
913                 stream.write([99]).unwrap();
914             });
915         }
916     })
917
918     pub fn socket_name(addr: SocketAddr) {
919         let ip_str = addr.ip.to_string();
920         let port = addr.port;
921         let mut listener = TcpListener::bind(ip_str.as_slice(), port).unwrap();
922
923         // Make sure socket_name gives
924         // us the socket we binded to.
925         let so_name = listener.socket_name();
926         assert!(so_name.is_ok());
927         assert_eq!(addr, so_name.unwrap());
928     }
929
930     pub fn peer_name(addr: SocketAddr) {
931         let ip_str = addr.ip.to_string();
932         let port = addr.port;
933         let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
934         spawn(proc() {
935             let mut acceptor = acceptor;
936             acceptor.accept().unwrap();
937         });
938
939         let stream = TcpStream::connect(ip_str.as_slice(), port);
940
941         assert!(stream.is_ok());
942         let mut stream = stream.unwrap();
943
944         // Make sure peer_name gives us the
945         // address/port of the peer we've
946         // connected to.
947         let peer_name = stream.peer_name();
948         assert!(peer_name.is_ok());
949         assert_eq!(addr, peer_name.unwrap());
950     }
951
952     iotest!(fn socket_and_peer_name_ip4() {
953         peer_name(next_test_ip4());
954         socket_name(next_test_ip4());
955     })
956
957     iotest!(fn socket_and_peer_name_ip6() {
958         // FIXME: peer name is not consistent
959         //peer_name(next_test_ip6());
960         socket_name(next_test_ip6());
961     })
962
963     iotest!(fn partial_read() {
964         let addr = next_test_ip4();
965         let port = addr.port;
966         let (tx, rx) = channel();
967         spawn(proc() {
968             let ip_str = addr.ip.to_string();
969             let mut srv = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
970             tx.send(());
971             let mut cl = srv.accept().unwrap();
972             cl.write([10]).unwrap();
973             let mut b = [0];
974             cl.read(b).unwrap();
975             tx.send(());
976         });
977
978         rx.recv();
979         let ip_str = addr.ip.to_string();
980         let mut c = TcpStream::connect(ip_str.as_slice(), port).unwrap();
981         let mut b = [0, ..10];
982         assert_eq!(c.read(b), Ok(1));
983         c.write([1]).unwrap();
984         rx.recv();
985     })
986
987     iotest!(fn double_bind() {
988         let addr = next_test_ip4();
989         let ip_str = addr.ip.to_string();
990         let port = addr.port;
991         let listener = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen();
992         assert!(listener.is_ok());
993         match TcpListener::bind(ip_str.as_slice(), port).listen() {
994             Ok(..) => fail!(),
995             Err(e) => {
996                 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
997                         "unknown error: {} {}", e, e.kind);
998             }
999         }
1000     })
1001
1002     iotest!(fn fast_rebind() {
1003         let addr = next_test_ip4();
1004         let port = addr.port;
1005         let (tx, rx) = channel();
1006
1007         spawn(proc() {
1008             let ip_str = addr.ip.to_string();
1009             rx.recv();
1010             let _stream = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1011             // Close
1012             rx.recv();
1013         });
1014
1015         {
1016             let ip_str = addr.ip.to_string();
1017             let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1018             tx.send(());
1019             {
1020                 let _stream = acceptor.accept().unwrap();
1021                 // Close client
1022                 tx.send(());
1023             }
1024             // Close listener
1025         }
1026         let _listener = TcpListener::bind(addr.ip.to_string().as_slice(), port);
1027     })
1028
1029     iotest!(fn tcp_clone_smoke() {
1030         let addr = next_test_ip4();
1031         let ip_str = addr.ip.to_string();
1032         let port = addr.port;
1033         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1034
1035         spawn(proc() {
1036             let mut s = TcpStream::connect(ip_str.as_slice(), port);
1037             let mut buf = [0, 0];
1038             assert_eq!(s.read(buf), Ok(1));
1039             assert_eq!(buf[0], 1);
1040             s.write([2]).unwrap();
1041         });
1042
1043         let mut s1 = acceptor.accept().unwrap();
1044         let s2 = s1.clone();
1045
1046         let (tx1, rx1) = channel();
1047         let (tx2, rx2) = channel();
1048         spawn(proc() {
1049             let mut s2 = s2;
1050             rx1.recv();
1051             s2.write([1]).unwrap();
1052             tx2.send(());
1053         });
1054         tx1.send(());
1055         let mut buf = [0, 0];
1056         assert_eq!(s1.read(buf), Ok(1));
1057         rx2.recv();
1058     })
1059
1060     iotest!(fn tcp_clone_two_read() {
1061         let addr = next_test_ip6();
1062         let ip_str = addr.ip.to_string();
1063         let port = addr.port;
1064         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1065         let (tx1, rx) = channel();
1066         let tx2 = tx1.clone();
1067
1068         spawn(proc() {
1069             let mut s = TcpStream::connect(ip_str.as_slice(), port);
1070             s.write([1]).unwrap();
1071             rx.recv();
1072             s.write([2]).unwrap();
1073             rx.recv();
1074         });
1075
1076         let mut s1 = acceptor.accept().unwrap();
1077         let s2 = s1.clone();
1078
1079         let (done, rx) = channel();
1080         spawn(proc() {
1081             let mut s2 = s2;
1082             let mut buf = [0, 0];
1083             s2.read(buf).unwrap();
1084             tx2.send(());
1085             done.send(());
1086         });
1087         let mut buf = [0, 0];
1088         s1.read(buf).unwrap();
1089         tx1.send(());
1090
1091         rx.recv();
1092     })
1093
1094     iotest!(fn tcp_clone_two_write() {
1095         let addr = next_test_ip4();
1096         let ip_str = addr.ip.to_string();
1097         let port = addr.port;
1098         let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1099
1100         spawn(proc() {
1101             let mut s = TcpStream::connect(ip_str.as_slice(), port);
1102             let mut buf = [0, 1];
1103             s.read(buf).unwrap();
1104             s.read(buf).unwrap();
1105         });
1106
1107         let mut s1 = acceptor.accept().unwrap();
1108         let s2 = s1.clone();
1109
1110         let (done, rx) = channel();
1111         spawn(proc() {
1112             let mut s2 = s2;
1113             s2.write([1]).unwrap();
1114             done.send(());
1115         });
1116         s1.write([2]).unwrap();
1117
1118         rx.recv();
1119     })
1120
1121     iotest!(fn shutdown_smoke() {
1122         use rt::rtio::RtioTcpStream;
1123
1124         let addr = next_test_ip4();
1125         let ip_str = addr.ip.to_string();
1126         let port = addr.port;
1127         let a = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen();
1128         spawn(proc() {
1129             let mut a = a;
1130             let mut c = a.accept().unwrap();
1131             assert_eq!(c.read_to_end(), Ok(vec!()));
1132             c.write([1]).unwrap();
1133         });
1134
1135         let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1136         assert!(s.obj.close_write().is_ok());
1137         assert!(s.write([1]).is_err());
1138         assert_eq!(s.read_to_end(), Ok(vec!(1)));
1139     })
1140
1141     iotest!(fn accept_timeout() {
1142         let addr = next_test_ip4();
1143         let ip_str = addr.ip.to_string();
1144         let port = addr.port;
1145         let mut a = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen().unwrap();
1146
1147         a.set_timeout(Some(10));
1148
1149         // Make sure we time out once and future invocations also time out
1150         let err = a.accept().err().unwrap();
1151         assert_eq!(err.kind, TimedOut);
1152         let err = a.accept().err().unwrap();
1153         assert_eq!(err.kind, TimedOut);
1154
1155         // Also make sure that even though the timeout is expired that we will
1156         // continue to receive any pending connections.
1157         //
1158         // FIXME: freebsd apparently never sees the pending connection, but
1159         //        testing manually always works. Need to investigate this
1160         //        flakiness.
1161         if !cfg!(target_os = "freebsd") {
1162             let (tx, rx) = channel();
1163             spawn(proc() {
1164                 tx.send(TcpStream::connect(addr.ip.to_string().as_slice(),
1165                                            port).unwrap());
1166             });
1167             let _l = rx.recv();
1168             for i in range(0i, 1001) {
1169                 match a.accept() {
1170                     Ok(..) => break,
1171                     Err(ref e) if e.kind == TimedOut => {}
1172                     Err(e) => fail!("error: {}", e),
1173                 }
1174                 ::task::deschedule();
1175                 if i == 1000 { fail!("should have a pending connection") }
1176             }
1177         }
1178
1179         // Unset the timeout and make sure that this always blocks.
1180         a.set_timeout(None);
1181         spawn(proc() {
1182             drop(TcpStream::connect(addr.ip.to_string().as_slice(),
1183                                     port).unwrap());
1184         });
1185         a.accept().unwrap();
1186     })
1187
1188     iotest!(fn close_readwrite_smoke() {
1189         let addr = next_test_ip4();
1190         let ip_str = addr.ip.to_string();
1191         let port = addr.port;
1192         let a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1193         let (_tx, rx) = channel::<()>();
1194         spawn(proc() {
1195             let mut a = a;
1196             let _s = a.accept().unwrap();
1197             let _ = rx.recv_opt();
1198         });
1199
1200         let mut b = [0];
1201         let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1202         let mut s2 = s.clone();
1203
1204         // closing should prevent reads/writes
1205         s.close_write().unwrap();
1206         assert!(s.write([0]).is_err());
1207         s.close_read().unwrap();
1208         assert!(s.read(b).is_err());
1209
1210         // closing should affect previous handles
1211         assert!(s2.write([0]).is_err());
1212         assert!(s2.read(b).is_err());
1213
1214         // closing should affect new handles
1215         let mut s3 = s.clone();
1216         assert!(s3.write([0]).is_err());
1217         assert!(s3.read(b).is_err());
1218
1219         // make sure these don't die
1220         let _ = s2.close_read();
1221         let _ = s2.close_write();
1222         let _ = s3.close_read();
1223         let _ = s3.close_write();
1224     })
1225
1226     iotest!(fn close_read_wakes_up() {
1227         let addr = next_test_ip4();
1228         let ip_str = addr.ip.to_string();
1229         let port = addr.port;
1230         let a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1231         let (_tx, rx) = channel::<()>();
1232         spawn(proc() {
1233             let mut a = a;
1234             let _s = a.accept().unwrap();
1235             let _ = rx.recv_opt();
1236         });
1237
1238         let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1239         let s2 = s.clone();
1240         let (tx, rx) = channel();
1241         spawn(proc() {
1242             let mut s2 = s2;
1243             assert!(s2.read([0]).is_err());
1244             tx.send(());
1245         });
1246         // this should wake up the child task
1247         s.close_read().unwrap();
1248
1249         // this test will never finish if the child doesn't wake up
1250         rx.recv();
1251     })
1252
1253     iotest!(fn readwrite_timeouts() {
1254         let addr = next_test_ip6();
1255         let ip_str = addr.ip.to_string();
1256         let port = addr.port;
1257         let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1258         let (tx, rx) = channel::<()>();
1259         spawn(proc() {
1260             let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1261             rx.recv();
1262             assert!(s.write([0]).is_ok());
1263             let _ = rx.recv_opt();
1264         });
1265
1266         let mut s = a.accept().unwrap();
1267         s.set_timeout(Some(20));
1268         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1269         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1270
1271         s.set_timeout(Some(20));
1272         for i in range(0i, 1001) {
1273             match s.write([0, .. 128 * 1024]) {
1274                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1275                 Err(IoError { kind: TimedOut, .. }) => break,
1276                 Err(e) => fail!("{}", e),
1277            }
1278            if i == 1000 { fail!("should have filled up?!"); }
1279         }
1280         assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1281
1282         tx.send(());
1283         s.set_timeout(None);
1284         assert_eq!(s.read([0, 0]), Ok(1));
1285     })
1286
1287     iotest!(fn read_timeouts() {
1288         let addr = next_test_ip6();
1289         let ip_str = addr.ip.to_string();
1290         let port = addr.port;
1291         let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1292         let (tx, rx) = channel::<()>();
1293         spawn(proc() {
1294             let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1295             rx.recv();
1296             let mut amt = 0;
1297             while amt < 100 * 128 * 1024 {
1298                 match s.read([0, ..128 * 1024]) {
1299                     Ok(n) => { amt += n; }
1300                     Err(e) => fail!("{}", e),
1301                 }
1302             }
1303             let _ = rx.recv_opt();
1304         });
1305
1306         let mut s = a.accept().unwrap();
1307         s.set_read_timeout(Some(20));
1308         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1309         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1310
1311         tx.send(());
1312         for _ in range(0i, 100) {
1313             assert!(s.write([0, ..128 * 1024]).is_ok());
1314         }
1315     })
1316
1317     iotest!(fn write_timeouts() {
1318         let addr = next_test_ip6();
1319         let ip_str = addr.ip.to_string();
1320         let port = addr.port;
1321         let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1322         let (tx, rx) = channel::<()>();
1323         spawn(proc() {
1324             let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1325             rx.recv();
1326             assert!(s.write([0]).is_ok());
1327             let _ = rx.recv_opt();
1328         });
1329
1330         let mut s = a.accept().unwrap();
1331         s.set_write_timeout(Some(20));
1332         for i in range(0i, 1001) {
1333             match s.write([0, .. 128 * 1024]) {
1334                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1335                 Err(IoError { kind: TimedOut, .. }) => break,
1336                 Err(e) => fail!("{}", e),
1337            }
1338            if i == 1000 { fail!("should have filled up?!"); }
1339         }
1340         assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1341
1342         tx.send(());
1343         assert!(s.read([0]).is_ok());
1344     })
1345
1346     iotest!(fn timeout_concurrent_read() {
1347         let addr = next_test_ip6();
1348         let ip_str = addr.ip.to_string();
1349         let port = addr.port;
1350         let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1351         let (tx, rx) = channel::<()>();
1352         spawn(proc() {
1353             let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1354             rx.recv();
1355             assert_eq!(s.write([0]), Ok(()));
1356             let _ = rx.recv_opt();
1357         });
1358
1359         let mut s = a.accept().unwrap();
1360         let s2 = s.clone();
1361         let (tx2, rx2) = channel();
1362         spawn(proc() {
1363             let mut s2 = s2;
1364             assert_eq!(s2.read([0]), Ok(1));
1365             tx2.send(());
1366         });
1367
1368         s.set_read_timeout(Some(20));
1369         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1370         tx.send(());
1371
1372         rx2.recv();
1373     })
1374
1375     iotest!(fn clone_while_reading() {
1376         let addr = next_test_ip6();
1377         let listen = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
1378         let mut accept = listen.listen().unwrap();
1379
1380         // Enqueue a task to write to a socket
1381         let (tx, rx) = channel();
1382         let (txdone, rxdone) = channel();
1383         let txdone2 = txdone.clone();
1384         spawn(proc() {
1385             let mut tcp = TcpStream::connect(addr.ip.to_string().as_slice(),
1386                                              addr.port).unwrap();
1387             rx.recv();
1388             tcp.write_u8(0).unwrap();
1389             txdone2.send(());
1390         });
1391
1392         // Spawn off a reading clone
1393         let tcp = accept.accept().unwrap();
1394         let tcp2 = tcp.clone();
1395         let txdone3 = txdone.clone();
1396         spawn(proc() {
1397             let mut tcp2 = tcp2;
1398             tcp2.read_u8().unwrap();
1399             txdone3.send(());
1400         });
1401
1402         // Try to ensure that the reading clone is indeed reading
1403         for _ in range(0i, 50) {
1404             ::task::deschedule();
1405         }
1406
1407         // clone the handle again while it's reading, then let it finish the
1408         // read.
1409         let _ = tcp.clone();
1410         tx.send(());
1411         rxdone.recv();
1412         rxdone.recv();
1413     })
1414 }