1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! TCP network connections
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
24 use io::net::ip::{SocketAddr, ToSocketAddr};
26 use io::{Reader, Writer, Listener, Acceptor};
27 use io::{standard_error, TimedOut};
29 use option::{None, Some, Option};
31 use rt::rtio::{IoFactory, RtioSocket, RtioTcpListener};
32 use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
36 /// A structure which represents a TCP stream between a local socket and a
42 /// # #![allow(unused_must_use)]
43 /// use std::io::TcpStream;
45 /// let mut stream = TcpStream::connect("127.0.0.1:34254");
47 /// stream.write([1]);
48 /// let mut buf = [0];
50 /// drop(stream); // close the connection
52 pub struct TcpStream {
53 obj: Box<RtioTcpStream + Send>,
57 fn new(s: Box<RtioTcpStream + Send>) -> TcpStream {
61 /// Open a TCP connection to a remote host.
63 /// `addr` is an address of the remote host. Anything which implements `ToSocketAddr`
64 /// trait can be supplied for the address; see this trait documentation for
65 /// concrete examples.
66 pub fn connect<A: ToSocketAddr>(addr: A) -> IoResult<TcpStream> {
67 super::with_addresses_io(addr, |io, addr| io.tcp_connect(addr, None).map(TcpStream::new))
70 /// Creates a TCP connection to a remote socket address, timing out after
71 /// the specified duration.
73 /// This is the same as the `connect` method, except that if the timeout
74 /// specified elapses before a connection is made an error will be
75 /// returned. The error's kind will be `TimedOut`.
77 /// Same as the `connect` method, `addr` argument type can vary as defined
78 /// by `ToSocketAddr` trait.
80 /// If a `timeout` with zero or negative duration is specified then
81 /// the function returns `Err`, with the error kind set to `TimedOut`.
82 #[experimental = "the timeout argument may eventually change types"]
83 pub fn connect_timeout<A: ToSocketAddr>(addr: A,
84 timeout: Duration) -> IoResult<TcpStream> {
85 if timeout <= Duration::milliseconds(0) {
86 return Err(standard_error(TimedOut));
89 super::with_addresses_io(addr, |io, addr|
90 io.tcp_connect(addr, Some(timeout.num_milliseconds() as u64)).map(TcpStream::new)
94 /// Returns the socket address of the remote peer of this TCP connection.
95 pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
96 match self.obj.peer_name() {
97 Ok(rtio::SocketAddr { ip, port }) => {
98 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
100 Err(e) => Err(IoError::from_rtio_error(e)),
104 /// Returns the socket address of the local half of this TCP connection.
105 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
106 match self.obj.socket_name() {
107 Ok(rtio::SocketAddr { ip, port }) => {
108 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
110 Err(e) => Err(IoError::from_rtio_error(e)),
114 /// Sets the nodelay flag on this connection to the boolean specified
116 pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
120 self.obj.control_congestion()
121 }.map_err(IoError::from_rtio_error)
124 /// Sets the keepalive timeout to the timeout specified.
126 /// If the value specified is `None`, then the keepalive flag is cleared on
127 /// this connection. Otherwise, the keepalive timeout will be set to the
128 /// specified time, in seconds.
130 pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
131 match delay_in_seconds {
132 Some(i) => self.obj.keepalive(i),
133 None => self.obj.letdie(),
134 }.map_err(IoError::from_rtio_error)
137 /// Closes the reading half of this connection.
139 /// This method will close the reading portion of this connection, causing
140 /// all pending and future reads to immediately return with an error.
145 /// # #![allow(unused_must_use)]
146 /// use std::io::timer;
147 /// use std::io::TcpStream;
148 /// use std::time::Duration;
150 /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
151 /// let stream2 = stream.clone();
154 /// // close this stream after one second
155 /// timer::sleep(Duration::seconds(1));
156 /// let mut stream = stream2;
157 /// stream.close_read();
160 /// // wait for some data, will get canceled after one second
161 /// let mut buf = [0];
162 /// stream.read(buf);
165 /// Note that this method affects all cloned handles associated with this
166 /// stream, not just this one handle.
167 pub fn close_read(&mut self) -> IoResult<()> {
168 self.obj.close_read().map_err(IoError::from_rtio_error)
171 /// Closes the writing half of this connection.
173 /// This method will close the writing portion of this connection, causing
174 /// all future writes to immediately return with an error.
176 /// Note that this method affects all cloned handles associated with this
177 /// stream, not just this one handle.
178 pub fn close_write(&mut self) -> IoResult<()> {
179 self.obj.close_write().map_err(IoError::from_rtio_error)
182 /// Sets a timeout, in milliseconds, for blocking operations on this stream.
184 /// This function will set a timeout for all blocking operations (including
185 /// reads and writes) on this stream. The timeout specified is a relative
186 /// time, in milliseconds, into the future after which point operations will
187 /// time out. This means that the timeout must be reset periodically to keep
188 /// it from expiring. Specifying a value of `None` will clear the timeout
191 /// The timeout on this stream is local to this stream only. Setting a
192 /// timeout does not affect any other cloned instances of this stream, nor
193 /// does the timeout propagated to cloned handles of this stream. Setting
194 /// this timeout will override any specific read or write timeouts
195 /// previously set for this stream.
197 /// For clarification on the semantics of interrupting a read and a write,
198 /// take a look at `set_read_timeout` and `set_write_timeout`.
199 #[experimental = "the timeout argument may change in type and value"]
200 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
201 self.obj.set_timeout(timeout_ms)
204 /// Sets the timeout for read operations on this stream.
206 /// See documentation in `set_timeout` for the semantics of this read time.
207 /// This will overwrite any previous read timeout set through either this
208 /// function or `set_timeout`.
212 /// When this timeout expires, if there is no pending read operation, no
213 /// action is taken. Otherwise, the read operation will be scheduled to
214 /// promptly return. If a timeout error is returned, then no data was read
215 /// during the timeout period.
216 #[experimental = "the timeout argument may change in type and value"]
217 pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
218 self.obj.set_read_timeout(timeout_ms)
221 /// Sets the timeout for write operations on this stream.
223 /// See documentation in `set_timeout` for the semantics of this write time.
224 /// This will overwrite any previous write timeout set through either this
225 /// function or `set_timeout`.
229 /// When this timeout expires, if there is no pending write operation, no
230 /// action is taken. Otherwise, the pending write operation will be
231 /// scheduled to promptly return. The actual state of the underlying stream
232 /// is not specified.
234 /// The write operation may return an error of type `ShortWrite` which
235 /// indicates that the object is known to have written an exact number of
236 /// bytes successfully during the timeout period, and the remaining bytes
237 /// were never written.
239 /// If the write operation returns `TimedOut`, then it the timeout primitive
240 /// does not know how many bytes were written as part of the timeout
241 /// operation. It may be the case that bytes continue to be written in an
242 /// asynchronous fashion after the call to write returns.
243 #[experimental = "the timeout argument may change in type and value"]
244 pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
245 self.obj.set_write_timeout(timeout_ms)
249 impl Clone for TcpStream {
250 /// Creates a new handle to this TCP stream, allowing for simultaneous reads
251 /// and writes of this connection.
253 /// The underlying TCP stream will not be closed until all handles to the
254 /// stream have been deallocated. All handles will also follow the same
255 /// stream, but two concurrent reads will not receive the same data.
256 /// Instead, the first read will receive the first packet received, and the
257 /// second read will receive the second packet.
258 fn clone(&self) -> TcpStream {
259 TcpStream { obj: self.obj.clone() }
263 impl Reader for TcpStream {
264 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
265 self.obj.read(buf).map_err(IoError::from_rtio_error)
269 impl Writer for TcpStream {
270 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
271 self.obj.write(buf).map_err(IoError::from_rtio_error)
275 /// A structure representing a socket server. This listener is used to create a
276 /// `TcpAcceptor` which can be used to accept sockets on a local port.
283 /// # #![allow(dead_code)]
284 /// use std::io::{TcpListener, TcpStream};
285 /// use std::io::{Acceptor, Listener};
287 /// let listener = TcpListener::bind("127.0.0.1:80");
289 /// // bind the listener to the specified address
290 /// let mut acceptor = listener.listen();
292 /// fn handle_client(mut stream: TcpStream) {
294 /// # &mut stream; // silence unused mutability/variable warning
296 /// // accept connections and process them, spawning a new tasks for each one
297 /// for stream in acceptor.incoming() {
299 /// Err(e) => { /* connection failed */ }
300 /// Ok(stream) => spawn(proc() {
301 /// // connection succeeded
302 /// handle_client(stream)
307 /// // close the socket server
311 pub struct TcpListener {
312 obj: Box<RtioTcpListener + Send>,
316 /// Creates a new `TcpListener` which will be bound to the specified address.
317 /// This listener is not ready for accepting connections, `listen` must be called
318 /// on it before that's possible.
320 /// Binding with a port number of 0 will request that the OS assigns a port
321 /// to this listener. The port allocated can be queried via the
322 /// `socket_name` function.
323 pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<TcpListener> {
324 super::with_addresses_io(addr, |io, addr| io.tcp_bind(addr).map(|l| TcpListener { obj: l }))
327 /// Returns the local socket address of this listener.
328 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
329 match self.obj.socket_name() {
330 Ok(rtio::SocketAddr { ip, port }) => {
331 Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
333 Err(e) => Err(IoError::from_rtio_error(e)),
338 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
339 fn listen(self) -> IoResult<TcpAcceptor> {
340 match self.obj.listen() {
341 Ok(acceptor) => Ok(TcpAcceptor { obj: acceptor }),
342 Err(e) => Err(IoError::from_rtio_error(e)),
347 /// The accepting half of a TCP socket server. This structure is created through
348 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
349 /// `TcpStream` instances.
350 pub struct TcpAcceptor {
351 obj: Box<RtioTcpAcceptor + Send>,
355 /// Prevents blocking on all future accepts after `ms` milliseconds have
358 /// This function is used to set a deadline after which this acceptor will
359 /// time out accepting any connections. The argument is the relative
360 /// distance, in milliseconds, to a point in the future after which all
361 /// accepts will fail.
363 /// If the argument specified is `None`, then any previously registered
364 /// timeout is cleared.
366 /// A timeout of `0` can be used to "poll" this acceptor to see if it has
367 /// any pending connections. All pending connections will be accepted,
368 /// regardless of whether the timeout has expired or not (the accept will
369 /// not block in this case).
374 /// # #![allow(experimental)]
375 /// use std::io::TcpListener;
376 /// use std::io::{Listener, Acceptor, TimedOut};
378 /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
380 /// // After 100ms have passed, all accepts will fail
381 /// a.set_timeout(Some(100));
383 /// match a.accept() {
384 /// Ok(..) => println!("accepted a socket"),
385 /// Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
386 /// Err(e) => println!("err: {}", e),
389 /// // Reset the timeout and try again
390 /// a.set_timeout(Some(100));
391 /// let socket = a.accept();
393 /// // Clear the timeout and block indefinitely waiting for a connection
394 /// a.set_timeout(None);
395 /// let socket = a.accept();
397 #[experimental = "the type of the argument and name of this function are \
399 pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); }
401 /// Closes the accepting capabilities of this acceptor.
403 /// This function is similar to `TcpStream`'s `close_{read,write}` methods
404 /// in that it will affect *all* cloned handles of this acceptor's original
407 /// Once this function succeeds, all future calls to `accept` will return
408 /// immediately with an error, preventing all future calls to accept. The
409 /// underlying socket will not be relinquished back to the OS until all
410 /// acceptors have been deallocated.
412 /// This is useful for waking up a thread in an accept loop to indicate that
418 /// # #![allow(experimental)]
419 /// use std::io::{TcpListener, Listener, Acceptor, EndOfFile};
421 /// let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
422 /// let a2 = a.clone();
426 /// for socket in a2.incoming() {
428 /// Ok(s) => { /* handle s */ }
429 /// Err(ref e) if e.kind == EndOfFile => break, // closed
430 /// Err(e) => panic!("unexpected error: {}", e),
435 /// # fn wait_for_sigint() {}
436 /// // Now that our accept loop is running, wait for the program to be
437 /// // requested to exit.
438 /// wait_for_sigint();
440 /// // Signal our accept loop to exit
441 /// assert!(a.close_accept().is_ok());
444 pub fn close_accept(&mut self) -> IoResult<()> {
445 self.obj.close_accept().map_err(IoError::from_rtio_error)
449 impl Acceptor<TcpStream> for TcpAcceptor {
450 fn accept(&mut self) -> IoResult<TcpStream> {
451 match self.obj.accept(){
452 Ok(s) => Ok(TcpStream::new(s)),
453 Err(e) => Err(IoError::from_rtio_error(e)),
458 impl Clone for TcpAcceptor {
459 /// Creates a new handle to this TCP acceptor, allowing for simultaneous
462 /// The underlying TCP acceptor will not be closed until all handles to the
463 /// acceptor have been deallocated. Incoming connections will be received on
464 /// at most once acceptor, the same connection will not be accepted twice.
466 /// The `close_accept` method will shut down *all* acceptors cloned from the
467 /// same original acceptor, whereas the `set_timeout` method only affects
468 /// the selector that it is called on.
470 /// This function is useful for creating a handle to invoke `close_accept`
471 /// on to wake up any other task blocked in `accept`.
472 fn clone(&self) -> TcpAcceptor {
473 TcpAcceptor { obj: self.obj.clone() }
478 #[allow(experimental)]
486 // FIXME #11530 this fails on android because tests are run as root
487 #[cfg_attr(any(windows, target_os = "android"), ignore)]
490 match TcpListener::bind("0.0.0.0:1") {
492 Err(e) => assert_eq!(e.kind, PermissionDenied),
498 match TcpStream::connect("0.0.0.0:1") {
500 Err(e) => assert_eq!(e.kind, ConnectionRefused),
505 fn listen_ip4_localhost() {
506 let socket_addr = next_test_ip4();
507 let listener = TcpListener::bind(socket_addr);
508 let mut acceptor = listener.listen();
511 let mut stream = TcpStream::connect(("localhost", socket_addr.port));
512 stream.write([144]).unwrap();
515 let mut stream = acceptor.accept();
517 stream.read(buf).unwrap();
518 assert!(buf[0] == 144);
522 fn connect_localhost() {
523 let addr = next_test_ip4();
524 let mut acceptor = TcpListener::bind(addr).listen();
527 let mut stream = TcpStream::connect(("localhost", addr.port));
528 stream.write([64]).unwrap();
531 let mut stream = acceptor.accept();
533 stream.read(buf).unwrap();
534 assert!(buf[0] == 64);
538 fn connect_ip4_loopback() {
539 let addr = next_test_ip4();
540 let mut acceptor = TcpListener::bind(addr).listen();
543 let mut stream = TcpStream::connect(("127.0.0.1", addr.port));
544 stream.write([44]).unwrap();
547 let mut stream = acceptor.accept();
549 stream.read(buf).unwrap();
550 assert!(buf[0] == 44);
554 fn connect_ip6_loopback() {
555 let addr = next_test_ip6();
556 let mut acceptor = TcpListener::bind(addr).listen();
559 let mut stream = TcpStream::connect(("::1", addr.port));
560 stream.write([66]).unwrap();
563 let mut stream = acceptor.accept();
565 stream.read(buf).unwrap();
566 assert!(buf[0] == 66);
570 fn smoke_test_ip4() {
571 let addr = next_test_ip4();
572 let mut acceptor = TcpListener::bind(addr).listen();
575 let mut stream = TcpStream::connect(addr);
576 stream.write([99]).unwrap();
579 let mut stream = acceptor.accept();
581 stream.read(buf).unwrap();
582 assert!(buf[0] == 99);
586 fn smoke_test_ip6() {
587 let addr = next_test_ip6();
588 let mut acceptor = TcpListener::bind(addr).listen();
591 let mut stream = TcpStream::connect(addr);
592 stream.write([99]).unwrap();
595 let mut stream = acceptor.accept();
597 stream.read(buf).unwrap();
598 assert!(buf[0] == 99);
603 let addr = next_test_ip4();
604 let mut acceptor = TcpListener::bind(addr).listen();
607 let _stream = TcpStream::connect(addr);
611 let mut stream = acceptor.accept();
613 let nread = stream.read(buf);
614 assert!(nread.is_err());
619 let addr = next_test_ip6();
620 let mut acceptor = TcpListener::bind(addr).listen();
623 let _stream = TcpStream::connect(addr);
627 let mut stream = acceptor.accept();
629 let nread = stream.read(buf);
630 assert!(nread.is_err());
634 fn read_eof_twice_ip4() {
635 let addr = next_test_ip4();
636 let mut acceptor = TcpListener::bind(addr).listen();
639 let _stream = TcpStream::connect(addr);
643 let mut stream = acceptor.accept();
645 let nread = stream.read(buf);
646 assert!(nread.is_err());
648 match stream.read(buf) {
651 assert!(e.kind == NotConnected || e.kind == EndOfFile,
652 "unknown kind: {}", e.kind);
658 fn read_eof_twice_ip6() {
659 let addr = next_test_ip6();
660 let mut acceptor = TcpListener::bind(addr).listen();
663 let _stream = TcpStream::connect(addr);
667 let mut stream = acceptor.accept();
669 let nread = stream.read(buf);
670 assert!(nread.is_err());
672 match stream.read(buf) {
675 assert!(e.kind == NotConnected || e.kind == EndOfFile,
676 "unknown kind: {}", e.kind);
682 fn write_close_ip4() {
683 let addr = next_test_ip4();
684 let mut acceptor = TcpListener::bind(addr).listen();
687 let _stream = TcpStream::connect(addr);
691 let mut stream = acceptor.accept();
694 match stream.write(buf) {
697 assert!(e.kind == ConnectionReset ||
698 e.kind == BrokenPipe ||
699 e.kind == ConnectionAborted,
700 "unknown error: {}", e);
708 fn write_close_ip6() {
709 let addr = next_test_ip6();
710 let mut acceptor = TcpListener::bind(addr).listen();
713 let _stream = TcpStream::connect(addr);
717 let mut stream = acceptor.accept();
720 match stream.write(buf) {
723 assert!(e.kind == ConnectionReset ||
724 e.kind == BrokenPipe ||
725 e.kind == ConnectionAborted,
726 "unknown error: {}", e);
734 fn multiple_connect_serial_ip4() {
735 let addr = next_test_ip4();
737 let mut acceptor = TcpListener::bind(addr).listen();
740 for _ in range(0, max) {
741 let mut stream = TcpStream::connect(addr);
742 stream.write([99]).unwrap();
746 for ref mut stream in acceptor.incoming().take(max) {
748 stream.read(buf).unwrap();
749 assert_eq!(buf[0], 99);
754 fn multiple_connect_serial_ip6() {
755 let addr = next_test_ip6();
757 let mut acceptor = TcpListener::bind(addr).listen();
760 for _ in range(0, max) {
761 let mut stream = TcpStream::connect(addr);
762 stream.write([99]).unwrap();
766 for ref mut stream in acceptor.incoming().take(max) {
768 stream.read(buf).unwrap();
769 assert_eq!(buf[0], 99);
774 fn multiple_connect_interleaved_greedy_schedule_ip4() {
775 let addr = next_test_ip4();
776 static MAX: int = 10;
777 let acceptor = TcpListener::bind(addr).listen();
780 let mut acceptor = acceptor;
781 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
782 // Start another task to handle the connection
784 let mut stream = stream;
786 stream.read(buf).unwrap();
787 assert!(buf[0] == i as u8);
795 fn connect(i: int, addr: SocketAddr) {
796 if i == MAX { return }
799 debug!("connecting");
800 let mut stream = TcpStream::connect(addr);
801 // Connect again before writing
802 connect(i + 1, addr);
804 stream.write([i as u8]).unwrap();
810 fn multiple_connect_interleaved_greedy_schedule_ip6() {
811 let addr = next_test_ip6();
812 static MAX: int = 10;
813 let acceptor = TcpListener::bind(addr).listen();
816 let mut acceptor = acceptor;
817 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
818 // Start another task to handle the connection
820 let mut stream = stream;
822 stream.read(buf).unwrap();
823 assert!(buf[0] == i as u8);
831 fn connect(i: int, addr: SocketAddr) {
832 if i == MAX { return }
835 debug!("connecting");
836 let mut stream = TcpStream::connect(addr);
837 // Connect again before writing
838 connect(i + 1, addr);
840 stream.write([i as u8]).unwrap();
846 fn multiple_connect_interleaved_lazy_schedule_ip4() {
847 static MAX: int = 10;
848 let addr = next_test_ip4();
849 let acceptor = TcpListener::bind(addr).listen();
852 let mut acceptor = acceptor;
853 for stream in acceptor.incoming().take(MAX as uint) {
854 // Start another task to handle the connection
856 let mut stream = stream;
858 stream.read(buf).unwrap();
859 assert!(buf[0] == 99);
867 fn connect(i: int, addr: SocketAddr) {
868 if i == MAX { return }
871 debug!("connecting");
872 let mut stream = TcpStream::connect(addr);
873 // Connect again before writing
874 connect(i + 1, addr);
876 stream.write([99]).unwrap();
882 fn multiple_connect_interleaved_lazy_schedule_ip6() {
883 static MAX: int = 10;
884 let addr = next_test_ip6();
885 let acceptor = TcpListener::bind(addr).listen();
888 let mut acceptor = acceptor;
889 for stream in acceptor.incoming().take(MAX as uint) {
890 // Start another task to handle the connection
892 let mut stream = stream;
894 stream.read(buf).unwrap();
895 assert!(buf[0] == 99);
903 fn connect(i: int, addr: SocketAddr) {
904 if i == MAX { return }
907 debug!("connecting");
908 let mut stream = TcpStream::connect(addr);
909 // Connect again before writing
910 connect(i + 1, addr);
912 stream.write([99]).unwrap();
917 pub fn socket_name(addr: SocketAddr) {
918 let mut listener = TcpListener::bind(addr).unwrap();
920 // Make sure socket_name gives
921 // us the socket we binded to.
922 let so_name = listener.socket_name();
923 assert!(so_name.is_ok());
924 assert_eq!(addr, so_name.unwrap());
927 pub fn peer_name(addr: SocketAddr) {
928 let acceptor = TcpListener::bind(addr).listen();
930 let mut acceptor = acceptor;
931 acceptor.accept().unwrap();
934 let stream = TcpStream::connect(addr);
936 assert!(stream.is_ok());
937 let mut stream = stream.unwrap();
939 // Make sure peer_name gives us the
940 // address/port of the peer we've
942 let peer_name = stream.peer_name();
943 assert!(peer_name.is_ok());
944 assert_eq!(addr, peer_name.unwrap());
948 fn socket_and_peer_name_ip4() {
949 peer_name(next_test_ip4());
950 socket_name(next_test_ip4());
954 fn socket_and_peer_name_ip6() {
955 // FIXME: peer name is not consistent
956 //peer_name(next_test_ip6());
957 socket_name(next_test_ip6());
962 let addr = next_test_ip4();
963 let (tx, rx) = channel();
965 let mut srv = TcpListener::bind(addr).listen().unwrap();
967 let mut cl = srv.accept().unwrap();
968 cl.write([10]).unwrap();
975 let mut c = TcpStream::connect(addr).unwrap();
976 let mut b = [0, ..10];
977 assert_eq!(c.read(b), Ok(1));
978 c.write([1]).unwrap();
984 let addr = next_test_ip4();
985 let listener = TcpListener::bind(addr).unwrap().listen();
986 assert!(listener.is_ok());
987 match TcpListener::bind(addr).listen() {
990 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError,
991 "unknown error: {} {}", e, e.kind);
998 let addr = next_test_ip4();
999 let (tx, rx) = channel();
1003 let _stream = TcpStream::connect(addr).unwrap();
1009 let mut acceptor = TcpListener::bind(addr).listen();
1012 let _stream = acceptor.accept().unwrap();
1018 let _listener = TcpListener::bind(addr);
1022 fn tcp_clone_smoke() {
1023 let addr = next_test_ip4();
1024 let mut acceptor = TcpListener::bind(addr).listen();
1027 let mut s = TcpStream::connect(addr);
1028 let mut buf = [0, 0];
1029 assert_eq!(s.read(buf), Ok(1));
1030 assert_eq!(buf[0], 1);
1031 s.write([2]).unwrap();
1034 let mut s1 = acceptor.accept().unwrap();
1035 let s2 = s1.clone();
1037 let (tx1, rx1) = channel();
1038 let (tx2, rx2) = channel();
1042 s2.write([1]).unwrap();
1046 let mut buf = [0, 0];
1047 assert_eq!(s1.read(buf), Ok(1));
1052 fn tcp_clone_two_read() {
1053 let addr = next_test_ip6();
1054 let mut acceptor = TcpListener::bind(addr).listen();
1055 let (tx1, rx) = channel();
1056 let tx2 = tx1.clone();
1059 let mut s = TcpStream::connect(addr);
1060 s.write([1]).unwrap();
1062 s.write([2]).unwrap();
1066 let mut s1 = acceptor.accept().unwrap();
1067 let s2 = s1.clone();
1069 let (done, rx) = channel();
1072 let mut buf = [0, 0];
1073 s2.read(buf).unwrap();
1077 let mut buf = [0, 0];
1078 s1.read(buf).unwrap();
1085 fn tcp_clone_two_write() {
1086 let addr = next_test_ip4();
1087 let mut acceptor = TcpListener::bind(addr).listen();
1090 let mut s = TcpStream::connect(addr);
1091 let mut buf = [0, 1];
1092 s.read(buf).unwrap();
1093 s.read(buf).unwrap();
1096 let mut s1 = acceptor.accept().unwrap();
1097 let s2 = s1.clone();
1099 let (done, rx) = channel();
1102 s2.write([1]).unwrap();
1105 s1.write([2]).unwrap();
1111 fn shutdown_smoke() {
1112 use rt::rtio::RtioTcpStream;
1114 let addr = next_test_ip4();
1115 let a = TcpListener::bind(addr).unwrap().listen();
1118 let mut c = a.accept().unwrap();
1119 assert_eq!(c.read_to_end(), Ok(vec!()));
1120 c.write([1]).unwrap();
1123 let mut s = TcpStream::connect(addr).unwrap();
1124 assert!(s.obj.close_write().is_ok());
1125 assert!(s.write([1]).is_err());
1126 assert_eq!(s.read_to_end(), Ok(vec!(1)));
1130 fn accept_timeout() {
1131 let addr = next_test_ip4();
1132 let mut a = TcpListener::bind(addr).unwrap().listen().unwrap();
1134 a.set_timeout(Some(10));
1136 // Make sure we time out once and future invocations also time out
1137 let err = a.accept().err().unwrap();
1138 assert_eq!(err.kind, TimedOut);
1139 let err = a.accept().err().unwrap();
1140 assert_eq!(err.kind, TimedOut);
1142 // Also make sure that even though the timeout is expired that we will
1143 // continue to receive any pending connections.
1145 // FIXME: freebsd apparently never sees the pending connection, but
1146 // testing manually always works. Need to investigate this
1148 if !cfg!(target_os = "freebsd") {
1149 let (tx, rx) = channel();
1151 tx.send(TcpStream::connect(addr).unwrap());
1154 for i in range(0i, 1001) {
1157 Err(ref e) if e.kind == TimedOut => {}
1158 Err(e) => panic!("error: {}", e),
1160 ::task::deschedule();
1161 if i == 1000 { panic!("should have a pending connection") }
1165 // Unset the timeout and make sure that this always blocks.
1166 a.set_timeout(None);
1168 drop(TcpStream::connect(addr).unwrap());
1170 a.accept().unwrap();
1174 fn close_readwrite_smoke() {
1175 let addr = next_test_ip4();
1176 let a = TcpListener::bind(addr).listen().unwrap();
1177 let (_tx, rx) = channel::<()>();
1180 let _s = a.accept().unwrap();
1181 let _ = rx.recv_opt();
1185 let mut s = TcpStream::connect(addr).unwrap();
1186 let mut s2 = s.clone();
1188 // closing should prevent reads/writes
1189 s.close_write().unwrap();
1190 assert!(s.write([0]).is_err());
1191 s.close_read().unwrap();
1192 assert!(s.read(b).is_err());
1194 // closing should affect previous handles
1195 assert!(s2.write([0]).is_err());
1196 assert!(s2.read(b).is_err());
1198 // closing should affect new handles
1199 let mut s3 = s.clone();
1200 assert!(s3.write([0]).is_err());
1201 assert!(s3.read(b).is_err());
1203 // make sure these don't die
1204 let _ = s2.close_read();
1205 let _ = s2.close_write();
1206 let _ = s3.close_read();
1207 let _ = s3.close_write();
1211 fn close_read_wakes_up() {
1212 let addr = next_test_ip4();
1213 let a = TcpListener::bind(addr).listen().unwrap();
1214 let (_tx, rx) = channel::<()>();
1217 let _s = a.accept().unwrap();
1218 let _ = rx.recv_opt();
1221 let mut s = TcpStream::connect(addr).unwrap();
1223 let (tx, rx) = channel();
1226 assert!(s2.read([0]).is_err());
1229 // this should wake up the child task
1230 s.close_read().unwrap();
1232 // this test will never finish if the child doesn't wake up
1237 fn readwrite_timeouts() {
1238 let addr = next_test_ip6();
1239 let mut a = TcpListener::bind(addr).listen().unwrap();
1240 let (tx, rx) = channel::<()>();
1242 let mut s = TcpStream::connect(addr).unwrap();
1244 assert!(s.write([0]).is_ok());
1245 let _ = rx.recv_opt();
1248 let mut s = a.accept().unwrap();
1249 s.set_timeout(Some(20));
1250 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1251 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1253 s.set_timeout(Some(20));
1254 for i in range(0i, 1001) {
1255 match s.write([0, .. 128 * 1024]) {
1256 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1257 Err(IoError { kind: TimedOut, .. }) => break,
1258 Err(e) => panic!("{}", e),
1260 if i == 1000 { panic!("should have filled up?!"); }
1262 assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1265 s.set_timeout(None);
1266 assert_eq!(s.read([0, 0]), Ok(1));
1270 fn read_timeouts() {
1271 let addr = next_test_ip6();
1272 let mut a = TcpListener::bind(addr).listen().unwrap();
1273 let (tx, rx) = channel::<()>();
1275 let mut s = TcpStream::connect(addr).unwrap();
1278 while amt < 100 * 128 * 1024 {
1279 match s.read([0, ..128 * 1024]) {
1280 Ok(n) => { amt += n; }
1281 Err(e) => panic!("{}", e),
1284 let _ = rx.recv_opt();
1287 let mut s = a.accept().unwrap();
1288 s.set_read_timeout(Some(20));
1289 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1290 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1293 for _ in range(0i, 100) {
1294 assert!(s.write([0, ..128 * 1024]).is_ok());
1299 fn write_timeouts() {
1300 let addr = next_test_ip6();
1301 let mut a = TcpListener::bind(addr).listen().unwrap();
1302 let (tx, rx) = channel::<()>();
1304 let mut s = TcpStream::connect(addr).unwrap();
1306 assert!(s.write([0]).is_ok());
1307 let _ = rx.recv_opt();
1310 let mut s = a.accept().unwrap();
1311 s.set_write_timeout(Some(20));
1312 for i in range(0i, 1001) {
1313 match s.write([0, .. 128 * 1024]) {
1314 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1315 Err(IoError { kind: TimedOut, .. }) => break,
1316 Err(e) => panic!("{}", e),
1318 if i == 1000 { panic!("should have filled up?!"); }
1320 assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1323 assert!(s.read([0]).is_ok());
1327 fn timeout_concurrent_read() {
1328 let addr = next_test_ip6();
1329 let mut a = TcpListener::bind(addr).listen().unwrap();
1330 let (tx, rx) = channel::<()>();
1332 let mut s = TcpStream::connect(addr).unwrap();
1334 assert_eq!(s.write([0]), Ok(()));
1335 let _ = rx.recv_opt();
1338 let mut s = a.accept().unwrap();
1340 let (tx2, rx2) = channel();
1343 assert_eq!(s2.read([0]), Ok(1));
1347 s.set_read_timeout(Some(20));
1348 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1355 fn clone_while_reading() {
1356 let addr = next_test_ip6();
1357 let listen = TcpListener::bind(addr);
1358 let mut accept = listen.listen().unwrap();
1360 // Enqueue a task to write to a socket
1361 let (tx, rx) = channel();
1362 let (txdone, rxdone) = channel();
1363 let txdone2 = txdone.clone();
1365 let mut tcp = TcpStream::connect(addr).unwrap();
1367 tcp.write_u8(0).unwrap();
1371 // Spawn off a reading clone
1372 let tcp = accept.accept().unwrap();
1373 let tcp2 = tcp.clone();
1374 let txdone3 = txdone.clone();
1376 let mut tcp2 = tcp2;
1377 tcp2.read_u8().unwrap();
1381 // Try to ensure that the reading clone is indeed reading
1382 for _ in range(0i, 50) {
1383 ::task::deschedule();
1386 // clone the handle again while it's reading, then let it finish the
1388 let _ = tcp.clone();
1395 fn clone_accept_smoke() {
1396 let addr = next_test_ip4();
1397 let l = TcpListener::bind(addr);
1398 let mut a = l.listen().unwrap();
1399 let mut a2 = a.clone();
1402 let _ = TcpStream::connect(addr);
1405 let _ = TcpStream::connect(addr);
1408 assert!(a.accept().is_ok());
1409 assert!(a2.accept().is_ok());
1413 fn clone_accept_concurrent() {
1414 let addr = next_test_ip4();
1415 let l = TcpListener::bind(addr);
1416 let a = l.listen().unwrap();
1419 let (tx, rx) = channel();
1420 let tx2 = tx.clone();
1422 spawn(proc() { let mut a = a; tx.send(a.accept()) });
1423 spawn(proc() { let mut a = a2; tx2.send(a.accept()) });
1426 let _ = TcpStream::connect(addr);
1429 let _ = TcpStream::connect(addr);
1432 assert!(rx.recv().is_ok());
1433 assert!(rx.recv().is_ok());
1437 fn close_accept_smoke() {
1438 let addr = next_test_ip4();
1439 let l = TcpListener::bind(addr);
1440 let mut a = l.listen().unwrap();
1442 a.close_accept().unwrap();
1443 assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
1447 fn close_accept_concurrent() {
1448 let addr = next_test_ip4();
1449 let l = TcpListener::bind(addr);
1450 let a = l.listen().unwrap();
1451 let mut a2 = a.clone();
1453 let (tx, rx) = channel();
1456 tx.send(a.accept());
1458 a2.close_accept().unwrap();
1460 assert_eq!(rx.recv().err().unwrap().kind, EndOfFile);