1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! TCP network connections
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
22 use result::Result::Err;
23 use old_io::net::ip::{SocketAddr, ToSocketAddr};
24 use old_io::{Reader, Writer, Listener, Acceptor};
25 use old_io::{standard_error, TimedOut};
27 use option::Option::{None, Some};
30 use sys::tcp::TcpStream as TcpStreamImp;
31 use sys::tcp::TcpListener as TcpListenerImp;
32 use sys::tcp::TcpAcceptor as TcpAcceptorImp;
36 /// A structure which represents a TCP stream between a local socket and a
39 /// The socket will be closed when the value is dropped.
44 /// use std::old_io::TcpStream;
47 /// let mut stream = TcpStream::connect("127.0.0.1:34254");
49 /// // ignore the Result
50 /// let _ = stream.write(&[1]);
52 /// let mut buf = [0];
53 /// let _ = stream.read(&mut buf); // ignore here too
54 /// } // the stream is closed here
56 pub struct TcpStream {
61 fn new(s: TcpStreamImp) -> TcpStream {
62 TcpStream { inner: s }
65 /// Open a TCP connection to a remote host.
67 /// `addr` is an address of the remote host. Anything which implements `ToSocketAddr`
68 /// trait can be supplied for the address; see this trait documentation for
69 /// concrete examples.
70 pub fn connect<A: ToSocketAddr>(addr: A) -> IoResult<TcpStream> {
71 super::with_addresses(addr, |addr| {
72 TcpStreamImp::connect(addr, None).map(TcpStream::new)
76 /// Creates a TCP connection to a remote socket address, timing out after
77 /// the specified duration.
79 /// This is the same as the `connect` method, except that if the timeout
80 /// specified elapses before a connection is made an error will be
81 /// returned. The error's kind will be `TimedOut`.
83 /// Same as the `connect` method, `addr` argument type can be anything which
84 /// implements `ToSocketAddr` trait.
86 /// If a `timeout` with zero or negative duration is specified then
87 /// the function returns `Err`, with the error kind set to `TimedOut`.
88 #[unstable(feature = "io",
89 reason = "the timeout argument may eventually change types")]
90 pub fn connect_timeout<A: ToSocketAddr>(addr: A,
91 timeout: Duration) -> IoResult<TcpStream> {
92 if timeout <= Duration::milliseconds(0) {
93 return Err(standard_error(TimedOut));
96 super::with_addresses(addr, |addr| {
97 TcpStreamImp::connect(addr, Some(timeout.num_milliseconds() as u64))
102 /// Returns the socket address of the remote peer of this TCP connection.
103 pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
104 self.inner.peer_name()
107 /// Returns the socket address of the local half of this TCP connection.
108 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
109 self.inner.socket_name()
112 /// Sets the nodelay flag on this connection to the boolean specified
113 #[unstable(feature = "io")]
114 pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
115 self.inner.set_nodelay(nodelay)
118 /// Sets the keepalive timeout to the timeout specified.
120 /// If the value specified is `None`, then the keepalive flag is cleared on
121 /// this connection. Otherwise, the keepalive timeout will be set to the
122 /// specified time, in seconds.
123 #[unstable(feature = "io")]
124 pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
125 self.inner.set_keepalive(delay_in_seconds)
128 /// Closes the reading half of this connection.
130 /// This method will close the reading portion of this connection, causing
131 /// all pending and future reads to immediately return with an error.
136 /// # #![allow(unused_must_use)]
137 /// use std::old_io::timer;
138 /// use std::old_io::TcpStream;
139 /// use std::time::Duration;
140 /// use std::thread::Thread;
142 /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
143 /// let stream2 = stream.clone();
145 /// let _t = Thread::spawn(move|| {
146 /// // close this stream after one second
147 /// timer::sleep(Duration::seconds(1));
148 /// let mut stream = stream2;
149 /// stream.close_read();
152 /// // wait for some data, will get canceled after one second
153 /// let mut buf = [0];
154 /// stream.read(&mut buf);
157 /// Note that this method affects all cloned handles associated with this
158 /// stream, not just this one handle.
159 pub fn close_read(&mut self) -> IoResult<()> {
160 self.inner.close_read()
163 /// Closes the writing half of this connection.
165 /// This method will close the writing portion of this connection, causing
166 /// all future writes to immediately return with an error.
168 /// Note that this method affects all cloned handles associated with this
169 /// stream, not just this one handle.
170 pub fn close_write(&mut self) -> IoResult<()> {
171 self.inner.close_write()
174 /// Sets a timeout, in milliseconds, for blocking operations on this stream.
176 /// This function will set a timeout for all blocking operations (including
177 /// reads and writes) on this stream. The timeout specified is a relative
178 /// time, in milliseconds, into the future after which point operations will
179 /// time out. This means that the timeout must be reset periodically to keep
180 /// it from expiring. Specifying a value of `None` will clear the timeout
183 /// The timeout on this stream is local to this stream only. Setting a
184 /// timeout does not affect any other cloned instances of this stream, nor
185 /// does the timeout propagated to cloned handles of this stream. Setting
186 /// this timeout will override any specific read or write timeouts
187 /// previously set for this stream.
189 /// For clarification on the semantics of interrupting a read and a write,
190 /// take a look at `set_read_timeout` and `set_write_timeout`.
191 #[unstable(feature = "io",
192 reason = "the timeout argument may change in type and value")]
193 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
194 self.inner.set_timeout(timeout_ms)
197 /// Sets the timeout for read operations on this stream.
199 /// See documentation in `set_timeout` for the semantics of this read time.
200 /// This will overwrite any previous read timeout set through either this
201 /// function or `set_timeout`.
205 /// When this timeout expires, if there is no pending read operation, no
206 /// action is taken. Otherwise, the read operation will be scheduled to
207 /// promptly return. If a timeout error is returned, then no data was read
208 /// during the timeout period.
209 #[unstable(feature = "io",
210 reason = "the timeout argument may change in type and value")]
211 pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
212 self.inner.set_read_timeout(timeout_ms)
215 /// Sets the timeout for write operations on this stream.
217 /// See documentation in `set_timeout` for the semantics of this write time.
218 /// This will overwrite any previous write timeout set through either this
219 /// function or `set_timeout`.
223 /// When this timeout expires, if there is no pending write operation, no
224 /// action is taken. Otherwise, the pending write operation will be
225 /// scheduled to promptly return. The actual state of the underlying stream
226 /// is not specified.
228 /// The write operation may return an error of type `ShortWrite` which
229 /// indicates that the object is known to have written an exact number of
230 /// bytes successfully during the timeout period, and the remaining bytes
231 /// were never written.
233 /// If the write operation returns `TimedOut`, then it the timeout primitive
234 /// does not know how many bytes were written as part of the timeout
235 /// operation. It may be the case that bytes continue to be written in an
236 /// asynchronous fashion after the call to write returns.
237 #[unstable(feature = "io",
238 reason = "the timeout argument may change in type and value")]
239 pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
240 self.inner.set_write_timeout(timeout_ms)
244 impl Clone for TcpStream {
245 /// Creates a new handle to this TCP stream, allowing for simultaneous reads
246 /// and writes of this connection.
248 /// The underlying TCP stream will not be closed until all handles to the
249 /// stream have been deallocated. All handles will also follow the same
250 /// stream, but two concurrent reads will not receive the same data.
251 /// Instead, the first read will receive the first packet received, and the
252 /// second read will receive the second packet.
253 fn clone(&self) -> TcpStream {
254 TcpStream { inner: self.inner.clone() }
258 impl Reader for TcpStream {
259 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
264 impl Writer for TcpStream {
265 fn write_all(&mut self, buf: &[u8]) -> IoResult<()> {
266 self.inner.write(buf)
270 impl sys_common::AsInner<TcpStreamImp> for TcpStream {
271 fn as_inner(&self) -> &TcpStreamImp {
276 /// A structure representing a socket server. This listener is used to create a
277 /// `TcpAcceptor` which can be used to accept sockets on a local port.
283 /// use std::old_io::{TcpListener, TcpStream};
284 /// use std::old_io::{Acceptor, Listener};
285 /// use std::thread::Thread;
287 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
289 /// // bind the listener to the specified address
290 /// let mut acceptor = listener.listen().unwrap();
292 /// fn handle_client(mut stream: TcpStream) {
294 /// # &mut stream; // silence unused mutability/variable warning
296 /// // accept connections and process them, spawning a new tasks for each one
297 /// for stream in acceptor.incoming() {
299 /// Err(e) => { /* connection failed */ }
301 /// Thread::spawn(move|| {
302 /// // connection succeeded
303 /// handle_client(stream)
309 /// // close the socket server
313 pub struct TcpListener {
314 inner: TcpListenerImp,
318 /// Creates a new `TcpListener` which will be bound to the specified address.
319 /// This listener is not ready for accepting connections, `listen` must be called
320 /// on it before that's possible.
322 /// Binding with a port number of 0 will request that the OS assigns a port
323 /// to this listener. The port allocated can be queried via the
324 /// `socket_name` function.
326 /// The address type can be any implementer of `ToSocketAddr` trait. See its
327 /// documentation for concrete examples.
328 pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<TcpListener> {
329 super::with_addresses(addr, |addr| {
330 TcpListenerImp::bind(addr).map(|inner| TcpListener { inner: inner })
334 /// Returns the local socket address of this listener.
335 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
336 self.inner.socket_name()
340 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
341 fn listen(self) -> IoResult<TcpAcceptor> {
342 self.inner.listen(128).map(|a| TcpAcceptor { inner: a })
346 impl sys_common::AsInner<TcpListenerImp> for TcpListener {
347 fn as_inner(&self) -> &TcpListenerImp {
352 /// The accepting half of a TCP socket server. This structure is created through
353 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
354 /// `TcpStream` instances.
355 pub struct TcpAcceptor {
356 inner: TcpAcceptorImp,
360 /// Prevents blocking on all future accepts after `ms` milliseconds have
363 /// This function is used to set a deadline after which this acceptor will
364 /// time out accepting any connections. The argument is the relative
365 /// distance, in milliseconds, to a point in the future after which all
366 /// accepts will fail.
368 /// If the argument specified is `None`, then any previously registered
369 /// timeout is cleared.
371 /// A timeout of `0` can be used to "poll" this acceptor to see if it has
372 /// any pending connections. All pending connections will be accepted,
373 /// regardless of whether the timeout has expired or not (the accept will
374 /// not block in this case).
379 /// use std::old_io::TcpListener;
380 /// use std::old_io::{Listener, Acceptor, TimedOut};
382 /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
384 /// // After 100ms have passed, all accepts will fail
385 /// a.set_timeout(Some(100));
387 /// match a.accept() {
388 /// Ok(..) => println!("accepted a socket"),
389 /// Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
390 /// Err(e) => println!("err: {}", e),
393 /// // Reset the timeout and try again
394 /// a.set_timeout(Some(100));
395 /// let socket = a.accept();
397 /// // Clear the timeout and block indefinitely waiting for a connection
398 /// a.set_timeout(None);
399 /// let socket = a.accept();
401 #[unstable(feature = "io",
402 reason = "the type of the argument and name of this function are \
404 pub fn set_timeout(&mut self, ms: Option<u64>) { self.inner.set_timeout(ms); }
406 /// Closes the accepting capabilities of this acceptor.
408 /// This function is similar to `TcpStream`'s `close_{read,write}` methods
409 /// in that it will affect *all* cloned handles of this acceptor's original
412 /// Once this function succeeds, all future calls to `accept` will return
413 /// immediately with an error, preventing all future calls to accept. The
414 /// underlying socket will not be relinquished back to the OS until all
415 /// acceptors have been deallocated.
417 /// This is useful for waking up a thread in an accept loop to indicate that
423 /// use std::old_io::{TcpListener, Listener, Acceptor, EndOfFile};
424 /// use std::thread::Thread;
426 /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
427 /// let a2 = a.clone();
429 /// let _t = Thread::spawn(move|| {
431 /// for socket in a2.incoming() {
433 /// Ok(s) => { /* handle s */ }
434 /// Err(ref e) if e.kind == EndOfFile => break, // closed
435 /// Err(e) => panic!("unexpected error: {}", e),
440 /// # fn wait_for_sigint() {}
441 /// // Now that our accept loop is running, wait for the program to be
442 /// // requested to exit.
443 /// wait_for_sigint();
445 /// // Signal our accept loop to exit
446 /// assert!(a.close_accept().is_ok());
448 #[unstable(feature = "io")]
449 pub fn close_accept(&mut self) -> IoResult<()> {
450 self.inner.close_accept()
454 impl Acceptor<TcpStream> for TcpAcceptor {
455 fn accept(&mut self) -> IoResult<TcpStream> {
456 self.inner.accept().map(TcpStream::new)
460 impl Clone for TcpAcceptor {
461 /// Creates a new handle to this TCP acceptor, allowing for simultaneous
464 /// The underlying TCP acceptor will not be closed until all handles to the
465 /// acceptor have been deallocated. Incoming connections will be received on
466 /// at most once acceptor, the same connection will not be accepted twice.
468 /// The `close_accept` method will shut down *all* acceptors cloned from the
469 /// same original acceptor, whereas the `set_timeout` method only affects
470 /// the selector that it is called on.
472 /// This function is useful for creating a handle to invoke `close_accept`
473 /// on to wake up any other task blocked in `accept`.
474 fn clone(&self) -> TcpAcceptor {
475 TcpAcceptor { inner: self.inner.clone() }
479 impl sys_common::AsInner<TcpAcceptorImp> for TcpAcceptor {
480 fn as_inner(&self) -> &TcpAcceptorImp {
489 use sync::mpsc::channel;
491 use old_io::net::tcp::*;
492 use old_io::net::ip::*;
494 use old_io::{EndOfFile, TimedOut, ShortWrite, IoError};
495 use old_io::{ConnectionRefused, BrokenPipe, ConnectionAborted};
496 use old_io::{ConnectionReset, NotConnected, PermissionDenied, OtherIoError};
497 use old_io::{InvalidInput};
498 use old_io::{Acceptor, Listener};
500 // FIXME #11530 this fails on android because tests are run as root
501 #[cfg_attr(any(windows, target_os = "android"), ignore)]
504 match TcpListener::bind("0.0.0.0:1") {
506 Err(e) => assert_eq!(e.kind, PermissionDenied),
512 match TcpStream::connect("0.0.0.0:1") {
514 Err(e) => assert!((e.kind == ConnectionRefused)
515 || (e.kind == InvalidInput)),
520 fn listen_ip4_localhost() {
521 let socket_addr = next_test_ip4();
522 let listener = TcpListener::bind(socket_addr);
523 let mut acceptor = listener.listen();
525 let _t = Thread::spawn(move|| {
526 let mut stream = TcpStream::connect(("localhost", socket_addr.port));
527 stream.write(&[144]).unwrap();
530 let mut stream = acceptor.accept();
532 stream.read(&mut buf).unwrap();
533 assert!(buf[0] == 144);
537 fn connect_localhost() {
538 let addr = next_test_ip4();
539 let mut acceptor = TcpListener::bind(addr).listen();
541 let _t = Thread::spawn(move|| {
542 let mut stream = TcpStream::connect(("localhost", addr.port));
543 stream.write(&[64]).unwrap();
546 let mut stream = acceptor.accept();
548 stream.read(&mut buf).unwrap();
549 assert!(buf[0] == 64);
553 fn connect_ip4_loopback() {
554 let addr = next_test_ip4();
555 let mut acceptor = TcpListener::bind(addr).listen();
557 let _t = Thread::spawn(move|| {
558 let mut stream = TcpStream::connect(("127.0.0.1", addr.port));
559 stream.write(&[44]).unwrap();
562 let mut stream = acceptor.accept();
564 stream.read(&mut buf).unwrap();
565 assert!(buf[0] == 44);
569 fn connect_ip6_loopback() {
570 let addr = next_test_ip6();
571 let mut acceptor = TcpListener::bind(addr).listen();
573 let _t = Thread::spawn(move|| {
574 let mut stream = TcpStream::connect(("::1", addr.port));
575 stream.write(&[66]).unwrap();
578 let mut stream = acceptor.accept();
580 stream.read(&mut buf).unwrap();
581 assert!(buf[0] == 66);
585 fn smoke_test_ip4() {
586 let addr = next_test_ip4();
587 let mut acceptor = TcpListener::bind(addr).listen();
589 let _t = Thread::spawn(move|| {
590 let mut stream = TcpStream::connect(addr);
591 stream.write(&[99]).unwrap();
594 let mut stream = acceptor.accept();
596 stream.read(&mut buf).unwrap();
597 assert!(buf[0] == 99);
601 fn smoke_test_ip6() {
602 let addr = next_test_ip6();
603 let mut acceptor = TcpListener::bind(addr).listen();
605 let _t = Thread::spawn(move|| {
606 let mut stream = TcpStream::connect(addr);
607 stream.write(&[99]).unwrap();
610 let mut stream = acceptor.accept();
612 stream.read(&mut buf).unwrap();
613 assert!(buf[0] == 99);
618 let addr = next_test_ip4();
619 let mut acceptor = TcpListener::bind(addr).listen();
621 let _t = Thread::spawn(move|| {
622 let _stream = TcpStream::connect(addr);
626 let mut stream = acceptor.accept();
628 let nread = stream.read(&mut buf);
629 assert!(nread.is_err());
634 let addr = next_test_ip6();
635 let mut acceptor = TcpListener::bind(addr).listen();
637 let _t = Thread::spawn(move|| {
638 let _stream = TcpStream::connect(addr);
642 let mut stream = acceptor.accept();
644 let nread = stream.read(&mut buf);
645 assert!(nread.is_err());
649 fn read_eof_twice_ip4() {
650 let addr = next_test_ip4();
651 let mut acceptor = TcpListener::bind(addr).listen();
653 let _t = Thread::spawn(move|| {
654 let _stream = TcpStream::connect(addr);
658 let mut stream = acceptor.accept();
660 let nread = stream.read(&mut buf);
661 assert!(nread.is_err());
663 match stream.read(&mut buf) {
666 assert!(e.kind == NotConnected || e.kind == EndOfFile,
667 "unknown kind: {:?}", e.kind);
673 fn read_eof_twice_ip6() {
674 let addr = next_test_ip6();
675 let mut acceptor = TcpListener::bind(addr).listen();
677 let _t = Thread::spawn(move|| {
678 let _stream = TcpStream::connect(addr);
682 let mut stream = acceptor.accept();
684 let nread = stream.read(&mut buf);
685 assert!(nread.is_err());
687 match stream.read(&mut buf) {
690 assert!(e.kind == NotConnected || e.kind == EndOfFile,
691 "unknown kind: {:?}", e.kind);
697 fn write_close_ip4() {
698 let addr = next_test_ip4();
699 let mut acceptor = TcpListener::bind(addr).listen();
701 let (tx, rx) = channel();
702 let _t = Thread::spawn(move|| {
703 drop(TcpStream::connect(addr));
704 tx.send(()).unwrap();
707 let mut stream = acceptor.accept();
710 match stream.write(&buf) {
713 assert!(e.kind == ConnectionReset ||
714 e.kind == BrokenPipe ||
715 e.kind == ConnectionAborted,
716 "unknown error: {}", e);
722 fn write_close_ip6() {
723 let addr = next_test_ip6();
724 let mut acceptor = TcpListener::bind(addr).listen();
726 let (tx, rx) = channel();
727 let _t = Thread::spawn(move|| {
728 drop(TcpStream::connect(addr));
729 tx.send(()).unwrap();
732 let mut stream = acceptor.accept();
735 match stream.write(&buf) {
738 assert!(e.kind == ConnectionReset ||
739 e.kind == BrokenPipe ||
740 e.kind == ConnectionAborted,
741 "unknown error: {}", e);
747 fn multiple_connect_serial_ip4() {
748 let addr = next_test_ip4();
750 let mut acceptor = TcpListener::bind(addr).listen();
752 let _t = Thread::spawn(move|| {
754 let mut stream = TcpStream::connect(addr);
755 stream.write(&[99]).unwrap();
759 for ref mut stream in acceptor.incoming().take(max) {
761 stream.read(&mut buf).unwrap();
762 assert_eq!(buf[0], 99);
767 fn multiple_connect_serial_ip6() {
768 let addr = next_test_ip6();
770 let mut acceptor = TcpListener::bind(addr).listen();
772 let _t = Thread::spawn(move|| {
774 let mut stream = TcpStream::connect(addr);
775 stream.write(&[99]).unwrap();
779 for ref mut stream in acceptor.incoming().take(max) {
781 stream.read(&mut buf).unwrap();
782 assert_eq!(buf[0], 99);
787 fn multiple_connect_interleaved_greedy_schedule_ip4() {
788 let addr = next_test_ip4();
789 static MAX: int = 10;
790 let acceptor = TcpListener::bind(addr).listen();
792 let _t = Thread::spawn(move|| {
793 let mut acceptor = acceptor;
794 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
795 // Start another task to handle the connection
796 let _t = Thread::spawn(move|| {
797 let mut stream = stream;
799 stream.read(&mut buf).unwrap();
800 assert!(buf[0] == i as u8);
808 fn connect(i: int, addr: SocketAddr) {
809 if i == MAX { return }
811 let _t = Thread::spawn(move|| {
812 debug!("connecting");
813 let mut stream = TcpStream::connect(addr);
814 // Connect again before writing
815 connect(i + 1, addr);
817 stream.write(&[i as u8]).unwrap();
823 fn multiple_connect_interleaved_greedy_schedule_ip6() {
824 let addr = next_test_ip6();
825 static MAX: int = 10;
826 let acceptor = TcpListener::bind(addr).listen();
828 let _t = Thread::spawn(move|| {
829 let mut acceptor = acceptor;
830 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
831 // Start another task to handle the connection
832 let _t = Thread::spawn(move|| {
833 let mut stream = stream;
835 stream.read(&mut buf).unwrap();
836 assert!(buf[0] == i as u8);
844 fn connect(i: int, addr: SocketAddr) {
845 if i == MAX { return }
847 let _t = Thread::spawn(move|| {
848 debug!("connecting");
849 let mut stream = TcpStream::connect(addr);
850 // Connect again before writing
851 connect(i + 1, addr);
853 stream.write(&[i as u8]).unwrap();
859 fn multiple_connect_interleaved_lazy_schedule_ip4() {
860 static MAX: int = 10;
861 let addr = next_test_ip4();
862 let acceptor = TcpListener::bind(addr).listen();
864 let _t = Thread::spawn(move|| {
865 let mut acceptor = acceptor;
866 for stream in acceptor.incoming().take(MAX as uint) {
867 // Start another task to handle the connection
868 let _t = Thread::spawn(move|| {
869 let mut stream = stream;
871 stream.read(&mut buf).unwrap();
872 assert!(buf[0] == 99);
880 fn connect(i: int, addr: SocketAddr) {
881 if i == MAX { return }
883 let _t = Thread::spawn(move|| {
884 debug!("connecting");
885 let mut stream = TcpStream::connect(addr);
886 // Connect again before writing
887 connect(i + 1, addr);
889 stream.write(&[99]).unwrap();
895 fn multiple_connect_interleaved_lazy_schedule_ip6() {
896 static MAX: int = 10;
897 let addr = next_test_ip6();
898 let acceptor = TcpListener::bind(addr).listen();
900 let _t = Thread::spawn(move|| {
901 let mut acceptor = acceptor;
902 for stream in acceptor.incoming().take(MAX as uint) {
903 // Start another task to handle the connection
904 let _t = Thread::spawn(move|| {
905 let mut stream = stream;
907 stream.read(&mut buf).unwrap();
908 assert!(buf[0] == 99);
916 fn connect(i: int, addr: SocketAddr) {
917 if i == MAX { return }
919 let _t = Thread::spawn(move|| {
920 debug!("connecting");
921 let mut stream = TcpStream::connect(addr);
922 // Connect again before writing
923 connect(i + 1, addr);
925 stream.write(&[99]).unwrap();
930 pub fn socket_name(addr: SocketAddr) {
931 let mut listener = TcpListener::bind(addr).unwrap();
933 // Make sure socket_name gives
934 // us the socket we binded to.
935 let so_name = listener.socket_name();
936 assert!(so_name.is_ok());
937 assert_eq!(addr, so_name.unwrap());
940 pub fn peer_name(addr: SocketAddr) {
941 let acceptor = TcpListener::bind(addr).listen();
942 let _t = Thread::spawn(move|| {
943 let mut acceptor = acceptor;
944 acceptor.accept().unwrap();
947 let stream = TcpStream::connect(addr);
949 assert!(stream.is_ok());
950 let mut stream = stream.unwrap();
952 // Make sure peer_name gives us the
953 // address/port of the peer we've
955 let peer_name = stream.peer_name();
956 assert!(peer_name.is_ok());
957 assert_eq!(addr, peer_name.unwrap());
961 fn socket_and_peer_name_ip4() {
962 peer_name(next_test_ip4());
963 socket_name(next_test_ip4());
967 fn socket_and_peer_name_ip6() {
968 // FIXME: peer name is not consistent
969 //peer_name(next_test_ip6());
970 socket_name(next_test_ip6());
975 let addr = next_test_ip4();
976 let (tx, rx) = channel();
977 let _t = Thread::spawn(move|| {
978 let mut srv = TcpListener::bind(addr).listen().unwrap();
979 tx.send(()).unwrap();
980 let mut cl = srv.accept().unwrap();
981 cl.write(&[10]).unwrap();
983 cl.read(&mut b).unwrap();
984 tx.send(()).unwrap();
988 let mut c = TcpStream::connect(addr).unwrap();
990 assert_eq!(c.read(&mut b), Ok(1));
991 c.write(&[1]).unwrap();
997 let addr = next_test_ip4();
998 let listener = TcpListener::bind(addr).unwrap().listen();
999 assert!(listener.is_ok());
1000 match TcpListener::bind(addr).listen() {
1003 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
1004 "unknown error: {} {:?}", e, e.kind);
1011 let addr = next_test_ip4();
1012 let (tx, rx) = channel();
1014 let _t = Thread::spawn(move|| {
1016 let _stream = TcpStream::connect(addr).unwrap();
1022 let mut acceptor = TcpListener::bind(addr).listen();
1023 tx.send(()).unwrap();
1025 let _stream = acceptor.accept().unwrap();
1027 tx.send(()).unwrap();
1031 let _listener = TcpListener::bind(addr);
1035 fn tcp_clone_smoke() {
1036 let addr = next_test_ip4();
1037 let mut acceptor = TcpListener::bind(addr).listen();
1039 let _t = Thread::spawn(move|| {
1040 let mut s = TcpStream::connect(addr);
1041 let mut buf = [0, 0];
1042 assert_eq!(s.read(&mut buf), Ok(1));
1043 assert_eq!(buf[0], 1);
1044 s.write(&[2]).unwrap();
1047 let mut s1 = acceptor.accept().unwrap();
1048 let s2 = s1.clone();
1050 let (tx1, rx1) = channel();
1051 let (tx2, rx2) = channel();
1052 let _t = Thread::spawn(move|| {
1054 rx1.recv().unwrap();
1055 s2.write(&[1]).unwrap();
1056 tx2.send(()).unwrap();
1058 tx1.send(()).unwrap();
1059 let mut buf = [0, 0];
1060 assert_eq!(s1.read(&mut buf), Ok(1));
1061 rx2.recv().unwrap();
1065 fn tcp_clone_two_read() {
1066 let addr = next_test_ip6();
1067 let mut acceptor = TcpListener::bind(addr).listen();
1068 let (tx1, rx) = channel();
1069 let tx2 = tx1.clone();
1071 let _t = Thread::spawn(move|| {
1072 let mut s = TcpStream::connect(addr);
1073 s.write(&[1]).unwrap();
1075 s.write(&[2]).unwrap();
1079 let mut s1 = acceptor.accept().unwrap();
1080 let s2 = s1.clone();
1082 let (done, rx) = channel();
1083 let _t = Thread::spawn(move|| {
1085 let mut buf = [0, 0];
1086 s2.read(&mut buf).unwrap();
1087 tx2.send(()).unwrap();
1088 done.send(()).unwrap();
1090 let mut buf = [0, 0];
1091 s1.read(&mut buf).unwrap();
1092 tx1.send(()).unwrap();
1098 fn tcp_clone_two_write() {
1099 let addr = next_test_ip4();
1100 let mut acceptor = TcpListener::bind(addr).listen();
1102 let _t = Thread::spawn(move|| {
1103 let mut s = TcpStream::connect(addr);
1104 let mut buf = [0, 1];
1105 s.read(&mut buf).unwrap();
1106 s.read(&mut buf).unwrap();
1109 let mut s1 = acceptor.accept().unwrap();
1110 let s2 = s1.clone();
1112 let (done, rx) = channel();
1113 let _t = Thread::spawn(move|| {
1115 s2.write(&[1]).unwrap();
1116 done.send(()).unwrap();
1118 s1.write(&[2]).unwrap();
1124 fn shutdown_smoke() {
1125 let addr = next_test_ip4();
1126 let a = TcpListener::bind(addr).unwrap().listen();
1127 let _t = Thread::spawn(move|| {
1129 let mut c = a.accept().unwrap();
1130 assert_eq!(c.read_to_end(), Ok(vec!()));
1131 c.write(&[1]).unwrap();
1134 let mut s = TcpStream::connect(addr).unwrap();
1135 assert!(s.inner.close_write().is_ok());
1136 assert!(s.write(&[1]).is_err());
1137 assert_eq!(s.read_to_end(), Ok(vec!(1)));
1141 fn accept_timeout() {
1142 let addr = next_test_ip4();
1143 let mut a = TcpListener::bind(addr).unwrap().listen().unwrap();
1145 a.set_timeout(Some(10));
1147 // Make sure we time out once and future invocations also time out
1148 let err = a.accept().err().unwrap();
1149 assert_eq!(err.kind, TimedOut);
1150 let err = a.accept().err().unwrap();
1151 assert_eq!(err.kind, TimedOut);
1153 // Also make sure that even though the timeout is expired that we will
1154 // continue to receive any pending connections.
1156 // FIXME: freebsd apparently never sees the pending connection, but
1157 // testing manually always works. Need to investigate this
1159 if !cfg!(target_os = "freebsd") {
1160 let (tx, rx) = channel();
1161 let _t = Thread::spawn(move|| {
1162 tx.send(TcpStream::connect(addr).unwrap()).unwrap();
1164 let _l = rx.recv().unwrap();
1165 for i in 0i32..1001 {
1168 Err(ref e) if e.kind == TimedOut => {}
1169 Err(e) => panic!("error: {}", e),
1171 ::thread::Thread::yield_now();
1172 if i == 1000 { panic!("should have a pending connection") }
1176 // Unset the timeout and make sure that this always blocks.
1177 a.set_timeout(None);
1178 let _t = Thread::spawn(move|| {
1179 drop(TcpStream::connect(addr).unwrap());
1181 a.accept().unwrap();
1185 fn close_readwrite_smoke() {
1186 let addr = next_test_ip4();
1187 let a = TcpListener::bind(addr).listen().unwrap();
1188 let (_tx, rx) = channel::<()>();
1189 Thread::spawn(move|| {
1191 let _s = a.accept().unwrap();
1192 let _ = rx.recv().unwrap();
1196 let mut s = TcpStream::connect(addr).unwrap();
1197 let mut s2 = s.clone();
1199 // closing should prevent reads/writes
1200 s.close_write().unwrap();
1201 assert!(s.write(&[0]).is_err());
1202 s.close_read().unwrap();
1203 assert!(s.read(&mut b).is_err());
1205 // closing should affect previous handles
1206 assert!(s2.write(&[0]).is_err());
1207 assert!(s2.read(&mut b).is_err());
1209 // closing should affect new handles
1210 let mut s3 = s.clone();
1211 assert!(s3.write(&[0]).is_err());
1212 assert!(s3.read(&mut b).is_err());
1214 // make sure these don't die
1215 let _ = s2.close_read();
1216 let _ = s2.close_write();
1217 let _ = s3.close_read();
1218 let _ = s3.close_write();
1222 fn close_read_wakes_up() {
1223 let addr = next_test_ip4();
1224 let a = TcpListener::bind(addr).listen().unwrap();
1225 let (_tx, rx) = channel::<()>();
1226 Thread::spawn(move|| {
1228 let _s = a.accept().unwrap();
1229 let _ = rx.recv().unwrap();
1232 let mut s = TcpStream::connect(addr).unwrap();
1234 let (tx, rx) = channel();
1235 let _t = Thread::spawn(move|| {
1237 assert!(s2.read(&mut [0]).is_err());
1238 tx.send(()).unwrap();
1240 // this should wake up the child task
1241 s.close_read().unwrap();
1243 // this test will never finish if the child doesn't wake up
1248 fn readwrite_timeouts() {
1249 let addr = next_test_ip6();
1250 let mut a = TcpListener::bind(addr).listen().unwrap();
1251 let (tx, rx) = channel::<()>();
1252 Thread::spawn(move|| {
1253 let mut s = TcpStream::connect(addr).unwrap();
1255 assert!(s.write(&[0]).is_ok());
1259 let mut s = a.accept().unwrap();
1260 s.set_timeout(Some(20));
1261 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1262 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1264 s.set_timeout(Some(20));
1265 for i in 0i32..1001 {
1266 match s.write(&[0; 128 * 1024]) {
1267 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1268 Err(IoError { kind: TimedOut, .. }) => break,
1269 Err(e) => panic!("{}", e),
1271 if i == 1000 { panic!("should have filled up?!"); }
1273 assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
1275 tx.send(()).unwrap();
1276 s.set_timeout(None);
1277 assert_eq!(s.read(&mut [0, 0]), Ok(1));
1281 fn read_timeouts() {
1282 let addr = next_test_ip6();
1283 let mut a = TcpListener::bind(addr).listen().unwrap();
1284 let (tx, rx) = channel::<()>();
1285 Thread::spawn(move|| {
1286 let mut s = TcpStream::connect(addr).unwrap();
1289 while amt < 100 * 128 * 1024 {
1290 match s.read(&mut [0;128 * 1024]) {
1291 Ok(n) => { amt += n; }
1292 Err(e) => panic!("{}", e),
1298 let mut s = a.accept().unwrap();
1299 s.set_read_timeout(Some(20));
1300 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1301 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1303 tx.send(()).unwrap();
1305 assert!(s.write(&[0;128 * 1024]).is_ok());
1310 fn write_timeouts() {
1311 let addr = next_test_ip6();
1312 let mut a = TcpListener::bind(addr).listen().unwrap();
1313 let (tx, rx) = channel::<()>();
1314 Thread::spawn(move|| {
1315 let mut s = TcpStream::connect(addr).unwrap();
1317 assert!(s.write(&[0]).is_ok());
1321 let mut s = a.accept().unwrap();
1322 s.set_write_timeout(Some(20));
1323 for i in 0i32..1001 {
1324 match s.write(&[0; 128 * 1024]) {
1325 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1326 Err(IoError { kind: TimedOut, .. }) => break,
1327 Err(e) => panic!("{}", e),
1329 if i == 1000 { panic!("should have filled up?!"); }
1331 assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
1333 tx.send(()).unwrap();
1334 assert!(s.read(&mut [0]).is_ok());
1338 fn timeout_concurrent_read() {
1339 let addr = next_test_ip6();
1340 let mut a = TcpListener::bind(addr).listen().unwrap();
1341 let (tx, rx) = channel::<()>();
1342 Thread::spawn(move|| {
1343 let mut s = TcpStream::connect(addr).unwrap();
1345 assert_eq!(s.write(&[0]), Ok(()));
1349 let mut s = a.accept().unwrap();
1351 let (tx2, rx2) = channel();
1352 let _t = Thread::spawn(move|| {
1354 assert_eq!(s2.read(&mut [0]), Ok(1));
1355 tx2.send(()).unwrap();
1358 s.set_read_timeout(Some(20));
1359 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1360 tx.send(()).unwrap();
1362 rx2.recv().unwrap();
1366 fn clone_while_reading() {
1367 let addr = next_test_ip6();
1368 let listen = TcpListener::bind(addr);
1369 let mut accept = listen.listen().unwrap();
1371 // Enqueue a task to write to a socket
1372 let (tx, rx) = channel();
1373 let (txdone, rxdone) = channel();
1374 let txdone2 = txdone.clone();
1375 let _t = Thread::spawn(move|| {
1376 let mut tcp = TcpStream::connect(addr).unwrap();
1378 tcp.write_u8(0).unwrap();
1379 txdone2.send(()).unwrap();
1382 // Spawn off a reading clone
1383 let tcp = accept.accept().unwrap();
1384 let tcp2 = tcp.clone();
1385 let txdone3 = txdone.clone();
1386 let _t = Thread::spawn(move|| {
1387 let mut tcp2 = tcp2;
1388 tcp2.read_u8().unwrap();
1389 txdone3.send(()).unwrap();
1392 // Try to ensure that the reading clone is indeed reading
1394 ::thread::Thread::yield_now();
1397 // clone the handle again while it's reading, then let it finish the
1399 let _ = tcp.clone();
1400 tx.send(()).unwrap();
1401 rxdone.recv().unwrap();
1402 rxdone.recv().unwrap();
1406 fn clone_accept_smoke() {
1407 let addr = next_test_ip4();
1408 let l = TcpListener::bind(addr);
1409 let mut a = l.listen().unwrap();
1410 let mut a2 = a.clone();
1412 let _t = Thread::spawn(move|| {
1413 let _ = TcpStream::connect(addr);
1415 let _t = Thread::spawn(move|| {
1416 let _ = TcpStream::connect(addr);
1419 assert!(a.accept().is_ok());
1420 assert!(a2.accept().is_ok());
1424 fn clone_accept_concurrent() {
1425 let addr = next_test_ip4();
1426 let l = TcpListener::bind(addr);
1427 let a = l.listen().unwrap();
1430 let (tx, rx) = channel();
1431 let tx2 = tx.clone();
1433 let _t = Thread::spawn(move|| {
1435 tx.send(a.accept()).unwrap();
1437 let _t = Thread::spawn(move|| {
1439 tx2.send(a.accept()).unwrap();
1442 let _t = Thread::spawn(move|| {
1443 let _ = TcpStream::connect(addr);
1445 let _t = Thread::spawn(move|| {
1446 let _ = TcpStream::connect(addr);
1449 assert!(rx.recv().unwrap().is_ok());
1450 assert!(rx.recv().unwrap().is_ok());
1454 fn close_accept_smoke() {
1455 let addr = next_test_ip4();
1456 let l = TcpListener::bind(addr);
1457 let mut a = l.listen().unwrap();
1459 a.close_accept().unwrap();
1460 assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
1464 fn close_accept_concurrent() {
1465 let addr = next_test_ip4();
1466 let l = TcpListener::bind(addr);
1467 let a = l.listen().unwrap();
1468 let mut a2 = a.clone();
1470 let (tx, rx) = channel();
1471 let _t = Thread::spawn(move|| {
1473 tx.send(a.accept()).unwrap();
1475 a2.close_accept().unwrap();
1477 assert_eq!(rx.recv().unwrap().err().unwrap().kind, EndOfFile);