]> git.lizzy.rs Git - rust.git/blob - src/libstd/net/tcp.rs
Add UDP functionality from net2
[rust.git] / src / libstd / net / tcp.rs
1 // Copyright 2015 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use prelude::v1::*;
12 use io::prelude::*;
13
14 use fmt;
15 use io;
16 use net::{ToSocketAddrs, SocketAddr, Shutdown};
17 use sys_common::io::read_to_end_uninitialized;
18 use sys_common::net as net_imp;
19 use sys_common::{AsInner, FromInner, IntoInner};
20 use time::Duration;
21
22 /// A structure which represents a TCP stream between a local socket and a
23 /// remote socket.
24 ///
25 /// The socket will be closed when the value is dropped.
26 ///
27 /// # Examples
28 ///
29 /// ```no_run
30 /// use std::io::prelude::*;
31 /// use std::net::TcpStream;
32 ///
33 /// {
34 ///     let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
35 ///
36 ///     // ignore the Result
37 ///     let _ = stream.write(&[1]);
38 ///     let _ = stream.read(&mut [0; 128]); // ignore here too
39 /// } // the stream is closed here
40 /// ```
41 #[stable(feature = "rust1", since = "1.0.0")]
42 pub struct TcpStream(net_imp::TcpStream);
43
44 /// A structure representing a socket server.
45 ///
46 /// # Examples
47 ///
48 /// ```no_run
49 /// use std::net::{TcpListener, TcpStream};
50 /// use std::thread;
51 ///
52 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
53 ///
54 /// fn handle_client(stream: TcpStream) {
55 ///     // ...
56 /// }
57 ///
58 /// // accept connections and process them, spawning a new thread for each one
59 /// for stream in listener.incoming() {
60 ///     match stream {
61 ///         Ok(stream) => {
62 ///             thread::spawn(move|| {
63 ///                 // connection succeeded
64 ///                 handle_client(stream)
65 ///             });
66 ///         }
67 ///         Err(e) => { /* connection failed */ }
68 ///     }
69 /// }
70 ///
71 /// // close the socket server
72 /// drop(listener);
73 /// ```
74 #[stable(feature = "rust1", since = "1.0.0")]
75 pub struct TcpListener(net_imp::TcpListener);
76
77 /// An infinite iterator over the connections from a `TcpListener`.
78 ///
79 /// This iterator will infinitely yield `Some` of the accepted connections. It
80 /// is equivalent to calling `accept` in a loop.
81 #[stable(feature = "rust1", since = "1.0.0")]
82 pub struct Incoming<'a> { listener: &'a TcpListener }
83
84 impl TcpStream {
85     /// Opens a TCP connection to a remote host.
86     ///
87     /// `addr` is an address of the remote host. Anything which implements
88     /// `ToSocketAddrs` trait can be supplied for the address; see this trait
89     /// documentation for concrete examples.
90     #[stable(feature = "rust1", since = "1.0.0")]
91     pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
92         super::each_addr(addr, net_imp::TcpStream::connect).map(TcpStream)
93     }
94
95     /// Returns the socket address of the remote peer of this TCP connection.
96     #[stable(feature = "rust1", since = "1.0.0")]
97     pub fn peer_addr(&self) -> io::Result<SocketAddr> {
98         self.0.peer_addr()
99     }
100
101     /// Returns the socket address of the local half of this TCP connection.
102     #[stable(feature = "rust1", since = "1.0.0")]
103     pub fn local_addr(&self) -> io::Result<SocketAddr> {
104         self.0.socket_addr()
105     }
106
107     /// Shuts down the read, write, or both halves of this connection.
108     ///
109     /// This function will cause all pending and future I/O on the specified
110     /// portions to return immediately with an appropriate value (see the
111     /// documentation of `Shutdown`).
112     #[stable(feature = "rust1", since = "1.0.0")]
113     pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
114         self.0.shutdown(how)
115     }
116
117     /// Creates a new independently owned handle to the underlying socket.
118     ///
119     /// The returned `TcpStream` is a reference to the same stream that this
120     /// object references. Both handles will read and write the same stream of
121     /// data, and options set on one stream will be propagated to the other
122     /// stream.
123     #[stable(feature = "rust1", since = "1.0.0")]
124     pub fn try_clone(&self) -> io::Result<TcpStream> {
125         self.0.duplicate().map(TcpStream)
126     }
127
128     /// Sets the read timeout to the timeout specified.
129     ///
130     /// If the value specified is `None`, then `read` calls will block
131     /// indefinitely. It is an error to pass the zero `Duration` to this
132     /// method.
133     ///
134     /// # Note
135     ///
136     /// Platforms may return a different error code whenever a read times out as
137     /// a result of setting this option. For example Unix typically returns an
138     /// error of the kind `WouldBlock`, but Windows may return `TimedOut`.
139     #[stable(feature = "socket_timeout", since = "1.4.0")]
140     pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
141         self.0.set_read_timeout(dur)
142     }
143
144     /// Sets the write timeout to the timeout specified.
145     ///
146     /// If the value specified is `None`, then `write` calls will block
147     /// indefinitely. It is an error to pass the zero `Duration` to this
148     /// method.
149     ///
150     /// # Note
151     ///
152     /// Platforms may return a different error code whenever a write times out
153     /// as a result of setting this option. For example Unix typically returns
154     /// an error of the kind `WouldBlock`, but Windows may return `TimedOut`.
155     #[stable(feature = "socket_timeout", since = "1.4.0")]
156     pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
157         self.0.set_write_timeout(dur)
158     }
159
160     /// Returns the read timeout of this socket.
161     ///
162     /// If the timeout is `None`, then `read` calls will block indefinitely.
163     ///
164     /// # Note
165     ///
166     /// Some platforms do not provide access to the current timeout.
167     #[stable(feature = "socket_timeout", since = "1.4.0")]
168     pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
169         self.0.read_timeout()
170     }
171
172     /// Returns the write timeout of this socket.
173     ///
174     /// If the timeout is `None`, then `write` calls will block indefinitely.
175     ///
176     /// # Note
177     ///
178     /// Some platforms do not provide access to the current timeout.
179     #[stable(feature = "socket_timeout", since = "1.4.0")]
180     pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
181         self.0.write_timeout()
182     }
183
184     /// Sets the value of the `TCP_NODELAY` option on this socket.
185     ///
186     /// If set, this option disables the Nagle algorithm. This means that
187     /// segments are always sent as soon as possible, even if there is only a
188     /// small amount of data. When not set, data is buffered until there is a
189     /// sufficient amount to send out, thereby avoiding the frequent sending of
190     /// small packets.
191     #[stable(feature = "net2_mutators", since = "1.9.0")]
192     pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
193         self.0.set_nodelay(nodelay)
194     }
195
196     /// Gets the value of the `TCP_NODELAY` option on this socket.
197     ///
198     /// For more information about this option, see [`set_nodelay`][link].
199     ///
200     /// [link]: #tymethod.set_nodelay
201     #[stable(feature = "net2_mutators", since = "1.9.0")]
202     pub fn nodelay(&self) -> io::Result<bool> {
203         self.0.nodelay()
204     }
205
206     /// Sets whether keepalive messages are enabled to be sent on this socket.
207     ///
208     /// On Unix, this option will set the `SO_KEEPALIVE` as well as the
209     /// `TCP_KEEPALIVE` or `TCP_KEEPIDLE` option (depending on your platform).
210     /// On Windows, this will set the `SIO_KEEPALIVE_VALS` option.
211     ///
212     /// If `None` is specified then keepalive messages are disabled, otherwise
213     /// the duration specified will be the time to remain idle before sending a
214     /// TCP keepalive probe.
215     ///
216     /// Some platforms specify this value in seconds, so sub-second
217     /// specifications may be omitted.
218     #[stable(feature = "net2_mutators", since = "1.9.0")]
219     pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> {
220         self.0.set_keepalive(keepalive)
221     }
222
223     /// Returns whether keepalive messages are enabled on this socket, and if so
224     /// the duration of time between them.
225     ///
226     /// For more information about this option, see [`set_keepalive`][link].
227     ///
228     /// [link]: #tymethod.set_keepalive
229     #[stable(feature = "net2_mutators", since = "1.9.0")]
230     pub fn keepalive(&self) -> io::Result<Option<Duration>> {
231         self.0.keepalive()
232     }
233
234     /// Sets the value for the `IP_TTL` option on this socket.
235     ///
236     /// This value sets the time-to-live field that is used in every packet sent
237     /// from this socket.
238     #[stable(feature = "net2_mutators", since = "1.9.0")]
239     pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
240         self.0.set_ttl(ttl)
241     }
242
243     /// Gets the value of the `IP_TTL` option for this socket.
244     ///
245     /// For more information about this option, see [`set_ttl`][link].
246     ///
247     /// [link]: #tymethod.set_ttl
248     #[stable(feature = "net2_mutators", since = "1.9.0")]
249     pub fn ttl(&self) -> io::Result<u32> {
250         self.0.ttl()
251     }
252
253     /// Sets the value for the `IPV6_V6ONLY` option on this socket.
254     ///
255     /// If this is set to `true` then the socket is restricted to sending and
256     /// receiving IPv6 packets only. If this is the case, an IPv4 and an IPv6
257     /// application can each bind the same port at the same time.
258     ///
259     /// If this is set to `false` then the socket can be used to send and
260     /// receive packets from an IPv4-mapped IPv6 address.
261     #[stable(feature = "net2_mutators", since = "1.9.0")]
262     pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
263         self.0.set_only_v6(only_v6)
264     }
265
266     /// Gets the value of the `IPV6_V6ONLY` option for this socket.
267     ///
268     /// For more information about this option, see [`set_only_v6`][link].
269     ///
270     /// [link]: #tymethod.set_only_v6
271     #[stable(feature = "net2_mutators", since = "1.9.0")]
272     pub fn only_v6(&self) -> io::Result<bool> {
273         self.0.only_v6()
274     }
275
276     /// Get the value of the `SO_ERROR` option on this socket.
277     ///
278     /// This will retrieve the stored error in the underlying socket, clearing
279     /// the field in the process. This can be useful for checking errors between
280     /// calls.
281     #[stable(feature = "net2_mutators", since = "1.9.0")]
282     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
283         self.0.take_error()
284     }
285
286     /// Moves this TCP stream into or out of nonblocking mode.
287     ///
288     /// On Unix this corresponds to calling fcntl, and on Windows this
289     /// corresponds to calling ioctlsocket.
290     #[stable(feature = "net2_mutators", since = "1.9.0")]
291     pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
292         self.0.set_nonblocking(nonblocking)
293     }
294 }
295
296 #[stable(feature = "rust1", since = "1.0.0")]
297 impl Read for TcpStream {
298     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
299     fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
300         unsafe { read_to_end_uninitialized(self, buf) }
301     }
302 }
303 #[stable(feature = "rust1", since = "1.0.0")]
304 impl Write for TcpStream {
305     fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
306     fn flush(&mut self) -> io::Result<()> { Ok(()) }
307 }
308 #[stable(feature = "rust1", since = "1.0.0")]
309 impl<'a> Read for &'a TcpStream {
310     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
311     fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
312         unsafe { read_to_end_uninitialized(self, buf) }
313     }
314 }
315 #[stable(feature = "rust1", since = "1.0.0")]
316 impl<'a> Write for &'a TcpStream {
317     fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
318     fn flush(&mut self) -> io::Result<()> { Ok(()) }
319 }
320
321 impl AsInner<net_imp::TcpStream> for TcpStream {
322     fn as_inner(&self) -> &net_imp::TcpStream { &self.0 }
323 }
324
325 impl FromInner<net_imp::TcpStream> for TcpStream {
326     fn from_inner(inner: net_imp::TcpStream) -> TcpStream { TcpStream(inner) }
327 }
328
329 impl IntoInner<net_imp::TcpStream> for TcpStream {
330     fn into_inner(self) -> net_imp::TcpStream { self.0 }
331 }
332
333 #[stable(feature = "rust1", since = "1.0.0")]
334 impl fmt::Debug for TcpStream {
335     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
336         self.0.fmt(f)
337     }
338 }
339
340 impl TcpListener {
341     /// Creates a new `TcpListener` which will be bound to the specified
342     /// address.
343     ///
344     /// The returned listener is ready for accepting connections.
345     ///
346     /// Binding with a port number of 0 will request that the OS assigns a port
347     /// to this listener. The port allocated can be queried via the
348     /// `local_addr` method.
349     ///
350     /// The address type can be any implementor of `ToSocketAddrs` trait. See
351     /// its documentation for concrete examples.
352     #[stable(feature = "rust1", since = "1.0.0")]
353     pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
354         super::each_addr(addr, net_imp::TcpListener::bind).map(TcpListener)
355     }
356
357     /// Returns the local socket address of this listener.
358     #[stable(feature = "rust1", since = "1.0.0")]
359     pub fn local_addr(&self) -> io::Result<SocketAddr> {
360         self.0.socket_addr()
361     }
362
363     /// Creates a new independently owned handle to the underlying socket.
364     ///
365     /// The returned `TcpListener` is a reference to the same socket that this
366     /// object references. Both handles can be used to accept incoming
367     /// connections and options set on one listener will affect the other.
368     #[stable(feature = "rust1", since = "1.0.0")]
369     pub fn try_clone(&self) -> io::Result<TcpListener> {
370         self.0.duplicate().map(TcpListener)
371     }
372
373     /// Accept a new incoming connection from this listener.
374     ///
375     /// This function will block the calling thread until a new TCP connection
376     /// is established. When established, the corresponding `TcpStream` and the
377     /// remote peer's address will be returned.
378     #[stable(feature = "rust1", since = "1.0.0")]
379     pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
380         self.0.accept().map(|(a, b)| (TcpStream(a), b))
381     }
382
383     /// Returns an iterator over the connections being received on this
384     /// listener.
385     ///
386     /// The returned iterator will never return `None` and will also not yield
387     /// the peer's `SocketAddr` structure.
388     #[stable(feature = "rust1", since = "1.0.0")]
389     pub fn incoming(&self) -> Incoming {
390         Incoming { listener: self }
391     }
392
393     /// Sets the value for the `IP_TTL` option on this socket.
394     ///
395     /// This value sets the time-to-live field that is used in every packet sent
396     /// from this socket.
397     #[stable(feature = "net2_mutators", since = "1.9.0")]
398     pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
399         self.0.set_ttl(ttl)
400     }
401
402     /// Gets the value of the `IP_TTL` option for this socket.
403     ///
404     /// For more information about this option, see [`set_ttl`][link].
405     ///
406     /// [link]: #tymethod.set_ttl
407     #[stable(feature = "net2_mutators", since = "1.9.0")]
408     pub fn ttl(&self) -> io::Result<u32> {
409         self.0.ttl()
410     }
411
412     /// Sets the value for the `IPV6_V6ONLY` option on this socket.
413     ///
414     /// If this is set to `true` then the socket is restricted to sending and
415     /// receiving IPv6 packets only. In this case two IPv4 and IPv6 applications
416     /// can bind the same port at the same time.
417     ///
418     /// If this is set to `false` then the socket can be used to send and
419     /// receive packets from an IPv4-mapped IPv6 address.
420     #[stable(feature = "net2_mutators", since = "1.9.0")]
421     pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
422         self.0.set_only_v6(only_v6)
423     }
424
425     /// Gets the value of the `IPV6_V6ONLY` option for this socket.
426     ///
427     /// For more information about this option, see [`set_only_v6`][link].
428     ///
429     /// [link]: #tymethod.set_only_v6
430     #[stable(feature = "net2_mutators", since = "1.9.0")]
431     pub fn only_v6(&self) -> io::Result<bool> {
432         self.0.only_v6()
433     }
434
435     /// Get the value of the `SO_ERROR` option on this socket.
436     ///
437     /// This will retrieve the stored error in the underlying socket, clearing
438     /// the field in the process. This can be useful for checking errors between
439     /// calls.
440     #[stable(feature = "net2_mutators", since = "1.9.0")]
441     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
442         self.0.take_error()
443     }
444
445     /// Moves this TCP stream into or out of nonblocking mode.
446     ///
447     /// On Unix this corresponds to calling fcntl, and on Windows this
448     /// corresponds to calling ioctlsocket.
449     #[stable(feature = "net2_mutators", since = "1.9.0")]
450     pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
451         self.0.set_nonblocking(nonblocking)
452     }
453 }
454
455 #[stable(feature = "rust1", since = "1.0.0")]
456 impl<'a> Iterator for Incoming<'a> {
457     type Item = io::Result<TcpStream>;
458     fn next(&mut self) -> Option<io::Result<TcpStream>> {
459         Some(self.listener.accept().map(|p| p.0))
460     }
461 }
462
463 impl AsInner<net_imp::TcpListener> for TcpListener {
464     fn as_inner(&self) -> &net_imp::TcpListener { &self.0 }
465 }
466
467 impl FromInner<net_imp::TcpListener> for TcpListener {
468     fn from_inner(inner: net_imp::TcpListener) -> TcpListener {
469         TcpListener(inner)
470     }
471 }
472
473 impl IntoInner<net_imp::TcpListener> for TcpListener {
474     fn into_inner(self) -> net_imp::TcpListener { self.0 }
475 }
476
477 #[stable(feature = "rust1", since = "1.0.0")]
478 impl fmt::Debug for TcpListener {
479     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
480         self.0.fmt(f)
481     }
482 }
483
484 #[cfg(test)]
485 mod tests {
486     use prelude::v1::*;
487
488     use io::ErrorKind;
489     use io::prelude::*;
490     use net::*;
491     use net::test::{next_test_ip4, next_test_ip6};
492     use sync::mpsc::channel;
493     use sys_common::AsInner;
494     use time::{Instant, Duration};
495     use thread;
496
497     fn each_ip(f: &mut FnMut(SocketAddr)) {
498         f(next_test_ip4());
499         f(next_test_ip6());
500     }
501
502     macro_rules! t {
503         ($e:expr) => {
504             match $e {
505                 Ok(t) => t,
506                 Err(e) => panic!("received error for `{}`: {}", stringify!($e), e),
507             }
508         }
509     }
510
511     #[test]
512     fn bind_error() {
513         match TcpListener::bind("1.1.1.1:9999") {
514             Ok(..) => panic!(),
515             Err(e) =>
516                 assert_eq!(e.kind(), ErrorKind::AddrNotAvailable),
517         }
518     }
519
520     #[test]
521     fn connect_error() {
522         match TcpStream::connect("0.0.0.0:1") {
523             Ok(..) => panic!(),
524             Err(e) => assert!(e.kind() == ErrorKind::ConnectionRefused ||
525                               e.kind() == ErrorKind::InvalidInput ||
526                               e.kind() == ErrorKind::AddrInUse ||
527                               e.kind() == ErrorKind::AddrNotAvailable,
528                               "bad error: {} {:?}", e, e.kind()),
529         }
530     }
531
532     #[test]
533     fn listen_localhost() {
534         let socket_addr = next_test_ip4();
535         let listener = t!(TcpListener::bind(&socket_addr));
536
537         let _t = thread::spawn(move || {
538             let mut stream = t!(TcpStream::connect(&("localhost",
539                                                      socket_addr.port())));
540             t!(stream.write(&[144]));
541         });
542
543         let mut stream = t!(listener.accept()).0;
544         let mut buf = [0];
545         t!(stream.read(&mut buf));
546         assert!(buf[0] == 144);
547     }
548
549     #[test]
550     fn connect_loopback() {
551         each_ip(&mut |addr| {
552             let acceptor = t!(TcpListener::bind(&addr));
553
554             let _t = thread::spawn(move|| {
555                 let host = match addr {
556                     SocketAddr::V4(..) => "127.0.0.1",
557                     SocketAddr::V6(..) => "::1",
558                 };
559                 let mut stream = t!(TcpStream::connect(&(host, addr.port())));
560                 t!(stream.write(&[66]));
561             });
562
563             let mut stream = t!(acceptor.accept()).0;
564             let mut buf = [0];
565             t!(stream.read(&mut buf));
566             assert!(buf[0] == 66);
567         })
568     }
569
570     #[test]
571     fn smoke_test() {
572         each_ip(&mut |addr| {
573             let acceptor = t!(TcpListener::bind(&addr));
574
575             let (tx, rx) = channel();
576             let _t = thread::spawn(move|| {
577                 let mut stream = t!(TcpStream::connect(&addr));
578                 t!(stream.write(&[99]));
579                 tx.send(t!(stream.local_addr())).unwrap();
580             });
581
582             let (mut stream, addr) = t!(acceptor.accept());
583             let mut buf = [0];
584             t!(stream.read(&mut buf));
585             assert!(buf[0] == 99);
586             assert_eq!(addr, t!(rx.recv()));
587         })
588     }
589
590     #[test]
591     fn read_eof() {
592         each_ip(&mut |addr| {
593             let acceptor = t!(TcpListener::bind(&addr));
594
595             let _t = thread::spawn(move|| {
596                 let _stream = t!(TcpStream::connect(&addr));
597                 // Close
598             });
599
600             let mut stream = t!(acceptor.accept()).0;
601             let mut buf = [0];
602             let nread = t!(stream.read(&mut buf));
603             assert_eq!(nread, 0);
604             let nread = t!(stream.read(&mut buf));
605             assert_eq!(nread, 0);
606         })
607     }
608
609     #[test]
610     fn write_close() {
611         each_ip(&mut |addr| {
612             let acceptor = t!(TcpListener::bind(&addr));
613
614             let (tx, rx) = channel();
615             let _t = thread::spawn(move|| {
616                 drop(t!(TcpStream::connect(&addr)));
617                 tx.send(()).unwrap();
618             });
619
620             let mut stream = t!(acceptor.accept()).0;
621             rx.recv().unwrap();
622             let buf = [0];
623             match stream.write(&buf) {
624                 Ok(..) => {}
625                 Err(e) => {
626                     assert!(e.kind() == ErrorKind::ConnectionReset ||
627                             e.kind() == ErrorKind::BrokenPipe ||
628                             e.kind() == ErrorKind::ConnectionAborted,
629                             "unknown error: {}", e);
630                 }
631             }
632         })
633     }
634
635     #[test]
636     fn multiple_connect_serial() {
637         each_ip(&mut |addr| {
638             let max = 10;
639             let acceptor = t!(TcpListener::bind(&addr));
640
641             let _t = thread::spawn(move|| {
642                 for _ in 0..max {
643                     let mut stream = t!(TcpStream::connect(&addr));
644                     t!(stream.write(&[99]));
645                 }
646             });
647
648             for stream in acceptor.incoming().take(max) {
649                 let mut stream = t!(stream);
650                 let mut buf = [0];
651                 t!(stream.read(&mut buf));
652                 assert_eq!(buf[0], 99);
653             }
654         })
655     }
656
657     #[test]
658     fn multiple_connect_interleaved_greedy_schedule() {
659         const MAX: usize = 10;
660         each_ip(&mut |addr| {
661             let acceptor = t!(TcpListener::bind(&addr));
662
663             let _t = thread::spawn(move|| {
664                 let acceptor = acceptor;
665                 for (i, stream) in acceptor.incoming().enumerate().take(MAX) {
666                     // Start another thread to handle the connection
667                     let _t = thread::spawn(move|| {
668                         let mut stream = t!(stream);
669                         let mut buf = [0];
670                         t!(stream.read(&mut buf));
671                         assert!(buf[0] == i as u8);
672                     });
673                 }
674             });
675
676             connect(0, addr);
677         });
678
679         fn connect(i: usize, addr: SocketAddr) {
680             if i == MAX { return }
681
682             let t = thread::spawn(move|| {
683                 let mut stream = t!(TcpStream::connect(&addr));
684                 // Connect again before writing
685                 connect(i + 1, addr);
686                 t!(stream.write(&[i as u8]));
687             });
688             t.join().ok().unwrap();
689         }
690     }
691
692     #[test]
693     fn multiple_connect_interleaved_lazy_schedule() {
694         const MAX: usize = 10;
695         each_ip(&mut |addr| {
696             let acceptor = t!(TcpListener::bind(&addr));
697
698             let _t = thread::spawn(move|| {
699                 for stream in acceptor.incoming().take(MAX) {
700                     // Start another thread to handle the connection
701                     let _t = thread::spawn(move|| {
702                         let mut stream = t!(stream);
703                         let mut buf = [0];
704                         t!(stream.read(&mut buf));
705                         assert!(buf[0] == 99);
706                     });
707                 }
708             });
709
710             connect(0, addr);
711         });
712
713         fn connect(i: usize, addr: SocketAddr) {
714             if i == MAX { return }
715
716             let t = thread::spawn(move|| {
717                 let mut stream = t!(TcpStream::connect(&addr));
718                 connect(i + 1, addr);
719                 t!(stream.write(&[99]));
720             });
721             t.join().ok().unwrap();
722         }
723     }
724
725     #[test]
726     fn socket_and_peer_name() {
727         each_ip(&mut |addr| {
728             let listener = t!(TcpListener::bind(&addr));
729             let so_name = t!(listener.local_addr());
730             assert_eq!(addr, so_name);
731             let _t = thread::spawn(move|| {
732                 t!(listener.accept());
733             });
734
735             let stream = t!(TcpStream::connect(&addr));
736             assert_eq!(addr, t!(stream.peer_addr()));
737         })
738     }
739
740     #[test]
741     fn partial_read() {
742         each_ip(&mut |addr| {
743             let (tx, rx) = channel();
744             let srv = t!(TcpListener::bind(&addr));
745             let _t = thread::spawn(move|| {
746                 let mut cl = t!(srv.accept()).0;
747                 cl.write(&[10]).unwrap();
748                 let mut b = [0];
749                 t!(cl.read(&mut b));
750                 tx.send(()).unwrap();
751             });
752
753             let mut c = t!(TcpStream::connect(&addr));
754             let mut b = [0; 10];
755             assert_eq!(c.read(&mut b).unwrap(), 1);
756             t!(c.write(&[1]));
757             rx.recv().unwrap();
758         })
759     }
760
761     #[test]
762     fn double_bind() {
763         each_ip(&mut |addr| {
764             let _listener = t!(TcpListener::bind(&addr));
765             match TcpListener::bind(&addr) {
766                 Ok(..) => panic!(),
767                 Err(e) => {
768                     assert!(e.kind() == ErrorKind::ConnectionRefused ||
769                             e.kind() == ErrorKind::Other ||
770                             e.kind() == ErrorKind::AddrInUse,
771                             "unknown error: {} {:?}", e, e.kind());
772                 }
773             }
774         })
775     }
776
777     #[test]
778     fn fast_rebind() {
779         each_ip(&mut |addr| {
780             let acceptor = t!(TcpListener::bind(&addr));
781
782             let _t = thread::spawn(move|| {
783                 t!(TcpStream::connect(&addr));
784             });
785
786             t!(acceptor.accept());
787             drop(acceptor);
788             t!(TcpListener::bind(&addr));
789         });
790     }
791
792     #[test]
793     fn tcp_clone_smoke() {
794         each_ip(&mut |addr| {
795             let acceptor = t!(TcpListener::bind(&addr));
796
797             let _t = thread::spawn(move|| {
798                 let mut s = t!(TcpStream::connect(&addr));
799                 let mut buf = [0, 0];
800                 assert_eq!(s.read(&mut buf).unwrap(), 1);
801                 assert_eq!(buf[0], 1);
802                 t!(s.write(&[2]));
803             });
804
805             let mut s1 = t!(acceptor.accept()).0;
806             let s2 = t!(s1.try_clone());
807
808             let (tx1, rx1) = channel();
809             let (tx2, rx2) = channel();
810             let _t = thread::spawn(move|| {
811                 let mut s2 = s2;
812                 rx1.recv().unwrap();
813                 t!(s2.write(&[1]));
814                 tx2.send(()).unwrap();
815             });
816             tx1.send(()).unwrap();
817             let mut buf = [0, 0];
818             assert_eq!(s1.read(&mut buf).unwrap(), 1);
819             rx2.recv().unwrap();
820         })
821     }
822
823     #[test]
824     fn tcp_clone_two_read() {
825         each_ip(&mut |addr| {
826             let acceptor = t!(TcpListener::bind(&addr));
827             let (tx1, rx) = channel();
828             let tx2 = tx1.clone();
829
830             let _t = thread::spawn(move|| {
831                 let mut s = t!(TcpStream::connect(&addr));
832                 t!(s.write(&[1]));
833                 rx.recv().unwrap();
834                 t!(s.write(&[2]));
835                 rx.recv().unwrap();
836             });
837
838             let mut s1 = t!(acceptor.accept()).0;
839             let s2 = t!(s1.try_clone());
840
841             let (done, rx) = channel();
842             let _t = thread::spawn(move|| {
843                 let mut s2 = s2;
844                 let mut buf = [0, 0];
845                 t!(s2.read(&mut buf));
846                 tx2.send(()).unwrap();
847                 done.send(()).unwrap();
848             });
849             let mut buf = [0, 0];
850             t!(s1.read(&mut buf));
851             tx1.send(()).unwrap();
852
853             rx.recv().unwrap();
854         })
855     }
856
857     #[test]
858     fn tcp_clone_two_write() {
859         each_ip(&mut |addr| {
860             let acceptor = t!(TcpListener::bind(&addr));
861
862             let _t = thread::spawn(move|| {
863                 let mut s = t!(TcpStream::connect(&addr));
864                 let mut buf = [0, 1];
865                 t!(s.read(&mut buf));
866                 t!(s.read(&mut buf));
867             });
868
869             let mut s1 = t!(acceptor.accept()).0;
870             let s2 = t!(s1.try_clone());
871
872             let (done, rx) = channel();
873             let _t = thread::spawn(move|| {
874                 let mut s2 = s2;
875                 t!(s2.write(&[1]));
876                 done.send(()).unwrap();
877             });
878             t!(s1.write(&[2]));
879
880             rx.recv().unwrap();
881         })
882     }
883
884     #[test]
885     fn shutdown_smoke() {
886         each_ip(&mut |addr| {
887             let a = t!(TcpListener::bind(&addr));
888             let _t = thread::spawn(move|| {
889                 let mut c = t!(a.accept()).0;
890                 let mut b = [0];
891                 assert_eq!(c.read(&mut b).unwrap(), 0);
892                 t!(c.write(&[1]));
893             });
894
895             let mut s = t!(TcpStream::connect(&addr));
896             t!(s.shutdown(Shutdown::Write));
897             assert!(s.write(&[1]).is_err());
898             let mut b = [0, 0];
899             assert_eq!(t!(s.read(&mut b)), 1);
900             assert_eq!(b[0], 1);
901         })
902     }
903
904     #[test]
905     fn close_readwrite_smoke() {
906         each_ip(&mut |addr| {
907             let a = t!(TcpListener::bind(&addr));
908             let (tx, rx) = channel::<()>();
909             let _t = thread::spawn(move|| {
910                 let _s = t!(a.accept());
911                 let _ = rx.recv();
912             });
913
914             let mut b = [0];
915             let mut s = t!(TcpStream::connect(&addr));
916             let mut s2 = t!(s.try_clone());
917
918             // closing should prevent reads/writes
919             t!(s.shutdown(Shutdown::Write));
920             assert!(s.write(&[0]).is_err());
921             t!(s.shutdown(Shutdown::Read));
922             assert_eq!(s.read(&mut b).unwrap(), 0);
923
924             // closing should affect previous handles
925             assert!(s2.write(&[0]).is_err());
926             assert_eq!(s2.read(&mut b).unwrap(), 0);
927
928             // closing should affect new handles
929             let mut s3 = t!(s.try_clone());
930             assert!(s3.write(&[0]).is_err());
931             assert_eq!(s3.read(&mut b).unwrap(), 0);
932
933             // make sure these don't die
934             let _ = s2.shutdown(Shutdown::Read);
935             let _ = s2.shutdown(Shutdown::Write);
936             let _ = s3.shutdown(Shutdown::Read);
937             let _ = s3.shutdown(Shutdown::Write);
938             drop(tx);
939         })
940     }
941
942     #[test]
943     fn close_read_wakes_up() {
944         each_ip(&mut |addr| {
945             let a = t!(TcpListener::bind(&addr));
946             let (tx1, rx) = channel::<()>();
947             let _t = thread::spawn(move|| {
948                 let _s = t!(a.accept());
949                 let _ = rx.recv();
950             });
951
952             let s = t!(TcpStream::connect(&addr));
953             let s2 = t!(s.try_clone());
954             let (tx, rx) = channel();
955             let _t = thread::spawn(move|| {
956                 let mut s2 = s2;
957                 assert_eq!(t!(s2.read(&mut [0])), 0);
958                 tx.send(()).unwrap();
959             });
960             // this should wake up the child thread
961             t!(s.shutdown(Shutdown::Read));
962
963             // this test will never finish if the child doesn't wake up
964             rx.recv().unwrap();
965             drop(tx1);
966         })
967     }
968
969     #[test]
970     fn clone_while_reading() {
971         each_ip(&mut |addr| {
972             let accept = t!(TcpListener::bind(&addr));
973
974             // Enqueue a thread to write to a socket
975             let (tx, rx) = channel();
976             let (txdone, rxdone) = channel();
977             let txdone2 = txdone.clone();
978             let _t = thread::spawn(move|| {
979                 let mut tcp = t!(TcpStream::connect(&addr));
980                 rx.recv().unwrap();
981                 t!(tcp.write(&[0]));
982                 txdone2.send(()).unwrap();
983             });
984
985             // Spawn off a reading clone
986             let tcp = t!(accept.accept()).0;
987             let tcp2 = t!(tcp.try_clone());
988             let txdone3 = txdone.clone();
989             let _t = thread::spawn(move|| {
990                 let mut tcp2 = tcp2;
991                 t!(tcp2.read(&mut [0]));
992                 txdone3.send(()).unwrap();
993             });
994
995             // Try to ensure that the reading clone is indeed reading
996             for _ in 0..50 {
997                 thread::yield_now();
998             }
999
1000             // clone the handle again while it's reading, then let it finish the
1001             // read.
1002             let _ = t!(tcp.try_clone());
1003             tx.send(()).unwrap();
1004             rxdone.recv().unwrap();
1005             rxdone.recv().unwrap();
1006         })
1007     }
1008
1009     #[test]
1010     fn clone_accept_smoke() {
1011         each_ip(&mut |addr| {
1012             let a = t!(TcpListener::bind(&addr));
1013             let a2 = t!(a.try_clone());
1014
1015             let _t = thread::spawn(move|| {
1016                 let _ = TcpStream::connect(&addr);
1017             });
1018             let _t = thread::spawn(move|| {
1019                 let _ = TcpStream::connect(&addr);
1020             });
1021
1022             t!(a.accept());
1023             t!(a2.accept());
1024         })
1025     }
1026
1027     #[test]
1028     fn clone_accept_concurrent() {
1029         each_ip(&mut |addr| {
1030             let a = t!(TcpListener::bind(&addr));
1031             let a2 = t!(a.try_clone());
1032
1033             let (tx, rx) = channel();
1034             let tx2 = tx.clone();
1035
1036             let _t = thread::spawn(move|| {
1037                 tx.send(t!(a.accept())).unwrap();
1038             });
1039             let _t = thread::spawn(move|| {
1040                 tx2.send(t!(a2.accept())).unwrap();
1041             });
1042
1043             let _t = thread::spawn(move|| {
1044                 let _ = TcpStream::connect(&addr);
1045             });
1046             let _t = thread::spawn(move|| {
1047                 let _ = TcpStream::connect(&addr);
1048             });
1049
1050             rx.recv().unwrap();
1051             rx.recv().unwrap();
1052         })
1053     }
1054
1055     #[test]
1056     fn debug() {
1057         let name = if cfg!(windows) {"socket"} else {"fd"};
1058         let socket_addr = next_test_ip4();
1059
1060         let listener = t!(TcpListener::bind(&socket_addr));
1061         let listener_inner = listener.0.socket().as_inner();
1062         let compare = format!("TcpListener {{ addr: {:?}, {}: {:?} }}",
1063                               socket_addr, name, listener_inner);
1064         assert_eq!(format!("{:?}", listener), compare);
1065
1066         let stream = t!(TcpStream::connect(&("localhost",
1067                                                  socket_addr.port())));
1068         let stream_inner = stream.0.socket().as_inner();
1069         let compare = format!("TcpStream {{ addr: {:?}, \
1070                               peer: {:?}, {}: {:?} }}",
1071                               stream.local_addr().unwrap(),
1072                               stream.peer_addr().unwrap(),
1073                               name,
1074                               stream_inner);
1075         assert_eq!(format!("{:?}", stream), compare);
1076     }
1077
1078     // FIXME: re-enabled bitrig/openbsd tests once their socket timeout code
1079     //        no longer has rounding errors.
1080     #[cfg_attr(any(target_os = "bitrig", target_os = "netbsd", target_os = "openbsd"), ignore)]
1081     #[test]
1082     fn timeouts() {
1083         let addr = next_test_ip4();
1084         let listener = t!(TcpListener::bind(&addr));
1085
1086         let stream = t!(TcpStream::connect(&("localhost", addr.port())));
1087         let dur = Duration::new(15410, 0);
1088
1089         assert_eq!(None, t!(stream.read_timeout()));
1090
1091         t!(stream.set_read_timeout(Some(dur)));
1092         assert_eq!(Some(dur), t!(stream.read_timeout()));
1093
1094         assert_eq!(None, t!(stream.write_timeout()));
1095
1096         t!(stream.set_write_timeout(Some(dur)));
1097         assert_eq!(Some(dur), t!(stream.write_timeout()));
1098
1099         t!(stream.set_read_timeout(None));
1100         assert_eq!(None, t!(stream.read_timeout()));
1101
1102         t!(stream.set_write_timeout(None));
1103         assert_eq!(None, t!(stream.write_timeout()));
1104         drop(listener);
1105     }
1106
1107     #[test]
1108     fn test_read_timeout() {
1109         let addr = next_test_ip4();
1110         let listener = t!(TcpListener::bind(&addr));
1111
1112         let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
1113         t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
1114
1115         let mut buf = [0; 10];
1116         let start = Instant::now();
1117         let kind = stream.read(&mut buf).err().expect("expected error").kind();
1118         assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
1119         assert!(start.elapsed() > Duration::from_millis(400));
1120         drop(listener);
1121     }
1122
1123     #[test]
1124     fn test_read_with_timeout() {
1125         let addr = next_test_ip4();
1126         let listener = t!(TcpListener::bind(&addr));
1127
1128         let mut stream = t!(TcpStream::connect(&("localhost", addr.port())));
1129         t!(stream.set_read_timeout(Some(Duration::from_millis(1000))));
1130
1131         let mut other_end = t!(listener.accept()).0;
1132         t!(other_end.write_all(b"hello world"));
1133
1134         let mut buf = [0; 11];
1135         t!(stream.read(&mut buf));
1136         assert_eq!(b"hello world", &buf[..]);
1137
1138         let start = Instant::now();
1139         let kind = stream.read(&mut buf).err().expect("expected error").kind();
1140         assert!(kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut);
1141         assert!(start.elapsed() > Duration::from_millis(400));
1142         drop(listener);
1143     }
1144
1145     #[test]
1146     fn nodelay() {
1147         let addr = next_test_ip4();
1148         let _listener = t!(TcpListener::bind(&addr));
1149
1150         let stream = t!(TcpStream::connect(&("localhost", addr.port())));
1151
1152         assert_eq!(false, t!(stream.nodelay()));
1153         t!(stream.set_nodelay(true));
1154         assert_eq!(true, t!(stream.nodelay()));
1155         t!(stream.set_nodelay(false));
1156         assert_eq!(false, t!(stream.nodelay()));
1157     }
1158
1159     #[test]
1160     fn keepalive() {
1161         let addr = next_test_ip4();
1162         let _listener = t!(TcpListener::bind(&addr));
1163
1164         let stream = t!(TcpStream::connect(&("localhost", addr.port())));
1165         let dur = Duration::new(15410, 0);
1166
1167         assert_eq!(None, t!(stream.keepalive()));
1168         t!(stream.set_keepalive(Some(dur)));
1169         assert_eq!(Some(dur), t!(stream.keepalive()));
1170         t!(stream.set_keepalive(None));
1171         assert_eq!(None, t!(stream.keepalive()));
1172     }
1173
1174     #[test]
1175     fn ttl() {
1176         let ttl = 100;
1177
1178         let addr = next_test_ip4();
1179         let listener = t!(TcpListener::bind(&addr));
1180
1181         t!(listener.set_ttl(ttl));
1182         assert_eq!(ttl, t!(listener.ttl()));
1183
1184         let stream = t!(TcpStream::connect(&("localhost", addr.port())));
1185
1186         t!(stream.set_ttl(ttl));
1187         assert_eq!(ttl, t!(stream.ttl()));
1188     }
1189
1190     #[test]
1191     fn set_nonblocking() {
1192         let addr = next_test_ip4();
1193         let listener = t!(TcpListener::bind(&addr));
1194
1195         t!(listener.set_nonblocking(true));
1196         t!(listener.set_nonblocking(false));
1197
1198         let stream = t!(TcpStream::connect(&("localhost", addr.port())));
1199
1200         t!(stream.set_nonblocking(true));
1201         t!(stream.set_nonblocking(false));
1202     }
1203 }