1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! TCP network connections
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
22 use io::net::ip::SocketAddr;
23 use io::{Reader, Writer, Listener, Acceptor};
25 use option::{None, Some};
26 use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener};
27 use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
29 /// A structure which represents a TCP stream between a local socket and a
35 /// # #![allow(unused_must_use)]
36 /// use std::io::net::tcp::TcpStream;
37 /// use std::io::net::ip::{Ipv4Addr, SocketAddr};
39 /// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 };
40 /// let mut stream = TcpStream::connect(addr);
42 /// stream.write([1]);
43 /// let mut buf = [0];
45 /// drop(stream); // close the connection
47 pub struct TcpStream {
48 obj: ~RtioTcpStream:Send
52 fn new(s: ~RtioTcpStream:Send) -> TcpStream {
56 /// Creates a TCP connection to a remote socket address.
58 /// If no error is encountered, then `Ok(stream)` is returned.
59 pub fn connect(addr: SocketAddr) -> IoResult<TcpStream> {
60 LocalIo::maybe_raise(|io| {
61 io.tcp_connect(addr, None).map(TcpStream::new)
65 /// Creates a TCP connection to a remote socket address, timing out after
66 /// the specified number of milliseconds.
68 /// This is the same as the `connect` method, except that if the timeout
69 /// specified (in milliseconds) elapses before a connection is made an error
70 /// will be returned. The error's kind will be `TimedOut`.
71 #[experimental = "the timeout argument may eventually change types"]
72 pub fn connect_timeout(addr: SocketAddr,
73 timeout_ms: u64) -> IoResult<TcpStream> {
74 LocalIo::maybe_raise(|io| {
75 io.tcp_connect(addr, Some(timeout_ms)).map(TcpStream::new)
79 /// Returns the socket address of the remote peer of this TCP connection.
80 pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
84 /// Returns the socket address of the local half of this TCP connection.
85 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
86 self.obj.socket_name()
90 impl Clone for TcpStream {
91 /// Creates a new handle to this TCP stream, allowing for simultaneous reads
92 /// and writes of this connection.
94 /// The underlying TCP stream will not be closed until all handles to the
95 /// stream have been deallocated. All handles will also follow the same
96 /// stream, but two concurrent reads will not receive the same data.
97 /// Instead, the first read will receive the first packet received, and the
98 /// second read will receive the second packet.
99 fn clone(&self) -> TcpStream {
100 TcpStream { obj: self.obj.clone() }
104 impl Reader for TcpStream {
105 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) }
108 impl Writer for TcpStream {
109 fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.obj.write(buf) }
112 /// A structure representing a socket server. This listener is used to create a
113 /// `TcpAcceptor` which can be used to accept sockets on a local port.
120 /// # #![allow(dead_code)]
121 /// use std::io::{TcpListener, TcpStream};
122 /// use std::io::net::ip::{Ipv4Addr, SocketAddr};
123 /// use std::io::{Acceptor, Listener};
125 /// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 80 };
126 /// let listener = TcpListener::bind(addr);
128 /// // bind the listener to the specified address
129 /// let mut acceptor = listener.listen();
131 /// fn handle_client(mut stream: TcpStream) {
133 /// # &mut stream; // silence unused mutability/variable warning
135 /// // accept connections and process them, spawning a new tasks for each one
136 /// for stream in acceptor.incoming() {
138 /// Err(e) => { /* connection failed */ }
139 /// Ok(stream) => spawn(proc() {
140 /// // connection succeeded
141 /// handle_client(stream)
146 /// // close the socket server
150 pub struct TcpListener {
151 obj: ~RtioTcpListener:Send
155 /// Creates a new `TcpListener` which will be bound to the specified local
156 /// socket address. This listener is not ready for accepting connections,
157 /// `listen` must be called on it before that's possible.
159 /// Binding with a port number of 0 will request that the OS assigns a port
160 /// to this listener. The port allocated can be queried via the
161 /// `socket_name` function.
162 pub fn bind(addr: SocketAddr) -> IoResult<TcpListener> {
163 LocalIo::maybe_raise(|io| {
164 io.tcp_bind(addr).map(|l| TcpListener { obj: l })
168 /// Returns the local socket address of this listener.
169 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
170 self.obj.socket_name()
174 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
175 fn listen(self) -> IoResult<TcpAcceptor> {
176 self.obj.listen().map(|acceptor| TcpAcceptor { obj: acceptor })
180 /// The accepting half of a TCP socket server. This structure is created through
181 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
182 /// `TcpStream` instances.
183 pub struct TcpAcceptor {
184 obj: ~RtioTcpAcceptor:Send
187 impl Acceptor<TcpStream> for TcpAcceptor {
188 fn accept(&mut self) -> IoResult<TcpStream> {
189 self.obj.accept().map(TcpStream::new)
196 use io::net::ip::SocketAddr;
200 // FIXME #11530 this fails on android because tests are run as root
201 iotest!(fn bind_error() {
202 let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
203 match TcpListener::bind(addr) {
205 Err(e) => assert_eq!(e.kind, PermissionDenied),
207 } #[ignore(cfg(windows))] #[ignore(cfg(target_os = "android"))])
209 iotest!(fn connect_error() {
210 let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
211 match TcpStream::connect(addr) {
213 Err(e) => assert_eq!(e.kind, ConnectionRefused),
217 iotest!(fn smoke_test_ip4() {
218 let addr = next_test_ip4();
219 let mut acceptor = TcpListener::bind(addr).listen();
222 let mut stream = TcpStream::connect(addr);
223 stream.write([99]).unwrap();
226 let mut stream = acceptor.accept();
228 stream.read(buf).unwrap();
229 assert!(buf[0] == 99);
232 iotest!(fn smoke_test_ip6() {
233 let addr = next_test_ip6();
234 let mut acceptor = TcpListener::bind(addr).listen();
237 let mut stream = TcpStream::connect(addr);
238 stream.write([99]).unwrap();
241 let mut stream = acceptor.accept();
243 stream.read(buf).unwrap();
244 assert!(buf[0] == 99);
247 iotest!(fn read_eof_ip4() {
248 let addr = next_test_ip4();
249 let mut acceptor = TcpListener::bind(addr).listen();
252 let _stream = TcpStream::connect(addr);
256 let mut stream = acceptor.accept();
258 let nread = stream.read(buf);
259 assert!(nread.is_err());
262 iotest!(fn read_eof_ip6() {
263 let addr = next_test_ip6();
264 let mut acceptor = TcpListener::bind(addr).listen();
267 let _stream = TcpStream::connect(addr);
271 let mut stream = acceptor.accept();
273 let nread = stream.read(buf);
274 assert!(nread.is_err());
277 iotest!(fn read_eof_twice_ip4() {
278 let addr = next_test_ip4();
279 let mut acceptor = TcpListener::bind(addr).listen();
282 let _stream = TcpStream::connect(addr);
286 let mut stream = acceptor.accept();
288 let nread = stream.read(buf);
289 assert!(nread.is_err());
291 match stream.read(buf) {
294 assert!(e.kind == NotConnected || e.kind == EndOfFile,
295 "unknown kind: {:?}", e.kind);
300 iotest!(fn read_eof_twice_ip6() {
301 let addr = next_test_ip6();
302 let mut acceptor = TcpListener::bind(addr).listen();
305 let _stream = TcpStream::connect(addr);
309 let mut stream = acceptor.accept();
311 let nread = stream.read(buf);
312 assert!(nread.is_err());
314 match stream.read(buf) {
317 assert!(e.kind == NotConnected || e.kind == EndOfFile,
318 "unknown kind: {:?}", e.kind);
323 iotest!(fn write_close_ip4() {
324 let addr = next_test_ip4();
325 let mut acceptor = TcpListener::bind(addr).listen();
328 let _stream = TcpStream::connect(addr);
332 let mut stream = acceptor.accept();
335 match stream.write(buf) {
338 assert!(e.kind == ConnectionReset ||
339 e.kind == BrokenPipe ||
340 e.kind == ConnectionAborted,
341 "unknown error: {:?}", e);
348 iotest!(fn write_close_ip6() {
349 let addr = next_test_ip6();
350 let mut acceptor = TcpListener::bind(addr).listen();
353 let _stream = TcpStream::connect(addr);
357 let mut stream = acceptor.accept();
360 match stream.write(buf) {
363 assert!(e.kind == ConnectionReset ||
364 e.kind == BrokenPipe ||
365 e.kind == ConnectionAborted,
366 "unknown error: {:?}", e);
373 iotest!(fn multiple_connect_serial_ip4() {
374 let addr = next_test_ip4();
376 let mut acceptor = TcpListener::bind(addr).listen();
379 for _ in range(0, max) {
380 let mut stream = TcpStream::connect(addr);
381 stream.write([99]).unwrap();
385 for ref mut stream in acceptor.incoming().take(max) {
387 stream.read(buf).unwrap();
388 assert_eq!(buf[0], 99);
392 iotest!(fn multiple_connect_serial_ip6() {
393 let addr = next_test_ip6();
395 let mut acceptor = TcpListener::bind(addr).listen();
398 for _ in range(0, max) {
399 let mut stream = TcpStream::connect(addr);
400 stream.write([99]).unwrap();
404 for ref mut stream in acceptor.incoming().take(max) {
406 stream.read(buf).unwrap();
407 assert_eq!(buf[0], 99);
411 iotest!(fn multiple_connect_interleaved_greedy_schedule_ip4() {
412 let addr = next_test_ip4();
413 static MAX: int = 10;
414 let acceptor = TcpListener::bind(addr).listen();
417 let mut acceptor = acceptor;
418 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
419 // Start another task to handle the connection
421 let mut stream = stream;
423 stream.read(buf).unwrap();
424 assert!(buf[0] == i as u8);
432 fn connect(i: int, addr: SocketAddr) {
433 if i == MAX { return }
436 debug!("connecting");
437 let mut stream = TcpStream::connect(addr);
438 // Connect again before writing
439 connect(i + 1, addr);
441 stream.write([i as u8]).unwrap();
446 iotest!(fn multiple_connect_interleaved_greedy_schedule_ip6() {
447 let addr = next_test_ip6();
448 static MAX: int = 10;
449 let acceptor = TcpListener::bind(addr).listen();
452 let mut acceptor = acceptor;
453 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
454 // Start another task to handle the connection
456 let mut stream = stream;
458 stream.read(buf).unwrap();
459 assert!(buf[0] == i as u8);
467 fn connect(i: int, addr: SocketAddr) {
468 if i == MAX { return }
471 debug!("connecting");
472 let mut stream = TcpStream::connect(addr);
473 // Connect again before writing
474 connect(i + 1, addr);
476 stream.write([i as u8]).unwrap();
481 iotest!(fn multiple_connect_interleaved_lazy_schedule_ip4() {
482 static MAX: int = 10;
483 let addr = next_test_ip4();
484 let acceptor = TcpListener::bind(addr).listen();
487 let mut acceptor = acceptor;
488 for stream in acceptor.incoming().take(MAX as uint) {
489 // Start another task to handle the connection
491 let mut stream = stream;
493 stream.read(buf).unwrap();
494 assert!(buf[0] == 99);
502 fn connect(i: int, addr: SocketAddr) {
503 if i == MAX { return }
506 debug!("connecting");
507 let mut stream = TcpStream::connect(addr);
508 // Connect again before writing
509 connect(i + 1, addr);
511 stream.write([99]).unwrap();
516 iotest!(fn multiple_connect_interleaved_lazy_schedule_ip6() {
517 static MAX: int = 10;
518 let addr = next_test_ip6();
519 let acceptor = TcpListener::bind(addr).listen();
522 let mut acceptor = acceptor;
523 for stream in acceptor.incoming().take(MAX as uint) {
524 // Start another task to handle the connection
526 let mut stream = stream;
528 stream.read(buf).unwrap();
529 assert!(buf[0] == 99);
537 fn connect(i: int, addr: SocketAddr) {
538 if i == MAX { return }
541 debug!("connecting");
542 let mut stream = TcpStream::connect(addr);
543 // Connect again before writing
544 connect(i + 1, addr);
546 stream.write([99]).unwrap();
551 pub fn socket_name(addr: SocketAddr) {
552 let mut listener = TcpListener::bind(addr).unwrap();
554 // Make sure socket_name gives
555 // us the socket we binded to.
556 let so_name = listener.socket_name();
557 assert!(so_name.is_ok());
558 assert_eq!(addr, so_name.unwrap());
561 pub fn peer_name(addr: SocketAddr) {
562 let acceptor = TcpListener::bind(addr).listen();
564 let mut acceptor = acceptor;
565 acceptor.accept().unwrap();
568 let stream = TcpStream::connect(addr);
570 assert!(stream.is_ok());
571 let mut stream = stream.unwrap();
573 // Make sure peer_name gives us the
574 // address/port of the peer we've
576 let peer_name = stream.peer_name();
577 assert!(peer_name.is_ok());
578 assert_eq!(addr, peer_name.unwrap());
581 iotest!(fn socket_and_peer_name_ip4() {
582 peer_name(next_test_ip4());
583 socket_name(next_test_ip4());
586 iotest!(fn socket_and_peer_name_ip6() {
587 // FIXME: peer name is not consistent
588 //peer_name(next_test_ip6());
589 socket_name(next_test_ip6());
592 iotest!(fn partial_read() {
593 let addr = next_test_ip4();
594 let (tx, rx) = channel();
596 let mut srv = TcpListener::bind(addr).listen().unwrap();
598 let mut cl = srv.accept().unwrap();
599 cl.write([10]).unwrap();
606 let mut c = TcpStream::connect(addr).unwrap();
607 let mut b = [0, ..10];
608 assert_eq!(c.read(b), Ok(1));
609 c.write([1]).unwrap();
613 iotest!(fn double_bind() {
614 let addr = next_test_ip4();
615 let listener = TcpListener::bind(addr).unwrap().listen();
616 assert!(listener.is_ok());
617 match TcpListener::bind(addr).listen() {
620 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError);
625 iotest!(fn fast_rebind() {
626 let addr = next_test_ip4();
627 let (tx, rx) = channel();
631 let _stream = TcpStream::connect(addr).unwrap();
637 let mut acceptor = TcpListener::bind(addr).listen();
640 let _stream = acceptor.accept().unwrap();
646 let _listener = TcpListener::bind(addr);
649 iotest!(fn tcp_clone_smoke() {
650 let addr = next_test_ip4();
651 let mut acceptor = TcpListener::bind(addr).listen();
654 let mut s = TcpStream::connect(addr);
655 let mut buf = [0, 0];
656 assert_eq!(s.read(buf), Ok(1));
657 assert_eq!(buf[0], 1);
658 s.write([2]).unwrap();
661 let mut s1 = acceptor.accept().unwrap();
664 let (tx1, rx1) = channel();
665 let (tx2, rx2) = channel();
669 s2.write([1]).unwrap();
673 let mut buf = [0, 0];
674 assert_eq!(s1.read(buf), Ok(1));
678 iotest!(fn tcp_clone_two_read() {
679 let addr = next_test_ip6();
680 let mut acceptor = TcpListener::bind(addr).listen();
681 let (tx1, rx) = channel();
682 let tx2 = tx1.clone();
685 let mut s = TcpStream::connect(addr);
686 s.write([1]).unwrap();
688 s.write([2]).unwrap();
692 let mut s1 = acceptor.accept().unwrap();
695 let (done, rx) = channel();
698 let mut buf = [0, 0];
699 s2.read(buf).unwrap();
703 let mut buf = [0, 0];
704 s1.read(buf).unwrap();
710 iotest!(fn tcp_clone_two_write() {
711 let addr = next_test_ip4();
712 let mut acceptor = TcpListener::bind(addr).listen();
715 let mut s = TcpStream::connect(addr);
716 let mut buf = [0, 1];
717 s.read(buf).unwrap();
718 s.read(buf).unwrap();
721 let mut s1 = acceptor.accept().unwrap();
724 let (done, rx) = channel();
727 s2.write([1]).unwrap();
730 s1.write([2]).unwrap();
735 iotest!(fn shutdown_smoke() {
736 use rt::rtio::RtioTcpStream;
738 let addr = next_test_ip4();
739 let a = TcpListener::bind(addr).unwrap().listen();
742 let mut c = a.accept().unwrap();
743 assert_eq!(c.read_to_end(), Ok(vec!()));
744 c.write([1]).unwrap();
747 let mut s = TcpStream::connect(addr).unwrap();
748 assert!(s.obj.close_write().is_ok());
749 assert!(s.write([1]).is_err());
750 assert_eq!(s.read_to_end(), Ok(vec!(1)));