1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! TCP network connections
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
23 use slice::ImmutableSlice;
25 use io::net::addrinfo::get_host_addresses;
26 use io::net::ip::SocketAddr;
27 use io::{IoError, ConnectionFailed, InvalidInput};
28 use io::{Reader, Writer, Listener, Acceptor};
29 use io::{standard_error, TimedOut};
30 use from_str::FromStr;
32 use option::{None, Some, Option};
34 use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener};
35 use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
39 /// A structure which represents a TCP stream between a local socket and a
45 /// # #![allow(unused_must_use)]
46 /// use std::io::TcpStream;
48 /// let mut stream = TcpStream::connect("127.0.0.1", 34254);
50 /// stream.write([1]);
51 /// let mut buf = [0];
53 /// drop(stream); // close the connection
55 pub struct TcpStream {
56 obj: Box<RtioTcpStream + Send>,
60 fn new(s: Box<RtioTcpStream + Send>) -> TcpStream {
64 /// Open a TCP connection to a remote host by hostname or IP address.
66 /// `host` can be a hostname or IP address string. If no error is
67 /// encountered, then `Ok(stream)` is returned.
68 pub fn connect(host: &str, port: u16) -> IoResult<TcpStream> {
69 let addresses = match FromStr::from_str(host) {
70 Some(addr) => vec!(addr),
71 None => try!(get_host_addresses(host))
73 let mut err = IoError {
74 kind: ConnectionFailed,
75 desc: "no addresses found for hostname",
78 for addr in addresses.iter() {
79 let addr = rtio::SocketAddr{ ip: super::to_rtio(*addr), port: port };
80 let result = LocalIo::maybe_raise(|io| {
81 io.tcp_connect(addr, None).map(TcpStream::new)
88 err = IoError::from_rtio_error(connect_err)
95 /// Creates a TCP connection to a remote socket address, timing out after
96 /// the specified duration.
98 /// This is the same as the `connect` method, except that if the timeout
99 /// specified elapses before a connection is made an error will be
100 /// returned. The error's kind will be `TimedOut`.
102 /// Note that the `addr` argument may one day be split into a separate host
103 /// and port, similar to the API seen in `connect`.
105 /// If a `timeout` with zero or negative duration is specified then
106 /// the function returns `Err`, with the error kind set to `TimedOut`.
107 #[experimental = "the timeout argument may eventually change types"]
108 pub fn connect_timeout(addr: SocketAddr,
109 timeout: Duration) -> IoResult<TcpStream> {
110 if timeout <= Duration::milliseconds(0) {
111 return Err(standard_error(TimedOut));
114 let SocketAddr { ip, port } = addr;
115 let addr = rtio::SocketAddr { ip: super::to_rtio(ip), port: port };
116 LocalIo::maybe_raise(|io| {
117 io.tcp_connect(addr, Some(timeout.num_milliseconds() as u64)).map(TcpStream::new)
118 }).map_err(IoError::from_rtio_error)
121 /// Returns the socket address of the remote peer of this TCP connection.
122 pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
123 match self.obj.peer_name() {
124 Ok(rtio::SocketAddr { ip, port }) => {
125 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
127 Err(e) => Err(IoError::from_rtio_error(e)),
131 /// Returns the socket address of the local half of this TCP connection.
132 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
133 match self.obj.socket_name() {
134 Ok(rtio::SocketAddr { ip, port }) => {
135 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
137 Err(e) => Err(IoError::from_rtio_error(e)),
141 /// Sets the nodelay flag on this connection to the boolean specified
143 pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
147 self.obj.control_congestion()
148 }.map_err(IoError::from_rtio_error)
151 /// Sets the keepalive timeout to the timeout specified.
153 /// If the value specified is `None`, then the keepalive flag is cleared on
154 /// this connection. Otherwise, the keepalive timeout will be set to the
155 /// specified time, in seconds.
157 pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
158 match delay_in_seconds {
159 Some(i) => self.obj.keepalive(i),
160 None => self.obj.letdie(),
161 }.map_err(IoError::from_rtio_error)
164 /// Closes the reading half of this connection.
166 /// This method will close the reading portion of this connection, causing
167 /// all pending and future reads to immediately return with an error.
172 /// # #![allow(unused_must_use)]
173 /// use std::io::timer;
174 /// use std::io::TcpStream;
175 /// use std::time::Duration;
177 /// let mut stream = TcpStream::connect("127.0.0.1", 34254).unwrap();
178 /// let stream2 = stream.clone();
181 /// // close this stream after one second
182 /// timer::sleep(Duration::seconds(1));
183 /// let mut stream = stream2;
184 /// stream.close_read();
187 /// // wait for some data, will get canceled after one second
188 /// let mut buf = [0];
189 /// stream.read(buf);
192 /// Note that this method affects all cloned handles associated with this
193 /// stream, not just this one handle.
194 pub fn close_read(&mut self) -> IoResult<()> {
195 self.obj.close_read().map_err(IoError::from_rtio_error)
198 /// Closes the writing half of this connection.
200 /// This method will close the writing portion of this connection, causing
201 /// all future writes to immediately return with an error.
203 /// Note that this method affects all cloned handles associated with this
204 /// stream, not just this one handle.
205 pub fn close_write(&mut self) -> IoResult<()> {
206 self.obj.close_write().map_err(IoError::from_rtio_error)
209 /// Sets a timeout, in milliseconds, for blocking operations on this stream.
211 /// This function will set a timeout for all blocking operations (including
212 /// reads and writes) on this stream. The timeout specified is a relative
213 /// time, in milliseconds, into the future after which point operations will
214 /// time out. This means that the timeout must be reset periodically to keep
215 /// it from expiring. Specifying a value of `None` will clear the timeout
218 /// The timeout on this stream is local to this stream only. Setting a
219 /// timeout does not affect any other cloned instances of this stream, nor
220 /// does the timeout propagated to cloned handles of this stream. Setting
221 /// this timeout will override any specific read or write timeouts
222 /// previously set for this stream.
224 /// For clarification on the semantics of interrupting a read and a write,
225 /// take a look at `set_read_timeout` and `set_write_timeout`.
226 #[experimental = "the timeout argument may change in type and value"]
227 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
228 self.obj.set_timeout(timeout_ms)
231 /// Sets the timeout for read operations on this stream.
233 /// See documentation in `set_timeout` for the semantics of this read time.
234 /// This will overwrite any previous read timeout set through either this
235 /// function or `set_timeout`.
239 /// When this timeout expires, if there is no pending read operation, no
240 /// action is taken. Otherwise, the read operation will be scheduled to
241 /// promptly return. If a timeout error is returned, then no data was read
242 /// during the timeout period.
243 #[experimental = "the timeout argument may change in type and value"]
244 pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
245 self.obj.set_read_timeout(timeout_ms)
248 /// Sets the timeout for write operations on this stream.
250 /// See documentation in `set_timeout` for the semantics of this write time.
251 /// This will overwrite any previous write timeout set through either this
252 /// function or `set_timeout`.
256 /// When this timeout expires, if there is no pending write operation, no
257 /// action is taken. Otherwise, the pending write operation will be
258 /// scheduled to promptly return. The actual state of the underlying stream
259 /// is not specified.
261 /// The write operation may return an error of type `ShortWrite` which
262 /// indicates that the object is known to have written an exact number of
263 /// bytes successfully during the timeout period, and the remaining bytes
264 /// were never written.
266 /// If the write operation returns `TimedOut`, then it the timeout primitive
267 /// does not know how many bytes were written as part of the timeout
268 /// operation. It may be the case that bytes continue to be written in an
269 /// asynchronous fashion after the call to write returns.
270 #[experimental = "the timeout argument may change in type and value"]
271 pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
272 self.obj.set_write_timeout(timeout_ms)
276 impl Clone for TcpStream {
277 /// Creates a new handle to this TCP stream, allowing for simultaneous reads
278 /// and writes of this connection.
280 /// The underlying TCP stream will not be closed until all handles to the
281 /// stream have been deallocated. All handles will also follow the same
282 /// stream, but two concurrent reads will not receive the same data.
283 /// Instead, the first read will receive the first packet received, and the
284 /// second read will receive the second packet.
285 fn clone(&self) -> TcpStream {
286 TcpStream { obj: self.obj.clone() }
290 impl Reader for TcpStream {
291 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
292 self.obj.read(buf).map_err(IoError::from_rtio_error)
296 impl Writer for TcpStream {
297 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
298 self.obj.write(buf).map_err(IoError::from_rtio_error)
302 /// A structure representing a socket server. This listener is used to create a
303 /// `TcpAcceptor` which can be used to accept sockets on a local port.
310 /// # #![allow(dead_code)]
311 /// use std::io::{TcpListener, TcpStream};
312 /// use std::io::{Acceptor, Listener};
314 /// let listener = TcpListener::bind("127.0.0.1", 80);
316 /// // bind the listener to the specified address
317 /// let mut acceptor = listener.listen();
319 /// fn handle_client(mut stream: TcpStream) {
321 /// # &mut stream; // silence unused mutability/variable warning
323 /// // accept connections and process them, spawning a new tasks for each one
324 /// for stream in acceptor.incoming() {
326 /// Err(e) => { /* connection failed */ }
327 /// Ok(stream) => spawn(proc() {
328 /// // connection succeeded
329 /// handle_client(stream)
334 /// // close the socket server
338 pub struct TcpListener {
339 obj: Box<RtioTcpListener + Send>,
343 /// Creates a new `TcpListener` which will be bound to the specified IP
344 /// and port. This listener is not ready for accepting connections,
345 /// `listen` must be called on it before that's possible.
347 /// Binding with a port number of 0 will request that the OS assigns a port
348 /// to this listener. The port allocated can be queried via the
349 /// `socket_name` function.
350 pub fn bind(addr: &str, port: u16) -> IoResult<TcpListener> {
351 match FromStr::from_str(addr) {
353 let addr = rtio::SocketAddr{
354 ip: super::to_rtio(ip),
357 LocalIo::maybe_raise(|io| {
358 io.tcp_bind(addr).map(|l| TcpListener { obj: l })
359 }).map_err(IoError::from_rtio_error)
364 desc: "invalid IP address specified",
371 /// Returns the local socket address of this listener.
372 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
373 match self.obj.socket_name() {
374 Ok(rtio::SocketAddr { ip, port }) => {
375 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
377 Err(e) => Err(IoError::from_rtio_error(e)),
382 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
383 fn listen(self) -> IoResult<TcpAcceptor> {
384 match self.obj.listen() {
385 Ok(acceptor) => Ok(TcpAcceptor { obj: acceptor }),
386 Err(e) => Err(IoError::from_rtio_error(e)),
391 /// The accepting half of a TCP socket server. This structure is created through
392 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
393 /// `TcpStream` instances.
394 pub struct TcpAcceptor {
395 obj: Box<RtioTcpAcceptor + Send>,
399 /// Prevents blocking on all future accepts after `ms` milliseconds have
402 /// This function is used to set a deadline after which this acceptor will
403 /// time out accepting any connections. The argument is the relative
404 /// distance, in milliseconds, to a point in the future after which all
405 /// accepts will fail.
407 /// If the argument specified is `None`, then any previously registered
408 /// timeout is cleared.
410 /// A timeout of `0` can be used to "poll" this acceptor to see if it has
411 /// any pending connections. All pending connections will be accepted,
412 /// regardless of whether the timeout has expired or not (the accept will
413 /// not block in this case).
418 /// # #![allow(experimental)]
419 /// use std::io::TcpListener;
420 /// use std::io::{Listener, Acceptor, TimedOut};
422 /// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap();
424 /// // After 100ms have passed, all accepts will fail
425 /// a.set_timeout(Some(100));
427 /// match a.accept() {
428 /// Ok(..) => println!("accepted a socket"),
429 /// Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
430 /// Err(e) => println!("err: {}", e),
433 /// // Reset the timeout and try again
434 /// a.set_timeout(Some(100));
435 /// let socket = a.accept();
437 /// // Clear the timeout and block indefinitely waiting for a connection
438 /// a.set_timeout(None);
439 /// let socket = a.accept();
441 #[experimental = "the type of the argument and name of this function are \
443 pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); }
445 /// Closes the accepting capabilities of this acceptor.
447 /// This function is similar to `TcpStream`'s `close_{read,write}` methods
448 /// in that it will affect *all* cloned handles of this acceptor's original
451 /// Once this function succeeds, all future calls to `accept` will return
452 /// immediately with an error, preventing all future calls to accept. The
453 /// underlying socket will not be relinquished back to the OS until all
454 /// acceptors have been deallocated.
456 /// This is useful for waking up a thread in an accept loop to indicate that
462 /// # #![allow(experimental)]
463 /// use std::io::{TcpListener, Listener, Acceptor, EndOfFile};
465 /// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap();
466 /// let a2 = a.clone();
470 /// for socket in a2.incoming() {
472 /// Ok(s) => { /* handle s */ }
473 /// Err(ref e) if e.kind == EndOfFile => break, // closed
474 /// Err(e) => panic!("unexpected error: {}", e),
479 /// # fn wait_for_sigint() {}
480 /// // Now that our accept loop is running, wait for the program to be
481 /// // requested to exit.
482 /// wait_for_sigint();
484 /// // Signal our accept loop to exit
485 /// assert!(a.close_accept().is_ok());
488 pub fn close_accept(&mut self) -> IoResult<()> {
489 self.obj.close_accept().map_err(IoError::from_rtio_error)
493 impl Acceptor<TcpStream> for TcpAcceptor {
494 fn accept(&mut self) -> IoResult<TcpStream> {
495 match self.obj.accept(){
496 Ok(s) => Ok(TcpStream::new(s)),
497 Err(e) => Err(IoError::from_rtio_error(e)),
502 impl Clone for TcpAcceptor {
503 /// Creates a new handle to this TCP acceptor, allowing for simultaneous
506 /// The underlying TCP acceptor will not be closed until all handles to the
507 /// acceptor have been deallocated. Incoming connections will be received on
508 /// at most once acceptor, the same connection will not be accepted twice.
510 /// The `close_accept` method will shut down *all* acceptors cloned from the
511 /// same original acceptor, whereas the `set_timeout` method only affects
512 /// the selector that it is called on.
514 /// This function is useful for creating a handle to invoke `close_accept`
515 /// on to wake up any other task blocked in `accept`.
516 fn clone(&self) -> TcpAcceptor {
517 TcpAcceptor { obj: self.obj.clone() }
522 #[allow(experimental)]
530 // FIXME #11530 this fails on android because tests are run as root
531 #[cfg_attr(any(windows, target_os = "android"), ignore)]
534 match TcpListener::bind("0.0.0.0", 1) {
536 Err(e) => assert_eq!(e.kind, PermissionDenied),
542 match TcpStream::connect("0.0.0.0", 1) {
544 Err(e) => assert_eq!(e.kind, ConnectionRefused),
549 fn listen_ip4_localhost() {
550 let socket_addr = next_test_ip4();
551 let ip_str = socket_addr.ip.to_string();
552 let port = socket_addr.port;
553 let listener = TcpListener::bind(ip_str.as_slice(), port);
554 let mut acceptor = listener.listen();
557 let mut stream = TcpStream::connect("localhost", port);
558 stream.write([144]).unwrap();
561 let mut stream = acceptor.accept();
563 stream.read(buf).unwrap();
564 assert!(buf[0] == 144);
568 fn connect_localhost() {
569 let addr = next_test_ip4();
570 let ip_str = addr.ip.to_string();
571 let port = addr.port;
572 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
575 let mut stream = TcpStream::connect("localhost", addr.port);
576 stream.write([64]).unwrap();
579 let mut stream = acceptor.accept();
581 stream.read(buf).unwrap();
582 assert!(buf[0] == 64);
586 fn connect_ip4_loopback() {
587 let addr = next_test_ip4();
588 let ip_str = addr.ip.to_string();
589 let port = addr.port;
590 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
593 let mut stream = TcpStream::connect("127.0.0.1", addr.port);
594 stream.write([44]).unwrap();
597 let mut stream = acceptor.accept();
599 stream.read(buf).unwrap();
600 assert!(buf[0] == 44);
604 fn connect_ip6_loopback() {
605 let addr = next_test_ip6();
606 let ip_str = addr.ip.to_string();
607 let port = addr.port;
608 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
611 let mut stream = TcpStream::connect("::1", addr.port);
612 stream.write([66]).unwrap();
615 let mut stream = acceptor.accept();
617 stream.read(buf).unwrap();
618 assert!(buf[0] == 66);
622 fn smoke_test_ip4() {
623 let addr = next_test_ip4();
624 let ip_str = addr.ip.to_string();
625 let port = addr.port;
626 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
629 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
630 stream.write([99]).unwrap();
633 let mut stream = acceptor.accept();
635 stream.read(buf).unwrap();
636 assert!(buf[0] == 99);
640 fn smoke_test_ip6() {
641 let addr = next_test_ip6();
642 let ip_str = addr.ip.to_string();
643 let port = addr.port;
644 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
647 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
648 stream.write([99]).unwrap();
651 let mut stream = acceptor.accept();
653 stream.read(buf).unwrap();
654 assert!(buf[0] == 99);
659 let addr = next_test_ip4();
660 let ip_str = addr.ip.to_string();
661 let port = addr.port;
662 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
665 let _stream = TcpStream::connect(ip_str.as_slice(), port);
669 let mut stream = acceptor.accept();
671 let nread = stream.read(buf);
672 assert!(nread.is_err());
677 let addr = next_test_ip6();
678 let ip_str = addr.ip.to_string();
679 let port = addr.port;
680 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
683 let _stream = TcpStream::connect(ip_str.as_slice(), port);
687 let mut stream = acceptor.accept();
689 let nread = stream.read(buf);
690 assert!(nread.is_err());
694 fn read_eof_twice_ip4() {
695 let addr = next_test_ip4();
696 let ip_str = addr.ip.to_string();
697 let port = addr.port;
698 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
701 let _stream = TcpStream::connect(ip_str.as_slice(), port);
705 let mut stream = acceptor.accept();
707 let nread = stream.read(buf);
708 assert!(nread.is_err());
710 match stream.read(buf) {
713 assert!(e.kind == NotConnected || e.kind == EndOfFile,
714 "unknown kind: {}", e.kind);
720 fn read_eof_twice_ip6() {
721 let addr = next_test_ip6();
722 let ip_str = addr.ip.to_string();
723 let port = addr.port;
724 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
727 let _stream = TcpStream::connect(ip_str.as_slice(), port);
731 let mut stream = acceptor.accept();
733 let nread = stream.read(buf);
734 assert!(nread.is_err());
736 match stream.read(buf) {
739 assert!(e.kind == NotConnected || e.kind == EndOfFile,
740 "unknown kind: {}", e.kind);
746 fn write_close_ip4() {
747 let addr = next_test_ip4();
748 let ip_str = addr.ip.to_string();
749 let port = addr.port;
750 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
753 let _stream = TcpStream::connect(ip_str.as_slice(), port);
757 let mut stream = acceptor.accept();
760 match stream.write(buf) {
763 assert!(e.kind == ConnectionReset ||
764 e.kind == BrokenPipe ||
765 e.kind == ConnectionAborted,
766 "unknown error: {}", e);
774 fn write_close_ip6() {
775 let addr = next_test_ip6();
776 let ip_str = addr.ip.to_string();
777 let port = addr.port;
778 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
781 let _stream = TcpStream::connect(ip_str.as_slice(), port);
785 let mut stream = acceptor.accept();
788 match stream.write(buf) {
791 assert!(e.kind == ConnectionReset ||
792 e.kind == BrokenPipe ||
793 e.kind == ConnectionAborted,
794 "unknown error: {}", e);
802 fn multiple_connect_serial_ip4() {
803 let addr = next_test_ip4();
804 let ip_str = addr.ip.to_string();
805 let port = addr.port;
807 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
810 for _ in range(0, max) {
811 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
812 stream.write([99]).unwrap();
816 for ref mut stream in acceptor.incoming().take(max) {
818 stream.read(buf).unwrap();
819 assert_eq!(buf[0], 99);
824 fn multiple_connect_serial_ip6() {
825 let addr = next_test_ip6();
826 let ip_str = addr.ip.to_string();
827 let port = addr.port;
829 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
832 for _ in range(0, max) {
833 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
834 stream.write([99]).unwrap();
838 for ref mut stream in acceptor.incoming().take(max) {
840 stream.read(buf).unwrap();
841 assert_eq!(buf[0], 99);
846 fn multiple_connect_interleaved_greedy_schedule_ip4() {
847 let addr = next_test_ip4();
848 let ip_str = addr.ip.to_string();
849 let port = addr.port;
850 static MAX: int = 10;
851 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
854 let mut acceptor = acceptor;
855 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
856 // Start another task to handle the connection
858 let mut stream = stream;
860 stream.read(buf).unwrap();
861 assert!(buf[0] == i as u8);
869 fn connect(i: int, addr: SocketAddr) {
870 let ip_str = addr.ip.to_string();
871 let port = addr.port;
872 if i == MAX { return }
875 debug!("connecting");
876 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
877 // Connect again before writing
878 connect(i + 1, addr);
880 stream.write([i as u8]).unwrap();
886 fn multiple_connect_interleaved_greedy_schedule_ip6() {
887 let addr = next_test_ip6();
888 let ip_str = addr.ip.to_string();
889 let port = addr.port;
890 static MAX: int = 10;
891 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
894 let mut acceptor = acceptor;
895 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
896 // Start another task to handle the connection
898 let mut stream = stream;
900 stream.read(buf).unwrap();
901 assert!(buf[0] == i as u8);
909 fn connect(i: int, addr: SocketAddr) {
910 let ip_str = addr.ip.to_string();
911 let port = addr.port;
912 if i == MAX { return }
915 debug!("connecting");
916 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
917 // Connect again before writing
918 connect(i + 1, addr);
920 stream.write([i as u8]).unwrap();
926 fn multiple_connect_interleaved_lazy_schedule_ip4() {
927 static MAX: int = 10;
928 let addr = next_test_ip4();
929 let ip_str = addr.ip.to_string();
930 let port = addr.port;
931 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
934 let mut acceptor = acceptor;
935 for stream in acceptor.incoming().take(MAX as uint) {
936 // Start another task to handle the connection
938 let mut stream = stream;
940 stream.read(buf).unwrap();
941 assert!(buf[0] == 99);
949 fn connect(i: int, addr: SocketAddr) {
950 let ip_str = addr.ip.to_string();
951 let port = addr.port;
952 if i == MAX { return }
955 debug!("connecting");
956 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
957 // Connect again before writing
958 connect(i + 1, addr);
960 stream.write([99]).unwrap();
966 fn multiple_connect_interleaved_lazy_schedule_ip6() {
967 static MAX: int = 10;
968 let addr = next_test_ip6();
969 let ip_str = addr.ip.to_string();
970 let port = addr.port;
971 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
974 let mut acceptor = acceptor;
975 for stream in acceptor.incoming().take(MAX as uint) {
976 // Start another task to handle the connection
978 let mut stream = stream;
980 stream.read(buf).unwrap();
981 assert!(buf[0] == 99);
989 fn connect(i: int, addr: SocketAddr) {
990 let ip_str = addr.ip.to_string();
991 let port = addr.port;
992 if i == MAX { return }
995 debug!("connecting");
996 let mut stream = TcpStream::connect(ip_str.as_slice(), port);
997 // Connect again before writing
998 connect(i + 1, addr);
1000 stream.write([99]).unwrap();
1005 pub fn socket_name(addr: SocketAddr) {
1006 let ip_str = addr.ip.to_string();
1007 let port = addr.port;
1008 let mut listener = TcpListener::bind(ip_str.as_slice(), port).unwrap();
1010 // Make sure socket_name gives
1011 // us the socket we binded to.
1012 let so_name = listener.socket_name();
1013 assert!(so_name.is_ok());
1014 assert_eq!(addr, so_name.unwrap());
1017 pub fn peer_name(addr: SocketAddr) {
1018 let ip_str = addr.ip.to_string();
1019 let port = addr.port;
1020 let acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1022 let mut acceptor = acceptor;
1023 acceptor.accept().unwrap();
1026 let stream = TcpStream::connect(ip_str.as_slice(), port);
1028 assert!(stream.is_ok());
1029 let mut stream = stream.unwrap();
1031 // Make sure peer_name gives us the
1032 // address/port of the peer we've
1034 let peer_name = stream.peer_name();
1035 assert!(peer_name.is_ok());
1036 assert_eq!(addr, peer_name.unwrap());
1040 fn socket_and_peer_name_ip4() {
1041 peer_name(next_test_ip4());
1042 socket_name(next_test_ip4());
1046 fn socket_and_peer_name_ip6() {
1047 // FIXME: peer name is not consistent
1048 //peer_name(next_test_ip6());
1049 socket_name(next_test_ip6());
1054 let addr = next_test_ip4();
1055 let port = addr.port;
1056 let (tx, rx) = channel();
1058 let ip_str = addr.ip.to_string();
1059 let mut srv = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1061 let mut cl = srv.accept().unwrap();
1062 cl.write([10]).unwrap();
1064 cl.read(b).unwrap();
1069 let ip_str = addr.ip.to_string();
1070 let mut c = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1071 let mut b = [0, ..10];
1072 assert_eq!(c.read(b), Ok(1));
1073 c.write([1]).unwrap();
1079 let addr = next_test_ip4();
1080 let ip_str = addr.ip.to_string();
1081 let port = addr.port;
1082 let listener = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen();
1083 assert!(listener.is_ok());
1084 match TcpListener::bind(ip_str.as_slice(), port).listen() {
1087 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
1088 "unknown error: {} {}", e, e.kind);
1095 let addr = next_test_ip4();
1096 let port = addr.port;
1097 let (tx, rx) = channel();
1100 let ip_str = addr.ip.to_string();
1102 let _stream = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1108 let ip_str = addr.ip.to_string();
1109 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1112 let _stream = acceptor.accept().unwrap();
1118 let _listener = TcpListener::bind(addr.ip.to_string().as_slice(), port);
1122 fn tcp_clone_smoke() {
1123 let addr = next_test_ip4();
1124 let ip_str = addr.ip.to_string();
1125 let port = addr.port;
1126 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1129 let mut s = TcpStream::connect(ip_str.as_slice(), port);
1130 let mut buf = [0, 0];
1131 assert_eq!(s.read(buf), Ok(1));
1132 assert_eq!(buf[0], 1);
1133 s.write([2]).unwrap();
1136 let mut s1 = acceptor.accept().unwrap();
1137 let s2 = s1.clone();
1139 let (tx1, rx1) = channel();
1140 let (tx2, rx2) = channel();
1144 s2.write([1]).unwrap();
1148 let mut buf = [0, 0];
1149 assert_eq!(s1.read(buf), Ok(1));
1154 fn tcp_clone_two_read() {
1155 let addr = next_test_ip6();
1156 let ip_str = addr.ip.to_string();
1157 let port = addr.port;
1158 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1159 let (tx1, rx) = channel();
1160 let tx2 = tx1.clone();
1163 let mut s = TcpStream::connect(ip_str.as_slice(), port);
1164 s.write([1]).unwrap();
1166 s.write([2]).unwrap();
1170 let mut s1 = acceptor.accept().unwrap();
1171 let s2 = s1.clone();
1173 let (done, rx) = channel();
1176 let mut buf = [0, 0];
1177 s2.read(buf).unwrap();
1181 let mut buf = [0, 0];
1182 s1.read(buf).unwrap();
1189 fn tcp_clone_two_write() {
1190 let addr = next_test_ip4();
1191 let ip_str = addr.ip.to_string();
1192 let port = addr.port;
1193 let mut acceptor = TcpListener::bind(ip_str.as_slice(), port).listen();
1196 let mut s = TcpStream::connect(ip_str.as_slice(), port);
1197 let mut buf = [0, 1];
1198 s.read(buf).unwrap();
1199 s.read(buf).unwrap();
1202 let mut s1 = acceptor.accept().unwrap();
1203 let s2 = s1.clone();
1205 let (done, rx) = channel();
1208 s2.write([1]).unwrap();
1211 s1.write([2]).unwrap();
1217 fn shutdown_smoke() {
1218 use rt::rtio::RtioTcpStream;
1220 let addr = next_test_ip4();
1221 let ip_str = addr.ip.to_string();
1222 let port = addr.port;
1223 let a = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen();
1226 let mut c = a.accept().unwrap();
1227 assert_eq!(c.read_to_end(), Ok(vec!()));
1228 c.write([1]).unwrap();
1231 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1232 assert!(s.obj.close_write().is_ok());
1233 assert!(s.write([1]).is_err());
1234 assert_eq!(s.read_to_end(), Ok(vec!(1)));
1238 fn accept_timeout() {
1239 let addr = next_test_ip4();
1240 let ip_str = addr.ip.to_string();
1241 let port = addr.port;
1242 let mut a = TcpListener::bind(ip_str.as_slice(), port).unwrap().listen().unwrap();
1244 a.set_timeout(Some(10));
1246 // Make sure we time out once and future invocations also time out
1247 let err = a.accept().err().unwrap();
1248 assert_eq!(err.kind, TimedOut);
1249 let err = a.accept().err().unwrap();
1250 assert_eq!(err.kind, TimedOut);
1252 // Also make sure that even though the timeout is expired that we will
1253 // continue to receive any pending connections.
1255 // FIXME: freebsd apparently never sees the pending connection, but
1256 // testing manually always works. Need to investigate this
1258 if !cfg!(target_os = "freebsd") {
1259 let (tx, rx) = channel();
1261 tx.send(TcpStream::connect(addr.ip.to_string().as_slice(),
1265 for i in range(0i, 1001) {
1268 Err(ref e) if e.kind == TimedOut => {}
1269 Err(e) => panic!("error: {}", e),
1271 ::task::deschedule();
1272 if i == 1000 { panic!("should have a pending connection") }
1276 // Unset the timeout and make sure that this always blocks.
1277 a.set_timeout(None);
1279 drop(TcpStream::connect(addr.ip.to_string().as_slice(),
1282 a.accept().unwrap();
1286 fn close_readwrite_smoke() {
1287 let addr = next_test_ip4();
1288 let ip_str = addr.ip.to_string();
1289 let port = addr.port;
1290 let a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1291 let (_tx, rx) = channel::<()>();
1294 let _s = a.accept().unwrap();
1295 let _ = rx.recv_opt();
1299 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1300 let mut s2 = s.clone();
1302 // closing should prevent reads/writes
1303 s.close_write().unwrap();
1304 assert!(s.write([0]).is_err());
1305 s.close_read().unwrap();
1306 assert!(s.read(b).is_err());
1308 // closing should affect previous handles
1309 assert!(s2.write([0]).is_err());
1310 assert!(s2.read(b).is_err());
1312 // closing should affect new handles
1313 let mut s3 = s.clone();
1314 assert!(s3.write([0]).is_err());
1315 assert!(s3.read(b).is_err());
1317 // make sure these don't die
1318 let _ = s2.close_read();
1319 let _ = s2.close_write();
1320 let _ = s3.close_read();
1321 let _ = s3.close_write();
1325 fn close_read_wakes_up() {
1326 let addr = next_test_ip4();
1327 let ip_str = addr.ip.to_string();
1328 let port = addr.port;
1329 let a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1330 let (_tx, rx) = channel::<()>();
1333 let _s = a.accept().unwrap();
1334 let _ = rx.recv_opt();
1337 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1339 let (tx, rx) = channel();
1342 assert!(s2.read([0]).is_err());
1345 // this should wake up the child task
1346 s.close_read().unwrap();
1348 // this test will never finish if the child doesn't wake up
1353 fn readwrite_timeouts() {
1354 let addr = next_test_ip6();
1355 let ip_str = addr.ip.to_string();
1356 let port = addr.port;
1357 let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1358 let (tx, rx) = channel::<()>();
1360 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1362 assert!(s.write([0]).is_ok());
1363 let _ = rx.recv_opt();
1366 let mut s = a.accept().unwrap();
1367 s.set_timeout(Some(20));
1368 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1369 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1371 s.set_timeout(Some(20));
1372 for i in range(0i, 1001) {
1373 match s.write([0, .. 128 * 1024]) {
1374 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1375 Err(IoError { kind: TimedOut, .. }) => break,
1376 Err(e) => panic!("{}", e),
1378 if i == 1000 { panic!("should have filled up?!"); }
1380 assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1383 s.set_timeout(None);
1384 assert_eq!(s.read([0, 0]), Ok(1));
1388 fn read_timeouts() {
1389 let addr = next_test_ip6();
1390 let ip_str = addr.ip.to_string();
1391 let port = addr.port;
1392 let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1393 let (tx, rx) = channel::<()>();
1395 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1398 while amt < 100 * 128 * 1024 {
1399 match s.read([0, ..128 * 1024]) {
1400 Ok(n) => { amt += n; }
1401 Err(e) => panic!("{}", e),
1404 let _ = rx.recv_opt();
1407 let mut s = a.accept().unwrap();
1408 s.set_read_timeout(Some(20));
1409 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1410 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1413 for _ in range(0i, 100) {
1414 assert!(s.write([0, ..128 * 1024]).is_ok());
1419 fn write_timeouts() {
1420 let addr = next_test_ip6();
1421 let ip_str = addr.ip.to_string();
1422 let port = addr.port;
1423 let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1424 let (tx, rx) = channel::<()>();
1426 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1428 assert!(s.write([0]).is_ok());
1429 let _ = rx.recv_opt();
1432 let mut s = a.accept().unwrap();
1433 s.set_write_timeout(Some(20));
1434 for i in range(0i, 1001) {
1435 match s.write([0, .. 128 * 1024]) {
1436 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1437 Err(IoError { kind: TimedOut, .. }) => break,
1438 Err(e) => panic!("{}", e),
1440 if i == 1000 { panic!("should have filled up?!"); }
1442 assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1445 assert!(s.read([0]).is_ok());
1449 fn timeout_concurrent_read() {
1450 let addr = next_test_ip6();
1451 let ip_str = addr.ip.to_string();
1452 let port = addr.port;
1453 let mut a = TcpListener::bind(ip_str.as_slice(), port).listen().unwrap();
1454 let (tx, rx) = channel::<()>();
1456 let mut s = TcpStream::connect(ip_str.as_slice(), port).unwrap();
1458 assert_eq!(s.write([0]), Ok(()));
1459 let _ = rx.recv_opt();
1462 let mut s = a.accept().unwrap();
1464 let (tx2, rx2) = channel();
1467 assert_eq!(s2.read([0]), Ok(1));
1471 s.set_read_timeout(Some(20));
1472 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1479 fn clone_while_reading() {
1480 let addr = next_test_ip6();
1481 let listen = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
1482 let mut accept = listen.listen().unwrap();
1484 // Enqueue a task to write to a socket
1485 let (tx, rx) = channel();
1486 let (txdone, rxdone) = channel();
1487 let txdone2 = txdone.clone();
1489 let mut tcp = TcpStream::connect(addr.ip.to_string().as_slice(),
1490 addr.port).unwrap();
1492 tcp.write_u8(0).unwrap();
1496 // Spawn off a reading clone
1497 let tcp = accept.accept().unwrap();
1498 let tcp2 = tcp.clone();
1499 let txdone3 = txdone.clone();
1501 let mut tcp2 = tcp2;
1502 tcp2.read_u8().unwrap();
1506 // Try to ensure that the reading clone is indeed reading
1507 for _ in range(0i, 50) {
1508 ::task::deschedule();
1511 // clone the handle again while it's reading, then let it finish the
1513 let _ = tcp.clone();
1520 fn clone_accept_smoke() {
1521 let addr = next_test_ip4();
1522 let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
1523 let mut a = l.listen().unwrap();
1524 let mut a2 = a.clone();
1527 let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
1530 let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
1533 assert!(a.accept().is_ok());
1534 assert!(a2.accept().is_ok());
1538 fn clone_accept_concurrent() {
1539 let addr = next_test_ip4();
1540 let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
1541 let a = l.listen().unwrap();
1544 let (tx, rx) = channel();
1545 let tx2 = tx.clone();
1547 spawn(proc() { let mut a = a; tx.send(a.accept()) });
1548 spawn(proc() { let mut a = a2; tx2.send(a.accept()) });
1551 let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
1554 let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
1557 assert!(rx.recv().is_ok());
1558 assert!(rx.recv().is_ok());
1562 fn close_accept_smoke() {
1563 let addr = next_test_ip4();
1564 let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
1565 let mut a = l.listen().unwrap();
1567 a.close_accept().unwrap();
1568 assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
1572 fn close_accept_concurrent() {
1573 let addr = next_test_ip4();
1574 let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
1575 let a = l.listen().unwrap();
1576 let mut a2 = a.clone();
1578 let (tx, rx) = channel();
1581 tx.send(a.accept());
1583 a2.close_accept().unwrap();
1585 assert_eq!(rx.recv().err().unwrap().kind, EndOfFile);