1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! TCP network connections
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
23 use slice::ImmutableVector;
25 use io::net::addrinfo::get_host_addresses;
26 use io::net::ip::SocketAddr;
27 use io::{IoError, ConnectionFailed, InvalidInput};
28 use io::{Reader, Writer, Listener, Acceptor};
29 use from_str::FromStr;
31 use option::{None, Some, Option};
33 use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener};
34 use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
37 /// A structure which represents a TCP stream between a local socket and a
43 /// # #![allow(unused_must_use)]
44 /// use std::io::net::tcp::TcpStream;
46 /// let mut stream = TcpStream::connect("127.0.0.1", 34254);
48 /// stream.write([1]);
49 /// let mut buf = [0];
51 /// drop(stream); // close the connection
53 pub struct TcpStream {
54 obj: Box<RtioTcpStream + Send>,
58 fn new(s: Box<RtioTcpStream + Send>) -> TcpStream {
62 /// Open a TCP connection to a remote host by hostname or IP address.
64 /// `host` can be a hostname or IP address string. If no error is
65 /// encountered, then `Ok(stream)` is returned.
66 pub fn connect(host: &str, port: u16) -> IoResult<TcpStream> {
67 let addresses = match FromStr::from_str(host) {
68 Some(addr) => vec!(addr),
69 None => try!(get_host_addresses(host))
71 let mut err = IoError {
72 kind: ConnectionFailed,
73 desc: "no addresses found for hostname",
76 for addr in addresses.iter() {
77 let addr = rtio::SocketAddr{ ip: super::to_rtio(*addr), port: port };
78 let result = LocalIo::maybe_raise(|io| {
79 io.tcp_connect(addr, None).map(TcpStream::new)
86 err = IoError::from_rtio_error(connect_err)
93 /// Creates a TCP connection to a remote socket address, timing out after
94 /// the specified number of milliseconds.
96 /// This is the same as the `connect` method, except that if the timeout
97 /// specified (in milliseconds) elapses before a connection is made an error
98 /// will be returned. The error's kind will be `TimedOut`.
100 /// Note that the `addr` argument may one day be split into a separate host
101 /// and port, similar to the API seen in `connect`.
102 #[experimental = "the timeout argument may eventually change types"]
103 pub fn connect_timeout(addr: SocketAddr,
104 timeout_ms: u64) -> IoResult<TcpStream> {
105 let SocketAddr { ip, port } = addr;
106 let addr = rtio::SocketAddr { ip: super::to_rtio(ip), port: port };
107 LocalIo::maybe_raise(|io| {
108 io.tcp_connect(addr, Some(timeout_ms)).map(TcpStream::new)
109 }).map_err(IoError::from_rtio_error)
112 /// Returns the socket address of the remote peer of this TCP connection.
113 pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
114 match self.obj.peer_name() {
115 Ok(rtio::SocketAddr { ip, port }) => {
116 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
118 Err(e) => Err(IoError::from_rtio_error(e)),
122 /// Returns the socket address of the local half of this TCP connection.
123 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
124 match self.obj.socket_name() {
125 Ok(rtio::SocketAddr { ip, port }) => {
126 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
128 Err(e) => Err(IoError::from_rtio_error(e)),
132 /// Sets the nodelay flag on this connection to the boolean specified
134 pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
138 self.obj.control_congestion()
139 }.map_err(IoError::from_rtio_error)
142 /// Sets the keepalive timeout to the timeout specified.
144 /// If the value specified is `None`, then the keepalive flag is cleared on
145 /// this connection. Otherwise, the keepalive timeout will be set to the
146 /// specified time, in seconds.
148 pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
149 match delay_in_seconds {
150 Some(i) => self.obj.keepalive(i),
151 None => self.obj.letdie(),
152 }.map_err(IoError::from_rtio_error)
155 /// Closes the reading half of this connection.
157 /// This method will close the reading portion of this connection, causing
158 /// all pending and future reads to immediately return with an error.
163 /// # #![allow(unused_must_use)]
164 /// use std::io::timer;
165 /// use std::io::net::tcp::TcpStream;
167 /// let mut stream = TcpStream::connect("127.0.0.1", 34254).unwrap();
168 /// let stream2 = stream.clone();
171 /// // close this stream after one second
172 /// timer::sleep(1000);
173 /// let mut stream = stream2;
174 /// stream.close_read();
177 /// // wait for some data, will get canceled after one second
178 /// let mut buf = [0];
179 /// stream.read(buf);
182 /// Note that this method affects all cloned handles associated with this
183 /// stream, not just this one handle.
184 pub fn close_read(&mut self) -> IoResult<()> {
185 self.obj.close_read().map_err(IoError::from_rtio_error)
188 /// Closes the writing half of this connection.
190 /// This method will close the writing portion of this connection, causing
191 /// all future writes to immediately return with an error.
193 /// Note that this method affects all cloned handles associated with this
194 /// stream, not just this one handle.
195 pub fn close_write(&mut self) -> IoResult<()> {
196 self.obj.close_write().map_err(IoError::from_rtio_error)
199 /// Sets a timeout, in milliseconds, for blocking operations on this stream.
201 /// This function will set a timeout for all blocking operations (including
202 /// reads and writes) on this stream. The timeout specified is a relative
203 /// time, in milliseconds, into the future after which point operations will
204 /// time out. This means that the timeout must be reset periodically to keep
205 /// it from expiring. Specifying a value of `None` will clear the timeout
208 /// The timeout on this stream is local to this stream only. Setting a
209 /// timeout does not affect any other cloned instances of this stream, nor
210 /// does the timeout propagated to cloned handles of this stream. Setting
211 /// this timeout will override any specific read or write timeouts
212 /// previously set for this stream.
214 /// For clarification on the semantics of interrupting a read and a write,
215 /// take a look at `set_read_timeout` and `set_write_timeout`.
216 #[experimental = "the timeout argument may change in type and value"]
217 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
218 self.obj.set_timeout(timeout_ms)
221 /// Sets the timeout for read operations on this stream.
223 /// See documentation in `set_timeout` for the semantics of this read time.
224 /// This will overwrite any previous read timeout set through either this
225 /// function or `set_timeout`.
229 /// When this timeout expires, if there is no pending read operation, no
230 /// action is taken. Otherwise, the read operation will be scheduled to
231 /// promptly return. If a timeout error is returned, then no data was read
232 /// during the timeout period.
233 #[experimental = "the timeout argument may change in type and value"]
234 pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
235 self.obj.set_read_timeout(timeout_ms)
238 /// Sets the timeout for write operations on this stream.
240 /// See documentation in `set_timeout` for the semantics of this write time.
241 /// This will overwrite any previous write timeout set through either this
242 /// function or `set_timeout`.
246 /// When this timeout expires, if there is no pending write operation, no
247 /// action is taken. Otherwise, the pending write operation will be
248 /// scheduled to promptly return. The actual state of the underlying stream
249 /// is not specified.
251 /// The write operation may return an error of type `ShortWrite` which
252 /// indicates that the object is known to have written an exact number of
253 /// bytes successfully during the timeout period, and the remaining bytes
254 /// were never written.
256 /// If the write operation returns `TimedOut`, then it the timeout primitive
257 /// does not know how many bytes were written as part of the timeout
258 /// operation. It may be the case that bytes continue to be written in an
259 /// asynchronous fashion after the call to write returns.
260 #[experimental = "the timeout argument may change in type and value"]
261 pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
262 self.obj.set_write_timeout(timeout_ms)
266 impl Clone for TcpStream {
267 /// Creates a new handle to this TCP stream, allowing for simultaneous reads
268 /// and writes of this connection.
270 /// The underlying TCP stream will not be closed until all handles to the
271 /// stream have been deallocated. All handles will also follow the same
272 /// stream, but two concurrent reads will not receive the same data.
273 /// Instead, the first read will receive the first packet received, and the
274 /// second read will receive the second packet.
275 fn clone(&self) -> TcpStream {
276 TcpStream { obj: self.obj.clone() }
280 impl Reader for TcpStream {
281 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
282 self.obj.read(buf).map_err(IoError::from_rtio_error)
286 impl Writer for TcpStream {
287 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
288 self.obj.write(buf).map_err(IoError::from_rtio_error)
292 /// A structure representing a socket server. This listener is used to create a
293 /// `TcpAcceptor` which can be used to accept sockets on a local port.
300 /// # #![allow(dead_code)]
301 /// use std::io::{TcpListener, TcpStream};
302 /// use std::io::{Acceptor, Listener};
304 /// let listener = TcpListener::bind("127.0.0.1", 80);
306 /// // bind the listener to the specified address
307 /// let mut acceptor = listener.listen();
309 /// fn handle_client(mut stream: TcpStream) {
311 /// # &mut stream; // silence unused mutability/variable warning
313 /// // accept connections and process them, spawning a new tasks for each one
314 /// for stream in acceptor.incoming() {
316 /// Err(e) => { /* connection failed */ }
317 /// Ok(stream) => spawn(proc() {
318 /// // connection succeeded
319 /// handle_client(stream)
324 /// // close the socket server
328 pub struct TcpListener {
329 obj: Box<RtioTcpListener + Send>,
333 /// Creates a new `TcpListener` which will be bound to the specified IP
334 /// and port. This listener is not ready for accepting connections,
335 /// `listen` must be called on it before that's possible.
337 /// Binding with a port number of 0 will request that the OS assigns a port
338 /// to this listener. The port allocated can be queried via the
339 /// `socket_name` function.
340 pub fn bind(addr: &str, port: u16) -> IoResult<TcpListener> {
341 match FromStr::from_str(addr) {
343 let addr = rtio::SocketAddr{
344 ip: super::to_rtio(ip),
347 LocalIo::maybe_raise(|io| {
348 io.tcp_bind(addr).map(|l| TcpListener { obj: l })
349 }).map_err(IoError::from_rtio_error)
354 desc: "invalid IP address specified",
361 /// Returns the local socket address of this listener.
362 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
363 match self.obj.socket_name() {
364 Ok(rtio::SocketAddr { ip, port }) => {
365 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
367 Err(e) => Err(IoError::from_rtio_error(e)),
372 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
373 fn listen(self) -> IoResult<TcpAcceptor> {
374 match self.obj.listen() {
375 Ok(acceptor) => Ok(TcpAcceptor { obj: acceptor }),
376 Err(e) => Err(IoError::from_rtio_error(e)),
381 /// The accepting half of a TCP socket server. This structure is created through
382 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
383 /// `TcpStream` instances.
384 pub struct TcpAcceptor {
385 obj: Box<RtioTcpAcceptor + Send>,
389 /// Prevents blocking on all future accepts after `ms` milliseconds have
392 /// This function is used to set a deadline after which this acceptor will
393 /// time out accepting any connections. The argument is the relative
394 /// distance, in milliseconds, to a point in the future after which all
395 /// accepts will fail.
397 /// If the argument specified is `None`, then any previously registered
398 /// timeout is cleared.
400 /// A timeout of `0` can be used to "poll" this acceptor to see if it has
401 /// any pending connections. All pending connections will be accepted,
402 /// regardless of whether the timeout has expired or not (the accept will
403 /// not block in this case).
408 /// # #![allow(experimental)]
409 /// use std::io::net::tcp::TcpListener;
410 /// use std::io::{Listener, Acceptor, TimedOut};
412 /// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap();
414 /// // After 100ms have passed, all accepts will fail
415 /// a.set_timeout(Some(100));
417 /// match a.accept() {
418 /// Ok(..) => println!("accepted a socket"),
419 /// Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
420 /// Err(e) => println!("err: {}", e),
423 /// // Reset the timeout and try again
424 /// a.set_timeout(Some(100));
425 /// let socket = a.accept();
427 /// // Clear the timeout and block indefinitely waiting for a connection
428 /// a.set_timeout(None);
429 /// let socket = a.accept();
431 #[experimental = "the type of the argument and name of this function are \
433 pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); }
436 impl Acceptor<TcpStream> for TcpAcceptor {
437 fn accept(&mut self) -> IoResult<TcpStream> {
438 match self.obj.accept(){
439 Ok(s) => Ok(TcpStream::new(s)),
440 Err(e) => Err(IoError::from_rtio_error(e)),
446 #[allow(experimental)]
449 use io::net::ip::SocketAddr;
453 // FIXME #11530 this fails on android because tests are run as root
454 iotest!(fn bind_error() {
455 match TcpListener::bind("0.0.0.0", 1) {
457 Err(e) => assert_eq!(e.kind, PermissionDenied),
459 } #[ignore(cfg(windows))] #[ignore(cfg(target_os = "android"))])
461 iotest!(fn connect_error() {
462 match TcpStream::connect("0.0.0.0", 1) {
464 Err(e) => assert_eq!(e.kind, ConnectionRefused),
468 iotest!(fn listen_ip4_localhost() {
469 let socket_addr = next_test_ip4();
470 let ip_str = socket_addr.ip.to_str();
471 let port = socket_addr.port;
472 let listener = TcpListener::bind(ip_str.as_slice(), port);
473 let mut acceptor = listener.listen();
476 let mut stream = TcpStream::connect("localhost", port);
477 stream.write([144]).unwrap();
480 let mut stream = acceptor.accept();
482 stream.read(buf).unwrap();
483 assert!(buf[0] == 144);
486 iotest!(fn connect_localhost() {
487 let addr = next_test_ip4();
488 let ip_str = addr.ip.to_str();
489 let port = addr.port;
490 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
493 let mut stream = TcpStream::connect("localhost", addr.port);
494 stream.write([64]).unwrap();
497 let mut stream = acceptor.accept();
499 stream.read(buf).unwrap();
500 assert!(buf[0] == 64);
503 iotest!(fn connect_ip4_loopback() {
504 let addr = next_test_ip4();
505 let ip_str = addr.ip.to_str();
506 let port = addr.port;
507 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
510 let mut stream = TcpStream::connect("127.0.0.1", addr.port);
511 stream.write([44]).unwrap();
514 let mut stream = acceptor.accept();
516 stream.read(buf).unwrap();
517 assert!(buf[0] == 44);
520 iotest!(fn connect_ip6_loopback() {
521 let addr = next_test_ip6();
522 let ip_str = addr.ip.to_str();
523 let port = addr.port;
524 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
527 let mut stream = TcpStream::connect("::1", addr.port);
528 stream.write([66]).unwrap();
531 let mut stream = acceptor.accept();
533 stream.read(buf).unwrap();
534 assert!(buf[0] == 66);
537 iotest!(fn smoke_test_ip4() {
538 let addr = next_test_ip4();
539 let ip_str = addr.ip.to_str();
540 let port = addr.port;
541 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
544 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
545 stream.write([99]).unwrap();
548 let mut stream = acceptor.accept();
550 stream.read(buf).unwrap();
551 assert!(buf[0] == 99);
554 iotest!(fn smoke_test_ip6() {
555 let addr = next_test_ip6();
556 let ip_str = addr.ip.to_str();
557 let port = addr.port;
558 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
561 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
562 stream.write([99]).unwrap();
565 let mut stream = acceptor.accept();
567 stream.read(buf).unwrap();
568 assert!(buf[0] == 99);
571 iotest!(fn read_eof_ip4() {
572 let addr = next_test_ip4();
573 let ip_str = addr.ip.to_str();
574 let port = addr.port;
575 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
578 let _stream = TcpStream::connect(ip_str.as_slice(), port);
582 let mut stream = acceptor.accept();
584 let nread = stream.read(buf);
585 assert!(nread.is_err());
588 iotest!(fn read_eof_ip6() {
589 let addr = next_test_ip6();
590 let ip_str = addr.ip.to_str();
591 let port = addr.port;
592 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
595 let _stream = TcpStream::connect(ip_str.as_slice(), port);
599 let mut stream = acceptor.accept();
601 let nread = stream.read(buf);
602 assert!(nread.is_err());
605 iotest!(fn read_eof_twice_ip4() {
606 let addr = next_test_ip4();
607 let ip_str = addr.ip.to_str();
608 let port = addr.port;
609 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
612 let _stream = TcpStream::connect(ip_str.as_slice(), port);
616 let mut stream = acceptor.accept();
618 let nread = stream.read(buf);
619 assert!(nread.is_err());
621 match stream.read(buf) {
624 assert!(e.kind == NotConnected || e.kind == EndOfFile,
625 "unknown kind: {:?}", e.kind);
630 iotest!(fn read_eof_twice_ip6() {
631 let addr = next_test_ip6();
632 let ip_str = addr.ip.to_str();
633 let port = addr.port;
634 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
637 let _stream = TcpStream::connect(ip_str.as_slice(), port);
641 let mut stream = acceptor.accept();
643 let nread = stream.read(buf);
644 assert!(nread.is_err());
646 match stream.read(buf) {
649 assert!(e.kind == NotConnected || e.kind == EndOfFile,
650 "unknown kind: {:?}", e.kind);
655 iotest!(fn write_close_ip4() {
656 let addr = next_test_ip4();
657 let ip_str = addr.ip.to_str();
658 let port = addr.port;
659 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
662 let _stream = TcpStream::connect(ip_str.as_slice(), port);
666 let mut stream = acceptor.accept();
669 match stream.write(buf) {
672 assert!(e.kind == ConnectionReset ||
673 e.kind == BrokenPipe ||
674 e.kind == ConnectionAborted,
675 "unknown error: {:?}", e);
682 iotest!(fn write_close_ip6() {
683 let addr = next_test_ip6();
684 let ip_str = addr.ip.to_str();
685 let port = addr.port;
686 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
689 let _stream = TcpStream::connect(ip_str.as_slice(), port);
693 let mut stream = acceptor.accept();
696 match stream.write(buf) {
699 assert!(e.kind == ConnectionReset ||
700 e.kind == BrokenPipe ||
701 e.kind == ConnectionAborted,
702 "unknown error: {:?}", e);
709 iotest!(fn multiple_connect_serial_ip4() {
710 let addr = next_test_ip4();
711 let ip_str = addr.ip.to_str();
712 let port = addr.port;
714 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
717 for _ in range(0, max) {
718 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
719 stream.write([99]).unwrap();
723 for ref mut stream in acceptor.incoming().take(max) {
725 stream.read(buf).unwrap();
726 assert_eq!(buf[0], 99);
730 iotest!(fn multiple_connect_serial_ip6() {
731 let addr = next_test_ip6();
732 let ip_str = addr.ip.to_str();
733 let port = addr.port;
735 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
738 for _ in range(0, max) {
739 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
740 stream.write([99]).unwrap();
744 for ref mut stream in acceptor.incoming().take(max) {
746 stream.read(buf).unwrap();
747 assert_eq!(buf[0], 99);
751 iotest!(fn multiple_connect_interleaved_greedy_schedule_ip4() {
752 let addr = next_test_ip4();
753 let ip_str = addr.ip.to_str();
754 let port = addr.port;
755 static MAX: int = 10;
756 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
759 let mut acceptor = acceptor;
760 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
761 // Start another task to handle the connection
763 let mut stream = stream;
765 stream.read(buf).unwrap();
766 assert!(buf[0] == i as u8);
774 fn connect(i: int, addr: SocketAddr) {
775 let ip_str = addr.ip.to_str();
776 let port = addr.port;
777 if i == MAX { return }
780 debug!("connecting");
781 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
782 // Connect again before writing
783 connect(i + 1, addr);
785 stream.write([i as u8]).unwrap();
790 iotest!(fn multiple_connect_interleaved_greedy_schedule_ip6() {
791 let addr = next_test_ip6();
792 let ip_str = addr.ip.to_str();
793 let port = addr.port;
794 static MAX: int = 10;
795 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
798 let mut acceptor = acceptor;
799 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
800 // Start another task to handle the connection
802 let mut stream = stream;
804 stream.read(buf).unwrap();
805 assert!(buf[0] == i as u8);
813 fn connect(i: int, addr: SocketAddr) {
814 let ip_str = addr.ip.to_str();
815 let port = addr.port;
816 if i == MAX { return }
819 debug!("connecting");
820 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
821 // Connect again before writing
822 connect(i + 1, addr);
824 stream.write([i as u8]).unwrap();
829 iotest!(fn multiple_connect_interleaved_lazy_schedule_ip4() {
830 static MAX: int = 10;
831 let addr = next_test_ip4();
832 let ip_str = addr.ip.to_str();
833 let port = addr.port;
834 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
837 let mut acceptor = acceptor;
838 for stream in acceptor.incoming().take(MAX as uint) {
839 // Start another task to handle the connection
841 let mut stream = stream;
843 stream.read(buf).unwrap();
844 assert!(buf[0] == 99);
852 fn connect(i: int, addr: SocketAddr) {
853 let ip_str = addr.ip.to_str();
854 let port = addr.port;
855 if i == MAX { return }
858 debug!("connecting");
859 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
860 // Connect again before writing
861 connect(i + 1, addr);
863 stream.write([99]).unwrap();
868 iotest!(fn multiple_connect_interleaved_lazy_schedule_ip6() {
869 static MAX: int = 10;
870 let addr = next_test_ip6();
871 let ip_str = addr.ip.to_str();
872 let port = addr.port;
873 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
876 let mut acceptor = acceptor;
877 for stream in acceptor.incoming().take(MAX as uint) {
878 // Start another task to handle the connection
880 let mut stream = stream;
882 stream.read(buf).unwrap();
883 assert!(buf[0] == 99);
891 fn connect(i: int, addr: SocketAddr) {
892 let ip_str = addr.ip.to_str();
893 let port = addr.port;
894 if i == MAX { return }
897 debug!("connecting");
898 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
899 // Connect again before writing
900 connect(i + 1, addr);
902 stream.write([99]).unwrap();
907 pub fn socket_name(addr: SocketAddr) {
908 let ip_str = addr.ip.to_str();
909 let port = addr.port;
910 let mut listener = TcpListener::bind(ip_str.as_slice(), port).unwrap();
912 // Make sure socket_name gives
913 // us the socket we binded to.
914 let so_name = listener.socket_name();
915 assert!(so_name.is_ok());
916 assert_eq!(addr, so_name.unwrap());
919 pub fn peer_name(addr: SocketAddr) {
920 let ip_str = addr.ip.to_str();
921 let port = addr.port;
922 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
924 let mut acceptor = acceptor;
925 acceptor.accept().unwrap();
928 let stream = TcpStream::connect(ip_str.as_slice(), port);
930 assert!(stream.is_ok());
931 let mut stream = stream.unwrap();
933 // Make sure peer_name gives us the
934 // address/port of the peer we've
936 let peer_name = stream.peer_name();
937 assert!(peer_name.is_ok());
938 assert_eq!(addr, peer_name.unwrap());
941 iotest!(fn socket_and_peer_name_ip4() {
942 peer_name(next_test_ip4());
943 socket_name(next_test_ip4());
946 iotest!(fn socket_and_peer_name_ip6() {
947 // FIXME: peer name is not consistent
948 //peer_name(next_test_ip6());
949 socket_name(next_test_ip6());
952 iotest!(fn partial_read() {
953 let addr = next_test_ip4();
954 let port = addr.port;
955 let (tx, rx) = channel();
957 let ip_str = addr.ip.to_str();
958 let mut srv = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
960 let mut cl = srv.accept().unwrap();
961 cl.write([10]).unwrap();
968 let ip_str = addr.ip.to_str();
969 let mut c = TcpStream::connect(ip_str.as_slice(), port).unwrap();
970 let mut b = [0, ..10];
971 assert_eq!(c.read(b), Ok(1));
972 c.write([1]).unwrap();
976 iotest!(fn double_bind() {
977 let addr = next_test_ip4();
978 let ip_str = addr.ip.to_str();
979 let port = addr.port;
980 let listener = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen();
981 assert!(listener.is_ok());
982 match TcpListener::bind(ip_str.as_slice(), port).listen() {
985 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
986 "unknown error: {} {}", e, e.kind);
991 iotest!(fn fast_rebind() {
992 let addr = next_test_ip4();
993 let port = addr.port;
994 let (tx, rx) = channel();
997 let ip_str = addr.ip.to_str();
999 let _stream = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1005 let ip_str = addr.ip.to_str();
1006 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1009 let _stream = acceptor.accept().unwrap();
1015 let _listener = TcpListener::bind(addr.ip.to_str().as_slice(), port);
1018 iotest!(fn tcp_clone_smoke() {
1019 let addr = next_test_ip4();
1020 let ip_str = addr.ip.to_str();
1021 let port = addr.port;
1022 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1025 let mut s = TcpStream::connect(ip_str.as_slice(), port);
1026 let mut buf = [0, 0];
1027 assert_eq!(s.read(buf), Ok(1));
1028 assert_eq!(buf[0], 1);
1029 s.write([2]).unwrap();
1032 let mut s1 = acceptor.accept().unwrap();
1033 let s2 = s1.clone();
1035 let (tx1, rx1) = channel();
1036 let (tx2, rx2) = channel();
1040 s2.write([1]).unwrap();
1044 let mut buf = [0, 0];
1045 assert_eq!(s1.read(buf), Ok(1));
1049 iotest!(fn tcp_clone_two_read() {
1050 let addr = next_test_ip6();
1051 let ip_str = addr.ip.to_str();
1052 let port = addr.port;
1053 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1054 let (tx1, rx) = channel();
1055 let tx2 = tx1.clone();
1058 let mut s = TcpStream::connect(ip_str.as_slice(), port);
1059 s.write([1]).unwrap();
1061 s.write([2]).unwrap();
1065 let mut s1 = acceptor.accept().unwrap();
1066 let s2 = s1.clone();
1068 let (done, rx) = channel();
1071 let mut buf = [0, 0];
1072 s2.read(buf).unwrap();
1076 let mut buf = [0, 0];
1077 s1.read(buf).unwrap();
1083 iotest!(fn tcp_clone_two_write() {
1084 let addr = next_test_ip4();
1085 let ip_str = addr.ip.to_str();
1086 let port = addr.port;
1087 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1090 let mut s = TcpStream::connect(ip_str.as_slice(), port);
1091 let mut buf = [0, 1];
1092 s.read(buf).unwrap();
1093 s.read(buf).unwrap();
1096 let mut s1 = acceptor.accept().unwrap();
1097 let s2 = s1.clone();
1099 let (done, rx) = channel();
1102 s2.write([1]).unwrap();
1105 s1.write([2]).unwrap();
1110 iotest!(fn shutdown_smoke() {
1111 use rt::rtio::RtioTcpStream;
1113 let addr = next_test_ip4();
1114 let ip_str = addr.ip.to_str();
1115 let port = addr.port;
1116 let a = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen();
1119 let mut c = a.accept().unwrap();
1120 assert_eq!(c.read_to_end(), Ok(vec!()));
1121 c.write([1]).unwrap();
1124 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1125 assert!(s.obj.close_write().is_ok());
1126 assert!(s.write([1]).is_err());
1127 assert_eq!(s.read_to_end(), Ok(vec!(1)));
1130 iotest!(fn accept_timeout() {
1131 let addr = next_test_ip4();
1132 let ip_str = addr.ip.to_str();
1133 let port = addr.port;
1134 let mut a = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen().unwrap();
1136 a.set_timeout(Some(10));
1138 // Make sure we time out once and future invocations also time out
1139 let err = a.accept().err().unwrap();
1140 assert_eq!(err.kind, TimedOut);
1141 let err = a.accept().err().unwrap();
1142 assert_eq!(err.kind, TimedOut);
1144 // Also make sure that even though the timeout is expired that we will
1145 // continue to receive any pending connections.
1147 // FIXME: freebsd apparently never sees the pending connection, but
1148 // testing manually always works. Need to investigate this
1150 if !cfg!(target_os = "freebsd") {
1151 let (tx, rx) = channel();
1153 tx.send(TcpStream::connect(addr.ip.to_str().as_slice(),
1157 for i in range(0, 1001) {
1160 Err(ref e) if e.kind == TimedOut => {}
1161 Err(e) => fail!("error: {}", e),
1163 ::task::deschedule();
1164 if i == 1000 { fail!("should have a pending connection") }
1168 // Unset the timeout and make sure that this always blocks.
1169 a.set_timeout(None);
1171 drop(TcpStream::connect(addr.ip.to_str().as_slice(),
1174 a.accept().unwrap();
1177 iotest!(fn close_readwrite_smoke() {
1178 let addr = next_test_ip4();
1179 let ip_str = addr.ip.to_str();
1180 let port = addr.port;
1181 let a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1182 let (_tx, rx) = channel::<()>();
1185 let _s = a.accept().unwrap();
1186 let _ = rx.recv_opt();
1190 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1191 let mut s2 = s.clone();
1193 // closing should prevent reads/writes
1194 s.close_write().unwrap();
1195 assert!(s.write([0]).is_err());
1196 s.close_read().unwrap();
1197 assert!(s.read(b).is_err());
1199 // closing should affect previous handles
1200 assert!(s2.write([0]).is_err());
1201 assert!(s2.read(b).is_err());
1203 // closing should affect new handles
1204 let mut s3 = s.clone();
1205 assert!(s3.write([0]).is_err());
1206 assert!(s3.read(b).is_err());
1208 // make sure these don't die
1209 let _ = s2.close_read();
1210 let _ = s2.close_write();
1211 let _ = s3.close_read();
1212 let _ = s3.close_write();
1215 iotest!(fn close_read_wakes_up() {
1216 let addr = next_test_ip4();
1217 let ip_str = addr.ip.to_str();
1218 let port = addr.port;
1219 let a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1220 let (_tx, rx) = channel::<()>();
1223 let _s = a.accept().unwrap();
1224 let _ = rx.recv_opt();
1227 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1229 let (tx, rx) = channel();
1232 assert!(s2.read([0]).is_err());
1235 // this should wake up the child task
1236 s.close_read().unwrap();
1238 // this test will never finish if the child doesn't wake up
1242 iotest!(fn readwrite_timeouts() {
1243 let addr = next_test_ip6();
1244 let ip_str = addr.ip.to_str();
1245 let port = addr.port;
1246 let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1247 let (tx, rx) = channel::<()>();
1249 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1251 assert!(s.write([0]).is_ok());
1252 let _ = rx.recv_opt();
1255 let mut s = a.accept().unwrap();
1256 s.set_timeout(Some(20));
1257 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1258 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1260 s.set_timeout(Some(20));
1261 for i in range(0, 1001) {
1262 match s.write([0, .. 128 * 1024]) {
1263 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1264 Err(IoError { kind: TimedOut, .. }) => break,
1265 Err(e) => fail!("{}", e),
1267 if i == 1000 { fail!("should have filled up?!"); }
1269 assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1272 s.set_timeout(None);
1273 assert_eq!(s.read([0, 0]), Ok(1));
1276 iotest!(fn read_timeouts() {
1277 let addr = next_test_ip6();
1278 let ip_str = addr.ip.to_str();
1279 let port = addr.port;
1280 let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1281 let (tx, rx) = channel::<()>();
1283 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1286 while amt < 100 * 128 * 1024 {
1287 match s.read([0, ..128 * 1024]) {
1288 Ok(n) => { amt += n; }
1289 Err(e) => fail!("{}", e),
1292 let _ = rx.recv_opt();
1295 let mut s = a.accept().unwrap();
1296 s.set_read_timeout(Some(20));
1297 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1298 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1301 for _ in range(0, 100) {
1302 assert!(s.write([0, ..128 * 1024]).is_ok());
1306 iotest!(fn write_timeouts() {
1307 let addr = next_test_ip6();
1308 let ip_str = addr.ip.to_str();
1309 let port = addr.port;
1310 let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1311 let (tx, rx) = channel::<()>();
1313 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1315 assert!(s.write([0]).is_ok());
1316 let _ = rx.recv_opt();
1319 let mut s = a.accept().unwrap();
1320 s.set_write_timeout(Some(20));
1321 for i in range(0, 1001) {
1322 match s.write([0, .. 128 * 1024]) {
1323 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1324 Err(IoError { kind: TimedOut, .. }) => break,
1325 Err(e) => fail!("{}", e),
1327 if i == 1000 { fail!("should have filled up?!"); }
1329 assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1332 assert!(s.read([0]).is_ok());
1335 iotest!(fn timeout_concurrent_read() {
1336 let addr = next_test_ip6();
1337 let ip_str = addr.ip.to_str();
1338 let port = addr.port;
1339 let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1340 let (tx, rx) = channel::<()>();
1342 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1344 assert_eq!(s.write([0]), Ok(()));
1345 let _ = rx.recv_opt();
1348 let mut s = a.accept().unwrap();
1350 let (tx2, rx2) = channel();
1353 assert_eq!(s2.read([0]), Ok(1));
1357 s.set_read_timeout(Some(20));
1358 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);