1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! TCP network connections
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
21 use collections::MutableSeq;
24 use slice::ImmutableVector;
26 use io::net::addrinfo::get_host_addresses;
27 use io::net::ip::SocketAddr;
28 use io::{IoError, ConnectionFailed, InvalidInput};
29 use io::{Reader, Writer, Listener, Acceptor};
30 use from_str::FromStr;
32 use option::{None, Some, Option};
34 use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener};
35 use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
38 /// A structure which represents a TCP stream between a local socket and a
44 /// # #![allow(unused_must_use)]
45 /// use std::io::TcpStream;
47 /// let mut stream = TcpStream::connect("127.0.0.1", 34254);
49 /// stream.write([1]);
50 /// let mut buf = [0];
52 /// drop(stream); // close the connection
54 pub struct TcpStream {
55 obj: Box<RtioTcpStream + Send>,
59 fn new(s: Box<RtioTcpStream + Send>) -> TcpStream {
63 /// Open a TCP connection to a remote host by hostname or IP address.
65 /// `host` can be a hostname or IP address string. If no error is
66 /// encountered, then `Ok(stream)` is returned.
67 pub fn connect(host: &str, port: u16) -> IoResult<TcpStream> {
68 let addresses = match FromStr::from_str(host) {
69 Some(addr) => vec!(addr),
70 None => try!(get_host_addresses(host))
72 let mut err = IoError {
73 kind: ConnectionFailed,
74 desc: "no addresses found for hostname",
77 for addr in addresses.iter() {
78 let addr = rtio::SocketAddr{ ip: super::to_rtio(*addr), port: port };
79 let result = LocalIo::maybe_raise(|io| {
80 io.tcp_connect(addr, None).map(TcpStream::new)
87 err = IoError::from_rtio_error(connect_err)
94 /// Creates a TCP connection to a remote socket address, timing out after
95 /// the specified number of milliseconds.
97 /// This is the same as the `connect` method, except that if the timeout
98 /// specified (in milliseconds) elapses before a connection is made an error
99 /// will be returned. The error's kind will be `TimedOut`.
101 /// Note that the `addr` argument may one day be split into a separate host
102 /// and port, similar to the API seen in `connect`.
103 #[experimental = "the timeout argument may eventually change types"]
104 pub fn connect_timeout(addr: SocketAddr,
105 timeout_ms: u64) -> IoResult<TcpStream> {
106 let SocketAddr { ip, port } = addr;
107 let addr = rtio::SocketAddr { ip: super::to_rtio(ip), port: port };
108 LocalIo::maybe_raise(|io| {
109 io.tcp_connect(addr, Some(timeout_ms)).map(TcpStream::new)
110 }).map_err(IoError::from_rtio_error)
113 /// Returns the socket address of the remote peer of this TCP connection.
114 pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
115 match self.obj.peer_name() {
116 Ok(rtio::SocketAddr { ip, port }) => {
117 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
119 Err(e) => Err(IoError::from_rtio_error(e)),
123 /// Returns the socket address of the local half of this TCP connection.
124 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
125 match self.obj.socket_name() {
126 Ok(rtio::SocketAddr { ip, port }) => {
127 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
129 Err(e) => Err(IoError::from_rtio_error(e)),
133 /// Sets the nodelay flag on this connection to the boolean specified
135 pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
139 self.obj.control_congestion()
140 }.map_err(IoError::from_rtio_error)
143 /// Sets the keepalive timeout to the timeout specified.
145 /// If the value specified is `None`, then the keepalive flag is cleared on
146 /// this connection. Otherwise, the keepalive timeout will be set to the
147 /// specified time, in seconds.
149 pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
150 match delay_in_seconds {
151 Some(i) => self.obj.keepalive(i),
152 None => self.obj.letdie(),
153 }.map_err(IoError::from_rtio_error)
156 /// Closes the reading half of this connection.
158 /// This method will close the reading portion of this connection, causing
159 /// all pending and future reads to immediately return with an error.
164 /// # #![allow(unused_must_use)]
165 /// use std::io::timer;
166 /// use std::io::TcpStream;
168 /// let mut stream = TcpStream::connect("127.0.0.1", 34254).unwrap();
169 /// let stream2 = stream.clone();
172 /// // close this stream after one second
173 /// timer::sleep(1000);
174 /// let mut stream = stream2;
175 /// stream.close_read();
178 /// // wait for some data, will get canceled after one second
179 /// let mut buf = [0];
180 /// stream.read(buf);
183 /// Note that this method affects all cloned handles associated with this
184 /// stream, not just this one handle.
185 pub fn close_read(&mut self) -> IoResult<()> {
186 self.obj.close_read().map_err(IoError::from_rtio_error)
189 /// Closes the writing half of this connection.
191 /// This method will close the writing portion of this connection, causing
192 /// all future writes to immediately return with an error.
194 /// Note that this method affects all cloned handles associated with this
195 /// stream, not just this one handle.
196 pub fn close_write(&mut self) -> IoResult<()> {
197 self.obj.close_write().map_err(IoError::from_rtio_error)
200 /// Sets a timeout, in milliseconds, for blocking operations on this stream.
202 /// This function will set a timeout for all blocking operations (including
203 /// reads and writes) on this stream. The timeout specified is a relative
204 /// time, in milliseconds, into the future after which point operations will
205 /// time out. This means that the timeout must be reset periodically to keep
206 /// it from expiring. Specifying a value of `None` will clear the timeout
209 /// The timeout on this stream is local to this stream only. Setting a
210 /// timeout does not affect any other cloned instances of this stream, nor
211 /// does the timeout propagated to cloned handles of this stream. Setting
212 /// this timeout will override any specific read or write timeouts
213 /// previously set for this stream.
215 /// For clarification on the semantics of interrupting a read and a write,
216 /// take a look at `set_read_timeout` and `set_write_timeout`.
217 #[experimental = "the timeout argument may change in type and value"]
218 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
219 self.obj.set_timeout(timeout_ms)
222 /// Sets the timeout for read operations on this stream.
224 /// See documentation in `set_timeout` for the semantics of this read time.
225 /// This will overwrite any previous read timeout set through either this
226 /// function or `set_timeout`.
230 /// When this timeout expires, if there is no pending read operation, no
231 /// action is taken. Otherwise, the read operation will be scheduled to
232 /// promptly return. If a timeout error is returned, then no data was read
233 /// during the timeout period.
234 #[experimental = "the timeout argument may change in type and value"]
235 pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
236 self.obj.set_read_timeout(timeout_ms)
239 /// Sets the timeout for write operations on this stream.
241 /// See documentation in `set_timeout` for the semantics of this write time.
242 /// This will overwrite any previous write timeout set through either this
243 /// function or `set_timeout`.
247 /// When this timeout expires, if there is no pending write operation, no
248 /// action is taken. Otherwise, the pending write operation will be
249 /// scheduled to promptly return. The actual state of the underlying stream
250 /// is not specified.
252 /// The write operation may return an error of type `ShortWrite` which
253 /// indicates that the object is known to have written an exact number of
254 /// bytes successfully during the timeout period, and the remaining bytes
255 /// were never written.
257 /// If the write operation returns `TimedOut`, then it the timeout primitive
258 /// does not know how many bytes were written as part of the timeout
259 /// operation. It may be the case that bytes continue to be written in an
260 /// asynchronous fashion after the call to write returns.
261 #[experimental = "the timeout argument may change in type and value"]
262 pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
263 self.obj.set_write_timeout(timeout_ms)
267 impl Clone for TcpStream {
268 /// Creates a new handle to this TCP stream, allowing for simultaneous reads
269 /// and writes of this connection.
271 /// The underlying TCP stream will not be closed until all handles to the
272 /// stream have been deallocated. All handles will also follow the same
273 /// stream, but two concurrent reads will not receive the same data.
274 /// Instead, the first read will receive the first packet received, and the
275 /// second read will receive the second packet.
276 fn clone(&self) -> TcpStream {
277 TcpStream { obj: self.obj.clone() }
281 impl Reader for TcpStream {
282 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
283 self.obj.read(buf).map_err(IoError::from_rtio_error)
287 impl Writer for TcpStream {
288 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
289 self.obj.write(buf).map_err(IoError::from_rtio_error)
293 /// A structure representing a socket server. This listener is used to create a
294 /// `TcpAcceptor` which can be used to accept sockets on a local port.
301 /// # #![allow(dead_code)]
302 /// use std::io::{TcpListener, TcpStream};
303 /// use std::io::{Acceptor, Listener};
305 /// let listener = TcpListener::bind("127.0.0.1", 80);
307 /// // bind the listener to the specified address
308 /// let mut acceptor = listener.listen();
310 /// fn handle_client(mut stream: TcpStream) {
312 /// # &mut stream; // silence unused mutability/variable warning
314 /// // accept connections and process them, spawning a new tasks for each one
315 /// for stream in acceptor.incoming() {
317 /// Err(e) => { /* connection failed */ }
318 /// Ok(stream) => spawn(proc() {
319 /// // connection succeeded
320 /// handle_client(stream)
325 /// // close the socket server
329 pub struct TcpListener {
330 obj: Box<RtioTcpListener + Send>,
334 /// Creates a new `TcpListener` which will be bound to the specified IP
335 /// and port. This listener is not ready for accepting connections,
336 /// `listen` must be called on it before that's possible.
338 /// Binding with a port number of 0 will request that the OS assigns a port
339 /// to this listener. The port allocated can be queried via the
340 /// `socket_name` function.
341 pub fn bind(addr: &str, port: u16) -> IoResult<TcpListener> {
342 match FromStr::from_str(addr) {
344 let addr = rtio::SocketAddr{
345 ip: super::to_rtio(ip),
348 LocalIo::maybe_raise(|io| {
349 io.tcp_bind(addr).map(|l| TcpListener { obj: l })
350 }).map_err(IoError::from_rtio_error)
355 desc: "invalid IP address specified",
362 /// Returns the local socket address of this listener.
363 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
364 match self.obj.socket_name() {
365 Ok(rtio::SocketAddr { ip, port }) => {
366 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
368 Err(e) => Err(IoError::from_rtio_error(e)),
373 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
374 fn listen(self) -> IoResult<TcpAcceptor> {
375 match self.obj.listen() {
376 Ok(acceptor) => Ok(TcpAcceptor { obj: acceptor }),
377 Err(e) => Err(IoError::from_rtio_error(e)),
382 /// The accepting half of a TCP socket server. This structure is created through
383 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
384 /// `TcpStream` instances.
385 pub struct TcpAcceptor {
386 obj: Box<RtioTcpAcceptor + Send>,
390 /// Prevents blocking on all future accepts after `ms` milliseconds have
393 /// This function is used to set a deadline after which this acceptor will
394 /// time out accepting any connections. The argument is the relative
395 /// distance, in milliseconds, to a point in the future after which all
396 /// accepts will fail.
398 /// If the argument specified is `None`, then any previously registered
399 /// timeout is cleared.
401 /// A timeout of `0` can be used to "poll" this acceptor to see if it has
402 /// any pending connections. All pending connections will be accepted,
403 /// regardless of whether the timeout has expired or not (the accept will
404 /// not block in this case).
409 /// # #![allow(experimental)]
410 /// use std::io::TcpListener;
411 /// use std::io::{Listener, Acceptor, TimedOut};
413 /// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap();
415 /// // After 100ms have passed, all accepts will fail
416 /// a.set_timeout(Some(100));
418 /// match a.accept() {
419 /// Ok(..) => println!("accepted a socket"),
420 /// Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
421 /// Err(e) => println!("err: {}", e),
424 /// // Reset the timeout and try again
425 /// a.set_timeout(Some(100));
426 /// let socket = a.accept();
428 /// // Clear the timeout and block indefinitely waiting for a connection
429 /// a.set_timeout(None);
430 /// let socket = a.accept();
432 #[experimental = "the type of the argument and name of this function are \
434 pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); }
437 impl Acceptor<TcpStream> for TcpAcceptor {
438 fn accept(&mut self) -> IoResult<TcpStream> {
439 match self.obj.accept(){
440 Ok(s) => Ok(TcpStream::new(s)),
441 Err(e) => Err(IoError::from_rtio_error(e)),
447 #[allow(experimental)]
450 use io::net::ip::SocketAddr;
454 // FIXME #11530 this fails on android because tests are run as root
455 iotest!(fn bind_error() {
456 match TcpListener::bind("0.0.0.0", 1) {
458 Err(e) => assert_eq!(e.kind, PermissionDenied),
460 } #[ignore(cfg(windows))] #[ignore(cfg(target_os = "android"))])
462 iotest!(fn connect_error() {
463 match TcpStream::connect("0.0.0.0", 1) {
465 Err(e) => assert_eq!(e.kind, ConnectionRefused),
469 iotest!(fn listen_ip4_localhost() {
470 let socket_addr = next_test_ip4();
471 let ip_str = socket_addr.ip.to_string();
472 let port = socket_addr.port;
473 let listener = TcpListener::bind(ip_str.as_slice(), port);
474 let mut acceptor = listener.listen();
477 let mut stream = TcpStream::connect("localhost", port);
478 stream.write([144]).unwrap();
481 let mut stream = acceptor.accept();
483 stream.read(buf).unwrap();
484 assert!(buf[0] == 144);
487 iotest!(fn connect_localhost() {
488 let addr = next_test_ip4();
489 let ip_str = addr.ip.to_string();
490 let port = addr.port;
491 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
494 let mut stream = TcpStream::connect("localhost", addr.port);
495 stream.write([64]).unwrap();
498 let mut stream = acceptor.accept();
500 stream.read(buf).unwrap();
501 assert!(buf[0] == 64);
504 iotest!(fn connect_ip4_loopback() {
505 let addr = next_test_ip4();
506 let ip_str = addr.ip.to_string();
507 let port = addr.port;
508 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
511 let mut stream = TcpStream::connect("127.0.0.1", addr.port);
512 stream.write([44]).unwrap();
515 let mut stream = acceptor.accept();
517 stream.read(buf).unwrap();
518 assert!(buf[0] == 44);
521 iotest!(fn connect_ip6_loopback() {
522 let addr = next_test_ip6();
523 let ip_str = addr.ip.to_string();
524 let port = addr.port;
525 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
528 let mut stream = TcpStream::connect("::1", addr.port);
529 stream.write([66]).unwrap();
532 let mut stream = acceptor.accept();
534 stream.read(buf).unwrap();
535 assert!(buf[0] == 66);
538 iotest!(fn smoke_test_ip4() {
539 let addr = next_test_ip4();
540 let ip_str = addr.ip.to_string();
541 let port = addr.port;
542 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
545 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
546 stream.write([99]).unwrap();
549 let mut stream = acceptor.accept();
551 stream.read(buf).unwrap();
552 assert!(buf[0] == 99);
555 iotest!(fn smoke_test_ip6() {
556 let addr = next_test_ip6();
557 let ip_str = addr.ip.to_string();
558 let port = addr.port;
559 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
562 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
563 stream.write([99]).unwrap();
566 let mut stream = acceptor.accept();
568 stream.read(buf).unwrap();
569 assert!(buf[0] == 99);
572 iotest!(fn read_eof_ip4() {
573 let addr = next_test_ip4();
574 let ip_str = addr.ip.to_string();
575 let port = addr.port;
576 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
579 let _stream = TcpStream::connect(ip_str.as_slice(), port);
583 let mut stream = acceptor.accept();
585 let nread = stream.read(buf);
586 assert!(nread.is_err());
589 iotest!(fn read_eof_ip6() {
590 let addr = next_test_ip6();
591 let ip_str = addr.ip.to_string();
592 let port = addr.port;
593 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
596 let _stream = TcpStream::connect(ip_str.as_slice(), port);
600 let mut stream = acceptor.accept();
602 let nread = stream.read(buf);
603 assert!(nread.is_err());
606 iotest!(fn read_eof_twice_ip4() {
607 let addr = next_test_ip4();
608 let ip_str = addr.ip.to_string();
609 let port = addr.port;
610 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
613 let _stream = TcpStream::connect(ip_str.as_slice(), port);
617 let mut stream = acceptor.accept();
619 let nread = stream.read(buf);
620 assert!(nread.is_err());
622 match stream.read(buf) {
625 assert!(e.kind == NotConnected || e.kind == EndOfFile,
626 "unknown kind: {}", e.kind);
631 iotest!(fn read_eof_twice_ip6() {
632 let addr = next_test_ip6();
633 let ip_str = addr.ip.to_string();
634 let port = addr.port;
635 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
638 let _stream = TcpStream::connect(ip_str.as_slice(), port);
642 let mut stream = acceptor.accept();
644 let nread = stream.read(buf);
645 assert!(nread.is_err());
647 match stream.read(buf) {
650 assert!(e.kind == NotConnected || e.kind == EndOfFile,
651 "unknown kind: {}", e.kind);
656 iotest!(fn write_close_ip4() {
657 let addr = next_test_ip4();
658 let ip_str = addr.ip.to_string();
659 let port = addr.port;
660 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
663 let _stream = TcpStream::connect(ip_str.as_slice(), port);
667 let mut stream = acceptor.accept();
670 match stream.write(buf) {
673 assert!(e.kind == ConnectionReset ||
674 e.kind == BrokenPipe ||
675 e.kind == ConnectionAborted,
676 "unknown error: {}", e);
683 iotest!(fn write_close_ip6() {
684 let addr = next_test_ip6();
685 let ip_str = addr.ip.to_string();
686 let port = addr.port;
687 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
690 let _stream = TcpStream::connect(ip_str.as_slice(), port);
694 let mut stream = acceptor.accept();
697 match stream.write(buf) {
700 assert!(e.kind == ConnectionReset ||
701 e.kind == BrokenPipe ||
702 e.kind == ConnectionAborted,
703 "unknown error: {}", e);
710 iotest!(fn multiple_connect_serial_ip4() {
711 let addr = next_test_ip4();
712 let ip_str = addr.ip.to_string();
713 let port = addr.port;
715 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
718 for _ in range(0, max) {
719 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
720 stream.write([99]).unwrap();
724 for ref mut stream in acceptor.incoming().take(max) {
726 stream.read(buf).unwrap();
727 assert_eq!(buf[0], 99);
731 iotest!(fn multiple_connect_serial_ip6() {
732 let addr = next_test_ip6();
733 let ip_str = addr.ip.to_string();
734 let port = addr.port;
736 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
739 for _ in range(0, max) {
740 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
741 stream.write([99]).unwrap();
745 for ref mut stream in acceptor.incoming().take(max) {
747 stream.read(buf).unwrap();
748 assert_eq!(buf[0], 99);
752 iotest!(fn multiple_connect_interleaved_greedy_schedule_ip4() {
753 let addr = next_test_ip4();
754 let ip_str = addr.ip.to_string();
755 let port = addr.port;
756 static MAX: int = 10;
757 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
760 let mut acceptor = acceptor;
761 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
762 // Start another task to handle the connection
764 let mut stream = stream;
766 stream.read(buf).unwrap();
767 assert!(buf[0] == i as u8);
775 fn connect(i: int, addr: SocketAddr) {
776 let ip_str = addr.ip.to_string();
777 let port = addr.port;
778 if i == MAX { return }
781 debug!("connecting");
782 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
783 // Connect again before writing
784 connect(i + 1, addr);
786 stream.write([i as u8]).unwrap();
791 iotest!(fn multiple_connect_interleaved_greedy_schedule_ip6() {
792 let addr = next_test_ip6();
793 let ip_str = addr.ip.to_string();
794 let port = addr.port;
795 static MAX: int = 10;
796 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
799 let mut acceptor = acceptor;
800 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
801 // Start another task to handle the connection
803 let mut stream = stream;
805 stream.read(buf).unwrap();
806 assert!(buf[0] == i as u8);
814 fn connect(i: int, addr: SocketAddr) {
815 let ip_str = addr.ip.to_string();
816 let port = addr.port;
817 if i == MAX { return }
820 debug!("connecting");
821 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
822 // Connect again before writing
823 connect(i + 1, addr);
825 stream.write([i as u8]).unwrap();
830 iotest!(fn multiple_connect_interleaved_lazy_schedule_ip4() {
831 static MAX: int = 10;
832 let addr = next_test_ip4();
833 let ip_str = addr.ip.to_string();
834 let port = addr.port;
835 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
838 let mut acceptor = acceptor;
839 for stream in acceptor.incoming().take(MAX as uint) {
840 // Start another task to handle the connection
842 let mut stream = stream;
844 stream.read(buf).unwrap();
845 assert!(buf[0] == 99);
853 fn connect(i: int, addr: SocketAddr) {
854 let ip_str = addr.ip.to_string();
855 let port = addr.port;
856 if i == MAX { return }
859 debug!("connecting");
860 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
861 // Connect again before writing
862 connect(i + 1, addr);
864 stream.write([99]).unwrap();
869 iotest!(fn multiple_connect_interleaved_lazy_schedule_ip6() {
870 static MAX: int = 10;
871 let addr = next_test_ip6();
872 let ip_str = addr.ip.to_string();
873 let port = addr.port;
874 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
877 let mut acceptor = acceptor;
878 for stream in acceptor.incoming().take(MAX as uint) {
879 // Start another task to handle the connection
881 let mut stream = stream;
883 stream.read(buf).unwrap();
884 assert!(buf[0] == 99);
892 fn connect(i: int, addr: SocketAddr) {
893 let ip_str = addr.ip.to_string();
894 let port = addr.port;
895 if i == MAX { return }
898 debug!("connecting");
899 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
900 // Connect again before writing
901 connect(i + 1, addr);
903 stream.write([99]).unwrap();
908 pub fn socket_name(addr: SocketAddr) {
909 let ip_str = addr.ip.to_string();
910 let port = addr.port;
911 let mut listener = TcpListener::bind(ip_str.as_slice(), port).unwrap();
913 // Make sure socket_name gives
914 // us the socket we binded to.
915 let so_name = listener.socket_name();
916 assert!(so_name.is_ok());
917 assert_eq!(addr, so_name.unwrap());
920 pub fn peer_name(addr: SocketAddr) {
921 let ip_str = addr.ip.to_string();
922 let port = addr.port;
923 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
925 let mut acceptor = acceptor;
926 acceptor.accept().unwrap();
929 let stream = TcpStream::connect(ip_str.as_slice(), port);
931 assert!(stream.is_ok());
932 let mut stream = stream.unwrap();
934 // Make sure peer_name gives us the
935 // address/port of the peer we've
937 let peer_name = stream.peer_name();
938 assert!(peer_name.is_ok());
939 assert_eq!(addr, peer_name.unwrap());
942 iotest!(fn socket_and_peer_name_ip4() {
943 peer_name(next_test_ip4());
944 socket_name(next_test_ip4());
947 iotest!(fn socket_and_peer_name_ip6() {
948 // FIXME: peer name is not consistent
949 //peer_name(next_test_ip6());
950 socket_name(next_test_ip6());
953 iotest!(fn partial_read() {
954 let addr = next_test_ip4();
955 let port = addr.port;
956 let (tx, rx) = channel();
958 let ip_str = addr.ip.to_string();
959 let mut srv = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
961 let mut cl = srv.accept().unwrap();
962 cl.write([10]).unwrap();
969 let ip_str = addr.ip.to_string();
970 let mut c = TcpStream::connect(ip_str.as_slice(), port).unwrap();
971 let mut b = [0, ..10];
972 assert_eq!(c.read(b), Ok(1));
973 c.write([1]).unwrap();
977 iotest!(fn double_bind() {
978 let addr = next_test_ip4();
979 let ip_str = addr.ip.to_string();
980 let port = addr.port;
981 let listener = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen();
982 assert!(listener.is_ok());
983 match TcpListener::bind(ip_str.as_slice(), port).listen() {
986 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
987 "unknown error: {} {}", e, e.kind);
992 iotest!(fn fast_rebind() {
993 let addr = next_test_ip4();
994 let port = addr.port;
995 let (tx, rx) = channel();
998 let ip_str = addr.ip.to_string();
1000 let _stream = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1006 let ip_str = addr.ip.to_string();
1007 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1010 let _stream = acceptor.accept().unwrap();
1016 let _listener = TcpListener::bind(addr.ip.to_string().as_slice(), port);
1019 iotest!(fn tcp_clone_smoke() {
1020 let addr = next_test_ip4();
1021 let ip_str = addr.ip.to_string();
1022 let port = addr.port;
1023 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1026 let mut s = TcpStream::connect(ip_str.as_slice(), port);
1027 let mut buf = [0, 0];
1028 assert_eq!(s.read(buf), Ok(1));
1029 assert_eq!(buf[0], 1);
1030 s.write([2]).unwrap();
1033 let mut s1 = acceptor.accept().unwrap();
1034 let s2 = s1.clone();
1036 let (tx1, rx1) = channel();
1037 let (tx2, rx2) = channel();
1041 s2.write([1]).unwrap();
1045 let mut buf = [0, 0];
1046 assert_eq!(s1.read(buf), Ok(1));
1050 iotest!(fn tcp_clone_two_read() {
1051 let addr = next_test_ip6();
1052 let ip_str = addr.ip.to_string();
1053 let port = addr.port;
1054 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1055 let (tx1, rx) = channel();
1056 let tx2 = tx1.clone();
1059 let mut s = TcpStream::connect(ip_str.as_slice(), port);
1060 s.write([1]).unwrap();
1062 s.write([2]).unwrap();
1066 let mut s1 = acceptor.accept().unwrap();
1067 let s2 = s1.clone();
1069 let (done, rx) = channel();
1072 let mut buf = [0, 0];
1073 s2.read(buf).unwrap();
1077 let mut buf = [0, 0];
1078 s1.read(buf).unwrap();
1084 iotest!(fn tcp_clone_two_write() {
1085 let addr = next_test_ip4();
1086 let ip_str = addr.ip.to_string();
1087 let port = addr.port;
1088 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1091 let mut s = TcpStream::connect(ip_str.as_slice(), port);
1092 let mut buf = [0, 1];
1093 s.read(buf).unwrap();
1094 s.read(buf).unwrap();
1097 let mut s1 = acceptor.accept().unwrap();
1098 let s2 = s1.clone();
1100 let (done, rx) = channel();
1103 s2.write([1]).unwrap();
1106 s1.write([2]).unwrap();
1111 iotest!(fn shutdown_smoke() {
1112 use rt::rtio::RtioTcpStream;
1114 let addr = next_test_ip4();
1115 let ip_str = addr.ip.to_string();
1116 let port = addr.port;
1117 let a = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen();
1120 let mut c = a.accept().unwrap();
1121 assert_eq!(c.read_to_end(), Ok(vec!()));
1122 c.write([1]).unwrap();
1125 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1126 assert!(s.obj.close_write().is_ok());
1127 assert!(s.write([1]).is_err());
1128 assert_eq!(s.read_to_end(), Ok(vec!(1)));
1131 iotest!(fn accept_timeout() {
1132 let addr = next_test_ip4();
1133 let ip_str = addr.ip.to_string();
1134 let port = addr.port;
1135 let mut a = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen().unwrap();
1137 a.set_timeout(Some(10));
1139 // Make sure we time out once and future invocations also time out
1140 let err = a.accept().err().unwrap();
1141 assert_eq!(err.kind, TimedOut);
1142 let err = a.accept().err().unwrap();
1143 assert_eq!(err.kind, TimedOut);
1145 // Also make sure that even though the timeout is expired that we will
1146 // continue to receive any pending connections.
1148 // FIXME: freebsd apparently never sees the pending connection, but
1149 // testing manually always works. Need to investigate this
1151 if !cfg!(target_os = "freebsd") {
1152 let (tx, rx) = channel();
1154 tx.send(TcpStream::connect(addr.ip.to_string().as_slice(),
1158 for i in range(0i, 1001) {
1161 Err(ref e) if e.kind == TimedOut => {}
1162 Err(e) => fail!("error: {}", e),
1164 ::task::deschedule();
1165 if i == 1000 { fail!("should have a pending connection") }
1169 // Unset the timeout and make sure that this always blocks.
1170 a.set_timeout(None);
1172 drop(TcpStream::connect(addr.ip.to_string().as_slice(),
1175 a.accept().unwrap();
1178 iotest!(fn close_readwrite_smoke() {
1179 let addr = next_test_ip4();
1180 let ip_str = addr.ip.to_string();
1181 let port = addr.port;
1182 let a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1183 let (_tx, rx) = channel::<()>();
1186 let _s = a.accept().unwrap();
1187 let _ = rx.recv_opt();
1191 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1192 let mut s2 = s.clone();
1194 // closing should prevent reads/writes
1195 s.close_write().unwrap();
1196 assert!(s.write([0]).is_err());
1197 s.close_read().unwrap();
1198 assert!(s.read(b).is_err());
1200 // closing should affect previous handles
1201 assert!(s2.write([0]).is_err());
1202 assert!(s2.read(b).is_err());
1204 // closing should affect new handles
1205 let mut s3 = s.clone();
1206 assert!(s3.write([0]).is_err());
1207 assert!(s3.read(b).is_err());
1209 // make sure these don't die
1210 let _ = s2.close_read();
1211 let _ = s2.close_write();
1212 let _ = s3.close_read();
1213 let _ = s3.close_write();
1216 iotest!(fn close_read_wakes_up() {
1217 let addr = next_test_ip4();
1218 let ip_str = addr.ip.to_string();
1219 let port = addr.port;
1220 let a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1221 let (_tx, rx) = channel::<()>();
1224 let _s = a.accept().unwrap();
1225 let _ = rx.recv_opt();
1228 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1230 let (tx, rx) = channel();
1233 assert!(s2.read([0]).is_err());
1236 // this should wake up the child task
1237 s.close_read().unwrap();
1239 // this test will never finish if the child doesn't wake up
1243 iotest!(fn readwrite_timeouts() {
1244 let addr = next_test_ip6();
1245 let ip_str = addr.ip.to_string();
1246 let port = addr.port;
1247 let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1248 let (tx, rx) = channel::<()>();
1250 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1252 assert!(s.write([0]).is_ok());
1253 let _ = rx.recv_opt();
1256 let mut s = a.accept().unwrap();
1257 s.set_timeout(Some(20));
1258 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1259 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1261 s.set_timeout(Some(20));
1262 for i in range(0i, 1001) {
1263 match s.write([0, .. 128 * 1024]) {
1264 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1265 Err(IoError { kind: TimedOut, .. }) => break,
1266 Err(e) => fail!("{}", e),
1268 if i == 1000 { fail!("should have filled up?!"); }
1270 assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1273 s.set_timeout(None);
1274 assert_eq!(s.read([0, 0]), Ok(1));
1277 iotest!(fn read_timeouts() {
1278 let addr = next_test_ip6();
1279 let ip_str = addr.ip.to_string();
1280 let port = addr.port;
1281 let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1282 let (tx, rx) = channel::<()>();
1284 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1287 while amt < 100 * 128 * 1024 {
1288 match s.read([0, ..128 * 1024]) {
1289 Ok(n) => { amt += n; }
1290 Err(e) => fail!("{}", e),
1293 let _ = rx.recv_opt();
1296 let mut s = a.accept().unwrap();
1297 s.set_read_timeout(Some(20));
1298 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1299 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1302 for _ in range(0i, 100) {
1303 assert!(s.write([0, ..128 * 1024]).is_ok());
1307 iotest!(fn write_timeouts() {
1308 let addr = next_test_ip6();
1309 let ip_str = addr.ip.to_string();
1310 let port = addr.port;
1311 let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1312 let (tx, rx) = channel::<()>();
1314 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1316 assert!(s.write([0]).is_ok());
1317 let _ = rx.recv_opt();
1320 let mut s = a.accept().unwrap();
1321 s.set_write_timeout(Some(20));
1322 for i in range(0i, 1001) {
1323 match s.write([0, .. 128 * 1024]) {
1324 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1325 Err(IoError { kind: TimedOut, .. }) => break,
1326 Err(e) => fail!("{}", e),
1328 if i == 1000 { fail!("should have filled up?!"); }
1330 assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1333 assert!(s.read([0]).is_ok());
1336 iotest!(fn timeout_concurrent_read() {
1337 let addr = next_test_ip6();
1338 let ip_str = addr.ip.to_string();
1339 let port = addr.port;
1340 let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1341 let (tx, rx) = channel::<()>();
1343 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1345 assert_eq!(s.write([0]), Ok(()));
1346 let _ = rx.recv_opt();
1349 let mut s = a.accept().unwrap();
1351 let (tx2, rx2) = channel();
1354 assert_eq!(s2.read([0]), Ok(1));
1358 s.set_read_timeout(Some(20));
1359 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1365 iotest!(fn clone_while_reading() {
1366 let addr = next_test_ip6();
1367 let listen = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
1368 let mut accept = listen.listen().unwrap();
1370 // Enqueue a task to write to a socket
1371 let (tx, rx) = channel();
1372 let (txdone, rxdone) = channel();
1373 let txdone2 = txdone.clone();
1375 let mut tcp = TcpStream::connect(addr.ip.to_string().as_slice(),
1376 addr.port).unwrap();
1378 tcp.write_u8(0).unwrap();
1382 // Spawn off a reading clone
1383 let tcp = accept.accept().unwrap();
1384 let tcp2 = tcp.clone();
1385 let txdone3 = txdone.clone();
1387 let mut tcp2 = tcp2;
1388 tcp2.read_u8().unwrap();
1392 // Try to ensure that the reading clone is indeed reading
1393 for _ in range(0i, 50) {
1394 ::task::deschedule();
1397 // clone the handle again while it's reading, then let it finish the
1399 let _ = tcp.clone();