]> git.lizzy.rs Git - rust.git/blob - src/libstd/io/net/tcp.rs
Honor hidden doc attribute of derivable trait methods
[rust.git] / src / libstd / io / net / tcp.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! TCP network connections
12 //!
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
16 //!
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
19
20 use clone::Clone;
21 use io::IoResult;
22 use io::net::ip::SocketAddr;
23 use io::{Reader, Writer, Listener, Acceptor};
24 use kinds::Send;
25 use option::{None, Some};
26 use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener};
27 use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
28
29 /// A structure which represents a TCP stream between a local socket and a
30 /// remote socket.
31 ///
32 /// # Example
33 ///
34 /// ```rust
35 /// # #![allow(unused_must_use)]
36 /// use std::io::net::tcp::TcpStream;
37 /// use std::io::net::ip::{Ipv4Addr, SocketAddr};
38 ///
39 /// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 };
40 /// let mut stream = TcpStream::connect(addr);
41 ///
42 /// stream.write([1]);
43 /// let mut buf = [0];
44 /// stream.read(buf);
45 /// drop(stream); // close the connection
46 /// ```
47 pub struct TcpStream {
48     obj: ~RtioTcpStream:Send
49 }
50
51 impl TcpStream {
52     fn new(s: ~RtioTcpStream:Send) -> TcpStream {
53         TcpStream { obj: s }
54     }
55
56     /// Creates a TCP connection to a remote socket address.
57     ///
58     /// If no error is encountered, then `Ok(stream)` is returned.
59     pub fn connect(addr: SocketAddr) -> IoResult<TcpStream> {
60         LocalIo::maybe_raise(|io| {
61             io.tcp_connect(addr, None).map(TcpStream::new)
62         })
63     }
64
65     /// Creates a TCP connection to a remote socket address, timing out after
66     /// the specified number of milliseconds.
67     ///
68     /// This is the same as the `connect` method, except that if the timeout
69     /// specified (in milliseconds) elapses before a connection is made an error
70     /// will be returned. The error's kind will be `TimedOut`.
71     #[experimental = "the timeout argument may eventually change types"]
72     pub fn connect_timeout(addr: SocketAddr,
73                            timeout_ms: u64) -> IoResult<TcpStream> {
74         LocalIo::maybe_raise(|io| {
75             io.tcp_connect(addr, Some(timeout_ms)).map(TcpStream::new)
76         })
77     }
78
79     /// Returns the socket address of the remote peer of this TCP connection.
80     pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
81         self.obj.peer_name()
82     }
83
84     /// Returns the socket address of the local half of this TCP connection.
85     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
86         self.obj.socket_name()
87     }
88 }
89
90 impl Clone for TcpStream {
91     /// Creates a new handle to this TCP stream, allowing for simultaneous reads
92     /// and writes of this connection.
93     ///
94     /// The underlying TCP stream will not be closed until all handles to the
95     /// stream have been deallocated. All handles will also follow the same
96     /// stream, but two concurrent reads will not receive the same data.
97     /// Instead, the first read will receive the first packet received, and the
98     /// second read will receive the second packet.
99     fn clone(&self) -> TcpStream {
100         TcpStream { obj: self.obj.clone() }
101     }
102 }
103
104 impl Reader for TcpStream {
105     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) }
106 }
107
108 impl Writer for TcpStream {
109     fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.obj.write(buf) }
110 }
111
112 /// A structure representing a socket server. This listener is used to create a
113 /// `TcpAcceptor` which can be used to accept sockets on a local port.
114 ///
115 /// # Example
116 ///
117 /// ```rust
118 /// # fn main() { }
119 /// # fn foo() {
120 /// # #![allow(dead_code)]
121 /// use std::io::{TcpListener, TcpStream};
122 /// use std::io::net::ip::{Ipv4Addr, SocketAddr};
123 /// use std::io::{Acceptor, Listener};
124 ///
125 /// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 80 };
126 /// let listener = TcpListener::bind(addr);
127 ///
128 /// // bind the listener to the specified address
129 /// let mut acceptor = listener.listen();
130 ///
131 /// fn handle_client(mut stream: TcpStream) {
132 ///     // ...
133 /// # &mut stream; // silence unused mutability/variable warning
134 /// }
135 /// // accept connections and process them, spawning a new tasks for each one
136 /// for stream in acceptor.incoming() {
137 ///     match stream {
138 ///         Err(e) => { /* connection failed */ }
139 ///         Ok(stream) => spawn(proc() {
140 ///             // connection succeeded
141 ///             handle_client(stream)
142 ///         })
143 ///     }
144 /// }
145 ///
146 /// // close the socket server
147 /// drop(acceptor);
148 /// # }
149 /// ```
150 pub struct TcpListener {
151     obj: ~RtioTcpListener:Send
152 }
153
154 impl TcpListener {
155     /// Creates a new `TcpListener` which will be bound to the specified local
156     /// socket address. This listener is not ready for accepting connections,
157     /// `listen` must be called on it before that's possible.
158     ///
159     /// Binding with a port number of 0 will request that the OS assigns a port
160     /// to this listener. The port allocated can be queried via the
161     /// `socket_name` function.
162     pub fn bind(addr: SocketAddr) -> IoResult<TcpListener> {
163         LocalIo::maybe_raise(|io| {
164             io.tcp_bind(addr).map(|l| TcpListener { obj: l })
165         })
166     }
167
168     /// Returns the local socket address of this listener.
169     pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
170         self.obj.socket_name()
171     }
172 }
173
174 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
175     fn listen(self) -> IoResult<TcpAcceptor> {
176         self.obj.listen().map(|acceptor| TcpAcceptor { obj: acceptor })
177     }
178 }
179
180 /// The accepting half of a TCP socket server. This structure is created through
181 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
182 /// `TcpStream` instances.
183 pub struct TcpAcceptor {
184     obj: ~RtioTcpAcceptor:Send
185 }
186
187 impl Acceptor<TcpStream> for TcpAcceptor {
188     fn accept(&mut self) -> IoResult<TcpStream> {
189         self.obj.accept().map(TcpStream::new)
190     }
191 }
192
193 #[cfg(test)]
194 mod test {
195     use super::*;
196     use io::net::ip::SocketAddr;
197     use io::*;
198     use prelude::*;
199
200     // FIXME #11530 this fails on android because tests are run as root
201     iotest!(fn bind_error() {
202         let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
203         match TcpListener::bind(addr) {
204             Ok(..) => fail!(),
205             Err(e) => assert_eq!(e.kind, PermissionDenied),
206         }
207     } #[ignore(cfg(windows))] #[ignore(cfg(target_os = "android"))])
208
209     iotest!(fn connect_error() {
210         let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
211         match TcpStream::connect(addr) {
212             Ok(..) => fail!(),
213             Err(e) => assert_eq!(e.kind, ConnectionRefused),
214         }
215     })
216
217     iotest!(fn smoke_test_ip4() {
218         let addr = next_test_ip4();
219         let mut acceptor = TcpListener::bind(addr).listen();
220
221         spawn(proc() {
222             let mut stream = TcpStream::connect(addr);
223             stream.write([99]).unwrap();
224         });
225
226         let mut stream = acceptor.accept();
227         let mut buf = [0];
228         stream.read(buf).unwrap();
229         assert!(buf[0] == 99);
230     })
231
232     iotest!(fn smoke_test_ip6() {
233         let addr = next_test_ip6();
234         let mut acceptor = TcpListener::bind(addr).listen();
235
236         spawn(proc() {
237             let mut stream = TcpStream::connect(addr);
238             stream.write([99]).unwrap();
239         });
240
241         let mut stream = acceptor.accept();
242         let mut buf = [0];
243         stream.read(buf).unwrap();
244         assert!(buf[0] == 99);
245     })
246
247     iotest!(fn read_eof_ip4() {
248         let addr = next_test_ip4();
249         let mut acceptor = TcpListener::bind(addr).listen();
250
251         spawn(proc() {
252             let _stream = TcpStream::connect(addr);
253             // Close
254         });
255
256         let mut stream = acceptor.accept();
257         let mut buf = [0];
258         let nread = stream.read(buf);
259         assert!(nread.is_err());
260     })
261
262     iotest!(fn read_eof_ip6() {
263         let addr = next_test_ip6();
264         let mut acceptor = TcpListener::bind(addr).listen();
265
266         spawn(proc() {
267             let _stream = TcpStream::connect(addr);
268             // Close
269         });
270
271         let mut stream = acceptor.accept();
272         let mut buf = [0];
273         let nread = stream.read(buf);
274         assert!(nread.is_err());
275     })
276
277     iotest!(fn read_eof_twice_ip4() {
278         let addr = next_test_ip4();
279         let mut acceptor = TcpListener::bind(addr).listen();
280
281         spawn(proc() {
282             let _stream = TcpStream::connect(addr);
283             // Close
284         });
285
286         let mut stream = acceptor.accept();
287         let mut buf = [0];
288         let nread = stream.read(buf);
289         assert!(nread.is_err());
290
291         match stream.read(buf) {
292             Ok(..) => fail!(),
293             Err(ref e) => {
294                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
295                         "unknown kind: {:?}", e.kind);
296             }
297         }
298     })
299
300     iotest!(fn read_eof_twice_ip6() {
301         let addr = next_test_ip6();
302         let mut acceptor = TcpListener::bind(addr).listen();
303
304         spawn(proc() {
305             let _stream = TcpStream::connect(addr);
306             // Close
307         });
308
309         let mut stream = acceptor.accept();
310         let mut buf = [0];
311         let nread = stream.read(buf);
312         assert!(nread.is_err());
313
314         match stream.read(buf) {
315             Ok(..) => fail!(),
316             Err(ref e) => {
317                 assert!(e.kind == NotConnected || e.kind == EndOfFile,
318                         "unknown kind: {:?}", e.kind);
319             }
320         }
321     })
322
323     iotest!(fn write_close_ip4() {
324         let addr = next_test_ip4();
325         let mut acceptor = TcpListener::bind(addr).listen();
326
327         spawn(proc() {
328             let _stream = TcpStream::connect(addr);
329             // Close
330         });
331
332         let mut stream = acceptor.accept();
333         let buf = [0];
334         loop {
335             match stream.write(buf) {
336                 Ok(..) => {}
337                 Err(e) => {
338                     assert!(e.kind == ConnectionReset ||
339                             e.kind == BrokenPipe ||
340                             e.kind == ConnectionAborted,
341                             "unknown error: {:?}", e);
342                     break;
343                 }
344             }
345         }
346     })
347
348     iotest!(fn write_close_ip6() {
349         let addr = next_test_ip6();
350         let mut acceptor = TcpListener::bind(addr).listen();
351
352         spawn(proc() {
353             let _stream = TcpStream::connect(addr);
354             // Close
355         });
356
357         let mut stream = acceptor.accept();
358         let buf = [0];
359         loop {
360             match stream.write(buf) {
361                 Ok(..) => {}
362                 Err(e) => {
363                     assert!(e.kind == ConnectionReset ||
364                             e.kind == BrokenPipe ||
365                             e.kind == ConnectionAborted,
366                             "unknown error: {:?}", e);
367                     break;
368                 }
369             }
370         }
371     })
372
373     iotest!(fn multiple_connect_serial_ip4() {
374         let addr = next_test_ip4();
375         let max = 10u;
376         let mut acceptor = TcpListener::bind(addr).listen();
377
378         spawn(proc() {
379             for _ in range(0, max) {
380                 let mut stream = TcpStream::connect(addr);
381                 stream.write([99]).unwrap();
382             }
383         });
384
385         for ref mut stream in acceptor.incoming().take(max) {
386             let mut buf = [0];
387             stream.read(buf).unwrap();
388             assert_eq!(buf[0], 99);
389         }
390     })
391
392     iotest!(fn multiple_connect_serial_ip6() {
393         let addr = next_test_ip6();
394         let max = 10u;
395         let mut acceptor = TcpListener::bind(addr).listen();
396
397         spawn(proc() {
398             for _ in range(0, max) {
399                 let mut stream = TcpStream::connect(addr);
400                 stream.write([99]).unwrap();
401             }
402         });
403
404         for ref mut stream in acceptor.incoming().take(max) {
405             let mut buf = [0];
406             stream.read(buf).unwrap();
407             assert_eq!(buf[0], 99);
408         }
409     })
410
411     iotest!(fn multiple_connect_interleaved_greedy_schedule_ip4() {
412         let addr = next_test_ip4();
413         static MAX: int = 10;
414         let acceptor = TcpListener::bind(addr).listen();
415
416         spawn(proc() {
417             let mut acceptor = acceptor;
418             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
419                 // Start another task to handle the connection
420                 spawn(proc() {
421                     let mut stream = stream;
422                     let mut buf = [0];
423                     stream.read(buf).unwrap();
424                     assert!(buf[0] == i as u8);
425                     debug!("read");
426                 });
427             }
428         });
429
430         connect(0, addr);
431
432         fn connect(i: int, addr: SocketAddr) {
433             if i == MAX { return }
434
435             spawn(proc() {
436                 debug!("connecting");
437                 let mut stream = TcpStream::connect(addr);
438                 // Connect again before writing
439                 connect(i + 1, addr);
440                 debug!("writing");
441                 stream.write([i as u8]).unwrap();
442             });
443         }
444     })
445
446     iotest!(fn multiple_connect_interleaved_greedy_schedule_ip6() {
447         let addr = next_test_ip6();
448         static MAX: int = 10;
449         let acceptor = TcpListener::bind(addr).listen();
450
451         spawn(proc() {
452             let mut acceptor = acceptor;
453             for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
454                 // Start another task to handle the connection
455                 spawn(proc() {
456                     let mut stream = stream;
457                     let mut buf = [0];
458                     stream.read(buf).unwrap();
459                     assert!(buf[0] == i as u8);
460                     debug!("read");
461                 });
462             }
463         });
464
465         connect(0, addr);
466
467         fn connect(i: int, addr: SocketAddr) {
468             if i == MAX { return }
469
470             spawn(proc() {
471                 debug!("connecting");
472                 let mut stream = TcpStream::connect(addr);
473                 // Connect again before writing
474                 connect(i + 1, addr);
475                 debug!("writing");
476                 stream.write([i as u8]).unwrap();
477             });
478         }
479     })
480
481     iotest!(fn multiple_connect_interleaved_lazy_schedule_ip4() {
482         static MAX: int = 10;
483         let addr = next_test_ip4();
484         let acceptor = TcpListener::bind(addr).listen();
485
486         spawn(proc() {
487             let mut acceptor = acceptor;
488             for stream in acceptor.incoming().take(MAX as uint) {
489                 // Start another task to handle the connection
490                 spawn(proc() {
491                     let mut stream = stream;
492                     let mut buf = [0];
493                     stream.read(buf).unwrap();
494                     assert!(buf[0] == 99);
495                     debug!("read");
496                 });
497             }
498         });
499
500         connect(0, addr);
501
502         fn connect(i: int, addr: SocketAddr) {
503             if i == MAX { return }
504
505             spawn(proc() {
506                 debug!("connecting");
507                 let mut stream = TcpStream::connect(addr);
508                 // Connect again before writing
509                 connect(i + 1, addr);
510                 debug!("writing");
511                 stream.write([99]).unwrap();
512             });
513         }
514     })
515
516     iotest!(fn multiple_connect_interleaved_lazy_schedule_ip6() {
517         static MAX: int = 10;
518         let addr = next_test_ip6();
519         let acceptor = TcpListener::bind(addr).listen();
520
521         spawn(proc() {
522             let mut acceptor = acceptor;
523             for stream in acceptor.incoming().take(MAX as uint) {
524                 // Start another task to handle the connection
525                 spawn(proc() {
526                     let mut stream = stream;
527                     let mut buf = [0];
528                     stream.read(buf).unwrap();
529                     assert!(buf[0] == 99);
530                     debug!("read");
531                 });
532             }
533         });
534
535         connect(0, addr);
536
537         fn connect(i: int, addr: SocketAddr) {
538             if i == MAX { return }
539
540             spawn(proc() {
541                 debug!("connecting");
542                 let mut stream = TcpStream::connect(addr);
543                 // Connect again before writing
544                 connect(i + 1, addr);
545                 debug!("writing");
546                 stream.write([99]).unwrap();
547             });
548         }
549     })
550
551     pub fn socket_name(addr: SocketAddr) {
552         let mut listener = TcpListener::bind(addr).unwrap();
553
554         // Make sure socket_name gives
555         // us the socket we binded to.
556         let so_name = listener.socket_name();
557         assert!(so_name.is_ok());
558         assert_eq!(addr, so_name.unwrap());
559     }
560
561     pub fn peer_name(addr: SocketAddr) {
562         let acceptor = TcpListener::bind(addr).listen();
563         spawn(proc() {
564             let mut acceptor = acceptor;
565             acceptor.accept().unwrap();
566         });
567
568         let stream = TcpStream::connect(addr);
569
570         assert!(stream.is_ok());
571         let mut stream = stream.unwrap();
572
573         // Make sure peer_name gives us the
574         // address/port of the peer we've
575         // connected to.
576         let peer_name = stream.peer_name();
577         assert!(peer_name.is_ok());
578         assert_eq!(addr, peer_name.unwrap());
579     }
580
581     iotest!(fn socket_and_peer_name_ip4() {
582         peer_name(next_test_ip4());
583         socket_name(next_test_ip4());
584     })
585
586     iotest!(fn socket_and_peer_name_ip6() {
587         // FIXME: peer name is not consistent
588         //peer_name(next_test_ip6());
589         socket_name(next_test_ip6());
590     })
591
592     iotest!(fn partial_read() {
593         let addr = next_test_ip4();
594         let (tx, rx) = channel();
595         spawn(proc() {
596             let mut srv = TcpListener::bind(addr).listen().unwrap();
597             tx.send(());
598             let mut cl = srv.accept().unwrap();
599             cl.write([10]).unwrap();
600             let mut b = [0];
601             cl.read(b).unwrap();
602             tx.send(());
603         });
604
605         rx.recv();
606         let mut c = TcpStream::connect(addr).unwrap();
607         let mut b = [0, ..10];
608         assert_eq!(c.read(b), Ok(1));
609         c.write([1]).unwrap();
610         rx.recv();
611     })
612
613     iotest!(fn double_bind() {
614         let addr = next_test_ip4();
615         let listener = TcpListener::bind(addr).unwrap().listen();
616         assert!(listener.is_ok());
617         match TcpListener::bind(addr).listen() {
618             Ok(..) => fail!(),
619             Err(e) => {
620                 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError);
621             }
622         }
623     })
624
625     iotest!(fn fast_rebind() {
626         let addr = next_test_ip4();
627         let (tx, rx) = channel();
628
629         spawn(proc() {
630             rx.recv();
631             let _stream = TcpStream::connect(addr).unwrap();
632             // Close
633             rx.recv();
634         });
635
636         {
637             let mut acceptor = TcpListener::bind(addr).listen();
638             tx.send(());
639             {
640                 let _stream = acceptor.accept().unwrap();
641                 // Close client
642                 tx.send(());
643             }
644             // Close listener
645         }
646         let _listener = TcpListener::bind(addr);
647     })
648
649     iotest!(fn tcp_clone_smoke() {
650         let addr = next_test_ip4();
651         let mut acceptor = TcpListener::bind(addr).listen();
652
653         spawn(proc() {
654             let mut s = TcpStream::connect(addr);
655             let mut buf = [0, 0];
656             assert_eq!(s.read(buf), Ok(1));
657             assert_eq!(buf[0], 1);
658             s.write([2]).unwrap();
659         });
660
661         let mut s1 = acceptor.accept().unwrap();
662         let s2 = s1.clone();
663
664         let (tx1, rx1) = channel();
665         let (tx2, rx2) = channel();
666         spawn(proc() {
667             let mut s2 = s2;
668             rx1.recv();
669             s2.write([1]).unwrap();
670             tx2.send(());
671         });
672         tx1.send(());
673         let mut buf = [0, 0];
674         assert_eq!(s1.read(buf), Ok(1));
675         rx2.recv();
676     })
677
678     iotest!(fn tcp_clone_two_read() {
679         let addr = next_test_ip6();
680         let mut acceptor = TcpListener::bind(addr).listen();
681         let (tx1, rx) = channel();
682         let tx2 = tx1.clone();
683
684         spawn(proc() {
685             let mut s = TcpStream::connect(addr);
686             s.write([1]).unwrap();
687             rx.recv();
688             s.write([2]).unwrap();
689             rx.recv();
690         });
691
692         let mut s1 = acceptor.accept().unwrap();
693         let s2 = s1.clone();
694
695         let (done, rx) = channel();
696         spawn(proc() {
697             let mut s2 = s2;
698             let mut buf = [0, 0];
699             s2.read(buf).unwrap();
700             tx2.send(());
701             done.send(());
702         });
703         let mut buf = [0, 0];
704         s1.read(buf).unwrap();
705         tx1.send(());
706
707         rx.recv();
708     })
709
710     iotest!(fn tcp_clone_two_write() {
711         let addr = next_test_ip4();
712         let mut acceptor = TcpListener::bind(addr).listen();
713
714         spawn(proc() {
715             let mut s = TcpStream::connect(addr);
716             let mut buf = [0, 1];
717             s.read(buf).unwrap();
718             s.read(buf).unwrap();
719         });
720
721         let mut s1 = acceptor.accept().unwrap();
722         let s2 = s1.clone();
723
724         let (done, rx) = channel();
725         spawn(proc() {
726             let mut s2 = s2;
727             s2.write([1]).unwrap();
728             done.send(());
729         });
730         s1.write([2]).unwrap();
731
732         rx.recv();
733     })
734
735     iotest!(fn shutdown_smoke() {
736         use rt::rtio::RtioTcpStream;
737
738         let addr = next_test_ip4();
739         let a = TcpListener::bind(addr).unwrap().listen();
740         spawn(proc() {
741             let mut a = a;
742             let mut c = a.accept().unwrap();
743             assert_eq!(c.read_to_end(), Ok(vec!()));
744             c.write([1]).unwrap();
745         });
746
747         let mut s = TcpStream::connect(addr).unwrap();
748         assert!(s.obj.close_write().is_ok());
749         assert!(s.write([1]).is_err());
750         assert_eq!(s.read_to_end(), Ok(vec!(1)));
751     })
752 }