1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! TCP network connections
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
22 use result::Result::Err;
23 use old_io::net::ip::{SocketAddr, ToSocketAddr};
24 use old_io::{Reader, Writer, Listener, Acceptor};
25 use old_io::{standard_error, TimedOut};
27 use option::Option::{None, Some};
30 use sys::tcp::TcpStream as TcpStreamImp;
31 use sys::tcp::TcpListener as TcpListenerImp;
32 use sys::tcp::TcpAcceptor as TcpAcceptorImp;
36 /// A structure which represents a TCP stream between a local socket and a
39 /// The socket will be closed when the value is dropped.
44 /// use std::old_io::TcpStream;
47 /// let mut stream = TcpStream::connect("127.0.0.1:34254");
49 /// // ignore the Result
50 /// let _ = stream.write(&[1]);
52 /// let mut buf = [0];
53 /// let _ = stream.read(&mut buf); // ignore here too
54 /// } // the stream is closed here
56 pub struct TcpStream {
61 fn new(s: TcpStreamImp) -> TcpStream {
62 TcpStream { inner: s }
65 /// Open a TCP connection to a remote host.
67 /// `addr` is an address of the remote host. Anything which implements `ToSocketAddr`
68 /// trait can be supplied for the address; see this trait documentation for
69 /// concrete examples.
70 pub fn connect<A: ToSocketAddr>(addr: A) -> IoResult<TcpStream> {
71 super::with_addresses(addr, |addr| {
72 TcpStreamImp::connect(addr, None).map(TcpStream::new)
76 /// Creates a TCP connection to a remote socket address, timing out after
77 /// the specified duration.
79 /// This is the same as the `connect` method, except that if the timeout
80 /// specified elapses before a connection is made an error will be
81 /// returned. The error's kind will be `TimedOut`.
83 /// Same as the `connect` method, `addr` argument type can be anything which
84 /// implements `ToSocketAddr` trait.
86 /// If a `timeout` with zero or negative duration is specified then
87 /// the function returns `Err`, with the error kind set to `TimedOut`.
88 #[unstable(feature = "io",
89 reason = "the timeout argument may eventually change types")]
90 pub fn connect_timeout<A: ToSocketAddr>(addr: A,
91 timeout: Duration) -> IoResult<TcpStream> {
92 if timeout <= Duration::milliseconds(0) {
93 return Err(standard_error(TimedOut));
96 super::with_addresses(addr, |addr| {
97 TcpStreamImp::connect(addr, Some(timeout.num_milliseconds() as u64))
102 /// Returns the socket address of the remote peer of this TCP connection.
103 pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
104 self.inner.peer_name()
107 /// Returns the socket address of the local half of this TCP connection.
108 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
109 self.inner.socket_name()
112 /// Sets the nodelay flag on this connection to the boolean specified
113 #[unstable(feature = "io")]
114 pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
115 self.inner.set_nodelay(nodelay)
118 /// Sets the keepalive timeout to the timeout specified.
120 /// If the value specified is `None`, then the keepalive flag is cleared on
121 /// this connection. Otherwise, the keepalive timeout will be set to the
122 /// specified time, in seconds.
123 #[unstable(feature = "io")]
124 pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
125 self.inner.set_keepalive(delay_in_seconds)
128 /// Closes the reading half of this connection.
130 /// This method will close the reading portion of this connection, causing
131 /// all pending and future reads to immediately return with an error.
136 /// # #![allow(unused_must_use)]
137 /// use std::old_io::timer;
138 /// use std::old_io::TcpStream;
139 /// use std::time::Duration;
140 /// use std::thread::Thread;
142 /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
143 /// let stream2 = stream.clone();
145 /// let _t = Thread::spawn(move|| {
146 /// // close this stream after one second
147 /// timer::sleep(Duration::seconds(1));
148 /// let mut stream = stream2;
149 /// stream.close_read();
152 /// // wait for some data, will get canceled after one second
153 /// let mut buf = [0];
154 /// stream.read(&mut buf);
157 /// Note that this method affects all cloned handles associated with this
158 /// stream, not just this one handle.
159 pub fn close_read(&mut self) -> IoResult<()> {
160 self.inner.close_read()
163 /// Closes the writing half of this connection.
165 /// This method will close the writing portion of this connection, causing
166 /// all future writes to immediately return with an error.
168 /// Note that this method affects all cloned handles associated with this
169 /// stream, not just this one handle.
170 pub fn close_write(&mut self) -> IoResult<()> {
171 self.inner.close_write()
174 /// Sets a timeout, in milliseconds, for blocking operations on this stream.
176 /// This function will set a timeout for all blocking operations (including
177 /// reads and writes) on this stream. The timeout specified is a relative
178 /// time, in milliseconds, into the future after which point operations will
179 /// time out. This means that the timeout must be reset periodically to keep
180 /// it from expiring. Specifying a value of `None` will clear the timeout
183 /// The timeout on this stream is local to this stream only. Setting a
184 /// timeout does not affect any other cloned instances of this stream, nor
185 /// does the timeout propagated to cloned handles of this stream. Setting
186 /// this timeout will override any specific read or write timeouts
187 /// previously set for this stream.
189 /// For clarification on the semantics of interrupting a read and a write,
190 /// take a look at `set_read_timeout` and `set_write_timeout`.
191 #[unstable(feature = "io",
192 reason = "the timeout argument may change in type and value")]
193 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
194 self.inner.set_timeout(timeout_ms)
197 /// Sets the timeout for read operations on this stream.
199 /// See documentation in `set_timeout` for the semantics of this read time.
200 /// This will overwrite any previous read timeout set through either this
201 /// function or `set_timeout`.
205 /// When this timeout expires, if there is no pending read operation, no
206 /// action is taken. Otherwise, the read operation will be scheduled to
207 /// promptly return. If a timeout error is returned, then no data was read
208 /// during the timeout period.
209 #[unstable(feature = "io",
210 reason = "the timeout argument may change in type and value")]
211 pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
212 self.inner.set_read_timeout(timeout_ms)
215 /// Sets the timeout for write operations on this stream.
217 /// See documentation in `set_timeout` for the semantics of this write time.
218 /// This will overwrite any previous write timeout set through either this
219 /// function or `set_timeout`.
223 /// When this timeout expires, if there is no pending write operation, no
224 /// action is taken. Otherwise, the pending write operation will be
225 /// scheduled to promptly return. The actual state of the underlying stream
226 /// is not specified.
228 /// The write operation may return an error of type `ShortWrite` which
229 /// indicates that the object is known to have written an exact number of
230 /// bytes successfully during the timeout period, and the remaining bytes
231 /// were never written.
233 /// If the write operation returns `TimedOut`, then it the timeout primitive
234 /// does not know how many bytes were written as part of the timeout
235 /// operation. It may be the case that bytes continue to be written in an
236 /// asynchronous fashion after the call to write returns.
237 #[unstable(feature = "io",
238 reason = "the timeout argument may change in type and value")]
239 pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
240 self.inner.set_write_timeout(timeout_ms)
244 impl Clone for TcpStream {
245 /// Creates a new handle to this TCP stream, allowing for simultaneous reads
246 /// and writes of this connection.
248 /// The underlying TCP stream will not be closed until all handles to the
249 /// stream have been deallocated. All handles will also follow the same
250 /// stream, but two concurrent reads will not receive the same data.
251 /// Instead, the first read will receive the first packet received, and the
252 /// second read will receive the second packet.
253 fn clone(&self) -> TcpStream {
254 TcpStream { inner: self.inner.clone() }
258 impl Reader for TcpStream {
259 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
264 impl Writer for TcpStream {
265 fn write_all(&mut self, buf: &[u8]) -> IoResult<()> {
266 self.inner.write(buf)
270 impl sys_common::AsInner<TcpStreamImp> for TcpStream {
271 fn as_inner(&self) -> &TcpStreamImp {
276 /// A structure representing a socket server. This listener is used to create a
277 /// `TcpAcceptor` which can be used to accept sockets on a local port.
283 /// use std::old_io::{TcpListener, TcpStream};
284 /// use std::old_io::{Acceptor, Listener};
285 /// use std::thread::Thread;
287 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
289 /// // bind the listener to the specified address
290 /// let mut acceptor = listener.listen().unwrap();
292 /// fn handle_client(mut stream: TcpStream) {
294 /// # &mut stream; // silence unused mutability/variable warning
296 /// // accept connections and process them, spawning a new tasks for each one
297 /// for stream in acceptor.incoming() {
299 /// Err(e) => { /* connection failed */ }
301 /// Thread::spawn(move|| {
302 /// // connection succeeded
303 /// handle_client(stream)
309 /// // close the socket server
313 pub struct TcpListener {
314 inner: TcpListenerImp,
318 /// Creates a new `TcpListener` which will be bound to the specified address.
319 /// This listener is not ready for accepting connections, `listen` must be called
320 /// on it before that's possible.
322 /// Binding with a port number of 0 will request that the OS assigns a port
323 /// to this listener. The port allocated can be queried via the
324 /// `socket_name` function.
326 /// The address type can be any implementer of `ToSocketAddr` trait. See its
327 /// documentation for concrete examples.
328 pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<TcpListener> {
329 super::with_addresses(addr, |addr| {
330 TcpListenerImp::bind(addr).map(|inner| TcpListener { inner: inner })
334 /// Returns the local socket address of this listener.
335 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
336 self.inner.socket_name()
340 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
341 fn listen(self) -> IoResult<TcpAcceptor> {
342 self.inner.listen(128).map(|a| TcpAcceptor { inner: a })
346 impl sys_common::AsInner<TcpListenerImp> for TcpListener {
347 fn as_inner(&self) -> &TcpListenerImp {
352 /// The accepting half of a TCP socket server. This structure is created through
353 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
354 /// `TcpStream` instances.
355 pub struct TcpAcceptor {
356 inner: TcpAcceptorImp,
360 /// Prevents blocking on all future accepts after `ms` milliseconds have
363 /// This function is used to set a deadline after which this acceptor will
364 /// time out accepting any connections. The argument is the relative
365 /// distance, in milliseconds, to a point in the future after which all
366 /// accepts will fail.
368 /// If the argument specified is `None`, then any previously registered
369 /// timeout is cleared.
371 /// A timeout of `0` can be used to "poll" this acceptor to see if it has
372 /// any pending connections. All pending connections will be accepted,
373 /// regardless of whether the timeout has expired or not (the accept will
374 /// not block in this case).
379 /// use std::old_io::TcpListener;
380 /// use std::old_io::{Listener, Acceptor, TimedOut};
382 /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
384 /// // After 100ms have passed, all accepts will fail
385 /// a.set_timeout(Some(100));
387 /// match a.accept() {
388 /// Ok(..) => println!("accepted a socket"),
389 /// Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
390 /// Err(e) => println!("err: {}", e),
393 /// // Reset the timeout and try again
394 /// a.set_timeout(Some(100));
395 /// let socket = a.accept();
397 /// // Clear the timeout and block indefinitely waiting for a connection
398 /// a.set_timeout(None);
399 /// let socket = a.accept();
401 #[unstable(feature = "io",
402 reason = "the type of the argument and name of this function are \
404 pub fn set_timeout(&mut self, ms: Option<u64>) { self.inner.set_timeout(ms); }
406 /// Closes the accepting capabilities of this acceptor.
408 /// This function is similar to `TcpStream`'s `close_{read,write}` methods
409 /// in that it will affect *all* cloned handles of this acceptor's original
412 /// Once this function succeeds, all future calls to `accept` will return
413 /// immediately with an error, preventing all future calls to accept. The
414 /// underlying socket will not be relinquished back to the OS until all
415 /// acceptors have been deallocated.
417 /// This is useful for waking up a thread in an accept loop to indicate that
423 /// use std::old_io::{TcpListener, Listener, Acceptor, EndOfFile};
424 /// use std::thread::Thread;
426 /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
427 /// let a2 = a.clone();
429 /// let _t = Thread::spawn(move|| {
431 /// for socket in a2.incoming() {
433 /// Ok(s) => { /* handle s */ }
434 /// Err(ref e) if e.kind == EndOfFile => break, // closed
435 /// Err(e) => panic!("unexpected error: {}", e),
440 /// # fn wait_for_sigint() {}
441 /// // Now that our accept loop is running, wait for the program to be
442 /// // requested to exit.
443 /// wait_for_sigint();
445 /// // Signal our accept loop to exit
446 /// assert!(a.close_accept().is_ok());
448 #[unstable(feature = "io")]
449 pub fn close_accept(&mut self) -> IoResult<()> {
450 self.inner.close_accept()
454 impl Acceptor<TcpStream> for TcpAcceptor {
455 fn accept(&mut self) -> IoResult<TcpStream> {
456 self.inner.accept().map(TcpStream::new)
460 impl Clone for TcpAcceptor {
461 /// Creates a new handle to this TCP acceptor, allowing for simultaneous
464 /// The underlying TCP acceptor will not be closed until all handles to the
465 /// acceptor have been deallocated. Incoming connections will be received on
466 /// at most once acceptor, the same connection will not be accepted twice.
468 /// The `close_accept` method will shut down *all* acceptors cloned from the
469 /// same original acceptor, whereas the `set_timeout` method only affects
470 /// the selector that it is called on.
472 /// This function is useful for creating a handle to invoke `close_accept`
473 /// on to wake up any other task blocked in `accept`.
474 fn clone(&self) -> TcpAcceptor {
475 TcpAcceptor { inner: self.inner.clone() }
479 impl sys_common::AsInner<TcpAcceptorImp> for TcpAcceptor {
480 fn as_inner(&self) -> &TcpAcceptorImp {
489 use sync::mpsc::channel;
491 use old_io::net::tcp::*;
492 use old_io::net::ip::*;
494 use old_io::{EndOfFile, TimedOut, ShortWrite, IoError};
495 use old_io::{ConnectionRefused, BrokenPipe, ConnectionAborted};
496 use old_io::{ConnectionReset, NotConnected, PermissionDenied, OtherIoError};
497 use old_io::{Acceptor, Listener};
499 // FIXME #11530 this fails on android because tests are run as root
500 #[cfg_attr(any(windows, target_os = "android"), ignore)]
503 match TcpListener::bind("0.0.0.0:1") {
505 Err(e) => assert_eq!(e.kind, PermissionDenied),
511 match TcpStream::connect("0.0.0.0:1") {
513 Err(e) => assert_eq!(e.kind, ConnectionRefused),
518 fn listen_ip4_localhost() {
519 let socket_addr = next_test_ip4();
520 let listener = TcpListener::bind(socket_addr);
521 let mut acceptor = listener.listen();
523 let _t = Thread::spawn(move|| {
524 let mut stream = TcpStream::connect(("localhost", socket_addr.port));
525 stream.write(&[144]).unwrap();
528 let mut stream = acceptor.accept();
530 stream.read(&mut buf).unwrap();
531 assert!(buf[0] == 144);
535 fn connect_localhost() {
536 let addr = next_test_ip4();
537 let mut acceptor = TcpListener::bind(addr).listen();
539 let _t = Thread::spawn(move|| {
540 let mut stream = TcpStream::connect(("localhost", addr.port));
541 stream.write(&[64]).unwrap();
544 let mut stream = acceptor.accept();
546 stream.read(&mut buf).unwrap();
547 assert!(buf[0] == 64);
551 fn connect_ip4_loopback() {
552 let addr = next_test_ip4();
553 let mut acceptor = TcpListener::bind(addr).listen();
555 let _t = Thread::spawn(move|| {
556 let mut stream = TcpStream::connect(("127.0.0.1", addr.port));
557 stream.write(&[44]).unwrap();
560 let mut stream = acceptor.accept();
562 stream.read(&mut buf).unwrap();
563 assert!(buf[0] == 44);
567 fn connect_ip6_loopback() {
568 let addr = next_test_ip6();
569 let mut acceptor = TcpListener::bind(addr).listen();
571 let _t = Thread::spawn(move|| {
572 let mut stream = TcpStream::connect(("::1", addr.port));
573 stream.write(&[66]).unwrap();
576 let mut stream = acceptor.accept();
578 stream.read(&mut buf).unwrap();
579 assert!(buf[0] == 66);
583 fn smoke_test_ip4() {
584 let addr = next_test_ip4();
585 let mut acceptor = TcpListener::bind(addr).listen();
587 let _t = Thread::spawn(move|| {
588 let mut stream = TcpStream::connect(addr);
589 stream.write(&[99]).unwrap();
592 let mut stream = acceptor.accept();
594 stream.read(&mut buf).unwrap();
595 assert!(buf[0] == 99);
599 fn smoke_test_ip6() {
600 let addr = next_test_ip6();
601 let mut acceptor = TcpListener::bind(addr).listen();
603 let _t = Thread::spawn(move|| {
604 let mut stream = TcpStream::connect(addr);
605 stream.write(&[99]).unwrap();
608 let mut stream = acceptor.accept();
610 stream.read(&mut buf).unwrap();
611 assert!(buf[0] == 99);
616 let addr = next_test_ip4();
617 let mut acceptor = TcpListener::bind(addr).listen();
619 let _t = Thread::spawn(move|| {
620 let _stream = TcpStream::connect(addr);
624 let mut stream = acceptor.accept();
626 let nread = stream.read(&mut buf);
627 assert!(nread.is_err());
632 let addr = next_test_ip6();
633 let mut acceptor = TcpListener::bind(addr).listen();
635 let _t = Thread::spawn(move|| {
636 let _stream = TcpStream::connect(addr);
640 let mut stream = acceptor.accept();
642 let nread = stream.read(&mut buf);
643 assert!(nread.is_err());
647 fn read_eof_twice_ip4() {
648 let addr = next_test_ip4();
649 let mut acceptor = TcpListener::bind(addr).listen();
651 let _t = Thread::spawn(move|| {
652 let _stream = TcpStream::connect(addr);
656 let mut stream = acceptor.accept();
658 let nread = stream.read(&mut buf);
659 assert!(nread.is_err());
661 match stream.read(&mut buf) {
664 assert!(e.kind == NotConnected || e.kind == EndOfFile,
665 "unknown kind: {:?}", e.kind);
671 fn read_eof_twice_ip6() {
672 let addr = next_test_ip6();
673 let mut acceptor = TcpListener::bind(addr).listen();
675 let _t = Thread::spawn(move|| {
676 let _stream = TcpStream::connect(addr);
680 let mut stream = acceptor.accept();
682 let nread = stream.read(&mut buf);
683 assert!(nread.is_err());
685 match stream.read(&mut buf) {
688 assert!(e.kind == NotConnected || e.kind == EndOfFile,
689 "unknown kind: {:?}", e.kind);
695 fn write_close_ip4() {
696 let addr = next_test_ip4();
697 let mut acceptor = TcpListener::bind(addr).listen();
699 let (tx, rx) = channel();
700 let _t = Thread::spawn(move|| {
701 drop(TcpStream::connect(addr));
702 tx.send(()).unwrap();
705 let mut stream = acceptor.accept();
708 match stream.write(&buf) {
711 assert!(e.kind == ConnectionReset ||
712 e.kind == BrokenPipe ||
713 e.kind == ConnectionAborted,
714 "unknown error: {}", e);
720 fn write_close_ip6() {
721 let addr = next_test_ip6();
722 let mut acceptor = TcpListener::bind(addr).listen();
724 let (tx, rx) = channel();
725 let _t = Thread::spawn(move|| {
726 drop(TcpStream::connect(addr));
727 tx.send(()).unwrap();
730 let mut stream = acceptor.accept();
733 match stream.write(&buf) {
736 assert!(e.kind == ConnectionReset ||
737 e.kind == BrokenPipe ||
738 e.kind == ConnectionAborted,
739 "unknown error: {}", e);
745 fn multiple_connect_serial_ip4() {
746 let addr = next_test_ip4();
748 let mut acceptor = TcpListener::bind(addr).listen();
750 let _t = Thread::spawn(move|| {
752 let mut stream = TcpStream::connect(addr);
753 stream.write(&[99]).unwrap();
757 for ref mut stream in acceptor.incoming().take(max) {
759 stream.read(&mut buf).unwrap();
760 assert_eq!(buf[0], 99);
765 fn multiple_connect_serial_ip6() {
766 let addr = next_test_ip6();
768 let mut acceptor = TcpListener::bind(addr).listen();
770 let _t = Thread::spawn(move|| {
772 let mut stream = TcpStream::connect(addr);
773 stream.write(&[99]).unwrap();
777 for ref mut stream in acceptor.incoming().take(max) {
779 stream.read(&mut buf).unwrap();
780 assert_eq!(buf[0], 99);
785 fn multiple_connect_interleaved_greedy_schedule_ip4() {
786 let addr = next_test_ip4();
787 static MAX: int = 10;
788 let acceptor = TcpListener::bind(addr).listen();
790 let _t = Thread::spawn(move|| {
791 let mut acceptor = acceptor;
792 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
793 // Start another task to handle the connection
794 let _t = Thread::spawn(move|| {
795 let mut stream = stream;
797 stream.read(&mut buf).unwrap();
798 assert!(buf[0] == i as u8);
806 fn connect(i: int, addr: SocketAddr) {
807 if i == MAX { return }
809 let _t = Thread::spawn(move|| {
810 debug!("connecting");
811 let mut stream = TcpStream::connect(addr);
812 // Connect again before writing
813 connect(i + 1, addr);
815 stream.write(&[i as u8]).unwrap();
821 fn multiple_connect_interleaved_greedy_schedule_ip6() {
822 let addr = next_test_ip6();
823 static MAX: int = 10;
824 let acceptor = TcpListener::bind(addr).listen();
826 let _t = Thread::spawn(move|| {
827 let mut acceptor = acceptor;
828 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
829 // Start another task to handle the connection
830 let _t = Thread::spawn(move|| {
831 let mut stream = stream;
833 stream.read(&mut buf).unwrap();
834 assert!(buf[0] == i as u8);
842 fn connect(i: int, addr: SocketAddr) {
843 if i == MAX { return }
845 let _t = Thread::spawn(move|| {
846 debug!("connecting");
847 let mut stream = TcpStream::connect(addr);
848 // Connect again before writing
849 connect(i + 1, addr);
851 stream.write(&[i as u8]).unwrap();
857 fn multiple_connect_interleaved_lazy_schedule_ip4() {
858 static MAX: int = 10;
859 let addr = next_test_ip4();
860 let acceptor = TcpListener::bind(addr).listen();
862 let _t = Thread::spawn(move|| {
863 let mut acceptor = acceptor;
864 for stream in acceptor.incoming().take(MAX as uint) {
865 // Start another task to handle the connection
866 let _t = Thread::spawn(move|| {
867 let mut stream = stream;
869 stream.read(&mut buf).unwrap();
870 assert!(buf[0] == 99);
878 fn connect(i: int, addr: SocketAddr) {
879 if i == MAX { return }
881 let _t = Thread::spawn(move|| {
882 debug!("connecting");
883 let mut stream = TcpStream::connect(addr);
884 // Connect again before writing
885 connect(i + 1, addr);
887 stream.write(&[99]).unwrap();
893 fn multiple_connect_interleaved_lazy_schedule_ip6() {
894 static MAX: int = 10;
895 let addr = next_test_ip6();
896 let acceptor = TcpListener::bind(addr).listen();
898 let _t = Thread::spawn(move|| {
899 let mut acceptor = acceptor;
900 for stream in acceptor.incoming().take(MAX as uint) {
901 // Start another task to handle the connection
902 let _t = Thread::spawn(move|| {
903 let mut stream = stream;
905 stream.read(&mut buf).unwrap();
906 assert!(buf[0] == 99);
914 fn connect(i: int, addr: SocketAddr) {
915 if i == MAX { return }
917 let _t = Thread::spawn(move|| {
918 debug!("connecting");
919 let mut stream = TcpStream::connect(addr);
920 // Connect again before writing
921 connect(i + 1, addr);
923 stream.write(&[99]).unwrap();
928 pub fn socket_name(addr: SocketAddr) {
929 let mut listener = TcpListener::bind(addr).unwrap();
931 // Make sure socket_name gives
932 // us the socket we binded to.
933 let so_name = listener.socket_name();
934 assert!(so_name.is_ok());
935 assert_eq!(addr, so_name.unwrap());
938 pub fn peer_name(addr: SocketAddr) {
939 let acceptor = TcpListener::bind(addr).listen();
940 let _t = Thread::spawn(move|| {
941 let mut acceptor = acceptor;
942 acceptor.accept().unwrap();
945 let stream = TcpStream::connect(addr);
947 assert!(stream.is_ok());
948 let mut stream = stream.unwrap();
950 // Make sure peer_name gives us the
951 // address/port of the peer we've
953 let peer_name = stream.peer_name();
954 assert!(peer_name.is_ok());
955 assert_eq!(addr, peer_name.unwrap());
959 fn socket_and_peer_name_ip4() {
960 peer_name(next_test_ip4());
961 socket_name(next_test_ip4());
965 fn socket_and_peer_name_ip6() {
966 // FIXME: peer name is not consistent
967 //peer_name(next_test_ip6());
968 socket_name(next_test_ip6());
973 let addr = next_test_ip4();
974 let (tx, rx) = channel();
975 let _t = Thread::spawn(move|| {
976 let mut srv = TcpListener::bind(addr).listen().unwrap();
977 tx.send(()).unwrap();
978 let mut cl = srv.accept().unwrap();
979 cl.write(&[10]).unwrap();
981 cl.read(&mut b).unwrap();
982 tx.send(()).unwrap();
986 let mut c = TcpStream::connect(addr).unwrap();
988 assert_eq!(c.read(&mut b), Ok(1));
989 c.write(&[1]).unwrap();
995 let addr = next_test_ip4();
996 let listener = TcpListener::bind(addr).unwrap().listen();
997 assert!(listener.is_ok());
998 match TcpListener::bind(addr).listen() {
1001 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
1002 "unknown error: {} {:?}", e, e.kind);
1009 let addr = next_test_ip4();
1010 let (tx, rx) = channel();
1012 let _t = Thread::spawn(move|| {
1014 let _stream = TcpStream::connect(addr).unwrap();
1020 let mut acceptor = TcpListener::bind(addr).listen();
1021 tx.send(()).unwrap();
1023 let _stream = acceptor.accept().unwrap();
1025 tx.send(()).unwrap();
1029 let _listener = TcpListener::bind(addr);
1033 fn tcp_clone_smoke() {
1034 let addr = next_test_ip4();
1035 let mut acceptor = TcpListener::bind(addr).listen();
1037 let _t = Thread::spawn(move|| {
1038 let mut s = TcpStream::connect(addr);
1039 let mut buf = [0, 0];
1040 assert_eq!(s.read(&mut buf), Ok(1));
1041 assert_eq!(buf[0], 1);
1042 s.write(&[2]).unwrap();
1045 let mut s1 = acceptor.accept().unwrap();
1046 let s2 = s1.clone();
1048 let (tx1, rx1) = channel();
1049 let (tx2, rx2) = channel();
1050 let _t = Thread::spawn(move|| {
1052 rx1.recv().unwrap();
1053 s2.write(&[1]).unwrap();
1054 tx2.send(()).unwrap();
1056 tx1.send(()).unwrap();
1057 let mut buf = [0, 0];
1058 assert_eq!(s1.read(&mut buf), Ok(1));
1059 rx2.recv().unwrap();
1063 fn tcp_clone_two_read() {
1064 let addr = next_test_ip6();
1065 let mut acceptor = TcpListener::bind(addr).listen();
1066 let (tx1, rx) = channel();
1067 let tx2 = tx1.clone();
1069 let _t = Thread::spawn(move|| {
1070 let mut s = TcpStream::connect(addr);
1071 s.write(&[1]).unwrap();
1073 s.write(&[2]).unwrap();
1077 let mut s1 = acceptor.accept().unwrap();
1078 let s2 = s1.clone();
1080 let (done, rx) = channel();
1081 let _t = Thread::spawn(move|| {
1083 let mut buf = [0, 0];
1084 s2.read(&mut buf).unwrap();
1085 tx2.send(()).unwrap();
1086 done.send(()).unwrap();
1088 let mut buf = [0, 0];
1089 s1.read(&mut buf).unwrap();
1090 tx1.send(()).unwrap();
1096 fn tcp_clone_two_write() {
1097 let addr = next_test_ip4();
1098 let mut acceptor = TcpListener::bind(addr).listen();
1100 let _t = Thread::spawn(move|| {
1101 let mut s = TcpStream::connect(addr);
1102 let mut buf = [0, 1];
1103 s.read(&mut buf).unwrap();
1104 s.read(&mut buf).unwrap();
1107 let mut s1 = acceptor.accept().unwrap();
1108 let s2 = s1.clone();
1110 let (done, rx) = channel();
1111 let _t = Thread::spawn(move|| {
1113 s2.write(&[1]).unwrap();
1114 done.send(()).unwrap();
1116 s1.write(&[2]).unwrap();
1122 fn shutdown_smoke() {
1123 let addr = next_test_ip4();
1124 let a = TcpListener::bind(addr).unwrap().listen();
1125 let _t = Thread::spawn(move|| {
1127 let mut c = a.accept().unwrap();
1128 assert_eq!(c.read_to_end(), Ok(vec!()));
1129 c.write(&[1]).unwrap();
1132 let mut s = TcpStream::connect(addr).unwrap();
1133 assert!(s.inner.close_write().is_ok());
1134 assert!(s.write(&[1]).is_err());
1135 assert_eq!(s.read_to_end(), Ok(vec!(1)));
1139 fn accept_timeout() {
1140 let addr = next_test_ip4();
1141 let mut a = TcpListener::bind(addr).unwrap().listen().unwrap();
1143 a.set_timeout(Some(10));
1145 // Make sure we time out once and future invocations also time out
1146 let err = a.accept().err().unwrap();
1147 assert_eq!(err.kind, TimedOut);
1148 let err = a.accept().err().unwrap();
1149 assert_eq!(err.kind, TimedOut);
1151 // Also make sure that even though the timeout is expired that we will
1152 // continue to receive any pending connections.
1154 // FIXME: freebsd apparently never sees the pending connection, but
1155 // testing manually always works. Need to investigate this
1157 if !cfg!(target_os = "freebsd") {
1158 let (tx, rx) = channel();
1159 let _t = Thread::spawn(move|| {
1160 tx.send(TcpStream::connect(addr).unwrap()).unwrap();
1162 let _l = rx.recv().unwrap();
1166 Err(ref e) if e.kind == TimedOut => {}
1167 Err(e) => panic!("error: {}", e),
1169 ::thread::Thread::yield_now();
1170 if i == 1000 { panic!("should have a pending connection") }
1174 // Unset the timeout and make sure that this always blocks.
1175 a.set_timeout(None);
1176 let _t = Thread::spawn(move|| {
1177 drop(TcpStream::connect(addr).unwrap());
1179 a.accept().unwrap();
1183 fn close_readwrite_smoke() {
1184 let addr = next_test_ip4();
1185 let a = TcpListener::bind(addr).listen().unwrap();
1186 let (_tx, rx) = channel::<()>();
1187 Thread::spawn(move|| {
1189 let _s = a.accept().unwrap();
1190 let _ = rx.recv().unwrap();
1194 let mut s = TcpStream::connect(addr).unwrap();
1195 let mut s2 = s.clone();
1197 // closing should prevent reads/writes
1198 s.close_write().unwrap();
1199 assert!(s.write(&[0]).is_err());
1200 s.close_read().unwrap();
1201 assert!(s.read(&mut b).is_err());
1203 // closing should affect previous handles
1204 assert!(s2.write(&[0]).is_err());
1205 assert!(s2.read(&mut b).is_err());
1207 // closing should affect new handles
1208 let mut s3 = s.clone();
1209 assert!(s3.write(&[0]).is_err());
1210 assert!(s3.read(&mut b).is_err());
1212 // make sure these don't die
1213 let _ = s2.close_read();
1214 let _ = s2.close_write();
1215 let _ = s3.close_read();
1216 let _ = s3.close_write();
1220 fn close_read_wakes_up() {
1221 let addr = next_test_ip4();
1222 let a = TcpListener::bind(addr).listen().unwrap();
1223 let (_tx, rx) = channel::<()>();
1224 Thread::spawn(move|| {
1226 let _s = a.accept().unwrap();
1227 let _ = rx.recv().unwrap();
1230 let mut s = TcpStream::connect(addr).unwrap();
1232 let (tx, rx) = channel();
1233 let _t = Thread::spawn(move|| {
1235 assert!(s2.read(&mut [0]).is_err());
1236 tx.send(()).unwrap();
1238 // this should wake up the child task
1239 s.close_read().unwrap();
1241 // this test will never finish if the child doesn't wake up
1246 fn readwrite_timeouts() {
1247 let addr = next_test_ip6();
1248 let mut a = TcpListener::bind(addr).listen().unwrap();
1249 let (tx, rx) = channel::<()>();
1250 Thread::spawn(move|| {
1251 let mut s = TcpStream::connect(addr).unwrap();
1253 assert!(s.write(&[0]).is_ok());
1257 let mut s = a.accept().unwrap();
1258 s.set_timeout(Some(20));
1259 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1260 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1262 s.set_timeout(Some(20));
1264 match s.write(&[0; 128 * 1024]) {
1265 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1266 Err(IoError { kind: TimedOut, .. }) => break,
1267 Err(e) => panic!("{}", e),
1269 if i == 1000 { panic!("should have filled up?!"); }
1271 assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
1273 tx.send(()).unwrap();
1274 s.set_timeout(None);
1275 assert_eq!(s.read(&mut [0, 0]), Ok(1));
1279 fn read_timeouts() {
1280 let addr = next_test_ip6();
1281 let mut a = TcpListener::bind(addr).listen().unwrap();
1282 let (tx, rx) = channel::<()>();
1283 Thread::spawn(move|| {
1284 let mut s = TcpStream::connect(addr).unwrap();
1287 while amt < 100 * 128 * 1024 {
1288 match s.read(&mut [0;128 * 1024]) {
1289 Ok(n) => { amt += n; }
1290 Err(e) => panic!("{}", e),
1296 let mut s = a.accept().unwrap();
1297 s.set_read_timeout(Some(20));
1298 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1299 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1301 tx.send(()).unwrap();
1303 assert!(s.write(&[0;128 * 1024]).is_ok());
1308 fn write_timeouts() {
1309 let addr = next_test_ip6();
1310 let mut a = TcpListener::bind(addr).listen().unwrap();
1311 let (tx, rx) = channel::<()>();
1312 Thread::spawn(move|| {
1313 let mut s = TcpStream::connect(addr).unwrap();
1315 assert!(s.write(&[0]).is_ok());
1319 let mut s = a.accept().unwrap();
1320 s.set_write_timeout(Some(20));
1322 match s.write(&[0; 128 * 1024]) {
1323 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1324 Err(IoError { kind: TimedOut, .. }) => break,
1325 Err(e) => panic!("{}", e),
1327 if i == 1000 { panic!("should have filled up?!"); }
1329 assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
1331 tx.send(()).unwrap();
1332 assert!(s.read(&mut [0]).is_ok());
1336 fn timeout_concurrent_read() {
1337 let addr = next_test_ip6();
1338 let mut a = TcpListener::bind(addr).listen().unwrap();
1339 let (tx, rx) = channel::<()>();
1340 Thread::spawn(move|| {
1341 let mut s = TcpStream::connect(addr).unwrap();
1343 assert_eq!(s.write(&[0]), Ok(()));
1347 let mut s = a.accept().unwrap();
1349 let (tx2, rx2) = channel();
1350 let _t = Thread::spawn(move|| {
1352 assert_eq!(s2.read(&mut [0]), Ok(1));
1353 tx2.send(()).unwrap();
1356 s.set_read_timeout(Some(20));
1357 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1358 tx.send(()).unwrap();
1360 rx2.recv().unwrap();
1364 fn clone_while_reading() {
1365 let addr = next_test_ip6();
1366 let listen = TcpListener::bind(addr);
1367 let mut accept = listen.listen().unwrap();
1369 // Enqueue a task to write to a socket
1370 let (tx, rx) = channel();
1371 let (txdone, rxdone) = channel();
1372 let txdone2 = txdone.clone();
1373 let _t = Thread::spawn(move|| {
1374 let mut tcp = TcpStream::connect(addr).unwrap();
1376 tcp.write_u8(0).unwrap();
1377 txdone2.send(()).unwrap();
1380 // Spawn off a reading clone
1381 let tcp = accept.accept().unwrap();
1382 let tcp2 = tcp.clone();
1383 let txdone3 = txdone.clone();
1384 let _t = Thread::spawn(move|| {
1385 let mut tcp2 = tcp2;
1386 tcp2.read_u8().unwrap();
1387 txdone3.send(()).unwrap();
1390 // Try to ensure that the reading clone is indeed reading
1392 ::thread::Thread::yield_now();
1395 // clone the handle again while it's reading, then let it finish the
1397 let _ = tcp.clone();
1398 tx.send(()).unwrap();
1399 rxdone.recv().unwrap();
1400 rxdone.recv().unwrap();
1404 fn clone_accept_smoke() {
1405 let addr = next_test_ip4();
1406 let l = TcpListener::bind(addr);
1407 let mut a = l.listen().unwrap();
1408 let mut a2 = a.clone();
1410 let _t = Thread::spawn(move|| {
1411 let _ = TcpStream::connect(addr);
1413 let _t = Thread::spawn(move|| {
1414 let _ = TcpStream::connect(addr);
1417 assert!(a.accept().is_ok());
1418 assert!(a2.accept().is_ok());
1422 fn clone_accept_concurrent() {
1423 let addr = next_test_ip4();
1424 let l = TcpListener::bind(addr);
1425 let a = l.listen().unwrap();
1428 let (tx, rx) = channel();
1429 let tx2 = tx.clone();
1431 let _t = Thread::spawn(move|| {
1433 tx.send(a.accept()).unwrap();
1435 let _t = Thread::spawn(move|| {
1437 tx2.send(a.accept()).unwrap();
1440 let _t = Thread::spawn(move|| {
1441 let _ = TcpStream::connect(addr);
1443 let _t = Thread::spawn(move|| {
1444 let _ = TcpStream::connect(addr);
1447 assert!(rx.recv().unwrap().is_ok());
1448 assert!(rx.recv().unwrap().is_ok());
1452 fn close_accept_smoke() {
1453 let addr = next_test_ip4();
1454 let l = TcpListener::bind(addr);
1455 let mut a = l.listen().unwrap();
1457 a.close_accept().unwrap();
1458 assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
1462 fn close_accept_concurrent() {
1463 let addr = next_test_ip4();
1464 let l = TcpListener::bind(addr);
1465 let a = l.listen().unwrap();
1466 let mut a2 = a.clone();
1468 let (tx, rx) = channel();
1469 let _t = Thread::spawn(move|| {
1471 tx.send(a.accept()).unwrap();
1473 a2.close_accept().unwrap();
1475 assert_eq!(rx.recv().unwrap().err().unwrap().kind, EndOfFile);