1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! TCP network connections
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
21 use collections::MutableSeq;
24 use slice::ImmutableSlice;
26 use io::net::addrinfo::get_host_addresses;
27 use io::net::ip::SocketAddr;
28 use io::{IoError, ConnectionFailed, InvalidInput};
29 use io::{Reader, Writer, Listener, Acceptor};
30 use io::{standard_error, TimedOut};
31 use from_str::FromStr;
33 use option::{None, Some, Option};
35 use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener};
36 use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
40 /// A structure which represents a TCP stream between a local socket and a
46 /// # #![allow(unused_must_use)]
47 /// use std::io::TcpStream;
49 /// let mut stream = TcpStream::connect("127.0.0.1", 34254);
51 /// stream.write([1]);
52 /// let mut buf = [0];
54 /// drop(stream); // close the connection
56 pub struct TcpStream {
57 obj: Box<RtioTcpStream + Send>,
61 fn new(s: Box<RtioTcpStream + Send>) -> TcpStream {
65 /// Open a TCP connection to a remote host by hostname or IP address.
67 /// `host` can be a hostname or IP address string. If no error is
68 /// encountered, then `Ok(stream)` is returned.
69 pub fn connect(host: &str, port: u16) -> IoResult<TcpStream> {
70 let addresses = match FromStr::from_str(host) {
71 Some(addr) => vec!(addr),
72 None => try!(get_host_addresses(host))
74 let mut err = IoError {
75 kind: ConnectionFailed,
76 desc: "no addresses found for hostname",
79 for addr in addresses.iter() {
80 let addr = rtio::SocketAddr{ ip: super::to_rtio(*addr), port: port };
81 let result = LocalIo::maybe_raise(|io| {
82 io.tcp_connect(addr, None).map(TcpStream::new)
89 err = IoError::from_rtio_error(connect_err)
96 /// Creates a TCP connection to a remote socket address, timing out after
97 /// the specified duration.
99 /// This is the same as the `connect` method, except that if the timeout
100 /// specified (in milliseconds) elapses before a connection is made an error
101 /// will be returned. The error's kind will be `TimedOut`.
103 /// Note that the `addr` argument may one day be split into a separate host
104 /// and port, similar to the API seen in `connect`.
106 /// If a `timeout` with zero or negative duration is specified then
107 /// the function returns `Err`, with the error kind set to `TimedOut`.
108 #[experimental = "the timeout argument may eventually change types"]
109 pub fn connect_timeout(addr: SocketAddr,
110 timeout: Duration) -> IoResult<TcpStream> {
111 if timeout <= Duration::milliseconds(0) {
112 return Err(standard_error(TimedOut));
115 let SocketAddr { ip, port } = addr;
116 let addr = rtio::SocketAddr { ip: super::to_rtio(ip), port: port };
117 LocalIo::maybe_raise(|io| {
118 io.tcp_connect(addr, Some(timeout.num_milliseconds() as u64)).map(TcpStream::new)
119 }).map_err(IoError::from_rtio_error)
122 /// Returns the socket address of the remote peer of this TCP connection.
123 pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
124 match self.obj.peer_name() {
125 Ok(rtio::SocketAddr { ip, port }) => {
126 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
128 Err(e) => Err(IoError::from_rtio_error(e)),
132 /// Returns the socket address of the local half of this TCP connection.
133 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
134 match self.obj.socket_name() {
135 Ok(rtio::SocketAddr { ip, port }) => {
136 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
138 Err(e) => Err(IoError::from_rtio_error(e)),
142 /// Sets the nodelay flag on this connection to the boolean specified
144 pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
148 self.obj.control_congestion()
149 }.map_err(IoError::from_rtio_error)
152 /// Sets the keepalive timeout to the timeout specified.
154 /// If the value specified is `None`, then the keepalive flag is cleared on
155 /// this connection. Otherwise, the keepalive timeout will be set to the
156 /// specified time, in seconds.
158 pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
159 match delay_in_seconds {
160 Some(i) => self.obj.keepalive(i),
161 None => self.obj.letdie(),
162 }.map_err(IoError::from_rtio_error)
165 /// Closes the reading half of this connection.
167 /// This method will close the reading portion of this connection, causing
168 /// all pending and future reads to immediately return with an error.
173 /// # #![allow(unused_must_use)]
174 /// use std::io::timer;
175 /// use std::io::TcpStream;
176 /// use std::time::Duration;
178 /// let mut stream = TcpStream::connect("127.0.0.1", 34254).unwrap();
179 /// let stream2 = stream.clone();
182 /// // close this stream after one second
183 /// timer::sleep(Duration::seconds(1));
184 /// let mut stream = stream2;
185 /// stream.close_read();
188 /// // wait for some data, will get canceled after one second
189 /// let mut buf = [0];
190 /// stream.read(buf);
193 /// Note that this method affects all cloned handles associated with this
194 /// stream, not just this one handle.
195 pub fn close_read(&mut self) -> IoResult<()> {
196 self.obj.close_read().map_err(IoError::from_rtio_error)
199 /// Closes the writing half of this connection.
201 /// This method will close the writing portion of this connection, causing
202 /// all future writes to immediately return with an error.
204 /// Note that this method affects all cloned handles associated with this
205 /// stream, not just this one handle.
206 pub fn close_write(&mut self) -> IoResult<()> {
207 self.obj.close_write().map_err(IoError::from_rtio_error)
210 /// Sets a timeout, in milliseconds, for blocking operations on this stream.
212 /// This function will set a timeout for all blocking operations (including
213 /// reads and writes) on this stream. The timeout specified is a relative
214 /// time, in milliseconds, into the future after which point operations will
215 /// time out. This means that the timeout must be reset periodically to keep
216 /// it from expiring. Specifying a value of `None` will clear the timeout
219 /// The timeout on this stream is local to this stream only. Setting a
220 /// timeout does not affect any other cloned instances of this stream, nor
221 /// does the timeout propagated to cloned handles of this stream. Setting
222 /// this timeout will override any specific read or write timeouts
223 /// previously set for this stream.
225 /// For clarification on the semantics of interrupting a read and a write,
226 /// take a look at `set_read_timeout` and `set_write_timeout`.
227 #[experimental = "the timeout argument may change in type and value"]
228 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
229 self.obj.set_timeout(timeout_ms)
232 /// Sets the timeout for read operations on this stream.
234 /// See documentation in `set_timeout` for the semantics of this read time.
235 /// This will overwrite any previous read timeout set through either this
236 /// function or `set_timeout`.
240 /// When this timeout expires, if there is no pending read operation, no
241 /// action is taken. Otherwise, the read operation will be scheduled to
242 /// promptly return. If a timeout error is returned, then no data was read
243 /// during the timeout period.
244 #[experimental = "the timeout argument may change in type and value"]
245 pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
246 self.obj.set_read_timeout(timeout_ms)
249 /// Sets the timeout for write operations on this stream.
251 /// See documentation in `set_timeout` for the semantics of this write time.
252 /// This will overwrite any previous write timeout set through either this
253 /// function or `set_timeout`.
257 /// When this timeout expires, if there is no pending write operation, no
258 /// action is taken. Otherwise, the pending write operation will be
259 /// scheduled to promptly return. The actual state of the underlying stream
260 /// is not specified.
262 /// The write operation may return an error of type `ShortWrite` which
263 /// indicates that the object is known to have written an exact number of
264 /// bytes successfully during the timeout period, and the remaining bytes
265 /// were never written.
267 /// If the write operation returns `TimedOut`, then it the timeout primitive
268 /// does not know how many bytes were written as part of the timeout
269 /// operation. It may be the case that bytes continue to be written in an
270 /// asynchronous fashion after the call to write returns.
271 #[experimental = "the timeout argument may change in type and value"]
272 pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
273 self.obj.set_write_timeout(timeout_ms)
277 impl Clone for TcpStream {
278 /// Creates a new handle to this TCP stream, allowing for simultaneous reads
279 /// and writes of this connection.
281 /// The underlying TCP stream will not be closed until all handles to the
282 /// stream have been deallocated. All handles will also follow the same
283 /// stream, but two concurrent reads will not receive the same data.
284 /// Instead, the first read will receive the first packet received, and the
285 /// second read will receive the second packet.
286 fn clone(&self) -> TcpStream {
287 TcpStream { obj: self.obj.clone() }
291 impl Reader for TcpStream {
292 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
293 self.obj.read(buf).map_err(IoError::from_rtio_error)
297 impl Writer for TcpStream {
298 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
299 self.obj.write(buf).map_err(IoError::from_rtio_error)
303 /// A structure representing a socket server. This listener is used to create a
304 /// `TcpAcceptor` which can be used to accept sockets on a local port.
311 /// # #![allow(dead_code)]
312 /// use std::io::{TcpListener, TcpStream};
313 /// use std::io::{Acceptor, Listener};
315 /// let listener = TcpListener::bind("127.0.0.1", 80);
317 /// // bind the listener to the specified address
318 /// let mut acceptor = listener.listen();
320 /// fn handle_client(mut stream: TcpStream) {
322 /// # &mut stream; // silence unused mutability/variable warning
324 /// // accept connections and process them, spawning a new tasks for each one
325 /// for stream in acceptor.incoming() {
327 /// Err(e) => { /* connection failed */ }
328 /// Ok(stream) => spawn(proc() {
329 /// // connection succeeded
330 /// handle_client(stream)
335 /// // close the socket server
339 pub struct TcpListener {
340 obj: Box<RtioTcpListener + Send>,
344 /// Creates a new `TcpListener` which will be bound to the specified IP
345 /// and port. This listener is not ready for accepting connections,
346 /// `listen` must be called on it before that's possible.
348 /// Binding with a port number of 0 will request that the OS assigns a port
349 /// to this listener. The port allocated can be queried via the
350 /// `socket_name` function.
351 pub fn bind(addr: &str, port: u16) -> IoResult<TcpListener> {
352 match FromStr::from_str(addr) {
354 let addr = rtio::SocketAddr{
355 ip: super::to_rtio(ip),
358 LocalIo::maybe_raise(|io| {
359 io.tcp_bind(addr).map(|l| TcpListener { obj: l })
360 }).map_err(IoError::from_rtio_error)
365 desc: "invalid IP address specified",
372 /// Returns the local socket address of this listener.
373 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
374 match self.obj.socket_name() {
375 Ok(rtio::SocketAddr { ip, port }) => {
376 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
378 Err(e) => Err(IoError::from_rtio_error(e)),
383 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
384 fn listen(self) -> IoResult<TcpAcceptor> {
385 match self.obj.listen() {
386 Ok(acceptor) => Ok(TcpAcceptor { obj: acceptor }),
387 Err(e) => Err(IoError::from_rtio_error(e)),
392 /// The accepting half of a TCP socket server. This structure is created through
393 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
394 /// `TcpStream` instances.
395 pub struct TcpAcceptor {
396 obj: Box<RtioTcpAcceptor + Send>,
400 /// Prevents blocking on all future accepts after `ms` milliseconds have
403 /// This function is used to set a deadline after which this acceptor will
404 /// time out accepting any connections. The argument is the relative
405 /// distance, in milliseconds, to a point in the future after which all
406 /// accepts will fail.
408 /// If the argument specified is `None`, then any previously registered
409 /// timeout is cleared.
411 /// A timeout of `0` can be used to "poll" this acceptor to see if it has
412 /// any pending connections. All pending connections will be accepted,
413 /// regardless of whether the timeout has expired or not (the accept will
414 /// not block in this case).
419 /// # #![allow(experimental)]
420 /// use std::io::TcpListener;
421 /// use std::io::{Listener, Acceptor, TimedOut};
423 /// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap();
425 /// // After 100ms have passed, all accepts will fail
426 /// a.set_timeout(Some(100));
428 /// match a.accept() {
429 /// Ok(..) => println!("accepted a socket"),
430 /// Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
431 /// Err(e) => println!("err: {}", e),
434 /// // Reset the timeout and try again
435 /// a.set_timeout(Some(100));
436 /// let socket = a.accept();
438 /// // Clear the timeout and block indefinitely waiting for a connection
439 /// a.set_timeout(None);
440 /// let socket = a.accept();
442 #[experimental = "the type of the argument and name of this function are \
444 pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); }
447 impl Acceptor<TcpStream> for TcpAcceptor {
448 fn accept(&mut self) -> IoResult<TcpStream> {
449 match self.obj.accept(){
450 Ok(s) => Ok(TcpStream::new(s)),
451 Err(e) => Err(IoError::from_rtio_error(e)),
457 #[allow(experimental)]
460 use io::net::ip::SocketAddr;
464 // FIXME #11530 this fails on android because tests are run as root
465 iotest!(fn bind_error() {
466 match TcpListener::bind("0.0.0.0", 1) {
468 Err(e) => assert_eq!(e.kind, PermissionDenied),
470 } #[ignore(cfg(windows))] #[ignore(cfg(target_os = "android"))])
472 iotest!(fn connect_error() {
473 match TcpStream::connect("0.0.0.0", 1) {
475 Err(e) => assert_eq!(e.kind, ConnectionRefused),
479 iotest!(fn listen_ip4_localhost() {
480 let socket_addr = next_test_ip4();
481 let ip_str = socket_addr.ip.to_string();
482 let port = socket_addr.port;
483 let listener = TcpListener::bind(ip_str.as_slice(), port);
484 let mut acceptor = listener.listen();
487 let mut stream = TcpStream::connect("localhost", port);
488 stream.write([144]).unwrap();
491 let mut stream = acceptor.accept();
493 stream.read(buf).unwrap();
494 assert!(buf[0] == 144);
497 iotest!(fn connect_localhost() {
498 let addr = next_test_ip4();
499 let ip_str = addr.ip.to_string();
500 let port = addr.port;
501 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
504 let mut stream = TcpStream::connect("localhost", addr.port);
505 stream.write([64]).unwrap();
508 let mut stream = acceptor.accept();
510 stream.read(buf).unwrap();
511 assert!(buf[0] == 64);
514 iotest!(fn connect_ip4_loopback() {
515 let addr = next_test_ip4();
516 let ip_str = addr.ip.to_string();
517 let port = addr.port;
518 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
521 let mut stream = TcpStream::connect("127.0.0.1", addr.port);
522 stream.write([44]).unwrap();
525 let mut stream = acceptor.accept();
527 stream.read(buf).unwrap();
528 assert!(buf[0] == 44);
531 iotest!(fn connect_ip6_loopback() {
532 let addr = next_test_ip6();
533 let ip_str = addr.ip.to_string();
534 let port = addr.port;
535 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
538 let mut stream = TcpStream::connect("::1", addr.port);
539 stream.write([66]).unwrap();
542 let mut stream = acceptor.accept();
544 stream.read(buf).unwrap();
545 assert!(buf[0] == 66);
548 iotest!(fn smoke_test_ip4() {
549 let addr = next_test_ip4();
550 let ip_str = addr.ip.to_string();
551 let port = addr.port;
552 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
555 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
556 stream.write([99]).unwrap();
559 let mut stream = acceptor.accept();
561 stream.read(buf).unwrap();
562 assert!(buf[0] == 99);
565 iotest!(fn smoke_test_ip6() {
566 let addr = next_test_ip6();
567 let ip_str = addr.ip.to_string();
568 let port = addr.port;
569 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
572 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
573 stream.write([99]).unwrap();
576 let mut stream = acceptor.accept();
578 stream.read(buf).unwrap();
579 assert!(buf[0] == 99);
582 iotest!(fn read_eof_ip4() {
583 let addr = next_test_ip4();
584 let ip_str = addr.ip.to_string();
585 let port = addr.port;
586 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
589 let _stream = TcpStream::connect(ip_str.as_slice(), port);
593 let mut stream = acceptor.accept();
595 let nread = stream.read(buf);
596 assert!(nread.is_err());
599 iotest!(fn read_eof_ip6() {
600 let addr = next_test_ip6();
601 let ip_str = addr.ip.to_string();
602 let port = addr.port;
603 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
606 let _stream = TcpStream::connect(ip_str.as_slice(), port);
610 let mut stream = acceptor.accept();
612 let nread = stream.read(buf);
613 assert!(nread.is_err());
616 iotest!(fn read_eof_twice_ip4() {
617 let addr = next_test_ip4();
618 let ip_str = addr.ip.to_string();
619 let port = addr.port;
620 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
623 let _stream = TcpStream::connect(ip_str.as_slice(), port);
627 let mut stream = acceptor.accept();
629 let nread = stream.read(buf);
630 assert!(nread.is_err());
632 match stream.read(buf) {
635 assert!(e.kind == NotConnected || e.kind == EndOfFile,
636 "unknown kind: {}", e.kind);
641 iotest!(fn read_eof_twice_ip6() {
642 let addr = next_test_ip6();
643 let ip_str = addr.ip.to_string();
644 let port = addr.port;
645 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
648 let _stream = TcpStream::connect(ip_str.as_slice(), port);
652 let mut stream = acceptor.accept();
654 let nread = stream.read(buf);
655 assert!(nread.is_err());
657 match stream.read(buf) {
660 assert!(e.kind == NotConnected || e.kind == EndOfFile,
661 "unknown kind: {}", e.kind);
666 iotest!(fn write_close_ip4() {
667 let addr = next_test_ip4();
668 let ip_str = addr.ip.to_string();
669 let port = addr.port;
670 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
673 let _stream = TcpStream::connect(ip_str.as_slice(), port);
677 let mut stream = acceptor.accept();
680 match stream.write(buf) {
683 assert!(e.kind == ConnectionReset ||
684 e.kind == BrokenPipe ||
685 e.kind == ConnectionAborted,
686 "unknown error: {}", e);
693 iotest!(fn write_close_ip6() {
694 let addr = next_test_ip6();
695 let ip_str = addr.ip.to_string();
696 let port = addr.port;
697 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
700 let _stream = TcpStream::connect(ip_str.as_slice(), port);
704 let mut stream = acceptor.accept();
707 match stream.write(buf) {
710 assert!(e.kind == ConnectionReset ||
711 e.kind == BrokenPipe ||
712 e.kind == ConnectionAborted,
713 "unknown error: {}", e);
720 iotest!(fn multiple_connect_serial_ip4() {
721 let addr = next_test_ip4();
722 let ip_str = addr.ip.to_string();
723 let port = addr.port;
725 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
728 for _ in range(0, max) {
729 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
730 stream.write([99]).unwrap();
734 for ref mut stream in acceptor.incoming().take(max) {
736 stream.read(buf).unwrap();
737 assert_eq!(buf[0], 99);
741 iotest!(fn multiple_connect_serial_ip6() {
742 let addr = next_test_ip6();
743 let ip_str = addr.ip.to_string();
744 let port = addr.port;
746 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
749 for _ in range(0, max) {
750 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
751 stream.write([99]).unwrap();
755 for ref mut stream in acceptor.incoming().take(max) {
757 stream.read(buf).unwrap();
758 assert_eq!(buf[0], 99);
762 iotest!(fn multiple_connect_interleaved_greedy_schedule_ip4() {
763 let addr = next_test_ip4();
764 let ip_str = addr.ip.to_string();
765 let port = addr.port;
766 static MAX: int = 10;
767 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
770 let mut acceptor = acceptor;
771 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
772 // Start another task to handle the connection
774 let mut stream = stream;
776 stream.read(buf).unwrap();
777 assert!(buf[0] == i as u8);
785 fn connect(i: int, addr: SocketAddr) {
786 let ip_str = addr.ip.to_string();
787 let port = addr.port;
788 if i == MAX { return }
791 debug!("connecting");
792 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
793 // Connect again before writing
794 connect(i + 1, addr);
796 stream.write([i as u8]).unwrap();
801 iotest!(fn multiple_connect_interleaved_greedy_schedule_ip6() {
802 let addr = next_test_ip6();
803 let ip_str = addr.ip.to_string();
804 let port = addr.port;
805 static MAX: int = 10;
806 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
809 let mut acceptor = acceptor;
810 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
811 // Start another task to handle the connection
813 let mut stream = stream;
815 stream.read(buf).unwrap();
816 assert!(buf[0] == i as u8);
824 fn connect(i: int, addr: SocketAddr) {
825 let ip_str = addr.ip.to_string();
826 let port = addr.port;
827 if i == MAX { return }
830 debug!("connecting");
831 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
832 // Connect again before writing
833 connect(i + 1, addr);
835 stream.write([i as u8]).unwrap();
840 iotest!(fn multiple_connect_interleaved_lazy_schedule_ip4() {
841 static MAX: int = 10;
842 let addr = next_test_ip4();
843 let ip_str = addr.ip.to_string();
844 let port = addr.port;
845 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
848 let mut acceptor = acceptor;
849 for stream in acceptor.incoming().take(MAX as uint) {
850 // Start another task to handle the connection
852 let mut stream = stream;
854 stream.read(buf).unwrap();
855 assert!(buf[0] == 99);
863 fn connect(i: int, addr: SocketAddr) {
864 let ip_str = addr.ip.to_string();
865 let port = addr.port;
866 if i == MAX { return }
869 debug!("connecting");
870 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
871 // Connect again before writing
872 connect(i + 1, addr);
874 stream.write([99]).unwrap();
879 iotest!(fn multiple_connect_interleaved_lazy_schedule_ip6() {
880 static MAX: int = 10;
881 let addr = next_test_ip6();
882 let ip_str = addr.ip.to_string();
883 let port = addr.port;
884 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
887 let mut acceptor = acceptor;
888 for stream in acceptor.incoming().take(MAX as uint) {
889 // Start another task to handle the connection
891 let mut stream = stream;
893 stream.read(buf).unwrap();
894 assert!(buf[0] == 99);
902 fn connect(i: int, addr: SocketAddr) {
903 let ip_str = addr.ip.to_string();
904 let port = addr.port;
905 if i == MAX { return }
908 debug!("connecting");
909 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
910 // Connect again before writing
911 connect(i + 1, addr);
913 stream.write([99]).unwrap();
918 pub fn socket_name(addr: SocketAddr) {
919 let ip_str = addr.ip.to_string();
920 let port = addr.port;
921 let mut listener = TcpListener::bind(ip_str.as_slice(), port).unwrap();
923 // Make sure socket_name gives
924 // us the socket we binded to.
925 let so_name = listener.socket_name();
926 assert!(so_name.is_ok());
927 assert_eq!(addr, so_name.unwrap());
930 pub fn peer_name(addr: SocketAddr) {
931 let ip_str = addr.ip.to_string();
932 let port = addr.port;
933 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
935 let mut acceptor = acceptor;
936 acceptor.accept().unwrap();
939 let stream = TcpStream::connect(ip_str.as_slice(), port);
941 assert!(stream.is_ok());
942 let mut stream = stream.unwrap();
944 // Make sure peer_name gives us the
945 // address/port of the peer we've
947 let peer_name = stream.peer_name();
948 assert!(peer_name.is_ok());
949 assert_eq!(addr, peer_name.unwrap());
952 iotest!(fn socket_and_peer_name_ip4() {
953 peer_name(next_test_ip4());
954 socket_name(next_test_ip4());
957 iotest!(fn socket_and_peer_name_ip6() {
958 // FIXME: peer name is not consistent
959 //peer_name(next_test_ip6());
960 socket_name(next_test_ip6());
963 iotest!(fn partial_read() {
964 let addr = next_test_ip4();
965 let port = addr.port;
966 let (tx, rx) = channel();
968 let ip_str = addr.ip.to_string();
969 let mut srv = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
971 let mut cl = srv.accept().unwrap();
972 cl.write([10]).unwrap();
979 let ip_str = addr.ip.to_string();
980 let mut c = TcpStream::connect(ip_str.as_slice(), port).unwrap();
981 let mut b = [0, ..10];
982 assert_eq!(c.read(b), Ok(1));
983 c.write([1]).unwrap();
987 iotest!(fn double_bind() {
988 let addr = next_test_ip4();
989 let ip_str = addr.ip.to_string();
990 let port = addr.port;
991 let listener = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen();
992 assert!(listener.is_ok());
993 match TcpListener::bind(ip_str.as_slice(), port).listen() {
996 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
997 "unknown error: {} {}", e, e.kind);
1002 iotest!(fn fast_rebind() {
1003 let addr = next_test_ip4();
1004 let port = addr.port;
1005 let (tx, rx) = channel();
1008 let ip_str = addr.ip.to_string();
1010 let _stream = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1016 let ip_str = addr.ip.to_string();
1017 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1020 let _stream = acceptor.accept().unwrap();
1026 let _listener = TcpListener::bind(addr.ip.to_string().as_slice(), port);
1029 iotest!(fn tcp_clone_smoke() {
1030 let addr = next_test_ip4();
1031 let ip_str = addr.ip.to_string();
1032 let port = addr.port;
1033 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1036 let mut s = TcpStream::connect(ip_str.as_slice(), port);
1037 let mut buf = [0, 0];
1038 assert_eq!(s.read(buf), Ok(1));
1039 assert_eq!(buf[0], 1);
1040 s.write([2]).unwrap();
1043 let mut s1 = acceptor.accept().unwrap();
1044 let s2 = s1.clone();
1046 let (tx1, rx1) = channel();
1047 let (tx2, rx2) = channel();
1051 s2.write([1]).unwrap();
1055 let mut buf = [0, 0];
1056 assert_eq!(s1.read(buf), Ok(1));
1060 iotest!(fn tcp_clone_two_read() {
1061 let addr = next_test_ip6();
1062 let ip_str = addr.ip.to_string();
1063 let port = addr.port;
1064 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1065 let (tx1, rx) = channel();
1066 let tx2 = tx1.clone();
1069 let mut s = TcpStream::connect(ip_str.as_slice(), port);
1070 s.write([1]).unwrap();
1072 s.write([2]).unwrap();
1076 let mut s1 = acceptor.accept().unwrap();
1077 let s2 = s1.clone();
1079 let (done, rx) = channel();
1082 let mut buf = [0, 0];
1083 s2.read(buf).unwrap();
1087 let mut buf = [0, 0];
1088 s1.read(buf).unwrap();
1094 iotest!(fn tcp_clone_two_write() {
1095 let addr = next_test_ip4();
1096 let ip_str = addr.ip.to_string();
1097 let port = addr.port;
1098 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1101 let mut s = TcpStream::connect(ip_str.as_slice(), port);
1102 let mut buf = [0, 1];
1103 s.read(buf).unwrap();
1104 s.read(buf).unwrap();
1107 let mut s1 = acceptor.accept().unwrap();
1108 let s2 = s1.clone();
1110 let (done, rx) = channel();
1113 s2.write([1]).unwrap();
1116 s1.write([2]).unwrap();
1121 iotest!(fn shutdown_smoke() {
1122 use rt::rtio::RtioTcpStream;
1124 let addr = next_test_ip4();
1125 let ip_str = addr.ip.to_string();
1126 let port = addr.port;
1127 let a = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen();
1130 let mut c = a.accept().unwrap();
1131 assert_eq!(c.read_to_end(), Ok(vec!()));
1132 c.write([1]).unwrap();
1135 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1136 assert!(s.obj.close_write().is_ok());
1137 assert!(s.write([1]).is_err());
1138 assert_eq!(s.read_to_end(), Ok(vec!(1)));
1141 iotest!(fn accept_timeout() {
1142 let addr = next_test_ip4();
1143 let ip_str = addr.ip.to_string();
1144 let port = addr.port;
1145 let mut a = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen().unwrap();
1147 a.set_timeout(Some(10));
1149 // Make sure we time out once and future invocations also time out
1150 let err = a.accept().err().unwrap();
1151 assert_eq!(err.kind, TimedOut);
1152 let err = a.accept().err().unwrap();
1153 assert_eq!(err.kind, TimedOut);
1155 // Also make sure that even though the timeout is expired that we will
1156 // continue to receive any pending connections.
1158 // FIXME: freebsd apparently never sees the pending connection, but
1159 // testing manually always works. Need to investigate this
1161 if !cfg!(target_os = "freebsd") {
1162 let (tx, rx) = channel();
1164 tx.send(TcpStream::connect(addr.ip.to_string().as_slice(),
1168 for i in range(0i, 1001) {
1171 Err(ref e) if e.kind == TimedOut => {}
1172 Err(e) => fail!("error: {}", e),
1174 ::task::deschedule();
1175 if i == 1000 { fail!("should have a pending connection") }
1179 // Unset the timeout and make sure that this always blocks.
1180 a.set_timeout(None);
1182 drop(TcpStream::connect(addr.ip.to_string().as_slice(),
1185 a.accept().unwrap();
1188 iotest!(fn close_readwrite_smoke() {
1189 let addr = next_test_ip4();
1190 let ip_str = addr.ip.to_string();
1191 let port = addr.port;
1192 let a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1193 let (_tx, rx) = channel::<()>();
1196 let _s = a.accept().unwrap();
1197 let _ = rx.recv_opt();
1201 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1202 let mut s2 = s.clone();
1204 // closing should prevent reads/writes
1205 s.close_write().unwrap();
1206 assert!(s.write([0]).is_err());
1207 s.close_read().unwrap();
1208 assert!(s.read(b).is_err());
1210 // closing should affect previous handles
1211 assert!(s2.write([0]).is_err());
1212 assert!(s2.read(b).is_err());
1214 // closing should affect new handles
1215 let mut s3 = s.clone();
1216 assert!(s3.write([0]).is_err());
1217 assert!(s3.read(b).is_err());
1219 // make sure these don't die
1220 let _ = s2.close_read();
1221 let _ = s2.close_write();
1222 let _ = s3.close_read();
1223 let _ = s3.close_write();
1226 iotest!(fn close_read_wakes_up() {
1227 let addr = next_test_ip4();
1228 let ip_str = addr.ip.to_string();
1229 let port = addr.port;
1230 let a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1231 let (_tx, rx) = channel::<()>();
1234 let _s = a.accept().unwrap();
1235 let _ = rx.recv_opt();
1238 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1240 let (tx, rx) = channel();
1243 assert!(s2.read([0]).is_err());
1246 // this should wake up the child task
1247 s.close_read().unwrap();
1249 // this test will never finish if the child doesn't wake up
1253 iotest!(fn readwrite_timeouts() {
1254 let addr = next_test_ip6();
1255 let ip_str = addr.ip.to_string();
1256 let port = addr.port;
1257 let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1258 let (tx, rx) = channel::<()>();
1260 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1262 assert!(s.write([0]).is_ok());
1263 let _ = rx.recv_opt();
1266 let mut s = a.accept().unwrap();
1267 s.set_timeout(Some(20));
1268 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1269 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1271 s.set_timeout(Some(20));
1272 for i in range(0i, 1001) {
1273 match s.write([0, .. 128 * 1024]) {
1274 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1275 Err(IoError { kind: TimedOut, .. }) => break,
1276 Err(e) => fail!("{}", e),
1278 if i == 1000 { fail!("should have filled up?!"); }
1280 assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1283 s.set_timeout(None);
1284 assert_eq!(s.read([0, 0]), Ok(1));
1287 iotest!(fn read_timeouts() {
1288 let addr = next_test_ip6();
1289 let ip_str = addr.ip.to_string();
1290 let port = addr.port;
1291 let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1292 let (tx, rx) = channel::<()>();
1294 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1297 while amt < 100 * 128 * 1024 {
1298 match s.read([0, ..128 * 1024]) {
1299 Ok(n) => { amt += n; }
1300 Err(e) => fail!("{}", e),
1303 let _ = rx.recv_opt();
1306 let mut s = a.accept().unwrap();
1307 s.set_read_timeout(Some(20));
1308 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1309 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1312 for _ in range(0i, 100) {
1313 assert!(s.write([0, ..128 * 1024]).is_ok());
1317 iotest!(fn write_timeouts() {
1318 let addr = next_test_ip6();
1319 let ip_str = addr.ip.to_string();
1320 let port = addr.port;
1321 let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1322 let (tx, rx) = channel::<()>();
1324 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1326 assert!(s.write([0]).is_ok());
1327 let _ = rx.recv_opt();
1330 let mut s = a.accept().unwrap();
1331 s.set_write_timeout(Some(20));
1332 for i in range(0i, 1001) {
1333 match s.write([0, .. 128 * 1024]) {
1334 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1335 Err(IoError { kind: TimedOut, .. }) => break,
1336 Err(e) => fail!("{}", e),
1338 if i == 1000 { fail!("should have filled up?!"); }
1340 assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1343 assert!(s.read([0]).is_ok());
1346 iotest!(fn timeout_concurrent_read() {
1347 let addr = next_test_ip6();
1348 let ip_str = addr.ip.to_string();
1349 let port = addr.port;
1350 let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1351 let (tx, rx) = channel::<()>();
1353 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1355 assert_eq!(s.write([0]), Ok(()));
1356 let _ = rx.recv_opt();
1359 let mut s = a.accept().unwrap();
1361 let (tx2, rx2) = channel();
1364 assert_eq!(s2.read([0]), Ok(1));
1368 s.set_read_timeout(Some(20));
1369 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1375 iotest!(fn clone_while_reading() {
1376 let addr = next_test_ip6();
1377 let listen = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
1378 let mut accept = listen.listen().unwrap();
1380 // Enqueue a task to write to a socket
1381 let (tx, rx) = channel();
1382 let (txdone, rxdone) = channel();
1383 let txdone2 = txdone.clone();
1385 let mut tcp = TcpStream::connect(addr.ip.to_string().as_slice(),
1386 addr.port).unwrap();
1388 tcp.write_u8(0).unwrap();
1392 // Spawn off a reading clone
1393 let tcp = accept.accept().unwrap();
1394 let tcp2 = tcp.clone();
1395 let txdone3 = txdone.clone();
1397 let mut tcp2 = tcp2;
1398 tcp2.read_u8().unwrap();
1402 // Try to ensure that the reading clone is indeed reading
1403 for _ in range(0i, 50) {
1404 ::task::deschedule();
1407 // clone the handle again while it's reading, then let it finish the
1409 let _ = tcp.clone();