1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! TCP network connections
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
24 use io::net::ip::{SocketAddr, ToSocketAddr};
25 use io::{Reader, Writer, Listener, Acceptor};
26 use io::{standard_error, TimedOut};
27 use option::{None, Some, Option};
30 use sys::tcp::TcpStream as TcpStreamImp;
31 use sys::tcp::TcpListener as TcpListenerImp;
32 use sys::tcp::TcpAcceptor as TcpAcceptorImp;
34 /// A structure which represents a TCP stream between a local socket and a
40 /// # #![allow(unused_must_use)]
41 /// use std::io::TcpStream;
43 /// let mut stream = TcpStream::connect("127.0.0.1:34254");
45 /// stream.write([1]);
46 /// let mut buf = [0];
48 /// drop(stream); // close the connection
50 pub struct TcpStream {
55 fn new(s: TcpStreamImp) -> TcpStream {
56 TcpStream { inner: s }
59 /// Open a TCP connection to a remote host.
61 /// `addr` is an address of the remote host. Anything which implements `ToSocketAddr`
62 /// trait can be supplied for the address; see this trait documentation for
63 /// concrete examples.
64 pub fn connect<A: ToSocketAddr>(addr: A) -> IoResult<TcpStream> {
65 super::with_addresses(addr, |addr| {
66 TcpStreamImp::connect(addr, None).map(TcpStream::new)
70 /// Creates a TCP connection to a remote socket address, timing out after
71 /// the specified duration.
73 /// This is the same as the `connect` method, except that if the timeout
74 /// specified elapses before a connection is made an error will be
75 /// returned. The error's kind will be `TimedOut`.
77 /// Same as the `connect` method, `addr` argument type can be anything which
78 /// implements `ToSocketAddr` trait.
80 /// If a `timeout` with zero or negative duration is specified then
81 /// the function returns `Err`, with the error kind set to `TimedOut`.
82 #[experimental = "the timeout argument may eventually change types"]
83 pub fn connect_timeout<A: ToSocketAddr>(addr: A,
84 timeout: Duration) -> IoResult<TcpStream> {
85 if timeout <= Duration::milliseconds(0) {
86 return Err(standard_error(TimedOut));
89 super::with_addresses(addr, |addr| {
90 TcpStreamImp::connect(addr, Some(timeout.num_milliseconds() as u64))
95 /// Returns the socket address of the remote peer of this TCP connection.
96 pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
97 self.inner.peer_name()
100 /// Returns the socket address of the local half of this TCP connection.
101 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
102 self.inner.socket_name()
105 /// Sets the nodelay flag on this connection to the boolean specified
107 pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
108 self.inner.set_nodelay(nodelay)
111 /// Sets the keepalive timeout to the timeout specified.
113 /// If the value specified is `None`, then the keepalive flag is cleared on
114 /// this connection. Otherwise, the keepalive timeout will be set to the
115 /// specified time, in seconds.
117 pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
118 self.inner.set_keepalive(delay_in_seconds)
121 /// Closes the reading half of this connection.
123 /// This method will close the reading portion of this connection, causing
124 /// all pending and future reads to immediately return with an error.
129 /// # #![allow(unused_must_use)]
130 /// use std::io::timer;
131 /// use std::io::TcpStream;
132 /// use std::time::Duration;
134 /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
135 /// let stream2 = stream.clone();
138 /// // close this stream after one second
139 /// timer::sleep(Duration::seconds(1));
140 /// let mut stream = stream2;
141 /// stream.close_read();
144 /// // wait for some data, will get canceled after one second
145 /// let mut buf = [0];
146 /// stream.read(buf);
149 /// Note that this method affects all cloned handles associated with this
150 /// stream, not just this one handle.
151 pub fn close_read(&mut self) -> IoResult<()> {
152 self.inner.close_read()
155 /// Closes the writing half of this connection.
157 /// This method will close the writing portion of this connection, causing
158 /// all future writes to immediately return with an error.
160 /// Note that this method affects all cloned handles associated with this
161 /// stream, not just this one handle.
162 pub fn close_write(&mut self) -> IoResult<()> {
163 self.inner.close_write()
166 /// Sets a timeout, in milliseconds, for blocking operations on this stream.
168 /// This function will set a timeout for all blocking operations (including
169 /// reads and writes) on this stream. The timeout specified is a relative
170 /// time, in milliseconds, into the future after which point operations will
171 /// time out. This means that the timeout must be reset periodically to keep
172 /// it from expiring. Specifying a value of `None` will clear the timeout
175 /// The timeout on this stream is local to this stream only. Setting a
176 /// timeout does not affect any other cloned instances of this stream, nor
177 /// does the timeout propagated to cloned handles of this stream. Setting
178 /// this timeout will override any specific read or write timeouts
179 /// previously set for this stream.
181 /// For clarification on the semantics of interrupting a read and a write,
182 /// take a look at `set_read_timeout` and `set_write_timeout`.
183 #[experimental = "the timeout argument may change in type and value"]
184 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
185 self.inner.set_timeout(timeout_ms)
188 /// Sets the timeout for read operations on this stream.
190 /// See documentation in `set_timeout` for the semantics of this read time.
191 /// This will overwrite any previous read timeout set through either this
192 /// function or `set_timeout`.
196 /// When this timeout expires, if there is no pending read operation, no
197 /// action is taken. Otherwise, the read operation will be scheduled to
198 /// promptly return. If a timeout error is returned, then no data was read
199 /// during the timeout period.
200 #[experimental = "the timeout argument may change in type and value"]
201 pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
202 self.inner.set_read_timeout(timeout_ms)
205 /// Sets the timeout for write operations on this stream.
207 /// See documentation in `set_timeout` for the semantics of this write time.
208 /// This will overwrite any previous write timeout set through either this
209 /// function or `set_timeout`.
213 /// When this timeout expires, if there is no pending write operation, no
214 /// action is taken. Otherwise, the pending write operation will be
215 /// scheduled to promptly return. The actual state of the underlying stream
216 /// is not specified.
218 /// The write operation may return an error of type `ShortWrite` which
219 /// indicates that the object is known to have written an exact number of
220 /// bytes successfully during the timeout period, and the remaining bytes
221 /// were never written.
223 /// If the write operation returns `TimedOut`, then it the timeout primitive
224 /// does not know how many bytes were written as part of the timeout
225 /// operation. It may be the case that bytes continue to be written in an
226 /// asynchronous fashion after the call to write returns.
227 #[experimental = "the timeout argument may change in type and value"]
228 pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
229 self.inner.set_write_timeout(timeout_ms)
233 impl Clone for TcpStream {
234 /// Creates a new handle to this TCP stream, allowing for simultaneous reads
235 /// and writes of this connection.
237 /// The underlying TCP stream will not be closed until all handles to the
238 /// stream have been deallocated. All handles will also follow the same
239 /// stream, but two concurrent reads will not receive the same data.
240 /// Instead, the first read will receive the first packet received, and the
241 /// second read will receive the second packet.
242 fn clone(&self) -> TcpStream {
243 TcpStream { inner: self.inner.clone() }
247 impl Reader for TcpStream {
248 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
253 impl Writer for TcpStream {
254 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
255 self.inner.write(buf)
259 /// A structure representing a socket server. This listener is used to create a
260 /// `TcpAcceptor` which can be used to accept sockets on a local port.
267 /// # #![allow(dead_code)]
268 /// use std::io::{TcpListener, TcpStream};
269 /// use std::io::{Acceptor, Listener};
271 /// let listener = TcpListener::bind("127.0.0.1:80");
273 /// // bind the listener to the specified address
274 /// let mut acceptor = listener.listen();
276 /// fn handle_client(mut stream: TcpStream) {
278 /// # &mut stream; // silence unused mutability/variable warning
280 /// // accept connections and process them, spawning a new tasks for each one
281 /// for stream in acceptor.incoming() {
283 /// Err(e) => { /* connection failed */ }
284 /// Ok(stream) => spawn(proc() {
285 /// // connection succeeded
286 /// handle_client(stream)
291 /// // close the socket server
295 pub struct TcpListener {
296 inner: TcpListenerImp,
300 /// Creates a new `TcpListener` which will be bound to the specified address.
301 /// This listener is not ready for accepting connections, `listen` must be called
302 /// on it before that's possible.
304 /// Binding with a port number of 0 will request that the OS assigns a port
305 /// to this listener. The port allocated can be queried via the
306 /// `socket_name` function.
308 /// The address type can be any implementor of `ToSocketAddr` trait. See its
309 /// documentation for concrete examples.
310 pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<TcpListener> {
311 super::with_addresses(addr, |addr| {
312 TcpListenerImp::bind(addr).map(|inner| TcpListener { inner: inner })
316 /// Returns the local socket address of this listener.
317 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
318 self.inner.socket_name()
322 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
323 fn listen(self) -> IoResult<TcpAcceptor> {
324 self.inner.listen(128).map(|a| TcpAcceptor { inner: a })
328 /// The accepting half of a TCP socket server. This structure is created through
329 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
330 /// `TcpStream` instances.
331 pub struct TcpAcceptor {
332 inner: TcpAcceptorImp,
336 /// Prevents blocking on all future accepts after `ms` milliseconds have
339 /// This function is used to set a deadline after which this acceptor will
340 /// time out accepting any connections. The argument is the relative
341 /// distance, in milliseconds, to a point in the future after which all
342 /// accepts will fail.
344 /// If the argument specified is `None`, then any previously registered
345 /// timeout is cleared.
347 /// A timeout of `0` can be used to "poll" this acceptor to see if it has
348 /// any pending connections. All pending connections will be accepted,
349 /// regardless of whether the timeout has expired or not (the accept will
350 /// not block in this case).
355 /// # #![allow(experimental)]
356 /// use std::io::TcpListener;
357 /// use std::io::{Listener, Acceptor, TimedOut};
359 /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
361 /// // After 100ms have passed, all accepts will fail
362 /// a.set_timeout(Some(100));
364 /// match a.accept() {
365 /// Ok(..) => println!("accepted a socket"),
366 /// Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
367 /// Err(e) => println!("err: {}", e),
370 /// // Reset the timeout and try again
371 /// a.set_timeout(Some(100));
372 /// let socket = a.accept();
374 /// // Clear the timeout and block indefinitely waiting for a connection
375 /// a.set_timeout(None);
376 /// let socket = a.accept();
378 #[experimental = "the type of the argument and name of this function are \
380 pub fn set_timeout(&mut self, ms: Option<u64>) { self.inner.set_timeout(ms); }
382 /// Closes the accepting capabilities of this acceptor.
384 /// This function is similar to `TcpStream`'s `close_{read,write}` methods
385 /// in that it will affect *all* cloned handles of this acceptor's original
388 /// Once this function succeeds, all future calls to `accept` will return
389 /// immediately with an error, preventing all future calls to accept. The
390 /// underlying socket will not be relinquished back to the OS until all
391 /// acceptors have been deallocated.
393 /// This is useful for waking up a thread in an accept loop to indicate that
399 /// # #![allow(experimental)]
400 /// use std::io::{TcpListener, Listener, Acceptor, EndOfFile};
402 /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
403 /// let a2 = a.clone();
407 /// for socket in a2.incoming() {
409 /// Ok(s) => { /* handle s */ }
410 /// Err(ref e) if e.kind == EndOfFile => break, // closed
411 /// Err(e) => panic!("unexpected error: {}", e),
416 /// # fn wait_for_sigint() {}
417 /// // Now that our accept loop is running, wait for the program to be
418 /// // requested to exit.
419 /// wait_for_sigint();
421 /// // Signal our accept loop to exit
422 /// assert!(a.close_accept().is_ok());
425 pub fn close_accept(&mut self) -> IoResult<()> {
426 self.inner.close_accept()
430 impl Acceptor<TcpStream> for TcpAcceptor {
431 fn accept(&mut self) -> IoResult<TcpStream> {
432 self.inner.accept().map(TcpStream::new)
436 impl Clone for TcpAcceptor {
437 /// Creates a new handle to this TCP acceptor, allowing for simultaneous
440 /// The underlying TCP acceptor will not be closed until all handles to the
441 /// acceptor have been deallocated. Incoming connections will be received on
442 /// at most once acceptor, the same connection will not be accepted twice.
444 /// The `close_accept` method will shut down *all* acceptors cloned from the
445 /// same original acceptor, whereas the `set_timeout` method only affects
446 /// the selector that it is called on.
448 /// This function is useful for creating a handle to invoke `close_accept`
449 /// on to wake up any other task blocked in `accept`.
450 fn clone(&self) -> TcpAcceptor {
451 TcpAcceptor { inner: self.inner.clone() }
456 #[allow(experimental)]
464 // FIXME #11530 this fails on android because tests are run as root
465 #[cfg_attr(any(windows, target_os = "android"), ignore)]
468 match TcpListener::bind("0.0.0.0:1") {
470 Err(e) => assert_eq!(e.kind, PermissionDenied),
476 match TcpStream::connect("0.0.0.0:1") {
478 Err(e) => assert_eq!(e.kind, ConnectionRefused),
483 fn listen_ip4_localhost() {
484 let socket_addr = next_test_ip4();
485 let listener = TcpListener::bind(socket_addr);
486 let mut acceptor = listener.listen();
489 let mut stream = TcpStream::connect(("localhost", socket_addr.port));
490 stream.write([144]).unwrap();
493 let mut stream = acceptor.accept();
495 stream.read(buf).unwrap();
496 assert!(buf[0] == 144);
500 fn connect_localhost() {
501 let addr = next_test_ip4();
502 let mut acceptor = TcpListener::bind(addr).listen();
505 let mut stream = TcpStream::connect(("localhost", addr.port));
506 stream.write([64]).unwrap();
509 let mut stream = acceptor.accept();
511 stream.read(buf).unwrap();
512 assert!(buf[0] == 64);
516 fn connect_ip4_loopback() {
517 let addr = next_test_ip4();
518 let mut acceptor = TcpListener::bind(addr).listen();
521 let mut stream = TcpStream::connect(("127.0.0.1", addr.port));
522 stream.write([44]).unwrap();
525 let mut stream = acceptor.accept();
527 stream.read(buf).unwrap();
528 assert!(buf[0] == 44);
532 fn connect_ip6_loopback() {
533 let addr = next_test_ip6();
534 let mut acceptor = TcpListener::bind(addr).listen();
537 let mut stream = TcpStream::connect(("::1", addr.port));
538 stream.write([66]).unwrap();
541 let mut stream = acceptor.accept();
543 stream.read(buf).unwrap();
544 assert!(buf[0] == 66);
548 fn smoke_test_ip4() {
549 let addr = next_test_ip4();
550 let mut acceptor = TcpListener::bind(addr).listen();
553 let mut stream = TcpStream::connect(addr);
554 stream.write([99]).unwrap();
557 let mut stream = acceptor.accept();
559 stream.read(buf).unwrap();
560 assert!(buf[0] == 99);
564 fn smoke_test_ip6() {
565 let addr = next_test_ip6();
566 let mut acceptor = TcpListener::bind(addr).listen();
569 let mut stream = TcpStream::connect(addr);
570 stream.write([99]).unwrap();
573 let mut stream = acceptor.accept();
575 stream.read(buf).unwrap();
576 assert!(buf[0] == 99);
581 let addr = next_test_ip4();
582 let mut acceptor = TcpListener::bind(addr).listen();
585 let _stream = TcpStream::connect(addr);
589 let mut stream = acceptor.accept();
591 let nread = stream.read(buf);
592 assert!(nread.is_err());
597 let addr = next_test_ip6();
598 let mut acceptor = TcpListener::bind(addr).listen();
601 let _stream = TcpStream::connect(addr);
605 let mut stream = acceptor.accept();
607 let nread = stream.read(buf);
608 assert!(nread.is_err());
612 fn read_eof_twice_ip4() {
613 let addr = next_test_ip4();
614 let mut acceptor = TcpListener::bind(addr).listen();
617 let _stream = TcpStream::connect(addr);
621 let mut stream = acceptor.accept();
623 let nread = stream.read(buf);
624 assert!(nread.is_err());
626 match stream.read(buf) {
629 assert!(e.kind == NotConnected || e.kind == EndOfFile,
630 "unknown kind: {}", e.kind);
636 fn read_eof_twice_ip6() {
637 let addr = next_test_ip6();
638 let mut acceptor = TcpListener::bind(addr).listen();
641 let _stream = TcpStream::connect(addr);
645 let mut stream = acceptor.accept();
647 let nread = stream.read(buf);
648 assert!(nread.is_err());
650 match stream.read(buf) {
653 assert!(e.kind == NotConnected || e.kind == EndOfFile,
654 "unknown kind: {}", e.kind);
660 fn write_close_ip4() {
661 let addr = next_test_ip4();
662 let mut acceptor = TcpListener::bind(addr).listen();
664 let (tx, rx) = channel();
666 drop(TcpStream::connect(addr));
670 let mut stream = acceptor.accept();
673 match stream.write(buf) {
676 assert!(e.kind == ConnectionReset ||
677 e.kind == BrokenPipe ||
678 e.kind == ConnectionAborted,
679 "unknown error: {}", e);
685 fn write_close_ip6() {
686 let addr = next_test_ip6();
687 let mut acceptor = TcpListener::bind(addr).listen();
689 let (tx, rx) = channel();
691 drop(TcpStream::connect(addr));
695 let mut stream = acceptor.accept();
698 match stream.write(buf) {
701 assert!(e.kind == ConnectionReset ||
702 e.kind == BrokenPipe ||
703 e.kind == ConnectionAborted,
704 "unknown error: {}", e);
710 fn multiple_connect_serial_ip4() {
711 let addr = next_test_ip4();
713 let mut acceptor = TcpListener::bind(addr).listen();
716 for _ in range(0, max) {
717 let mut stream = TcpStream::connect(addr);
718 stream.write([99]).unwrap();
722 for ref mut stream in acceptor.incoming().take(max) {
724 stream.read(buf).unwrap();
725 assert_eq!(buf[0], 99);
730 fn multiple_connect_serial_ip6() {
731 let addr = next_test_ip6();
733 let mut acceptor = TcpListener::bind(addr).listen();
736 for _ in range(0, max) {
737 let mut stream = TcpStream::connect(addr);
738 stream.write([99]).unwrap();
742 for ref mut stream in acceptor.incoming().take(max) {
744 stream.read(buf).unwrap();
745 assert_eq!(buf[0], 99);
750 fn multiple_connect_interleaved_greedy_schedule_ip4() {
751 let addr = next_test_ip4();
752 static MAX: int = 10;
753 let acceptor = TcpListener::bind(addr).listen();
756 let mut acceptor = acceptor;
757 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
758 // Start another task to handle the connection
760 let mut stream = stream;
762 stream.read(buf).unwrap();
763 assert!(buf[0] == i as u8);
771 fn connect(i: int, addr: SocketAddr) {
772 if i == MAX { return }
775 debug!("connecting");
776 let mut stream = TcpStream::connect(addr);
777 // Connect again before writing
778 connect(i + 1, addr);
780 stream.write([i as u8]).unwrap();
786 fn multiple_connect_interleaved_greedy_schedule_ip6() {
787 let addr = next_test_ip6();
788 static MAX: int = 10;
789 let acceptor = TcpListener::bind(addr).listen();
792 let mut acceptor = acceptor;
793 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
794 // Start another task to handle the connection
796 let mut stream = stream;
798 stream.read(buf).unwrap();
799 assert!(buf[0] == i as u8);
807 fn connect(i: int, addr: SocketAddr) {
808 if i == MAX { return }
811 debug!("connecting");
812 let mut stream = TcpStream::connect(addr);
813 // Connect again before writing
814 connect(i + 1, addr);
816 stream.write([i as u8]).unwrap();
822 fn multiple_connect_interleaved_lazy_schedule_ip4() {
823 static MAX: int = 10;
824 let addr = next_test_ip4();
825 let acceptor = TcpListener::bind(addr).listen();
828 let mut acceptor = acceptor;
829 for stream in acceptor.incoming().take(MAX as uint) {
830 // Start another task to handle the connection
832 let mut stream = stream;
834 stream.read(buf).unwrap();
835 assert!(buf[0] == 99);
843 fn connect(i: int, addr: SocketAddr) {
844 if i == MAX { return }
847 debug!("connecting");
848 let mut stream = TcpStream::connect(addr);
849 // Connect again before writing
850 connect(i + 1, addr);
852 stream.write([99]).unwrap();
858 fn multiple_connect_interleaved_lazy_schedule_ip6() {
859 static MAX: int = 10;
860 let addr = next_test_ip6();
861 let acceptor = TcpListener::bind(addr).listen();
864 let mut acceptor = acceptor;
865 for stream in acceptor.incoming().take(MAX as uint) {
866 // Start another task to handle the connection
868 let mut stream = stream;
870 stream.read(buf).unwrap();
871 assert!(buf[0] == 99);
879 fn connect(i: int, addr: SocketAddr) {
880 if i == MAX { return }
883 debug!("connecting");
884 let mut stream = TcpStream::connect(addr);
885 // Connect again before writing
886 connect(i + 1, addr);
888 stream.write([99]).unwrap();
893 pub fn socket_name(addr: SocketAddr) {
894 let mut listener = TcpListener::bind(addr).unwrap();
896 // Make sure socket_name gives
897 // us the socket we binded to.
898 let so_name = listener.socket_name();
899 assert!(so_name.is_ok());
900 assert_eq!(addr, so_name.unwrap());
903 pub fn peer_name(addr: SocketAddr) {
904 let acceptor = TcpListener::bind(addr).listen();
906 let mut acceptor = acceptor;
907 acceptor.accept().unwrap();
910 let stream = TcpStream::connect(addr);
912 assert!(stream.is_ok());
913 let mut stream = stream.unwrap();
915 // Make sure peer_name gives us the
916 // address/port of the peer we've
918 let peer_name = stream.peer_name();
919 assert!(peer_name.is_ok());
920 assert_eq!(addr, peer_name.unwrap());
924 fn socket_and_peer_name_ip4() {
925 peer_name(next_test_ip4());
926 socket_name(next_test_ip4());
930 fn socket_and_peer_name_ip6() {
931 // FIXME: peer name is not consistent
932 //peer_name(next_test_ip6());
933 socket_name(next_test_ip6());
938 let addr = next_test_ip4();
939 let (tx, rx) = channel();
941 let mut srv = TcpListener::bind(addr).listen().unwrap();
943 let mut cl = srv.accept().unwrap();
944 cl.write([10]).unwrap();
951 let mut c = TcpStream::connect(addr).unwrap();
952 let mut b = [0, ..10];
953 assert_eq!(c.read(b), Ok(1));
954 c.write([1]).unwrap();
960 let addr = next_test_ip4();
961 let listener = TcpListener::bind(addr).unwrap().listen();
962 assert!(listener.is_ok());
963 match TcpListener::bind(addr).listen() {
966 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
967 "unknown error: {} {}", e, e.kind);
974 let addr = next_test_ip4();
975 let (tx, rx) = channel();
979 let _stream = TcpStream::connect(addr).unwrap();
985 let mut acceptor = TcpListener::bind(addr).listen();
988 let _stream = acceptor.accept().unwrap();
994 let _listener = TcpListener::bind(addr);
998 fn tcp_clone_smoke() {
999 let addr = next_test_ip4();
1000 let mut acceptor = TcpListener::bind(addr).listen();
1003 let mut s = TcpStream::connect(addr);
1004 let mut buf = [0, 0];
1005 assert_eq!(s.read(buf), Ok(1));
1006 assert_eq!(buf[0], 1);
1007 s.write([2]).unwrap();
1010 let mut s1 = acceptor.accept().unwrap();
1011 let s2 = s1.clone();
1013 let (tx1, rx1) = channel();
1014 let (tx2, rx2) = channel();
1018 s2.write([1]).unwrap();
1022 let mut buf = [0, 0];
1023 assert_eq!(s1.read(buf), Ok(1));
1028 fn tcp_clone_two_read() {
1029 let addr = next_test_ip6();
1030 let mut acceptor = TcpListener::bind(addr).listen();
1031 let (tx1, rx) = channel();
1032 let tx2 = tx1.clone();
1035 let mut s = TcpStream::connect(addr);
1036 s.write([1]).unwrap();
1038 s.write([2]).unwrap();
1042 let mut s1 = acceptor.accept().unwrap();
1043 let s2 = s1.clone();
1045 let (done, rx) = channel();
1048 let mut buf = [0, 0];
1049 s2.read(buf).unwrap();
1053 let mut buf = [0, 0];
1054 s1.read(buf).unwrap();
1061 fn tcp_clone_two_write() {
1062 let addr = next_test_ip4();
1063 let mut acceptor = TcpListener::bind(addr).listen();
1066 let mut s = TcpStream::connect(addr);
1067 let mut buf = [0, 1];
1068 s.read(buf).unwrap();
1069 s.read(buf).unwrap();
1072 let mut s1 = acceptor.accept().unwrap();
1073 let s2 = s1.clone();
1075 let (done, rx) = channel();
1078 s2.write([1]).unwrap();
1081 s1.write([2]).unwrap();
1087 fn shutdown_smoke() {
1088 let addr = next_test_ip4();
1089 let a = TcpListener::bind(addr).unwrap().listen();
1092 let mut c = a.accept().unwrap();
1093 assert_eq!(c.read_to_end(), Ok(vec!()));
1094 c.write([1]).unwrap();
1097 let mut s = TcpStream::connect(addr).unwrap();
1098 assert!(s.inner.close_write().is_ok());
1099 assert!(s.write([1]).is_err());
1100 assert_eq!(s.read_to_end(), Ok(vec!(1)));
1104 fn accept_timeout() {
1105 let addr = next_test_ip4();
1106 let mut a = TcpListener::bind(addr).unwrap().listen().unwrap();
1108 a.set_timeout(Some(10));
1110 // Make sure we time out once and future invocations also time out
1111 let err = a.accept().err().unwrap();
1112 assert_eq!(err.kind, TimedOut);
1113 let err = a.accept().err().unwrap();
1114 assert_eq!(err.kind, TimedOut);
1116 // Also make sure that even though the timeout is expired that we will
1117 // continue to receive any pending connections.
1119 // FIXME: freebsd apparently never sees the pending connection, but
1120 // testing manually always works. Need to investigate this
1122 if !cfg!(target_os = "freebsd") {
1123 let (tx, rx) = channel();
1125 tx.send(TcpStream::connect(addr).unwrap());
1128 for i in range(0i, 1001) {
1131 Err(ref e) if e.kind == TimedOut => {}
1132 Err(e) => panic!("error: {}", e),
1134 ::task::deschedule();
1135 if i == 1000 { panic!("should have a pending connection") }
1139 // Unset the timeout and make sure that this always blocks.
1140 a.set_timeout(None);
1142 drop(TcpStream::connect(addr).unwrap());
1144 a.accept().unwrap();
1148 fn close_readwrite_smoke() {
1149 let addr = next_test_ip4();
1150 let a = TcpListener::bind(addr).listen().unwrap();
1151 let (_tx, rx) = channel::<()>();
1154 let _s = a.accept().unwrap();
1155 let _ = rx.recv_opt();
1159 let mut s = TcpStream::connect(addr).unwrap();
1160 let mut s2 = s.clone();
1162 // closing should prevent reads/writes
1163 s.close_write().unwrap();
1164 assert!(s.write([0]).is_err());
1165 s.close_read().unwrap();
1166 assert!(s.read(b).is_err());
1168 // closing should affect previous handles
1169 assert!(s2.write([0]).is_err());
1170 assert!(s2.read(b).is_err());
1172 // closing should affect new handles
1173 let mut s3 = s.clone();
1174 assert!(s3.write([0]).is_err());
1175 assert!(s3.read(b).is_err());
1177 // make sure these don't die
1178 let _ = s2.close_read();
1179 let _ = s2.close_write();
1180 let _ = s3.close_read();
1181 let _ = s3.close_write();
1185 fn close_read_wakes_up() {
1186 let addr = next_test_ip4();
1187 let a = TcpListener::bind(addr).listen().unwrap();
1188 let (_tx, rx) = channel::<()>();
1191 let _s = a.accept().unwrap();
1192 let _ = rx.recv_opt();
1195 let mut s = TcpStream::connect(addr).unwrap();
1197 let (tx, rx) = channel();
1200 assert!(s2.read([0]).is_err());
1203 // this should wake up the child task
1204 s.close_read().unwrap();
1206 // this test will never finish if the child doesn't wake up
1211 fn readwrite_timeouts() {
1212 let addr = next_test_ip6();
1213 let mut a = TcpListener::bind(addr).listen().unwrap();
1214 let (tx, rx) = channel::<()>();
1216 let mut s = TcpStream::connect(addr).unwrap();
1218 assert!(s.write([0]).is_ok());
1219 let _ = rx.recv_opt();
1222 let mut s = a.accept().unwrap();
1223 s.set_timeout(Some(20));
1224 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1225 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1227 s.set_timeout(Some(20));
1228 for i in range(0i, 1001) {
1229 match s.write([0, .. 128 * 1024]) {
1230 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1231 Err(IoError { kind: TimedOut, .. }) => break,
1232 Err(e) => panic!("{}", e),
1234 if i == 1000 { panic!("should have filled up?!"); }
1236 assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1239 s.set_timeout(None);
1240 assert_eq!(s.read([0, 0]), Ok(1));
1244 fn read_timeouts() {
1245 let addr = next_test_ip6();
1246 let mut a = TcpListener::bind(addr).listen().unwrap();
1247 let (tx, rx) = channel::<()>();
1249 let mut s = TcpStream::connect(addr).unwrap();
1252 while amt < 100 * 128 * 1024 {
1253 match s.read([0, ..128 * 1024]) {
1254 Ok(n) => { amt += n; }
1255 Err(e) => panic!("{}", e),
1258 let _ = rx.recv_opt();
1261 let mut s = a.accept().unwrap();
1262 s.set_read_timeout(Some(20));
1263 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1264 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1267 for _ in range(0i, 100) {
1268 assert!(s.write([0, ..128 * 1024]).is_ok());
1273 fn write_timeouts() {
1274 let addr = next_test_ip6();
1275 let mut a = TcpListener::bind(addr).listen().unwrap();
1276 let (tx, rx) = channel::<()>();
1278 let mut s = TcpStream::connect(addr).unwrap();
1280 assert!(s.write([0]).is_ok());
1281 let _ = rx.recv_opt();
1284 let mut s = a.accept().unwrap();
1285 s.set_write_timeout(Some(20));
1286 for i in range(0i, 1001) {
1287 match s.write([0, .. 128 * 1024]) {
1288 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1289 Err(IoError { kind: TimedOut, .. }) => break,
1290 Err(e) => panic!("{}", e),
1292 if i == 1000 { panic!("should have filled up?!"); }
1294 assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1297 assert!(s.read([0]).is_ok());
1301 fn timeout_concurrent_read() {
1302 let addr = next_test_ip6();
1303 let mut a = TcpListener::bind(addr).listen().unwrap();
1304 let (tx, rx) = channel::<()>();
1306 let mut s = TcpStream::connect(addr).unwrap();
1308 assert_eq!(s.write([0]), Ok(()));
1309 let _ = rx.recv_opt();
1312 let mut s = a.accept().unwrap();
1314 let (tx2, rx2) = channel();
1317 assert_eq!(s2.read([0]), Ok(1));
1321 s.set_read_timeout(Some(20));
1322 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1329 fn clone_while_reading() {
1330 let addr = next_test_ip6();
1331 let listen = TcpListener::bind(addr);
1332 let mut accept = listen.listen().unwrap();
1334 // Enqueue a task to write to a socket
1335 let (tx, rx) = channel();
1336 let (txdone, rxdone) = channel();
1337 let txdone2 = txdone.clone();
1339 let mut tcp = TcpStream::connect(addr).unwrap();
1341 tcp.write_u8(0).unwrap();
1345 // Spawn off a reading clone
1346 let tcp = accept.accept().unwrap();
1347 let tcp2 = tcp.clone();
1348 let txdone3 = txdone.clone();
1350 let mut tcp2 = tcp2;
1351 tcp2.read_u8().unwrap();
1355 // Try to ensure that the reading clone is indeed reading
1356 for _ in range(0i, 50) {
1357 ::task::deschedule();
1360 // clone the handle again while it's reading, then let it finish the
1362 let _ = tcp.clone();
1369 fn clone_accept_smoke() {
1370 let addr = next_test_ip4();
1371 let l = TcpListener::bind(addr);
1372 let mut a = l.listen().unwrap();
1373 let mut a2 = a.clone();
1376 let _ = TcpStream::connect(addr);
1379 let _ = TcpStream::connect(addr);
1382 assert!(a.accept().is_ok());
1383 assert!(a2.accept().is_ok());
1387 fn clone_accept_concurrent() {
1388 let addr = next_test_ip4();
1389 let l = TcpListener::bind(addr);
1390 let a = l.listen().unwrap();
1393 let (tx, rx) = channel();
1394 let tx2 = tx.clone();
1396 spawn(proc() { let mut a = a; tx.send(a.accept()) });
1397 spawn(proc() { let mut a = a2; tx2.send(a.accept()) });
1400 let _ = TcpStream::connect(addr);
1403 let _ = TcpStream::connect(addr);
1406 assert!(rx.recv().is_ok());
1407 assert!(rx.recv().is_ok());
1411 fn close_accept_smoke() {
1412 let addr = next_test_ip4();
1413 let l = TcpListener::bind(addr);
1414 let mut a = l.listen().unwrap();
1416 a.close_accept().unwrap();
1417 assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
1421 fn close_accept_concurrent() {
1422 let addr = next_test_ip4();
1423 let l = TcpListener::bind(addr);
1424 let a = l.listen().unwrap();
1425 let mut a2 = a.clone();
1427 let (tx, rx) = channel();
1430 tx.send(a.accept());
1432 a2.close_accept().unwrap();
1434 assert_eq!(rx.recv().err().unwrap().kind, EndOfFile);