]> git.lizzy.rs Git - rust.git/blob - src/libstd/io/net/tcp.rs
Fixed tidy errors
[rust.git] / src / libstd / io / net / tcp.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! TCP network connections
12 //!
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
16 //!
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
19
20 use clone::Clone;
21 use io::IoResult;
22 use iter::Iterator;
23 use result::{Ok,Err};
24 use io::net::ip::{SocketAddr, ToSocketAddr};
25 use io::IoError;
26 use io::{Reader, Writer, Listener, Acceptor};
27 use io::{standard_error, TimedOut};
28 use kinds::Send;
29 use option::{None, Some, Option};
30 use boxed::Box;
31 use rt::rtio::{IoFactory, RtioSocket, RtioTcpListener};
32 use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
33 use rt::rtio;
34 use time::Duration;
35
36 /// A structure which represents a TCP stream between a local socket and a
37 /// remote socket.
38 ///
39 /// # Example
40 ///
41 /// ```no_run
42 /// # #![allow(unused_must_use)]
43 /// use std::io::TcpStream;
44 ///
45 /// let mut stream = TcpStream::connect("127.0.0.1:34254");
46 ///
47 /// stream.write([1]);
48 /// let mut buf = [0];
49 /// stream.read(buf);
50 /// drop(stream); // close the connection
51 /// ```
52 pub struct TcpStream {
53     obj: Box<RtioTcpStream + Send>,
54 }
55
56 impl TcpStream {
57     fn new(s: Box<RtioTcpStream + Send>) -> TcpStream {
58         TcpStream { obj: s }
59     }
60
61     /// Open a TCP connection to a remote host.
62     ///
63     /// `addr` is an address of the remote host. Anything which implements `ToSocketAddr`
64     /// trait can be supplied for the address; see this trait documentation for
65     /// concrete examples.
66     pub fn connect<A: ToSocketAddr>(addr: A) -> IoResult<TcpStream> {
67         super::with_addresses_io(addr, |io, addr| io.tcp_connect(addr, None).map(TcpStream::new))
68     }
69
70     /// Creates a TCP connection to a remote socket address, timing out after
71     /// the specified duration.
72     ///
73     /// This is the same as the `connect` method, except that if the timeout
74     /// specified elapses before a connection is made an error will be
75     /// returned. The error's kind will be `TimedOut`.
76     ///
77     /// Same as the `connect` method, `addr` argument type can vary as defined
78     /// by `ToSocketAddr` trait.
79     ///
80     /// If a `timeout` with zero or negative duration is specified then
81     /// the function returns `Err`, with the error kind set to `TimedOut`.
82     #[experimental = "the timeout argument may eventually change types"]
83     pub fn connect_timeout<A: ToSocketAddr>(addr: A,
84                                             timeout: Duration) -> IoResult<TcpStream> {
85         if timeout <= Duration::milliseconds(0) {
86             return Err(standard_error(TimedOut));
87         }
88
89         super::with_addresses_io(addr, |io, addr|
90             io.tcp_connect(addr, Some(timeout.num_milliseconds() as u64)).map(TcpStream::new)
91         )
92     }
93
94     /// Returns the socket address of the remote peer of this TCP connection.
95     pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
96         match self.obj.peer_name() {
97             Ok(rtio::SocketAddr { ip, port }) => {
98                 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
99             }
100             Err(e) => Err(IoError::from_rtio_error(e)),
101         }
102     }
103
104     /// Returns the socket address of the local half of this TCP connection.
105     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
106         match self.obj.socket_name() {
107             Ok(rtio::SocketAddr { ip, port }) => {
108                 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
109             }
110             Err(e) => Err(IoError::from_rtio_error(e)),
111         }
112     }
113
114     /// Sets the nodelay flag on this connection to the boolean specified
115     #[experimental]
116     pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
117         if nodelay {
118             self.obj.nodelay()
119         } else {
120             self.obj.control_congestion()
121         }.map_err(IoError::from_rtio_error)
122     }
123
124     /// Sets the keepalive timeout to the timeout specified.
125     ///
126     /// If the value specified is `None`, then the keepalive flag is cleared on
127     /// this connection. Otherwise, the keepalive timeout will be set to the
128     /// specified time, in seconds.
129     #[experimental]
130     pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
131         match delay_in_seconds {
132             Some(i) => self.obj.keepalive(i),
133             None => self.obj.letdie(),
134         }.map_err(IoError::from_rtio_error)
135     }
136
137     /// Closes the reading half of this connection.
138     ///
139     /// This method will close the reading portion of this connection, causing
140     /// all pending and future reads to immediately return with an error.
141     ///
142     /// # Example
143     ///
144     /// ```no_run
145     /// # #![allow(unused_must_use)]
146     /// use std::io::timer;
147     /// use std::io::TcpStream;
148     /// use std::time::Duration;
149     ///
150     /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
151     /// let stream2 = stream.clone();
152     ///
153     /// spawn(proc() {
154     ///     // close this stream after one second
155     ///     timer::sleep(Duration::seconds(1));
156     ///     let mut stream = stream2;
157     ///     stream.close_read();
158     /// });
159     ///
160     /// // wait for some data, will get canceled after one second
161     /// let mut buf = [0];
162     /// stream.read(buf);
163     /// ```
164     ///
165     /// Note that this method affects all cloned handles associated with this
166     /// stream, not just this one handle.
167     pub fn close_read(&mut self) -> IoResult<()> {
168         self.obj.close_read().map_err(IoError::from_rtio_error)
169     }
170
171     /// Closes the writing half of this connection.
172     ///
173     /// This method will close the writing portion of this connection, causing
174     /// all future writes to immediately return with an error.
175     ///
176     /// Note that this method affects all cloned handles associated with this
177     /// stream, not just this one handle.
178     pub fn close_write(&mut self) -> IoResult<()> {
179         self.obj.close_write().map_err(IoError::from_rtio_error)
180     }
181
182     /// Sets a timeout, in milliseconds, for blocking operations on this stream.
183     ///
184     /// This function will set a timeout for all blocking operations (including
185     /// reads and writes) on this stream. The timeout specified is a relative
186     /// time, in milliseconds, into the future after which point operations will
187     /// time out. This means that the timeout must be reset periodically to keep
188     /// it from expiring. Specifying a value of `None` will clear the timeout
189     /// for this stream.
190     ///
191     /// The timeout on this stream is local to this stream only. Setting a
192     /// timeout does not affect any other cloned instances of this stream, nor
193     /// does the timeout propagated to cloned handles of this stream. Setting
194     /// this timeout will override any specific read or write timeouts
195     /// previously set for this stream.
196     ///
197     /// For clarification on the semantics of interrupting a read and a write,
198     /// take a look at `set_read_timeout` and `set_write_timeout`.
199     #[experimental = "the timeout argument may change in type and value"]
200     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
201         self.obj.set_timeout(timeout_ms)
202     }
203
204     /// Sets the timeout for read operations on this stream.
205     ///
206     /// See documentation in `set_timeout` for the semantics of this read time.
207     /// This will overwrite any previous read timeout set through either this
208     /// function or `set_timeout`.
209     ///
210     /// # Errors
211     ///
212     /// When this timeout expires, if there is no pending read operation, no
213     /// action is taken. Otherwise, the read operation will be scheduled to
214     /// promptly return. If a timeout error is returned, then no data was read
215     /// during the timeout period.
216     #[experimental = "the timeout argument may change in type and value"]
217     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
218         self.obj.set_read_timeout(timeout_ms)
219     }
220
221     /// Sets the timeout for write operations on this stream.
222     ///
223     /// See documentation in `set_timeout` for the semantics of this write time.
224     /// This will overwrite any previous write timeout set through either this
225     /// function or `set_timeout`.
226     ///
227     /// # Errors
228     ///
229     /// When this timeout expires, if there is no pending write operation, no
230     /// action is taken. Otherwise, the pending write operation will be
231     /// scheduled to promptly return. The actual state of the underlying stream
232     /// is not specified.
233     ///
234     /// The write operation may return an error of type `ShortWrite` which
235     /// indicates that the object is known to have written an exact number of
236     /// bytes successfully during the timeout period, and the remaining bytes
237     /// were never written.
238     ///
239     /// If the write operation returns `TimedOut`, then it the timeout primitive
240     /// does not know how many bytes were written as part of the timeout
241     /// operation. It may be the case that bytes continue to be written in an
242     /// asynchronous fashion after the call to write returns.
243     #[experimental = "the timeout argument may change in type and value"]
244     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
245         self.obj.set_write_timeout(timeout_ms)
246     }
247 }
248
249 impl Clone for TcpStream {
250     /// Creates a new handle to this TCP stream, allowing for simultaneous reads
251     /// and writes of this connection.
252     ///
253     /// The underlying TCP stream will not be closed until all handles to the
254     /// stream have been deallocated. All handles will also follow the same
255     /// stream, but two concurrent reads will not receive the same data.
256     /// Instead, the first read will receive the first packet received, and the
257     /// second read will receive the second packet.
258     fn clone(&self) -> TcpStream {
259         TcpStream { obj: self.obj.clone() }
260     }
261 }
262
263 impl Reader for TcpStream {
264     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
265         self.obj.read(buf).map_err(IoError::from_rtio_error)
266     }
267 }
268
269 impl Writer for TcpStream {
270     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
271         self.obj.write(buf).map_err(IoError::from_rtio_error)
272     }
273 }
274
275 /// A structure representing a socket server. This listener is used to create a
276 /// `TcpAcceptor` which can be used to accept sockets on a local port.
277 ///
278 /// # Example
279 ///
280 /// ```rust
281 /// # fn main() { }
282 /// # fn foo() {
283 /// # #![allow(dead_code)]
284 /// use std::io::{TcpListener, TcpStream};
285 /// use std::io::{Acceptor, Listener};
286 ///
287 /// let listener = TcpListener::bind("127.0.0.1:80");
288 ///
289 /// // bind the listener to the specified address
290 /// let mut acceptor = listener.listen();
291 ///
292 /// fn handle_client(mut stream: TcpStream) {
293 ///     // ...
294 /// # &mut stream; // silence unused mutability/variable warning
295 /// }
296 /// // accept connections and process them, spawning a new tasks for each one
297 /// for stream in acceptor.incoming() {
298 ///     match stream {
299 ///         Err(e) => { /* connection failed */ }
300 ///         Ok(stream) => spawn(proc() {
301 ///             // connection succeeded
302 ///             handle_client(stream)
303 ///         })
304 ///     }
305 /// }
306 ///
307 /// // close the socket server
308 /// drop(acceptor);
309 /// # }
310 /// ```
311 pub struct TcpListener {
312     obj: Box<RtioTcpListener + Send>,
313 }
314
315 impl TcpListener {
316     /// Creates a new `TcpListener` which will be bound to the specified address.
317     /// This listener is not ready for accepting connections, `listen` must be called
318     /// on it before that's possible.
319     ///
320     /// Binding with a port number of 0 will request that the OS assigns a port
321     /// to this listener. The port allocated can be queried via the
322     /// `socket_name` function.
323     pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<TcpListener> {
324         super::with_addresses_io(addr, |io, addr| io.tcp_bind(addr).map(|l| TcpListener { obj: l }))
325     }
326
327     /// Returns the local socket address of this listener.
328     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
329         match self.obj.socket_name() {
330             Ok(rtio::SocketAddr { ip, port }) => {
331                 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
332             }
333             Err(e) => Err(IoError::from_rtio_error(e)),
334         }
335     }
336 }
337
338 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
339     fn listen(self) -> IoResult<TcpAcceptor> {
340         match self.obj.listen() {
341             Ok(acceptor) => Ok(TcpAcceptor { obj: acceptor }),
342             Err(e) => Err(IoError::from_rtio_error(e)),
343         }
344     }
345 }
346
347 /// The accepting half of a TCP socket server. This structure is created through
348 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
349 /// `TcpStream` instances.
350 pub struct TcpAcceptor {
351     obj: Box<RtioTcpAcceptor + Send>,
352 }
353
354 impl TcpAcceptor {
355     /// Prevents blocking on all future accepts after `ms` milliseconds have
356     /// elapsed.
357     ///
358     /// This function is used to set a deadline after which this acceptor will
359     /// time out accepting any connections. The argument is the relative
360     /// distance, in milliseconds, to a point in the future after which all
361     /// accepts will fail.
362     ///
363     /// If the argument specified is `None`, then any previously registered
364     /// timeout is cleared.
365     ///
366     /// A timeout of `0` can be used to "poll" this acceptor to see if it has
367     /// any pending connections. All pending connections will be accepted,
368     /// regardless of whether the timeout has expired or not (the accept will
369     /// not block in this case).
370     ///
371     /// # Example
372     ///
373     /// ```no_run
374     /// # #![allow(experimental)]
375     /// use std::io::TcpListener;
376     /// use std::io::{Listener, Acceptor, TimedOut};
377     ///
378     /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
379     ///
380     /// // After 100ms have passed, all accepts will fail
381     /// a.set_timeout(Some(100));
382     ///
383     /// match a.accept() {
384     ///     Ok(..) => println!("accepted a socket"),
385     ///     Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
386     ///     Err(e) => println!("err: {}", e),
387     /// }
388     ///
389     /// // Reset the timeout and try again
390     /// a.set_timeout(Some(100));
391     /// let socket = a.accept();
392     ///
393     /// // Clear the timeout and block indefinitely waiting for a connection
394     /// a.set_timeout(None);
395     /// let socket = a.accept();
396     /// ```
397     #[experimental = "the type of the argument and name of this function are \
398                       subject to change"]
399     pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); }
400
401     /// Closes the accepting capabilities of this acceptor.
402     ///
403     /// This function is similar to `TcpStream`'s `close_{read,write}` methods
404     /// in that it will affect *all* cloned handles of this acceptor's original
405     /// handle.
406     ///
407     /// Once this function succeeds, all future calls to `accept` will return
408     /// immediately with an error, preventing all future calls to accept. The
409     /// underlying socket will not be relinquished back to the OS until all
410     /// acceptors have been deallocated.
411     ///
412     /// This is useful for waking up a thread in an accept loop to indicate that
413     /// it should exit.
414     ///
415     /// # Example
416     ///
417     /// ```
418     /// # #![allow(experimental)]
419     /// use std::io::{TcpListener, Listener, Acceptor, EndOfFile};
420     ///
421     /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
422     /// let a2 = a.clone();
423     ///
424     /// spawn(proc() {
425     ///     let mut a2 = a2;
426     ///     for socket in a2.incoming() {
427     ///         match socket {
428     ///             Ok(s) => { /* handle s */ }
429     ///             Err(ref e) if e.kind == EndOfFile => break, // closed
430     ///             Err(e) => panic!("unexpected error: {}", e),
431     ///         }
432     ///     }
433     /// });
434     ///
435     /// # fn wait_for_sigint() {}
436     /// // Now that our accept loop is running, wait for the program to be
437     /// // requested to exit.
438     /// wait_for_sigint();
439     ///
440     /// // Signal our accept loop to exit
441     /// assert!(a.close_accept().is_ok());
442     /// ```
443     #[experimental]
444     pub fn close_accept(&mut self) -> IoResult<()> {
445         self.obj.close_accept().map_err(IoError::from_rtio_error)
446     }
447 }
448
449 impl Acceptor<TcpStream> for TcpAcceptor {
450     fn accept(&mut self) -> IoResult<TcpStream> {
451         match self.obj.accept(){
452             Ok(s) => Ok(TcpStream::new(s)),
453             Err(e) => Err(IoError::from_rtio_error(e)),
454         }
455     }
456 }
457
458 impl Clone for TcpAcceptor {
459     /// Creates a new handle to this TCP acceptor, allowing for simultaneous
460     /// accepts.
461     ///
462     /// The underlying TCP acceptor will not be closed until all handles to the
463     /// acceptor have been deallocated. Incoming connections will be received on
464     /// at most once acceptor, the same connection will not be accepted twice.
465     ///
466     /// The `close_accept` method will shut down *all* acceptors cloned from the
467     /// same original acceptor, whereas the `set_timeout` method only affects
468     /// the selector that it is called on.
469     ///
470     /// This function is useful for creating a handle to invoke `close_accept`
471     /// on to wake up any other task blocked in `accept`.
472     fn clone(&self) -> TcpAcceptor {
473         TcpAcceptor { obj: self.obj.clone() }
474     }
475 }
476
477 #[cfg(test)]
478 #[allow(experimental)]
479 mod test {
480     use io::net::tcp::*;
481     use io::net::ip::*;
482     use io::*;
483     use io::test::*;
484     use prelude::*;
485
486     // FIXME #11530 this fails on android because tests are run as root
487     #[cfg_attr(any(windows, target_os = "android"), ignore)]
488     #[test]
489     fn bind_error() {
490         match TcpListener::bind("0.0.0.0:1") {
491             Ok(..) => panic!(),
492             Err(e) => assert_eq!(e.kind, PermissionDenied),
493         }
494     }
495
496     #[test]
497     fn connect_error() {
498         match TcpStream::connect("0.0.0.0:1") {
499             Ok(..) => panic!(),
500             Err(e) => assert_eq!(e.kind, ConnectionRefused),
501         }
502     }
503
504     #[test]
505     fn listen_ip4_localhost() {
506         let socket_addr = next_test_ip4();
507         let listener = TcpListener::bind(socket_addr);
508         let mut acceptor = listener.listen();
509
510         spawn(proc() {
511             let mut stream = TcpStream::connect(("localhost", socket_addr.port));
512             stream.write([144]).unwrap();
513         });
514
515         let mut stream = acceptor.accept();
516         let mut buf = [0];
517         stream.read(buf).unwrap();
518         assert!(buf[0] == 144);
519     }
520
521     #[test]
522     fn connect_localhost() {
523         let addr = next_test_ip4();
524         let mut acceptor = TcpListener::bind(addr).listen();
525
526         spawn(proc() {
527             let mut stream = TcpStream::connect(("localhost", addr.port));
528             stream.write([64]).unwrap();
529         });
530
531         let mut stream = acceptor.accept();
532         let mut buf = [0];
533         stream.read(buf).unwrap();
534         assert!(buf[0] == 64);
535     }
536
537     #[test]
538     fn connect_ip4_loopback() {
539         let addr = next_test_ip4();
540         let mut acceptor = TcpListener::bind(addr).listen();
541
542         spawn(proc() {
543             let mut stream = TcpStream::connect(("127.0.0.1", addr.port));
544             stream.write([44]).unwrap();
545         });
546
547         let mut stream = acceptor.accept();
548         let mut buf = [0];
549         stream.read(buf).unwrap();
550         assert!(buf[0] == 44);
551     }
552
553     #[test]
554     fn connect_ip6_loopback() {
555         let addr = next_test_ip6();
556         let mut acceptor = TcpListener::bind(addr).listen();
557
558         spawn(proc() {
559             let mut stream = TcpStream::connect(("::1", addr.port));
560             stream.write([66]).unwrap();
561         });
562
563         let mut stream = acceptor.accept();
564         let mut buf = [0];
565         stream.read(buf).unwrap();
566         assert!(buf[0] == 66);
567     }
568
569     #[test]
570     fn smoke_test_ip4() {
571         let addr = next_test_ip4();
572         let mut acceptor = TcpListener::bind(addr).listen();
573
574         spawn(proc() {
575             let mut stream = TcpStream::connect(addr);
576             stream.write([99]).unwrap();
577         });
578
579         let mut stream = acceptor.accept();
580         let mut buf = [0];
581         stream.read(buf).unwrap();
582         assert!(buf[0] == 99);
583     }
584
585     #[test]
586     fn smoke_test_ip6() {
587         let addr = next_test_ip6();
588         let mut acceptor = TcpListener::bind(addr).listen();
589
590         spawn(proc() {
591             let mut stream = TcpStream::connect(addr);
592             stream.write([99]).unwrap();
593         });
594
595         let mut stream = acceptor.accept();
596         let mut buf = [0];
597         stream.read(buf).unwrap();
598         assert!(buf[0] == 99);
599     }
600
601     #[test]
602     fn read_eof_ip4() {
603         let addr = next_test_ip4();
604         let mut acceptor = TcpListener::bind(addr).listen();
605
606         spawn(proc() {
607             let _stream = TcpStream::connect(addr);
608             // Close
609         });
610
611         let mut stream = acceptor.accept();
612         let mut buf = [0];
613         let nread = stream.read(buf);
614         assert!(nread.is_err());
615     }
616
617     #[test]
618     fn read_eof_ip6() {
619         let addr = next_test_ip6();
620         let mut acceptor = TcpListener::bind(addr).listen();
621
622         spawn(proc() {
623             let _stream = TcpStream::connect(addr);
624             // Close
625         });
626
627         let mut stream = acceptor.accept();
628         let mut buf = [0];
629         let nread = stream.read(buf);
630         assert!(nread.is_err());
631     }
632
633     #[test]
634     fn read_eof_twice_ip4() {
635         let addr = next_test_ip4();
636         let mut acceptor = TcpListener::bind(addr).listen();
637
638         spawn(proc() {
639             let _stream = TcpStream::connect(addr);
640             // Close
641         });
642
643         let mut stream = acceptor.accept();
644         let mut buf = [0];
645         let nread = stream.read(buf);
646         assert!(nread.is_err());
647
648         match stream.read(buf) {
649             Ok(..) => panic!(),
650             Err(ref e) => {
651                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
652                         "unknown kind: {}", e.kind);
653             }
654         }
655     }
656
657     #[test]
658     fn read_eof_twice_ip6() {
659         let addr = next_test_ip6();
660         let mut acceptor = TcpListener::bind(addr).listen();
661
662         spawn(proc() {
663             let _stream = TcpStream::connect(addr);
664             // Close
665         });
666
667         let mut stream = acceptor.accept();
668         let mut buf = [0];
669         let nread = stream.read(buf);
670         assert!(nread.is_err());
671
672         match stream.read(buf) {
673             Ok(..) => panic!(),
674             Err(ref e) => {
675                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
676                         "unknown kind: {}", e.kind);
677             }
678         }
679     }
680
681     #[test]
682     fn write_close_ip4() {
683         let addr = next_test_ip4();
684         let mut acceptor = TcpListener::bind(addr).listen();
685
686         spawn(proc() {
687             let _stream = TcpStream::connect(addr);
688             // Close
689         });
690
691         let mut stream = acceptor.accept();
692         let buf = [0];
693         loop {
694             match stream.write(buf) {
695                 Ok(..) => {}
696                 Err(e) => {
697                     assert!(e.kind == ConnectionReset ||
698                             e.kind == BrokenPipe ||
699                             e.kind == ConnectionAborted,
700                             "unknown error: {}", e);
701                     break;
702                 }
703             }
704         }
705     }
706
707     #[test]
708     fn write_close_ip6() {
709         let addr = next_test_ip6();
710         let mut acceptor = TcpListener::bind(addr).listen();
711
712         spawn(proc() {
713             let _stream = TcpStream::connect(addr);
714             // Close
715         });
716
717         let mut stream = acceptor.accept();
718         let buf = [0];
719         loop {
720             match stream.write(buf) {
721                 Ok(..) => {}
722                 Err(e) => {
723                     assert!(e.kind == ConnectionReset ||
724                             e.kind == BrokenPipe ||
725                             e.kind == ConnectionAborted,
726                             "unknown error: {}", e);
727                     break;
728                 }
729             }
730         }
731     }
732
733     #[test]
734     fn multiple_connect_serial_ip4() {
735         let addr = next_test_ip4();
736         let max = 10u;
737         let mut acceptor = TcpListener::bind(addr).listen();
738
739         spawn(proc() {
740             for _ in range(0, max) {
741                 let mut stream = TcpStream::connect(addr);
742                 stream.write([99]).unwrap();
743             }
744         });
745
746         for ref mut stream in acceptor.incoming().take(max) {
747             let mut buf = [0];
748             stream.read(buf).unwrap();
749             assert_eq!(buf[0], 99);
750         }
751     }
752
753     #[test]
754     fn multiple_connect_serial_ip6() {
755         let addr = next_test_ip6();
756         let max = 10u;
757         let mut acceptor = TcpListener::bind(addr).listen();
758
759         spawn(proc() {
760             for _ in range(0, max) {
761                 let mut stream = TcpStream::connect(addr);
762                 stream.write([99]).unwrap();
763             }
764         });
765
766         for ref mut stream in acceptor.incoming().take(max) {
767             let mut buf = [0];
768             stream.read(buf).unwrap();
769             assert_eq!(buf[0], 99);
770         }
771     }
772
773     #[test]
774     fn multiple_connect_interleaved_greedy_schedule_ip4() {
775         let addr = next_test_ip4();
776         static MAX: int = 10;
777         let acceptor = TcpListener::bind(addr).listen();
778
779         spawn(proc() {
780             let mut acceptor = acceptor;
781             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
782                 // Start another task to handle the connection
783                 spawn(proc() {
784                     let mut stream = stream;
785                     let mut buf = [0];
786                     stream.read(buf).unwrap();
787                     assert!(buf[0] == i as u8);
788                     debug!("read");
789                 });
790             }
791         });
792
793         connect(0, addr);
794
795         fn connect(i: int, addr: SocketAddr) {
796             if i == MAX { return }
797
798             spawn(proc() {
799                 debug!("connecting");
800                 let mut stream = TcpStream::connect(addr);
801                 // Connect again before writing
802                 connect(i + 1, addr);
803                 debug!("writing");
804                 stream.write([i as u8]).unwrap();
805             });
806         }
807     }
808
809     #[test]
810     fn multiple_connect_interleaved_greedy_schedule_ip6() {
811         let addr = next_test_ip6();
812         static MAX: int = 10;
813         let acceptor = TcpListener::bind(addr).listen();
814
815         spawn(proc() {
816             let mut acceptor = acceptor;
817             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
818                 // Start another task to handle the connection
819                 spawn(proc() {
820                     let mut stream = stream;
821                     let mut buf = [0];
822                     stream.read(buf).unwrap();
823                     assert!(buf[0] == i as u8);
824                     debug!("read");
825                 });
826             }
827         });
828
829         connect(0, addr);
830
831         fn connect(i: int, addr: SocketAddr) {
832             if i == MAX { return }
833
834             spawn(proc() {
835                 debug!("connecting");
836                 let mut stream = TcpStream::connect(addr);
837                 // Connect again before writing
838                 connect(i + 1, addr);
839                 debug!("writing");
840                 stream.write([i as u8]).unwrap();
841             });
842         }
843     }
844
845     #[test]
846     fn multiple_connect_interleaved_lazy_schedule_ip4() {
847         static MAX: int = 10;
848         let addr = next_test_ip4();
849         let acceptor = TcpListener::bind(addr).listen();
850
851         spawn(proc() {
852             let mut acceptor = acceptor;
853             for stream in acceptor.incoming().take(MAX as uint) {
854                 // Start another task to handle the connection
855                 spawn(proc() {
856                     let mut stream = stream;
857                     let mut buf = [0];
858                     stream.read(buf).unwrap();
859                     assert!(buf[0] == 99);
860                     debug!("read");
861                 });
862             }
863         });
864
865         connect(0, addr);
866
867         fn connect(i: int, addr: SocketAddr) {
868             if i == MAX { return }
869
870             spawn(proc() {
871                 debug!("connecting");
872                 let mut stream = TcpStream::connect(addr);
873                 // Connect again before writing
874                 connect(i + 1, addr);
875                 debug!("writing");
876                 stream.write([99]).unwrap();
877             });
878         }
879     }
880
881     #[test]
882     fn multiple_connect_interleaved_lazy_schedule_ip6() {
883         static MAX: int = 10;
884         let addr = next_test_ip6();
885         let acceptor = TcpListener::bind(addr).listen();
886
887         spawn(proc() {
888             let mut acceptor = acceptor;
889             for stream in acceptor.incoming().take(MAX as uint) {
890                 // Start another task to handle the connection
891                 spawn(proc() {
892                     let mut stream = stream;
893                     let mut buf = [0];
894                     stream.read(buf).unwrap();
895                     assert!(buf[0] == 99);
896                     debug!("read");
897                 });
898             }
899         });
900
901         connect(0, addr);
902
903         fn connect(i: int, addr: SocketAddr) {
904             if i == MAX { return }
905
906             spawn(proc() {
907                 debug!("connecting");
908                 let mut stream = TcpStream::connect(addr);
909                 // Connect again before writing
910                 connect(i + 1, addr);
911                 debug!("writing");
912                 stream.write([99]).unwrap();
913             });
914         }
915     }
916
917     pub fn socket_name(addr: SocketAddr) {
918         let mut listener = TcpListener::bind(addr).unwrap();
919
920         // Make sure socket_name gives
921         // us the socket we binded to.
922         let so_name = listener.socket_name();
923         assert!(so_name.is_ok());
924         assert_eq!(addr, so_name.unwrap());
925     }
926
927     pub fn peer_name(addr: SocketAddr) {
928         let acceptor = TcpListener::bind(addr).listen();
929         spawn(proc() {
930             let mut acceptor = acceptor;
931             acceptor.accept().unwrap();
932         });
933
934         let stream = TcpStream::connect(addr);
935
936         assert!(stream.is_ok());
937         let mut stream = stream.unwrap();
938
939         // Make sure peer_name gives us the
940         // address/port of the peer we've
941         // connected to.
942         let peer_name = stream.peer_name();
943         assert!(peer_name.is_ok());
944         assert_eq!(addr, peer_name.unwrap());
945     }
946
947     #[test]
948     fn socket_and_peer_name_ip4() {
949         peer_name(next_test_ip4());
950         socket_name(next_test_ip4());
951     }
952
953     #[test]
954     fn socket_and_peer_name_ip6() {
955         // FIXME: peer name is not consistent
956         //peer_name(next_test_ip6());
957         socket_name(next_test_ip6());
958     }
959
960     #[test]
961     fn partial_read() {
962         let addr = next_test_ip4();
963         let (tx, rx) = channel();
964         spawn(proc() {
965             let mut srv = TcpListener::bind(addr).listen().unwrap();
966             tx.send(());
967             let mut cl = srv.accept().unwrap();
968             cl.write([10]).unwrap();
969             let mut b = [0];
970             cl.read(b).unwrap();
971             tx.send(());
972         });
973
974         rx.recv();
975         let mut c = TcpStream::connect(addr).unwrap();
976         let mut b = [0, ..10];
977         assert_eq!(c.read(b), Ok(1));
978         c.write([1]).unwrap();
979         rx.recv();
980     }
981
982     #[test]
983     fn double_bind() {
984         let addr = next_test_ip4();
985         let listener = TcpListener::bind(addr).unwrap().listen();
986         assert!(listener.is_ok());
987         match TcpListener::bind(addr).listen() {
988             Ok(..) => panic!(),
989             Err(e) => {
990                 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
991                         "unknown error: {} {}", e, e.kind);
992             }
993         }
994     }
995
996     #[test]
997     fn fast_rebind() {
998         let addr = next_test_ip4();
999         let (tx, rx) = channel();
1000
1001         spawn(proc() {
1002             rx.recv();
1003             let _stream = TcpStream::connect(addr).unwrap();
1004             // Close
1005             rx.recv();
1006         });
1007
1008         {
1009             let mut acceptor = TcpListener::bind(addr).listen();
1010             tx.send(());
1011             {
1012                 let _stream = acceptor.accept().unwrap();
1013                 // Close client
1014                 tx.send(());
1015             }
1016             // Close listener
1017         }
1018         let _listener = TcpListener::bind(addr);
1019     }
1020
1021     #[test]
1022     fn tcp_clone_smoke() {
1023         let addr = next_test_ip4();
1024         let mut acceptor = TcpListener::bind(addr).listen();
1025
1026         spawn(proc() {
1027             let mut s = TcpStream::connect(addr);
1028             let mut buf = [0, 0];
1029             assert_eq!(s.read(buf), Ok(1));
1030             assert_eq!(buf[0], 1);
1031             s.write([2]).unwrap();
1032         });
1033
1034         let mut s1 = acceptor.accept().unwrap();
1035         let s2 = s1.clone();
1036
1037         let (tx1, rx1) = channel();
1038         let (tx2, rx2) = channel();
1039         spawn(proc() {
1040             let mut s2 = s2;
1041             rx1.recv();
1042             s2.write([1]).unwrap();
1043             tx2.send(());
1044         });
1045         tx1.send(());
1046         let mut buf = [0, 0];
1047         assert_eq!(s1.read(buf), Ok(1));
1048         rx2.recv();
1049     }
1050
1051     #[test]
1052     fn tcp_clone_two_read() {
1053         let addr = next_test_ip6();
1054         let mut acceptor = TcpListener::bind(addr).listen();
1055         let (tx1, rx) = channel();
1056         let tx2 = tx1.clone();
1057
1058         spawn(proc() {
1059             let mut s = TcpStream::connect(addr);
1060             s.write([1]).unwrap();
1061             rx.recv();
1062             s.write([2]).unwrap();
1063             rx.recv();
1064         });
1065
1066         let mut s1 = acceptor.accept().unwrap();
1067         let s2 = s1.clone();
1068
1069         let (done, rx) = channel();
1070         spawn(proc() {
1071             let mut s2 = s2;
1072             let mut buf = [0, 0];
1073             s2.read(buf).unwrap();
1074             tx2.send(());
1075             done.send(());
1076         });
1077         let mut buf = [0, 0];
1078         s1.read(buf).unwrap();
1079         tx1.send(());
1080
1081         rx.recv();
1082     }
1083
1084     #[test]
1085     fn tcp_clone_two_write() {
1086         let addr = next_test_ip4();
1087         let mut acceptor = TcpListener::bind(addr).listen();
1088
1089         spawn(proc() {
1090             let mut s = TcpStream::connect(addr);
1091             let mut buf = [0, 1];
1092             s.read(buf).unwrap();
1093             s.read(buf).unwrap();
1094         });
1095
1096         let mut s1 = acceptor.accept().unwrap();
1097         let s2 = s1.clone();
1098
1099         let (done, rx) = channel();
1100         spawn(proc() {
1101             let mut s2 = s2;
1102             s2.write([1]).unwrap();
1103             done.send(());
1104         });
1105         s1.write([2]).unwrap();
1106
1107         rx.recv();
1108     }
1109
1110     #[test]
1111     fn shutdown_smoke() {
1112         use rt::rtio::RtioTcpStream;
1113
1114         let addr = next_test_ip4();
1115         let a = TcpListener::bind(addr).unwrap().listen();
1116         spawn(proc() {
1117             let mut a = a;
1118             let mut c = a.accept().unwrap();
1119             assert_eq!(c.read_to_end(), Ok(vec!()));
1120             c.write([1]).unwrap();
1121         });
1122
1123         let mut s = TcpStream::connect(addr).unwrap();
1124         assert!(s.obj.close_write().is_ok());
1125         assert!(s.write([1]).is_err());
1126         assert_eq!(s.read_to_end(), Ok(vec!(1)));
1127     }
1128
1129     #[test]
1130     fn accept_timeout() {
1131         let addr = next_test_ip4();
1132         let mut a = TcpListener::bind(addr).unwrap().listen().unwrap();
1133
1134         a.set_timeout(Some(10));
1135
1136         // Make sure we time out once and future invocations also time out
1137         let err = a.accept().err().unwrap();
1138         assert_eq!(err.kind, TimedOut);
1139         let err = a.accept().err().unwrap();
1140         assert_eq!(err.kind, TimedOut);
1141
1142         // Also make sure that even though the timeout is expired that we will
1143         // continue to receive any pending connections.
1144         //
1145         // FIXME: freebsd apparently never sees the pending connection, but
1146         //        testing manually always works. Need to investigate this
1147         //        flakiness.
1148         if !cfg!(target_os = "freebsd") {
1149             let (tx, rx) = channel();
1150             spawn(proc() {
1151                 tx.send(TcpStream::connect(addr).unwrap());
1152             });
1153             let _l = rx.recv();
1154             for i in range(0i, 1001) {
1155                 match a.accept() {
1156                     Ok(..) => break,
1157                     Err(ref e) if e.kind == TimedOut => {}
1158                     Err(e) => panic!("error: {}", e),
1159                 }
1160                 ::task::deschedule();
1161                 if i == 1000 { panic!("should have a pending connection") }
1162             }
1163         }
1164
1165         // Unset the timeout and make sure that this always blocks.
1166         a.set_timeout(None);
1167         spawn(proc() {
1168             drop(TcpStream::connect(addr).unwrap());
1169         });
1170         a.accept().unwrap();
1171     }
1172
1173     #[test]
1174     fn close_readwrite_smoke() {
1175         let addr = next_test_ip4();
1176         let a = TcpListener::bind(addr).listen().unwrap();
1177         let (_tx, rx) = channel::<()>();
1178         spawn(proc() {
1179             let mut a = a;
1180             let _s = a.accept().unwrap();
1181             let _ = rx.recv_opt();
1182         });
1183
1184         let mut b = [0];
1185         let mut s = TcpStream::connect(addr).unwrap();
1186         let mut s2 = s.clone();
1187
1188         // closing should prevent reads/writes
1189         s.close_write().unwrap();
1190         assert!(s.write([0]).is_err());
1191         s.close_read().unwrap();
1192         assert!(s.read(b).is_err());
1193
1194         // closing should affect previous handles
1195         assert!(s2.write([0]).is_err());
1196         assert!(s2.read(b).is_err());
1197
1198         // closing should affect new handles
1199         let mut s3 = s.clone();
1200         assert!(s3.write([0]).is_err());
1201         assert!(s3.read(b).is_err());
1202
1203         // make sure these don't die
1204         let _ = s2.close_read();
1205         let _ = s2.close_write();
1206         let _ = s3.close_read();
1207         let _ = s3.close_write();
1208     }
1209
1210     #[test]
1211     fn close_read_wakes_up() {
1212         let addr = next_test_ip4();
1213         let a = TcpListener::bind(addr).listen().unwrap();
1214         let (_tx, rx) = channel::<()>();
1215         spawn(proc() {
1216             let mut a = a;
1217             let _s = a.accept().unwrap();
1218             let _ = rx.recv_opt();
1219         });
1220
1221         let mut s = TcpStream::connect(addr).unwrap();
1222         let s2 = s.clone();
1223         let (tx, rx) = channel();
1224         spawn(proc() {
1225             let mut s2 = s2;
1226             assert!(s2.read([0]).is_err());
1227             tx.send(());
1228         });
1229         // this should wake up the child task
1230         s.close_read().unwrap();
1231
1232         // this test will never finish if the child doesn't wake up
1233         rx.recv();
1234     }
1235
1236     #[test]
1237     fn readwrite_timeouts() {
1238         let addr = next_test_ip6();
1239         let mut a = TcpListener::bind(addr).listen().unwrap();
1240         let (tx, rx) = channel::<()>();
1241         spawn(proc() {
1242             let mut s = TcpStream::connect(addr).unwrap();
1243             rx.recv();
1244             assert!(s.write([0]).is_ok());
1245             let _ = rx.recv_opt();
1246         });
1247
1248         let mut s = a.accept().unwrap();
1249         s.set_timeout(Some(20));
1250         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1251         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1252
1253         s.set_timeout(Some(20));
1254         for i in range(0i, 1001) {
1255             match s.write([0, .. 128 * 1024]) {
1256                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1257                 Err(IoError { kind: TimedOut, .. }) => break,
1258                 Err(e) => panic!("{}", e),
1259            }
1260            if i == 1000 { panic!("should have filled up?!"); }
1261         }
1262         assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1263
1264         tx.send(());
1265         s.set_timeout(None);
1266         assert_eq!(s.read([0, 0]), Ok(1));
1267     }
1268
1269     #[test]
1270     fn read_timeouts() {
1271         let addr = next_test_ip6();
1272         let mut a = TcpListener::bind(addr).listen().unwrap();
1273         let (tx, rx) = channel::<()>();
1274         spawn(proc() {
1275             let mut s = TcpStream::connect(addr).unwrap();
1276             rx.recv();
1277             let mut amt = 0;
1278             while amt < 100 * 128 * 1024 {
1279                 match s.read([0, ..128 * 1024]) {
1280                     Ok(n) => { amt += n; }
1281                     Err(e) => panic!("{}", e),
1282                 }
1283             }
1284             let _ = rx.recv_opt();
1285         });
1286
1287         let mut s = a.accept().unwrap();
1288         s.set_read_timeout(Some(20));
1289         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1290         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1291
1292         tx.send(());
1293         for _ in range(0i, 100) {
1294             assert!(s.write([0, ..128 * 1024]).is_ok());
1295         }
1296     }
1297
1298     #[test]
1299     fn write_timeouts() {
1300         let addr = next_test_ip6();
1301         let mut a = TcpListener::bind(addr).listen().unwrap();
1302         let (tx, rx) = channel::<()>();
1303         spawn(proc() {
1304             let mut s = TcpStream::connect(addr).unwrap();
1305             rx.recv();
1306             assert!(s.write([0]).is_ok());
1307             let _ = rx.recv_opt();
1308         });
1309
1310         let mut s = a.accept().unwrap();
1311         s.set_write_timeout(Some(20));
1312         for i in range(0i, 1001) {
1313             match s.write([0, .. 128 * 1024]) {
1314                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1315                 Err(IoError { kind: TimedOut, .. }) => break,
1316                 Err(e) => panic!("{}", e),
1317            }
1318            if i == 1000 { panic!("should have filled up?!"); }
1319         }
1320         assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1321
1322         tx.send(());
1323         assert!(s.read([0]).is_ok());
1324     }
1325
1326     #[test]
1327     fn timeout_concurrent_read() {
1328         let addr = next_test_ip6();
1329         let mut a = TcpListener::bind(addr).listen().unwrap();
1330         let (tx, rx) = channel::<()>();
1331         spawn(proc() {
1332             let mut s = TcpStream::connect(addr).unwrap();
1333             rx.recv();
1334             assert_eq!(s.write([0]), Ok(()));
1335             let _ = rx.recv_opt();
1336         });
1337
1338         let mut s = a.accept().unwrap();
1339         let s2 = s.clone();
1340         let (tx2, rx2) = channel();
1341         spawn(proc() {
1342             let mut s2 = s2;
1343             assert_eq!(s2.read([0]), Ok(1));
1344             tx2.send(());
1345         });
1346
1347         s.set_read_timeout(Some(20));
1348         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1349         tx.send(());
1350
1351         rx2.recv();
1352     }
1353
1354     #[test]
1355     fn clone_while_reading() {
1356         let addr = next_test_ip6();
1357         let listen = TcpListener::bind(addr);
1358         let mut accept = listen.listen().unwrap();
1359
1360         // Enqueue a task to write to a socket
1361         let (tx, rx) = channel();
1362         let (txdone, rxdone) = channel();
1363         let txdone2 = txdone.clone();
1364         spawn(proc() {
1365             let mut tcp = TcpStream::connect(addr).unwrap();
1366             rx.recv();
1367             tcp.write_u8(0).unwrap();
1368             txdone2.send(());
1369         });
1370
1371         // Spawn off a reading clone
1372         let tcp = accept.accept().unwrap();
1373         let tcp2 = tcp.clone();
1374         let txdone3 = txdone.clone();
1375         spawn(proc() {
1376             let mut tcp2 = tcp2;
1377             tcp2.read_u8().unwrap();
1378             txdone3.send(());
1379         });
1380
1381         // Try to ensure that the reading clone is indeed reading
1382         for _ in range(0i, 50) {
1383             ::task::deschedule();
1384         }
1385
1386         // clone the handle again while it's reading, then let it finish the
1387         // read.
1388         let _ = tcp.clone();
1389         tx.send(());
1390         rxdone.recv();
1391         rxdone.recv();
1392     }
1393
1394     #[test]
1395     fn clone_accept_smoke() {
1396         let addr = next_test_ip4();
1397         let l = TcpListener::bind(addr);
1398         let mut a = l.listen().unwrap();
1399         let mut a2 = a.clone();
1400
1401         spawn(proc() {
1402             let _ = TcpStream::connect(addr);
1403         });
1404         spawn(proc() {
1405             let _ = TcpStream::connect(addr);
1406         });
1407
1408         assert!(a.accept().is_ok());
1409         assert!(a2.accept().is_ok());
1410     }
1411
1412     #[test]
1413     fn clone_accept_concurrent() {
1414         let addr = next_test_ip4();
1415         let l = TcpListener::bind(addr);
1416         let a = l.listen().unwrap();
1417         let a2 = a.clone();
1418
1419         let (tx, rx) = channel();
1420         let tx2 = tx.clone();
1421
1422         spawn(proc() { let mut a = a; tx.send(a.accept()) });
1423         spawn(proc() { let mut a = a2; tx2.send(a.accept()) });
1424
1425         spawn(proc() {
1426             let _ = TcpStream::connect(addr);
1427         });
1428         spawn(proc() {
1429             let _ = TcpStream::connect(addr);
1430         });
1431
1432         assert!(rx.recv().is_ok());
1433         assert!(rx.recv().is_ok());
1434     }
1435
1436     #[test]
1437     fn close_accept_smoke() {
1438         let addr = next_test_ip4();
1439         let l = TcpListener::bind(addr);
1440         let mut a = l.listen().unwrap();
1441
1442         a.close_accept().unwrap();
1443         assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
1444     }
1445
1446     #[test]
1447     fn close_accept_concurrent() {
1448         let addr = next_test_ip4();
1449         let l = TcpListener::bind(addr);
1450         let a = l.listen().unwrap();
1451         let mut a2 = a.clone();
1452
1453         let (tx, rx) = channel();
1454         spawn(proc() {
1455             let mut a = a;
1456             tx.send(a.accept());
1457         });
1458         a2.close_accept().unwrap();
1459
1460         assert_eq!(rx.recv().err().unwrap().kind, EndOfFile);
1461     }
1462 }