1 // Copyright 2015 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
15 use net::{ToSocketAddrs, SocketAddr, Shutdown};
16 use sys_common::net2 as net_imp;
17 use sys_common::AsInner;
19 /// A structure which represents a TCP stream between a local socket and a
22 /// The socket will be closed when the value is dropped.
27 /// use std::io::prelude::*;
28 /// use std::net::TcpStream;
31 /// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
33 /// // ignore the Result
34 /// let _ = stream.write(&[1]);
35 /// let _ = stream.read(&mut [0; 128]); // ignore here too
36 /// } // the stream is closed here
38 pub struct TcpStream(net_imp::TcpStream);
40 /// A structure representing a socket server.
45 /// use std::net::{TcpListener, TcpStream};
46 /// use std::thread::Thread;
48 /// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
50 /// fn handle_client(stream: TcpStream) {
54 /// // accept connections and process them, spawning a new thread for each one
55 /// for stream in listener.incoming() {
58 /// Thread::spawn(move|| {
59 /// // connection succeeded
60 /// handle_client(stream)
63 /// Err(e) => { /* connection failed */ }
67 /// // close the socket server
70 pub struct TcpListener(net_imp::TcpListener);
72 /// An infinite iterator over the connections from a `TcpListener`.
74 /// This iterator will infinitely yield `Some` of the accepted connections. It
75 /// is equivalent to calling `accept` in a loop.
76 pub struct Incoming<'a> { listener: &'a TcpListener }
79 /// Open a TCP connection to a remote host.
81 /// `addr` is an address of the remote host. Anything which implements
82 /// `ToSocketAddrs` trait can be supplied for the address; see this trait
83 /// documentation for concrete examples.
84 pub fn connect<A: ToSocketAddrs + ?Sized>(addr: &A) -> io::Result<TcpStream> {
85 super::each_addr(addr, net_imp::TcpStream::connect).map(TcpStream)
88 /// Returns the socket address of the remote peer of this TCP connection.
89 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
93 /// Returns the socket address of the local half of this TCP connection.
94 pub fn socket_addr(&self) -> io::Result<SocketAddr> {
98 /// Shut down the read, write, or both halves of this connection.
100 /// This function will cause all pending and future I/O on the specified
101 /// portions to return immediately with an appropriate value (see the
102 /// documentation of `Shutdown`).
103 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
107 /// Create a new independently owned handle to the underlying socket.
109 /// The returned `TcpStream` is a reference to the same stream that this
110 /// object references. Both handles will read and write the same stream of
111 /// data, and options set on one stream will be propagated to the other
113 pub fn try_clone(&self) -> io::Result<TcpStream> {
114 self.0.duplicate().map(TcpStream)
117 /// Sets the nodelay flag on this connection to the boolean specified
118 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
119 self.0.set_nodelay(nodelay)
122 /// Sets the keepalive timeout to the timeout specified.
124 /// If the value specified is `None`, then the keepalive flag is cleared on
125 /// this connection. Otherwise, the keepalive timeout will be set to the
126 /// specified time, in seconds.
127 pub fn set_keepalive(&self, seconds: Option<u32>) -> io::Result<()> {
128 self.0.set_keepalive(seconds)
132 impl Read for TcpStream {
133 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
135 impl Write for TcpStream {
136 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
137 fn flush(&mut self) -> io::Result<()> { Ok(()) }
139 impl<'a> Read for &'a TcpStream {
140 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
142 impl<'a> Write for &'a TcpStream {
143 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
144 fn flush(&mut self) -> io::Result<()> { Ok(()) }
147 impl AsInner<net_imp::TcpStream> for TcpStream {
148 fn as_inner(&self) -> &net_imp::TcpStream { &self.0 }
152 /// Creates a new `TcpListener` which will be bound to the specified
155 /// The returned listener is ready for accepting connections.
157 /// Binding with a port number of 0 will request that the OS assigns a port
158 /// to this listener. The port allocated can be queried via the
159 /// `socket_addr` function.
161 /// The address type can be any implementer of `ToSocketAddrs` trait. See
162 /// its documentation for concrete examples.
163 pub fn bind<A: ToSocketAddrs + ?Sized>(addr: &A) -> io::Result<TcpListener> {
164 super::each_addr(addr, net_imp::TcpListener::bind).map(TcpListener)
167 /// Returns the local socket address of this listener.
168 pub fn socket_addr(&self) -> io::Result<SocketAddr> {
172 /// Create a new independently owned handle to the underlying socket.
174 /// The returned `TcpListener` is a reference to the same socket that this
175 /// object references. Both handles can be used to accept incoming
176 /// connections and options set on one listener will affect the other.
177 pub fn try_clone(&self) -> io::Result<TcpListener> {
178 self.0.duplicate().map(TcpListener)
181 /// Accept a new incoming connection from this listener.
183 /// This function will block the calling thread until a new TCP connection
184 /// is established. When established, the corresponding `TcpStream` and the
185 /// remote peer's address will be returned.
186 pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
187 self.0.accept().map(|(a, b)| (TcpStream(a), b))
190 /// Returns an iterator over the connections being received on this
193 /// The returned iterator will never returned `None` and will also not yield
194 /// the peer's `SocketAddr` structure.
195 pub fn incoming(&self) -> Incoming {
196 Incoming { listener: self }
200 impl<'a> Iterator for Incoming<'a> {
201 type Item = io::Result<TcpStream>;
202 fn next(&mut self) -> Option<io::Result<TcpStream>> {
203 Some(self.listener.accept().map(|p| p.0))
207 impl AsInner<net_imp::TcpListener> for TcpListener {
208 fn as_inner(&self) -> &net_imp::TcpListener { &self.0 }
218 use net::test::{next_test_ip4, next_test_ip6};
219 use sync::mpsc::channel;
222 fn each_ip(f: &mut FnMut(SocketAddr)) {
231 Err(e) => panic!("received error for `{}`: {}", stringify!($e), e),
236 // FIXME #11530 this fails on android because tests are run as root
237 #[cfg_attr(any(windows, target_os = "android"), ignore)]
240 match TcpListener::bind("0.0.0.0:1") {
242 Err(e) => assert_eq!(e.kind(), ErrorKind::PermissionDenied),
248 match TcpStream::connect("0.0.0.0:1") {
250 Err(e) => assert_eq!(e.kind(), ErrorKind::ConnectionRefused),
255 fn listen_localhost() {
256 let socket_addr = next_test_ip4();
257 let listener = t!(TcpListener::bind(&socket_addr));
259 let _t = Thread::scoped(move || {
260 let mut stream = t!(TcpStream::connect(&("localhost",
261 socket_addr.port())));
262 t!(stream.write(&[144]));
265 let mut stream = t!(listener.accept()).0;
267 t!(stream.read(&mut buf));
268 assert!(buf[0] == 144);
272 fn connect_ip4_loopback() {
273 let addr = next_test_ip4();
274 let acceptor = t!(TcpListener::bind(&addr));
276 let _t = Thread::scoped(move|| {
277 let mut stream = t!(TcpStream::connect(&("127.0.0.1", addr.port())));
278 t!(stream.write(&[44]));
281 let mut stream = t!(acceptor.accept()).0;
283 t!(stream.read(&mut buf));
284 assert!(buf[0] == 44);
288 fn connect_ip6_loopback() {
289 let addr = next_test_ip6();
290 let acceptor = t!(TcpListener::bind(&addr));
292 let _t = Thread::scoped(move|| {
293 let mut stream = t!(TcpStream::connect(&("::1", addr.port())));
294 t!(stream.write(&[66]));
297 let mut stream = t!(acceptor.accept()).0;
299 t!(stream.read(&mut buf));
300 assert!(buf[0] == 66);
304 fn smoke_test_ip6() {
305 each_ip(&mut |addr| {
306 let acceptor = t!(TcpListener::bind(&addr));
308 let (tx, rx) = channel();
309 let _t = Thread::scoped(move|| {
310 let mut stream = t!(TcpStream::connect(&addr));
311 t!(stream.write(&[99]));
312 tx.send(t!(stream.socket_addr())).unwrap();
315 let (mut stream, addr) = t!(acceptor.accept());
317 t!(stream.read(&mut buf));
318 assert!(buf[0] == 99);
319 assert_eq!(addr, t!(rx.recv()));
325 each_ip(&mut |addr| {
326 let acceptor = t!(TcpListener::bind(&addr));
328 let _t = Thread::scoped(move|| {
329 let _stream = t!(TcpStream::connect(&addr));
333 let mut stream = t!(acceptor.accept()).0;
335 let nread = t!(stream.read(&mut buf));
336 assert_eq!(nread, 0);
337 let nread = t!(stream.read(&mut buf));
338 assert_eq!(nread, 0);
344 each_ip(&mut |addr| {
345 let acceptor = t!(TcpListener::bind(&addr));
347 let (tx, rx) = channel();
348 let _t = Thread::scoped(move|| {
349 drop(t!(TcpStream::connect(&addr)));
350 tx.send(()).unwrap();
353 let mut stream = t!(acceptor.accept()).0;
356 match stream.write(&buf) {
359 assert!(e.kind() == ErrorKind::ConnectionReset ||
360 e.kind() == ErrorKind::BrokenPipe ||
361 e.kind() == ErrorKind::ConnectionAborted,
362 "unknown error: {}", e);
369 fn multiple_connect_serial_ip4() {
370 each_ip(&mut |addr| {
372 let acceptor = t!(TcpListener::bind(&addr));
374 let _t = Thread::scoped(move|| {
376 let mut stream = t!(TcpStream::connect(&addr));
377 t!(stream.write(&[99]));
381 for stream in acceptor.incoming().take(max) {
382 let mut stream = t!(stream);
384 t!(stream.read(&mut buf));
385 assert_eq!(buf[0], 99);
391 fn multiple_connect_interleaved_greedy_schedule() {
392 static MAX: usize = 10;
393 each_ip(&mut |addr| {
394 let acceptor = t!(TcpListener::bind(&addr));
396 let _t = Thread::scoped(move|| {
397 let acceptor = acceptor;
398 for (i, stream) in acceptor.incoming().enumerate().take(MAX) {
399 // Start another task to handle the connection
400 let _t = Thread::scoped(move|| {
401 let mut stream = t!(stream);
403 t!(stream.read(&mut buf));
404 assert!(buf[0] == i as u8);
412 fn connect(i: usize, addr: SocketAddr) {
413 if i == MAX { return }
415 let t = Thread::scoped(move|| {
416 let mut stream = t!(TcpStream::connect(&addr));
417 // Connect again before writing
418 connect(i + 1, addr);
419 t!(stream.write(&[i as u8]));
421 t.join().ok().unwrap();
426 fn multiple_connect_interleaved_lazy_schedule_ip4() {
427 static MAX: usize = 10;
428 each_ip(&mut |addr| {
429 let acceptor = t!(TcpListener::bind(&addr));
431 let _t = Thread::scoped(move|| {
432 for stream in acceptor.incoming().take(MAX) {
433 // Start another task to handle the connection
434 let _t = Thread::scoped(move|| {
435 let mut stream = t!(stream);
437 t!(stream.read(&mut buf));
438 assert!(buf[0] == 99);
446 fn connect(i: usize, addr: SocketAddr) {
447 if i == MAX { return }
449 let t = Thread::scoped(move|| {
450 let mut stream = t!(TcpStream::connect(&addr));
451 connect(i + 1, addr);
452 t!(stream.write(&[99]));
454 t.join().ok().unwrap();
458 pub fn socket_name(addr: SocketAddr) {
461 pub fn peer_name(addr: SocketAddr) {
465 fn socket_and_peer_name_ip4() {
466 each_ip(&mut |addr| {
467 let listener = t!(TcpListener::bind(&addr));
468 let so_name = t!(listener.socket_addr());
469 assert_eq!(addr, so_name);
470 let _t = Thread::scoped(move|| {
471 t!(listener.accept());
474 let stream = t!(TcpStream::connect(&addr));
475 assert_eq!(addr, t!(stream.peer_addr()));
481 each_ip(&mut |addr| {
482 let (tx, rx) = channel();
483 let srv = t!(TcpListener::bind(&addr));
484 let _t = Thread::scoped(move|| {
485 let mut cl = t!(srv.accept()).0;
486 cl.write(&[10]).unwrap();
489 tx.send(()).unwrap();
492 let mut c = t!(TcpStream::connect(&addr));
494 assert_eq!(c.read(&mut b), Ok(1));
502 each_ip(&mut |addr| {
503 let _listener = t!(TcpListener::bind(&addr));
504 match TcpListener::bind(&addr) {
507 assert!(e.kind() == ErrorKind::ConnectionRefused ||
508 e.kind() == ErrorKind::Other,
509 "unknown error: {} {:?}", e, e.kind());
517 each_ip(&mut |addr| {
518 let acceptor = t!(TcpListener::bind(&addr));
520 let _t = Thread::scoped(move|| {
521 t!(TcpStream::connect(&addr));
524 t!(acceptor.accept());
526 t!(TcpListener::bind(&addr));
531 fn tcp_clone_smoke() {
532 each_ip(&mut |addr| {
533 let acceptor = t!(TcpListener::bind(&addr));
535 let _t = Thread::scoped(move|| {
536 let mut s = t!(TcpStream::connect(&addr));
537 let mut buf = [0, 0];
538 assert_eq!(s.read(&mut buf), Ok(1));
539 assert_eq!(buf[0], 1);
543 let mut s1 = t!(acceptor.accept()).0;
544 let s2 = t!(s1.try_clone());
546 let (tx1, rx1) = channel();
547 let (tx2, rx2) = channel();
548 let _t = Thread::scoped(move|| {
552 tx2.send(()).unwrap();
554 tx1.send(()).unwrap();
555 let mut buf = [0, 0];
556 assert_eq!(s1.read(&mut buf), Ok(1));
562 fn tcp_clone_two_read() {
563 each_ip(&mut |addr| {
564 let acceptor = t!(TcpListener::bind(&addr));
565 let (tx1, rx) = channel();
566 let tx2 = tx1.clone();
568 let _t = Thread::scoped(move|| {
569 let mut s = t!(TcpStream::connect(&addr));
576 let mut s1 = t!(acceptor.accept()).0;
577 let s2 = t!(s1.try_clone());
579 let (done, rx) = channel();
580 let _t = Thread::scoped(move|| {
582 let mut buf = [0, 0];
583 t!(s2.read(&mut buf));
584 tx2.send(()).unwrap();
585 done.send(()).unwrap();
587 let mut buf = [0, 0];
588 t!(s1.read(&mut buf));
589 tx1.send(()).unwrap();
596 fn tcp_clone_two_write() {
597 each_ip(&mut |addr| {
598 let acceptor = t!(TcpListener::bind(&addr));
600 let _t = Thread::scoped(move|| {
601 let mut s = t!(TcpStream::connect(&addr));
602 let mut buf = [0, 1];
603 t!(s.read(&mut buf));
604 t!(s.read(&mut buf));
607 let mut s1 = t!(acceptor.accept()).0;
608 let s2 = t!(s1.try_clone());
610 let (done, rx) = channel();
611 let _t = Thread::scoped(move|| {
614 done.send(()).unwrap();
623 fn shutdown_smoke() {
624 each_ip(&mut |addr| {
625 let a = t!(TcpListener::bind(&addr));
626 let _t = Thread::scoped(move|| {
627 let mut c = t!(a.accept()).0;
629 assert_eq!(c.read(&mut b), Ok(0));
633 let mut s = t!(TcpStream::connect(&addr));
634 t!(s.shutdown(Shutdown::Write));
635 assert!(s.write(&[1]).is_err());
637 assert_eq!(t!(s.read(&mut b)), 1);
643 fn close_readwrite_smoke() {
644 each_ip(&mut |addr| {
645 let a = t!(TcpListener::bind(&addr));
646 let (tx, rx) = channel::<()>();
647 let _t = Thread::scoped(move|| {
648 let _s = t!(a.accept());
653 let mut s = t!(TcpStream::connect(&addr));
654 let mut s2 = t!(s.try_clone());
656 // closing should prevent reads/writes
657 t!(s.shutdown(Shutdown::Write));
658 assert!(s.write(&[0]).is_err());
659 t!(s.shutdown(Shutdown::Read));
660 assert_eq!(s.read(&mut b), Ok(0));
662 // closing should affect previous handles
663 assert!(s2.write(&[0]).is_err());
664 assert_eq!(s2.read(&mut b), Ok(0));
666 // closing should affect new handles
667 let mut s3 = t!(s.try_clone());
668 assert!(s3.write(&[0]).is_err());
669 assert_eq!(s3.read(&mut b), Ok(0));
671 // make sure these don't die
672 let _ = s2.shutdown(Shutdown::Read);
673 let _ = s2.shutdown(Shutdown::Write);
674 let _ = s3.shutdown(Shutdown::Read);
675 let _ = s3.shutdown(Shutdown::Write);
681 fn close_read_wakes_up() {
682 each_ip(&mut |addr| {
683 let a = t!(TcpListener::bind(&addr));
684 let (tx1, rx) = channel::<()>();
685 let _t = Thread::scoped(move|| {
686 let _s = t!(a.accept());
690 let s = t!(TcpStream::connect(&addr));
691 let s2 = t!(s.try_clone());
692 let (tx, rx) = channel();
693 let _t = Thread::scoped(move|| {
695 assert_eq!(t!(s2.read(&mut [0])), 0);
696 tx.send(()).unwrap();
698 // this should wake up the child task
699 t!(s.shutdown(Shutdown::Read));
701 // this test will never finish if the child doesn't wake up
708 fn clone_while_reading() {
709 each_ip(&mut |addr| {
710 let accept = t!(TcpListener::bind(&addr));
712 // Enqueue a task to write to a socket
713 let (tx, rx) = channel();
714 let (txdone, rxdone) = channel();
715 let txdone2 = txdone.clone();
716 let _t = Thread::scoped(move|| {
717 let mut tcp = t!(TcpStream::connect(&addr));
720 txdone2.send(()).unwrap();
723 // Spawn off a reading clone
724 let tcp = t!(accept.accept()).0;
725 let tcp2 = t!(tcp.try_clone());
726 let txdone3 = txdone.clone();
727 let _t = Thread::scoped(move|| {
729 t!(tcp2.read(&mut [0]));
730 txdone3.send(()).unwrap();
733 // Try to ensure that the reading clone is indeed reading
738 // clone the handle again while it's reading, then let it finish the
740 let _ = t!(tcp.try_clone());
741 tx.send(()).unwrap();
742 rxdone.recv().unwrap();
743 rxdone.recv().unwrap();
748 fn clone_accept_smoke() {
749 each_ip(&mut |addr| {
750 let a = t!(TcpListener::bind(&addr));
751 let a2 = t!(a.try_clone());
753 let _t = Thread::scoped(move|| {
754 let _ = TcpStream::connect(&addr);
756 let _t = Thread::scoped(move|| {
757 let _ = TcpStream::connect(&addr);
766 fn clone_accept_concurrent() {
767 each_ip(&mut |addr| {
768 let a = t!(TcpListener::bind(&addr));
769 let a2 = t!(a.try_clone());
771 let (tx, rx) = channel();
772 let tx2 = tx.clone();
774 let _t = Thread::scoped(move|| {
775 tx.send(t!(a.accept())).unwrap();
777 let _t = Thread::scoped(move|| {
778 tx2.send(t!(a2.accept())).unwrap();
781 let _t = Thread::scoped(move|| {
782 let _ = TcpStream::connect(&addr);
784 let _t = Thread::scoped(move|| {
785 let _ = TcpStream::connect(&addr);