1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! TCP network connections
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
22 use result::Result::Err;
23 use old_io::net::ip::{SocketAddr, ToSocketAddr};
24 use old_io::{Reader, Writer, Listener, Acceptor};
25 use old_io::{standard_error, TimedOut};
27 use option::Option::{None, Some};
30 use sys::tcp::TcpStream as TcpStreamImp;
31 use sys::tcp::TcpListener as TcpListenerImp;
32 use sys::tcp::TcpAcceptor as TcpAcceptorImp;
36 /// A structure which represents a TCP stream between a local socket and a
39 /// The socket will be closed when the value is dropped.
44 /// use std::old_io::TcpStream;
47 /// let mut stream = TcpStream::connect("127.0.0.1:34254");
49 /// // ignore the Result
50 /// let _ = stream.write(&[1]);
52 /// let mut buf = [0];
53 /// let _ = stream.read(&mut buf); // ignore here too
54 /// } // the stream is closed here
56 pub struct TcpStream {
61 fn new(s: TcpStreamImp) -> TcpStream {
62 TcpStream { inner: s }
65 /// Open a TCP connection to a remote host.
67 /// `addr` is an address of the remote host. Anything which implements `ToSocketAddr`
68 /// trait can be supplied for the address; see this trait documentation for
69 /// concrete examples.
70 pub fn connect<A: ToSocketAddr>(addr: A) -> IoResult<TcpStream> {
71 super::with_addresses(addr, |addr| {
72 TcpStreamImp::connect(addr, None).map(TcpStream::new)
76 /// Creates a TCP connection to a remote socket address, timing out after
77 /// the specified duration.
79 /// This is the same as the `connect` method, except that if the timeout
80 /// specified elapses before a connection is made an error will be
81 /// returned. The error's kind will be `TimedOut`.
83 /// Same as the `connect` method, `addr` argument type can be anything which
84 /// implements `ToSocketAddr` trait.
86 /// If a `timeout` with zero or negative duration is specified then
87 /// the function returns `Err`, with the error kind set to `TimedOut`.
88 #[unstable = "the timeout argument may eventually change types"]
89 pub fn connect_timeout<A: ToSocketAddr>(addr: A,
90 timeout: Duration) -> IoResult<TcpStream> {
91 if timeout <= Duration::milliseconds(0) {
92 return Err(standard_error(TimedOut));
95 super::with_addresses(addr, |addr| {
96 TcpStreamImp::connect(addr, Some(timeout.num_milliseconds() as u64))
101 /// Returns the socket address of the remote peer of this TCP connection.
102 pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
103 self.inner.peer_name()
106 /// Returns the socket address of the local half of this TCP connection.
107 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
108 self.inner.socket_name()
111 /// Sets the nodelay flag on this connection to the boolean specified
113 pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
114 self.inner.set_nodelay(nodelay)
117 /// Sets the keepalive timeout to the timeout specified.
119 /// If the value specified is `None`, then the keepalive flag is cleared on
120 /// this connection. Otherwise, the keepalive timeout will be set to the
121 /// specified time, in seconds.
123 pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
124 self.inner.set_keepalive(delay_in_seconds)
127 /// Closes the reading half of this connection.
129 /// This method will close the reading portion of this connection, causing
130 /// all pending and future reads to immediately return with an error.
135 /// # #![allow(unused_must_use)]
136 /// use std::old_io::timer;
137 /// use std::old_io::TcpStream;
138 /// use std::time::Duration;
139 /// use std::thread::Thread;
141 /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
142 /// let stream2 = stream.clone();
144 /// let _t = Thread::spawn(move|| {
145 /// // close this stream after one second
146 /// timer::sleep(Duration::seconds(1));
147 /// let mut stream = stream2;
148 /// stream.close_read();
151 /// // wait for some data, will get canceled after one second
152 /// let mut buf = [0];
153 /// stream.read(&mut buf);
156 /// Note that this method affects all cloned handles associated with this
157 /// stream, not just this one handle.
158 pub fn close_read(&mut self) -> IoResult<()> {
159 self.inner.close_read()
162 /// Closes the writing half of this connection.
164 /// This method will close the writing portion of this connection, causing
165 /// all future writes to immediately return with an error.
167 /// Note that this method affects all cloned handles associated with this
168 /// stream, not just this one handle.
169 pub fn close_write(&mut self) -> IoResult<()> {
170 self.inner.close_write()
173 /// Sets a timeout, in milliseconds, for blocking operations on this stream.
175 /// This function will set a timeout for all blocking operations (including
176 /// reads and writes) on this stream. The timeout specified is a relative
177 /// time, in milliseconds, into the future after which point operations will
178 /// time out. This means that the timeout must be reset periodically to keep
179 /// it from expiring. Specifying a value of `None` will clear the timeout
182 /// The timeout on this stream is local to this stream only. Setting a
183 /// timeout does not affect any other cloned instances of this stream, nor
184 /// does the timeout propagated to cloned handles of this stream. Setting
185 /// this timeout will override any specific read or write timeouts
186 /// previously set for this stream.
188 /// For clarification on the semantics of interrupting a read and a write,
189 /// take a look at `set_read_timeout` and `set_write_timeout`.
190 #[unstable = "the timeout argument may change in type and value"]
191 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
192 self.inner.set_timeout(timeout_ms)
195 /// Sets the timeout for read operations on this stream.
197 /// See documentation in `set_timeout` for the semantics of this read time.
198 /// This will overwrite any previous read timeout set through either this
199 /// function or `set_timeout`.
203 /// When this timeout expires, if there is no pending read operation, no
204 /// action is taken. Otherwise, the read operation will be scheduled to
205 /// promptly return. If a timeout error is returned, then no data was read
206 /// during the timeout period.
207 #[unstable = "the timeout argument may change in type and value"]
208 pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
209 self.inner.set_read_timeout(timeout_ms)
212 /// Sets the timeout for write operations on this stream.
214 /// See documentation in `set_timeout` for the semantics of this write time.
215 /// This will overwrite any previous write timeout set through either this
216 /// function or `set_timeout`.
220 /// When this timeout expires, if there is no pending write operation, no
221 /// action is taken. Otherwise, the pending write operation will be
222 /// scheduled to promptly return. The actual state of the underlying stream
223 /// is not specified.
225 /// The write operation may return an error of type `ShortWrite` which
226 /// indicates that the object is known to have written an exact number of
227 /// bytes successfully during the timeout period, and the remaining bytes
228 /// were never written.
230 /// If the write operation returns `TimedOut`, then it the timeout primitive
231 /// does not know how many bytes were written as part of the timeout
232 /// operation. It may be the case that bytes continue to be written in an
233 /// asynchronous fashion after the call to write returns.
234 #[unstable = "the timeout argument may change in type and value"]
235 pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
236 self.inner.set_write_timeout(timeout_ms)
240 impl Clone for TcpStream {
241 /// Creates a new handle to this TCP stream, allowing for simultaneous reads
242 /// and writes of this connection.
244 /// The underlying TCP stream will not be closed until all handles to the
245 /// stream have been deallocated. All handles will also follow the same
246 /// stream, but two concurrent reads will not receive the same data.
247 /// Instead, the first read will receive the first packet received, and the
248 /// second read will receive the second packet.
249 fn clone(&self) -> TcpStream {
250 TcpStream { inner: self.inner.clone() }
254 impl Reader for TcpStream {
255 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
260 impl Writer for TcpStream {
261 fn write_all(&mut self, buf: &[u8]) -> IoResult<()> {
262 self.inner.write(buf)
266 impl sys_common::AsInner<TcpStreamImp> for TcpStream {
267 fn as_inner(&self) -> &TcpStreamImp {
272 /// A structure representing a socket server. This listener is used to create a
273 /// `TcpAcceptor` which can be used to accept sockets on a local port.
279 /// use std::old_io::{TcpListener, TcpStream};
280 /// use std::old_io::{Acceptor, Listener};
281 /// use std::thread::Thread;
283 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
285 /// // bind the listener to the specified address
286 /// let mut acceptor = listener.listen().unwrap();
288 /// fn handle_client(mut stream: TcpStream) {
290 /// # &mut stream; // silence unused mutability/variable warning
292 /// // accept connections and process them, spawning a new tasks for each one
293 /// for stream in acceptor.incoming() {
295 /// Err(e) => { /* connection failed */ }
297 /// Thread::spawn(move|| {
298 /// // connection succeeded
299 /// handle_client(stream)
305 /// // close the socket server
309 pub struct TcpListener {
310 inner: TcpListenerImp,
314 /// Creates a new `TcpListener` which will be bound to the specified address.
315 /// This listener is not ready for accepting connections, `listen` must be called
316 /// on it before that's possible.
318 /// Binding with a port number of 0 will request that the OS assigns a port
319 /// to this listener. The port allocated can be queried via the
320 /// `socket_name` function.
322 /// The address type can be any implementer of `ToSocketAddr` trait. See its
323 /// documentation for concrete examples.
324 pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<TcpListener> {
325 super::with_addresses(addr, |addr| {
326 TcpListenerImp::bind(addr).map(|inner| TcpListener { inner: inner })
330 /// Returns the local socket address of this listener.
331 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
332 self.inner.socket_name()
336 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
337 fn listen(self) -> IoResult<TcpAcceptor> {
338 self.inner.listen(128).map(|a| TcpAcceptor { inner: a })
342 impl sys_common::AsInner<TcpListenerImp> for TcpListener {
343 fn as_inner(&self) -> &TcpListenerImp {
348 /// The accepting half of a TCP socket server. This structure is created through
349 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
350 /// `TcpStream` instances.
351 pub struct TcpAcceptor {
352 inner: TcpAcceptorImp,
356 /// Prevents blocking on all future accepts after `ms` milliseconds have
359 /// This function is used to set a deadline after which this acceptor will
360 /// time out accepting any connections. The argument is the relative
361 /// distance, in milliseconds, to a point in the future after which all
362 /// accepts will fail.
364 /// If the argument specified is `None`, then any previously registered
365 /// timeout is cleared.
367 /// A timeout of `0` can be used to "poll" this acceptor to see if it has
368 /// any pending connections. All pending connections will be accepted,
369 /// regardless of whether the timeout has expired or not (the accept will
370 /// not block in this case).
375 /// # #![allow(unstable)]
376 /// use std::old_io::TcpListener;
377 /// use std::old_io::{Listener, Acceptor, TimedOut};
379 /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
381 /// // After 100ms have passed, all accepts will fail
382 /// a.set_timeout(Some(100));
384 /// match a.accept() {
385 /// Ok(..) => println!("accepted a socket"),
386 /// Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
387 /// Err(e) => println!("err: {}", e),
390 /// // Reset the timeout and try again
391 /// a.set_timeout(Some(100));
392 /// let socket = a.accept();
394 /// // Clear the timeout and block indefinitely waiting for a connection
395 /// a.set_timeout(None);
396 /// let socket = a.accept();
398 #[unstable = "the type of the argument and name of this function are \
400 pub fn set_timeout(&mut self, ms: Option<u64>) { self.inner.set_timeout(ms); }
402 /// Closes the accepting capabilities of this acceptor.
404 /// This function is similar to `TcpStream`'s `close_{read,write}` methods
405 /// in that it will affect *all* cloned handles of this acceptor's original
408 /// Once this function succeeds, all future calls to `accept` will return
409 /// immediately with an error, preventing all future calls to accept. The
410 /// underlying socket will not be relinquished back to the OS until all
411 /// acceptors have been deallocated.
413 /// This is useful for waking up a thread in an accept loop to indicate that
419 /// # #![allow(unstable)]
420 /// use std::old_io::{TcpListener, Listener, Acceptor, EndOfFile};
421 /// use std::thread::Thread;
423 /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
424 /// let a2 = a.clone();
426 /// let _t = Thread::spawn(move|| {
428 /// for socket in a2.incoming() {
430 /// Ok(s) => { /* handle s */ }
431 /// Err(ref e) if e.kind == EndOfFile => break, // closed
432 /// Err(e) => panic!("unexpected error: {}", e),
437 /// # fn wait_for_sigint() {}
438 /// // Now that our accept loop is running, wait for the program to be
439 /// // requested to exit.
440 /// wait_for_sigint();
442 /// // Signal our accept loop to exit
443 /// assert!(a.close_accept().is_ok());
446 pub fn close_accept(&mut self) -> IoResult<()> {
447 self.inner.close_accept()
451 impl Acceptor<TcpStream> for TcpAcceptor {
452 fn accept(&mut self) -> IoResult<TcpStream> {
453 self.inner.accept().map(TcpStream::new)
457 impl Clone for TcpAcceptor {
458 /// Creates a new handle to this TCP acceptor, allowing for simultaneous
461 /// The underlying TCP acceptor will not be closed until all handles to the
462 /// acceptor have been deallocated. Incoming connections will be received on
463 /// at most once acceptor, the same connection will not be accepted twice.
465 /// The `close_accept` method will shut down *all* acceptors cloned from the
466 /// same original acceptor, whereas the `set_timeout` method only affects
467 /// the selector that it is called on.
469 /// This function is useful for creating a handle to invoke `close_accept`
470 /// on to wake up any other task blocked in `accept`.
471 fn clone(&self) -> TcpAcceptor {
472 TcpAcceptor { inner: self.inner.clone() }
476 impl sys_common::AsInner<TcpAcceptorImp> for TcpAcceptor {
477 fn as_inner(&self) -> &TcpAcceptorImp {
487 use sync::mpsc::channel;
489 use old_io::net::tcp::*;
490 use old_io::net::ip::*;
492 use old_io::{EndOfFile, TimedOut, ShortWrite, IoError};
493 use old_io::{ConnectionRefused, BrokenPipe, ConnectionAborted};
494 use old_io::{ConnectionReset, NotConnected, PermissionDenied, OtherIoError};
495 use old_io::{Acceptor, Listener};
497 // FIXME #11530 this fails on android because tests are run as root
498 #[cfg_attr(any(windows, target_os = "android"), ignore)]
501 match TcpListener::bind("0.0.0.0:1") {
503 Err(e) => assert_eq!(e.kind, PermissionDenied),
509 match TcpStream::connect("0.0.0.0:1") {
511 Err(e) => assert_eq!(e.kind, ConnectionRefused),
516 fn listen_ip4_localhost() {
517 let socket_addr = next_test_ip4();
518 let listener = TcpListener::bind(socket_addr);
519 let mut acceptor = listener.listen();
521 let _t = Thread::spawn(move|| {
522 let mut stream = TcpStream::connect(("localhost", socket_addr.port));
523 stream.write(&[144]).unwrap();
526 let mut stream = acceptor.accept();
528 stream.read(&mut buf).unwrap();
529 assert!(buf[0] == 144);
533 fn connect_localhost() {
534 let addr = next_test_ip4();
535 let mut acceptor = TcpListener::bind(addr).listen();
537 let _t = Thread::spawn(move|| {
538 let mut stream = TcpStream::connect(("localhost", addr.port));
539 stream.write(&[64]).unwrap();
542 let mut stream = acceptor.accept();
544 stream.read(&mut buf).unwrap();
545 assert!(buf[0] == 64);
549 fn connect_ip4_loopback() {
550 let addr = next_test_ip4();
551 let mut acceptor = TcpListener::bind(addr).listen();
553 let _t = Thread::spawn(move|| {
554 let mut stream = TcpStream::connect(("127.0.0.1", addr.port));
555 stream.write(&[44]).unwrap();
558 let mut stream = acceptor.accept();
560 stream.read(&mut buf).unwrap();
561 assert!(buf[0] == 44);
565 fn connect_ip6_loopback() {
566 let addr = next_test_ip6();
567 let mut acceptor = TcpListener::bind(addr).listen();
569 let _t = Thread::spawn(move|| {
570 let mut stream = TcpStream::connect(("::1", addr.port));
571 stream.write(&[66]).unwrap();
574 let mut stream = acceptor.accept();
576 stream.read(&mut buf).unwrap();
577 assert!(buf[0] == 66);
581 fn smoke_test_ip4() {
582 let addr = next_test_ip4();
583 let mut acceptor = TcpListener::bind(addr).listen();
585 let _t = Thread::spawn(move|| {
586 let mut stream = TcpStream::connect(addr);
587 stream.write(&[99]).unwrap();
590 let mut stream = acceptor.accept();
592 stream.read(&mut buf).unwrap();
593 assert!(buf[0] == 99);
597 fn smoke_test_ip6() {
598 let addr = next_test_ip6();
599 let mut acceptor = TcpListener::bind(addr).listen();
601 let _t = Thread::spawn(move|| {
602 let mut stream = TcpStream::connect(addr);
603 stream.write(&[99]).unwrap();
606 let mut stream = acceptor.accept();
608 stream.read(&mut buf).unwrap();
609 assert!(buf[0] == 99);
614 let addr = next_test_ip4();
615 let mut acceptor = TcpListener::bind(addr).listen();
617 let _t = Thread::spawn(move|| {
618 let _stream = TcpStream::connect(addr);
622 let mut stream = acceptor.accept();
624 let nread = stream.read(&mut buf);
625 assert!(nread.is_err());
630 let addr = next_test_ip6();
631 let mut acceptor = TcpListener::bind(addr).listen();
633 let _t = Thread::spawn(move|| {
634 let _stream = TcpStream::connect(addr);
638 let mut stream = acceptor.accept();
640 let nread = stream.read(&mut buf);
641 assert!(nread.is_err());
645 fn read_eof_twice_ip4() {
646 let addr = next_test_ip4();
647 let mut acceptor = TcpListener::bind(addr).listen();
649 let _t = Thread::spawn(move|| {
650 let _stream = TcpStream::connect(addr);
654 let mut stream = acceptor.accept();
656 let nread = stream.read(&mut buf);
657 assert!(nread.is_err());
659 match stream.read(&mut buf) {
662 assert!(e.kind == NotConnected || e.kind == EndOfFile,
663 "unknown kind: {:?}", e.kind);
669 fn read_eof_twice_ip6() {
670 let addr = next_test_ip6();
671 let mut acceptor = TcpListener::bind(addr).listen();
673 let _t = Thread::spawn(move|| {
674 let _stream = TcpStream::connect(addr);
678 let mut stream = acceptor.accept();
680 let nread = stream.read(&mut buf);
681 assert!(nread.is_err());
683 match stream.read(&mut buf) {
686 assert!(e.kind == NotConnected || e.kind == EndOfFile,
687 "unknown kind: {:?}", e.kind);
693 fn write_close_ip4() {
694 let addr = next_test_ip4();
695 let mut acceptor = TcpListener::bind(addr).listen();
697 let (tx, rx) = channel();
698 let _t = Thread::spawn(move|| {
699 drop(TcpStream::connect(addr));
700 tx.send(()).unwrap();
703 let mut stream = acceptor.accept();
706 match stream.write(&buf) {
709 assert!(e.kind == ConnectionReset ||
710 e.kind == BrokenPipe ||
711 e.kind == ConnectionAborted,
712 "unknown error: {}", e);
718 fn write_close_ip6() {
719 let addr = next_test_ip6();
720 let mut acceptor = TcpListener::bind(addr).listen();
722 let (tx, rx) = channel();
723 let _t = Thread::spawn(move|| {
724 drop(TcpStream::connect(addr));
725 tx.send(()).unwrap();
728 let mut stream = acceptor.accept();
731 match stream.write(&buf) {
734 assert!(e.kind == ConnectionReset ||
735 e.kind == BrokenPipe ||
736 e.kind == ConnectionAborted,
737 "unknown error: {}", e);
743 fn multiple_connect_serial_ip4() {
744 let addr = next_test_ip4();
746 let mut acceptor = TcpListener::bind(addr).listen();
748 let _t = Thread::spawn(move|| {
749 for _ in range(0, max) {
750 let mut stream = TcpStream::connect(addr);
751 stream.write(&[99]).unwrap();
755 for ref mut stream in acceptor.incoming().take(max) {
757 stream.read(&mut buf).unwrap();
758 assert_eq!(buf[0], 99);
763 fn multiple_connect_serial_ip6() {
764 let addr = next_test_ip6();
766 let mut acceptor = TcpListener::bind(addr).listen();
768 let _t = Thread::spawn(move|| {
769 for _ in range(0, max) {
770 let mut stream = TcpStream::connect(addr);
771 stream.write(&[99]).unwrap();
775 for ref mut stream in acceptor.incoming().take(max) {
777 stream.read(&mut buf).unwrap();
778 assert_eq!(buf[0], 99);
783 fn multiple_connect_interleaved_greedy_schedule_ip4() {
784 let addr = next_test_ip4();
785 static MAX: int = 10;
786 let acceptor = TcpListener::bind(addr).listen();
788 let _t = Thread::spawn(move|| {
789 let mut acceptor = acceptor;
790 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
791 // Start another task to handle the connection
792 let _t = Thread::spawn(move|| {
793 let mut stream = stream;
795 stream.read(&mut buf).unwrap();
796 assert!(buf[0] == i as u8);
804 fn connect(i: int, addr: SocketAddr) {
805 if i == MAX { return }
807 let _t = Thread::spawn(move|| {
808 debug!("connecting");
809 let mut stream = TcpStream::connect(addr);
810 // Connect again before writing
811 connect(i + 1, addr);
813 stream.write(&[i as u8]).unwrap();
819 fn multiple_connect_interleaved_greedy_schedule_ip6() {
820 let addr = next_test_ip6();
821 static MAX: int = 10;
822 let acceptor = TcpListener::bind(addr).listen();
824 let _t = Thread::spawn(move|| {
825 let mut acceptor = acceptor;
826 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
827 // Start another task to handle the connection
828 let _t = Thread::spawn(move|| {
829 let mut stream = stream;
831 stream.read(&mut buf).unwrap();
832 assert!(buf[0] == i as u8);
840 fn connect(i: int, addr: SocketAddr) {
841 if i == MAX { return }
843 let _t = Thread::spawn(move|| {
844 debug!("connecting");
845 let mut stream = TcpStream::connect(addr);
846 // Connect again before writing
847 connect(i + 1, addr);
849 stream.write(&[i as u8]).unwrap();
855 fn multiple_connect_interleaved_lazy_schedule_ip4() {
856 static MAX: int = 10;
857 let addr = next_test_ip4();
858 let acceptor = TcpListener::bind(addr).listen();
860 let _t = Thread::spawn(move|| {
861 let mut acceptor = acceptor;
862 for stream in acceptor.incoming().take(MAX as uint) {
863 // Start another task to handle the connection
864 let _t = Thread::spawn(move|| {
865 let mut stream = stream;
867 stream.read(&mut buf).unwrap();
868 assert!(buf[0] == 99);
876 fn connect(i: int, addr: SocketAddr) {
877 if i == MAX { return }
879 let _t = Thread::spawn(move|| {
880 debug!("connecting");
881 let mut stream = TcpStream::connect(addr);
882 // Connect again before writing
883 connect(i + 1, addr);
885 stream.write(&[99]).unwrap();
891 fn multiple_connect_interleaved_lazy_schedule_ip6() {
892 static MAX: int = 10;
893 let addr = next_test_ip6();
894 let acceptor = TcpListener::bind(addr).listen();
896 let _t = Thread::spawn(move|| {
897 let mut acceptor = acceptor;
898 for stream in acceptor.incoming().take(MAX as uint) {
899 // Start another task to handle the connection
900 let _t = Thread::spawn(move|| {
901 let mut stream = stream;
903 stream.read(&mut buf).unwrap();
904 assert!(buf[0] == 99);
912 fn connect(i: int, addr: SocketAddr) {
913 if i == MAX { return }
915 let _t = Thread::spawn(move|| {
916 debug!("connecting");
917 let mut stream = TcpStream::connect(addr);
918 // Connect again before writing
919 connect(i + 1, addr);
921 stream.write(&[99]).unwrap();
926 pub fn socket_name(addr: SocketAddr) {
927 let mut listener = TcpListener::bind(addr).unwrap();
929 // Make sure socket_name gives
930 // us the socket we binded to.
931 let so_name = listener.socket_name();
932 assert!(so_name.is_ok());
933 assert_eq!(addr, so_name.unwrap());
936 pub fn peer_name(addr: SocketAddr) {
937 let acceptor = TcpListener::bind(addr).listen();
938 let _t = Thread::spawn(move|| {
939 let mut acceptor = acceptor;
940 acceptor.accept().unwrap();
943 let stream = TcpStream::connect(addr);
945 assert!(stream.is_ok());
946 let mut stream = stream.unwrap();
948 // Make sure peer_name gives us the
949 // address/port of the peer we've
951 let peer_name = stream.peer_name();
952 assert!(peer_name.is_ok());
953 assert_eq!(addr, peer_name.unwrap());
957 fn socket_and_peer_name_ip4() {
958 peer_name(next_test_ip4());
959 socket_name(next_test_ip4());
963 fn socket_and_peer_name_ip6() {
964 // FIXME: peer name is not consistent
965 //peer_name(next_test_ip6());
966 socket_name(next_test_ip6());
971 let addr = next_test_ip4();
972 let (tx, rx) = channel();
973 let _t = Thread::spawn(move|| {
974 let mut srv = TcpListener::bind(addr).listen().unwrap();
975 tx.send(()).unwrap();
976 let mut cl = srv.accept().unwrap();
977 cl.write(&[10]).unwrap();
979 cl.read(&mut b).unwrap();
980 tx.send(()).unwrap();
984 let mut c = TcpStream::connect(addr).unwrap();
986 assert_eq!(c.read(&mut b), Ok(1));
987 c.write(&[1]).unwrap();
993 let addr = next_test_ip4();
994 let listener = TcpListener::bind(addr).unwrap().listen();
995 assert!(listener.is_ok());
996 match TcpListener::bind(addr).listen() {
999 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
1000 "unknown error: {} {:?}", e, e.kind);
1007 let addr = next_test_ip4();
1008 let (tx, rx) = channel();
1010 let _t = Thread::spawn(move|| {
1012 let _stream = TcpStream::connect(addr).unwrap();
1018 let mut acceptor = TcpListener::bind(addr).listen();
1019 tx.send(()).unwrap();
1021 let _stream = acceptor.accept().unwrap();
1023 tx.send(()).unwrap();
1027 let _listener = TcpListener::bind(addr);
1031 fn tcp_clone_smoke() {
1032 let addr = next_test_ip4();
1033 let mut acceptor = TcpListener::bind(addr).listen();
1035 let _t = Thread::spawn(move|| {
1036 let mut s = TcpStream::connect(addr);
1037 let mut buf = [0, 0];
1038 assert_eq!(s.read(&mut buf), Ok(1));
1039 assert_eq!(buf[0], 1);
1040 s.write(&[2]).unwrap();
1043 let mut s1 = acceptor.accept().unwrap();
1044 let s2 = s1.clone();
1046 let (tx1, rx1) = channel();
1047 let (tx2, rx2) = channel();
1048 let _t = Thread::spawn(move|| {
1050 rx1.recv().unwrap();
1051 s2.write(&[1]).unwrap();
1052 tx2.send(()).unwrap();
1054 tx1.send(()).unwrap();
1055 let mut buf = [0, 0];
1056 assert_eq!(s1.read(&mut buf), Ok(1));
1057 rx2.recv().unwrap();
1061 fn tcp_clone_two_read() {
1062 let addr = next_test_ip6();
1063 let mut acceptor = TcpListener::bind(addr).listen();
1064 let (tx1, rx) = channel();
1065 let tx2 = tx1.clone();
1067 let _t = Thread::spawn(move|| {
1068 let mut s = TcpStream::connect(addr);
1069 s.write(&[1]).unwrap();
1071 s.write(&[2]).unwrap();
1075 let mut s1 = acceptor.accept().unwrap();
1076 let s2 = s1.clone();
1078 let (done, rx) = channel();
1079 let _t = Thread::spawn(move|| {
1081 let mut buf = [0, 0];
1082 s2.read(&mut buf).unwrap();
1083 tx2.send(()).unwrap();
1084 done.send(()).unwrap();
1086 let mut buf = [0, 0];
1087 s1.read(&mut buf).unwrap();
1088 tx1.send(()).unwrap();
1094 fn tcp_clone_two_write() {
1095 let addr = next_test_ip4();
1096 let mut acceptor = TcpListener::bind(addr).listen();
1098 let _t = Thread::spawn(move|| {
1099 let mut s = TcpStream::connect(addr);
1100 let mut buf = [0, 1];
1101 s.read(&mut buf).unwrap();
1102 s.read(&mut buf).unwrap();
1105 let mut s1 = acceptor.accept().unwrap();
1106 let s2 = s1.clone();
1108 let (done, rx) = channel();
1109 let _t = Thread::spawn(move|| {
1111 s2.write(&[1]).unwrap();
1112 done.send(()).unwrap();
1114 s1.write(&[2]).unwrap();
1120 fn shutdown_smoke() {
1121 let addr = next_test_ip4();
1122 let a = TcpListener::bind(addr).unwrap().listen();
1123 let _t = Thread::spawn(move|| {
1125 let mut c = a.accept().unwrap();
1126 assert_eq!(c.read_to_end(), Ok(vec!()));
1127 c.write(&[1]).unwrap();
1130 let mut s = TcpStream::connect(addr).unwrap();
1131 assert!(s.inner.close_write().is_ok());
1132 assert!(s.write(&[1]).is_err());
1133 assert_eq!(s.read_to_end(), Ok(vec!(1)));
1137 fn accept_timeout() {
1138 let addr = next_test_ip4();
1139 let mut a = TcpListener::bind(addr).unwrap().listen().unwrap();
1141 a.set_timeout(Some(10));
1143 // Make sure we time out once and future invocations also time out
1144 let err = a.accept().err().unwrap();
1145 assert_eq!(err.kind, TimedOut);
1146 let err = a.accept().err().unwrap();
1147 assert_eq!(err.kind, TimedOut);
1149 // Also make sure that even though the timeout is expired that we will
1150 // continue to receive any pending connections.
1152 // FIXME: freebsd apparently never sees the pending connection, but
1153 // testing manually always works. Need to investigate this
1155 if !cfg!(target_os = "freebsd") {
1156 let (tx, rx) = channel();
1157 let _t = Thread::spawn(move|| {
1158 tx.send(TcpStream::connect(addr).unwrap()).unwrap();
1160 let _l = rx.recv().unwrap();
1161 for i in range(0i, 1001) {
1164 Err(ref e) if e.kind == TimedOut => {}
1165 Err(e) => panic!("error: {}", e),
1167 ::thread::Thread::yield_now();
1168 if i == 1000 { panic!("should have a pending connection") }
1172 // Unset the timeout and make sure that this always blocks.
1173 a.set_timeout(None);
1174 let _t = Thread::spawn(move|| {
1175 drop(TcpStream::connect(addr).unwrap());
1177 a.accept().unwrap();
1181 fn close_readwrite_smoke() {
1182 let addr = next_test_ip4();
1183 let a = TcpListener::bind(addr).listen().unwrap();
1184 let (_tx, rx) = channel::<()>();
1185 Thread::spawn(move|| {
1187 let _s = a.accept().unwrap();
1188 let _ = rx.recv().unwrap();
1192 let mut s = TcpStream::connect(addr).unwrap();
1193 let mut s2 = s.clone();
1195 // closing should prevent reads/writes
1196 s.close_write().unwrap();
1197 assert!(s.write(&[0]).is_err());
1198 s.close_read().unwrap();
1199 assert!(s.read(&mut b).is_err());
1201 // closing should affect previous handles
1202 assert!(s2.write(&[0]).is_err());
1203 assert!(s2.read(&mut b).is_err());
1205 // closing should affect new handles
1206 let mut s3 = s.clone();
1207 assert!(s3.write(&[0]).is_err());
1208 assert!(s3.read(&mut b).is_err());
1210 // make sure these don't die
1211 let _ = s2.close_read();
1212 let _ = s2.close_write();
1213 let _ = s3.close_read();
1214 let _ = s3.close_write();
1218 fn close_read_wakes_up() {
1219 let addr = next_test_ip4();
1220 let a = TcpListener::bind(addr).listen().unwrap();
1221 let (_tx, rx) = channel::<()>();
1222 Thread::spawn(move|| {
1224 let _s = a.accept().unwrap();
1225 let _ = rx.recv().unwrap();
1228 let mut s = TcpStream::connect(addr).unwrap();
1230 let (tx, rx) = channel();
1231 let _t = Thread::spawn(move|| {
1233 assert!(s2.read(&mut [0]).is_err());
1234 tx.send(()).unwrap();
1236 // this should wake up the child task
1237 s.close_read().unwrap();
1239 // this test will never finish if the child doesn't wake up
1244 fn readwrite_timeouts() {
1245 let addr = next_test_ip6();
1246 let mut a = TcpListener::bind(addr).listen().unwrap();
1247 let (tx, rx) = channel::<()>();
1248 Thread::spawn(move|| {
1249 let mut s = TcpStream::connect(addr).unwrap();
1251 assert!(s.write(&[0]).is_ok());
1255 let mut s = a.accept().unwrap();
1256 s.set_timeout(Some(20));
1257 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1258 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1260 s.set_timeout(Some(20));
1261 for i in range(0i, 1001) {
1262 match s.write(&[0; 128 * 1024]) {
1263 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1264 Err(IoError { kind: TimedOut, .. }) => break,
1265 Err(e) => panic!("{}", e),
1267 if i == 1000 { panic!("should have filled up?!"); }
1269 assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
1271 tx.send(()).unwrap();
1272 s.set_timeout(None);
1273 assert_eq!(s.read(&mut [0, 0]), Ok(1));
1277 fn read_timeouts() {
1278 let addr = next_test_ip6();
1279 let mut a = TcpListener::bind(addr).listen().unwrap();
1280 let (tx, rx) = channel::<()>();
1281 Thread::spawn(move|| {
1282 let mut s = TcpStream::connect(addr).unwrap();
1285 while amt < 100 * 128 * 1024 {
1286 match s.read(&mut [0;128 * 1024]) {
1287 Ok(n) => { amt += n; }
1288 Err(e) => panic!("{}", e),
1294 let mut s = a.accept().unwrap();
1295 s.set_read_timeout(Some(20));
1296 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1297 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1299 tx.send(()).unwrap();
1300 for _ in range(0i, 100) {
1301 assert!(s.write(&[0;128 * 1024]).is_ok());
1306 fn write_timeouts() {
1307 let addr = next_test_ip6();
1308 let mut a = TcpListener::bind(addr).listen().unwrap();
1309 let (tx, rx) = channel::<()>();
1310 Thread::spawn(move|| {
1311 let mut s = TcpStream::connect(addr).unwrap();
1313 assert!(s.write(&[0]).is_ok());
1317 let mut s = a.accept().unwrap();
1318 s.set_write_timeout(Some(20));
1319 for i in range(0i, 1001) {
1320 match s.write(&[0; 128 * 1024]) {
1321 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1322 Err(IoError { kind: TimedOut, .. }) => break,
1323 Err(e) => panic!("{}", e),
1325 if i == 1000 { panic!("should have filled up?!"); }
1327 assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
1329 tx.send(()).unwrap();
1330 assert!(s.read(&mut [0]).is_ok());
1334 fn timeout_concurrent_read() {
1335 let addr = next_test_ip6();
1336 let mut a = TcpListener::bind(addr).listen().unwrap();
1337 let (tx, rx) = channel::<()>();
1338 Thread::spawn(move|| {
1339 let mut s = TcpStream::connect(addr).unwrap();
1341 assert_eq!(s.write(&[0]), Ok(()));
1345 let mut s = a.accept().unwrap();
1347 let (tx2, rx2) = channel();
1348 let _t = Thread::spawn(move|| {
1350 assert_eq!(s2.read(&mut [0]), Ok(1));
1351 tx2.send(()).unwrap();
1354 s.set_read_timeout(Some(20));
1355 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
1356 tx.send(()).unwrap();
1358 rx2.recv().unwrap();
1362 fn clone_while_reading() {
1363 let addr = next_test_ip6();
1364 let listen = TcpListener::bind(addr);
1365 let mut accept = listen.listen().unwrap();
1367 // Enqueue a task to write to a socket
1368 let (tx, rx) = channel();
1369 let (txdone, rxdone) = channel();
1370 let txdone2 = txdone.clone();
1371 let _t = Thread::spawn(move|| {
1372 let mut tcp = TcpStream::connect(addr).unwrap();
1374 tcp.write_u8(0).unwrap();
1375 txdone2.send(()).unwrap();
1378 // Spawn off a reading clone
1379 let tcp = accept.accept().unwrap();
1380 let tcp2 = tcp.clone();
1381 let txdone3 = txdone.clone();
1382 let _t = Thread::spawn(move|| {
1383 let mut tcp2 = tcp2;
1384 tcp2.read_u8().unwrap();
1385 txdone3.send(()).unwrap();
1388 // Try to ensure that the reading clone is indeed reading
1389 for _ in range(0i, 50) {
1390 ::thread::Thread::yield_now();
1393 // clone the handle again while it's reading, then let it finish the
1395 let _ = tcp.clone();
1396 tx.send(()).unwrap();
1397 rxdone.recv().unwrap();
1398 rxdone.recv().unwrap();
1402 fn clone_accept_smoke() {
1403 let addr = next_test_ip4();
1404 let l = TcpListener::bind(addr);
1405 let mut a = l.listen().unwrap();
1406 let mut a2 = a.clone();
1408 let _t = Thread::spawn(move|| {
1409 let _ = TcpStream::connect(addr);
1411 let _t = Thread::spawn(move|| {
1412 let _ = TcpStream::connect(addr);
1415 assert!(a.accept().is_ok());
1416 assert!(a2.accept().is_ok());
1420 fn clone_accept_concurrent() {
1421 let addr = next_test_ip4();
1422 let l = TcpListener::bind(addr);
1423 let a = l.listen().unwrap();
1426 let (tx, rx) = channel();
1427 let tx2 = tx.clone();
1429 let _t = Thread::spawn(move|| {
1431 tx.send(a.accept()).unwrap();
1433 let _t = Thread::spawn(move|| {
1435 tx2.send(a.accept()).unwrap();
1438 let _t = Thread::spawn(move|| {
1439 let _ = TcpStream::connect(addr);
1441 let _t = Thread::spawn(move|| {
1442 let _ = TcpStream::connect(addr);
1445 assert!(rx.recv().unwrap().is_ok());
1446 assert!(rx.recv().unwrap().is_ok());
1450 fn close_accept_smoke() {
1451 let addr = next_test_ip4();
1452 let l = TcpListener::bind(addr);
1453 let mut a = l.listen().unwrap();
1455 a.close_accept().unwrap();
1456 assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
1460 fn close_accept_concurrent() {
1461 let addr = next_test_ip4();
1462 let l = TcpListener::bind(addr);
1463 let a = l.listen().unwrap();
1464 let mut a2 = a.clone();
1466 let (tx, rx) = channel();
1467 let _t = Thread::spawn(move|| {
1469 tx.send(a.accept()).unwrap();
1471 a2.close_accept().unwrap();
1473 assert_eq!(rx.recv().unwrap().err().unwrap().kind, EndOfFile);