]> git.lizzy.rs Git - rust.git/blob - src/libstd/rt/io/net/tcp.rs
b7cb703eb25477fb8018c2492d241718c8abc2e9
[rust.git] / src / libstd / rt / io / net / tcp.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use option::{Option, Some, None};
12 use result::{Ok, Err};
13 use rt::io::net::ip::SocketAddr;
14 use rt::io::{Reader, Writer, Listener, Acceptor};
15 use rt::io::{io_error, read_error, EndOfFile};
16 use rt::rtio::{IoFactory, IoFactoryObject,
17                RtioSocket,
18                RtioTcpListener, RtioTcpListenerObject,
19                RtioTcpAcceptor, RtioTcpAcceptorObject,
20                RtioTcpStream, RtioTcpStreamObject};
21 use rt::local::Local;
22
23 pub struct TcpStream(~RtioTcpStreamObject);
24
25 impl TcpStream {
26     fn new(s: ~RtioTcpStreamObject) -> TcpStream {
27         TcpStream(s)
28     }
29
30     pub fn connect(addr: SocketAddr) -> Option<TcpStream> {
31         let stream = unsafe {
32             rtdebug!("borrowing io to connect");
33             let io: *mut IoFactoryObject = Local::unsafe_borrow();
34             rtdebug!("about to connect");
35             (*io).tcp_connect(addr)
36         };
37
38         match stream {
39             Ok(s) => Some(TcpStream::new(s)),
40             Err(ioerr) => {
41                 rtdebug!("failed to connect: %?", ioerr);
42                 io_error::cond.raise(ioerr);
43                 None
44             }
45         }
46     }
47
48     pub fn peer_name(&mut self) -> Option<SocketAddr> {
49         match (**self).peer_name() {
50             Ok(pn) => Some(pn),
51             Err(ioerr) => {
52                 rtdebug!("failed to get peer name: %?", ioerr);
53                 io_error::cond.raise(ioerr);
54                 None
55             }
56         }
57     }
58
59     pub fn socket_name(&mut self) -> Option<SocketAddr> {
60         match (**self).socket_name() {
61             Ok(sn) => Some(sn),
62             Err(ioerr) => {
63                 rtdebug!("failed to get socket name: %?", ioerr);
64                 io_error::cond.raise(ioerr);
65                 None
66             }
67         }
68     }
69 }
70
71 impl Reader for TcpStream {
72     fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
73         match (**self).read(buf) {
74             Ok(read) => Some(read),
75             Err(ioerr) => {
76                 // EOF is indicated by returning None
77                 if ioerr.kind != EndOfFile {
78                     read_error::cond.raise(ioerr);
79                 }
80                 return None;
81             }
82         }
83     }
84
85     fn eof(&mut self) -> bool { fail!() }
86 }
87
88 impl Writer for TcpStream {
89     fn write(&mut self, buf: &[u8]) {
90         match (**self).write(buf) {
91             Ok(_) => (),
92             Err(ioerr) => io_error::cond.raise(ioerr),
93         }
94     }
95
96     fn flush(&mut self) { fail!() }
97 }
98
99 pub struct TcpListener(~RtioTcpListenerObject);
100
101 impl TcpListener {
102     pub fn bind(addr: SocketAddr) -> Option<TcpListener> {
103         let listener = unsafe {
104             let io: *mut IoFactoryObject = Local::unsafe_borrow();
105             (*io).tcp_bind(addr)
106         };
107         match listener {
108             Ok(l) => Some(TcpListener(l)),
109             Err(ioerr) => {
110                 io_error::cond.raise(ioerr);
111                 return None;
112             }
113         }
114     }
115
116     pub fn socket_name(&mut self) -> Option<SocketAddr> {
117         match (**self).socket_name() {
118             Ok(sn) => Some(sn),
119             Err(ioerr) => {
120                 rtdebug!("failed to get socket name: %?", ioerr);
121                 io_error::cond.raise(ioerr);
122                 None
123             }
124         }
125     }
126 }
127
128 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
129     fn listen(self) -> Option<TcpAcceptor> {
130         match (**self).listen() {
131             Ok(acceptor) => Some(TcpAcceptor(acceptor)),
132             Err(ioerr) => {
133                 io_error::cond.raise(ioerr);
134                 None
135             }
136         }
137     }
138 }
139
140 pub struct TcpAcceptor(~RtioTcpAcceptorObject);
141
142 impl Acceptor<TcpStream> for TcpAcceptor {
143     fn accept(&mut self) -> Option<TcpStream> {
144         match (**self).accept() {
145             Ok(s) => Some(TcpStream::new(s)),
146             Err(ioerr) => {
147                 io_error::cond.raise(ioerr);
148                 None
149             }
150         }
151     }
152 }
153
154 #[cfg(test)]
155 mod test {
156     use super::*;
157     use cell::Cell;
158     use rt::test::*;
159     use rt::io::net::ip::{Ipv4Addr, SocketAddr};
160     use rt::io::*;
161     use prelude::*;
162
163     #[test] #[ignore]
164     fn bind_error() {
165         do run_in_newsched_task {
166             let mut called = false;
167             do io_error::cond.trap(|e| {
168                 assert!(e.kind == PermissionDenied);
169                 called = true;
170             }).inside {
171                 let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
172                 let listener = TcpListener::bind(addr);
173                 assert!(listener.is_none());
174             }
175             assert!(called);
176         }
177     }
178
179     #[test]
180     #[ignore(cfg(windows))] // FIXME #8811
181     fn connect_error() {
182         do run_in_newsched_task {
183             let mut called = false;
184             do io_error::cond.trap(|e| {
185                 assert!(e.kind == ConnectionRefused);
186                 called = true;
187             }).inside {
188                 let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
189                 let stream = TcpStream::connect(addr);
190                 assert!(stream.is_none());
191             }
192             assert!(called);
193         }
194     }
195
196     #[test]
197     fn smoke_test_ip4() {
198         do run_in_newsched_task {
199             let addr = next_test_ip4();
200
201             do spawntask {
202                 let mut acceptor = TcpListener::bind(addr).listen();
203                 let mut stream = acceptor.accept();
204                 let mut buf = [0];
205                 stream.read(buf);
206                 assert!(buf[0] == 99);
207             }
208
209             do spawntask {
210                 let mut stream = TcpStream::connect(addr);
211                 stream.write([99]);
212             }
213         }
214     }
215
216     #[test]
217     fn smoke_test_ip6() {
218         do run_in_newsched_task {
219             let addr = next_test_ip6();
220
221             do spawntask {
222                 let mut acceptor = TcpListener::bind(addr).listen();
223                 let mut stream = acceptor.accept();
224                 let mut buf = [0];
225                 stream.read(buf);
226                 assert!(buf[0] == 99);
227             }
228
229             do spawntask {
230                 let mut stream = TcpStream::connect(addr);
231                 stream.write([99]);
232             }
233         }
234     }
235
236     #[test]
237     fn read_eof_ip4() {
238         do run_in_newsched_task {
239             let addr = next_test_ip4();
240
241             do spawntask {
242                 let mut acceptor = TcpListener::bind(addr).listen();
243                 let mut stream = acceptor.accept();
244                 let mut buf = [0];
245                 let nread = stream.read(buf);
246                 assert!(nread.is_none());
247             }
248
249             do spawntask {
250                 let _stream = TcpStream::connect(addr);
251                 // Close
252             }
253         }
254     }
255
256     #[test]
257     fn read_eof_ip6() {
258         do run_in_newsched_task {
259             let addr = next_test_ip6();
260
261             do spawntask {
262                 let mut acceptor = TcpListener::bind(addr).listen();
263                 let mut stream = acceptor.accept();
264                 let mut buf = [0];
265                 let nread = stream.read(buf);
266                 assert!(nread.is_none());
267             }
268
269             do spawntask {
270                 let _stream = TcpStream::connect(addr);
271                 // Close
272             }
273         }
274     }
275
276     #[test]
277     #[ignore(cfg(windows))] // FIXME #8811
278     fn read_eof_twice_ip4() {
279         do run_in_newsched_task {
280             let addr = next_test_ip4();
281
282             do spawntask {
283                 let mut acceptor = TcpListener::bind(addr).listen();
284                 let mut stream = acceptor.accept();
285                 let mut buf = [0];
286                 let nread = stream.read(buf);
287                 assert!(nread.is_none());
288                 let nread = stream.read(buf);
289                 assert!(nread.is_none());
290             }
291
292             do spawntask {
293                 let _stream = TcpStream::connect(addr);
294                 // Close
295             }
296         }
297     }
298
299     #[test]
300     #[ignore(cfg(windows))] // FIXME #8811
301     fn read_eof_twice_ip6() {
302         do run_in_newsched_task {
303             let addr = next_test_ip6();
304
305             do spawntask {
306                 let mut acceptor = TcpListener::bind(addr).listen();
307                 let mut stream = acceptor.accept();
308                 let mut buf = [0];
309                 let nread = stream.read(buf);
310                 assert!(nread.is_none());
311                 let nread = stream.read(buf);
312                 assert!(nread.is_none());
313             }
314
315             do spawntask {
316                 let _stream = TcpStream::connect(addr);
317                 // Close
318             }
319         }
320     }
321
322     #[test]
323     #[ignore(cfg(windows))] // FIXME #8811
324     fn write_close_ip4() {
325         do run_in_newsched_task {
326             let addr = next_test_ip4();
327
328             do spawntask {
329                 let mut acceptor = TcpListener::bind(addr).listen();
330                 let mut stream = acceptor.accept();
331                 let buf = [0];
332                 loop {
333                     let mut stop = false;
334                     do io_error::cond.trap(|e| {
335                         // NB: ECONNRESET on linux, EPIPE on mac
336                         assert!(e.kind == ConnectionReset || e.kind == BrokenPipe);
337                         stop = true;
338                     }).inside {
339                         stream.write(buf);
340                     }
341                     if stop { break }
342                 }
343             }
344
345             do spawntask {
346                 let _stream = TcpStream::connect(addr);
347                 // Close
348             }
349         }
350     }
351
352     #[test]
353     #[ignore(cfg(windows))] // FIXME #8811
354     fn write_close_ip6() {
355         do run_in_newsched_task {
356             let addr = next_test_ip6();
357
358             do spawntask {
359                 let mut acceptor = TcpListener::bind(addr).listen();
360                 let mut stream = acceptor.accept();
361                 let buf = [0];
362                 loop {
363                     let mut stop = false;
364                     do io_error::cond.trap(|e| {
365                         // NB: ECONNRESET on linux, EPIPE on mac
366                         assert!(e.kind == ConnectionReset || e.kind == BrokenPipe);
367                         stop = true;
368                     }).inside {
369                         stream.write(buf);
370                     }
371                     if stop { break }
372                 }
373             }
374
375             do spawntask {
376                 let _stream = TcpStream::connect(addr);
377                 // Close
378             }
379         }
380     }
381
382     #[test]
383     fn multiple_connect_serial_ip4() {
384         do run_in_newsched_task {
385             let addr = next_test_ip4();
386             let max = 10;
387
388             do spawntask {
389                 let mut acceptor = TcpListener::bind(addr).listen();
390                 for ref mut stream in acceptor.incoming().take(max) {
391                     let mut buf = [0];
392                     stream.read(buf);
393                     assert_eq!(buf[0], 99);
394                 }
395             }
396
397             do spawntask {
398                 do max.times {
399                     let mut stream = TcpStream::connect(addr);
400                     stream.write([99]);
401                 }
402             }
403         }
404     }
405
406     #[test]
407     fn multiple_connect_serial_ip6() {
408         do run_in_newsched_task {
409             let addr = next_test_ip6();
410             let max = 10;
411
412             do spawntask {
413                 let mut acceptor = TcpListener::bind(addr).listen();
414                 for ref mut stream in acceptor.incoming().take(max) {
415                     let mut buf = [0];
416                     stream.read(buf);
417                     assert_eq!(buf[0], 99);
418                 }
419             }
420
421             do spawntask {
422                 do max.times {
423                     let mut stream = TcpStream::connect(addr);
424                     stream.write([99]);
425                 }
426             }
427         }
428     }
429
430     #[test]
431     fn multiple_connect_interleaved_greedy_schedule_ip4() {
432         do run_in_newsched_task {
433             let addr = next_test_ip4();
434             static MAX: int = 10;
435
436             do spawntask {
437                 let mut acceptor = TcpListener::bind(addr).listen();
438                 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
439                     let stream = Cell::new(stream);
440                     // Start another task to handle the connection
441                     do spawntask {
442                         let mut stream = stream.take();
443                         let mut buf = [0];
444                         stream.read(buf);
445                         assert!(buf[0] == i as u8);
446                         rtdebug!("read");
447                     }
448                 }
449             }
450
451             connect(0, addr);
452
453             fn connect(i: int, addr: SocketAddr) {
454                 if i == MAX { return }
455
456                 do spawntask {
457                     rtdebug!("connecting");
458                     let mut stream = TcpStream::connect(addr);
459                     // Connect again before writing
460                     connect(i + 1, addr);
461                     rtdebug!("writing");
462                     stream.write([i as u8]);
463                 }
464             }
465         }
466     }
467
468     #[test]
469     fn multiple_connect_interleaved_greedy_schedule_ip6() {
470         do run_in_newsched_task {
471             let addr = next_test_ip6();
472             static MAX: int = 10;
473
474             do spawntask {
475                 let mut acceptor = TcpListener::bind(addr).listen();
476                 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
477                     let stream = Cell::new(stream);
478                     // Start another task to handle the connection
479                     do spawntask {
480                         let mut stream = stream.take();
481                         let mut buf = [0];
482                         stream.read(buf);
483                         assert!(buf[0] == i as u8);
484                         rtdebug!("read");
485                     }
486                 }
487             }
488
489             connect(0, addr);
490
491             fn connect(i: int, addr: SocketAddr) {
492                 if i == MAX { return }
493
494                 do spawntask {
495                     rtdebug!("connecting");
496                     let mut stream = TcpStream::connect(addr);
497                     // Connect again before writing
498                     connect(i + 1, addr);
499                     rtdebug!("writing");
500                     stream.write([i as u8]);
501                 }
502             }
503         }
504     }
505
506     #[test]
507     fn multiple_connect_interleaved_lazy_schedule_ip4() {
508         do run_in_newsched_task {
509             let addr = next_test_ip4();
510             static MAX: int = 10;
511
512             do spawntask {
513                 let mut acceptor = TcpListener::bind(addr).listen();
514                 for stream in acceptor.incoming().take(MAX as uint) {
515                     let stream = Cell::new(stream);
516                     // Start another task to handle the connection
517                     do spawntask_later {
518                         let mut stream = stream.take();
519                         let mut buf = [0];
520                         stream.read(buf);
521                         assert!(buf[0] == 99);
522                         rtdebug!("read");
523                     }
524                 }
525             }
526
527             connect(0, addr);
528
529             fn connect(i: int, addr: SocketAddr) {
530                 if i == MAX { return }
531
532                 do spawntask_later {
533                     rtdebug!("connecting");
534                     let mut stream = TcpStream::connect(addr);
535                     // Connect again before writing
536                     connect(i + 1, addr);
537                     rtdebug!("writing");
538                     stream.write([99]);
539                 }
540             }
541         }
542     }
543     #[test]
544     fn multiple_connect_interleaved_lazy_schedule_ip6() {
545         do run_in_newsched_task {
546             let addr = next_test_ip6();
547             static MAX: int = 10;
548
549             do spawntask {
550                 let mut acceptor = TcpListener::bind(addr).listen();
551                 for stream in acceptor.incoming().take(MAX as uint) {
552                     let stream = Cell::new(stream);
553                     // Start another task to handle the connection
554                     do spawntask_later {
555                         let mut stream = stream.take();
556                         let mut buf = [0];
557                         stream.read(buf);
558                         assert!(buf[0] == 99);
559                         rtdebug!("read");
560                     }
561                 }
562             }
563
564             connect(0, addr);
565
566             fn connect(i: int, addr: SocketAddr) {
567                 if i == MAX { return }
568
569                 do spawntask_later {
570                     rtdebug!("connecting");
571                     let mut stream = TcpStream::connect(addr);
572                     // Connect again before writing
573                     connect(i + 1, addr);
574                     rtdebug!("writing");
575                     stream.write([99]);
576                 }
577             }
578         }
579     }
580
581     #[cfg(test)]
582     fn socket_name(addr: SocketAddr) {
583         do run_in_newsched_task {
584             do spawntask {
585                 let mut listener = TcpListener::bind(addr).unwrap();
586
587                 // Make sure socket_name gives
588                 // us the socket we binded to.
589                 let so_name = listener.socket_name();
590                 assert!(so_name.is_some());
591                 assert_eq!(addr, so_name.unwrap());
592
593             }
594         }
595     }
596
597     #[cfg(test)]
598     fn peer_name(addr: SocketAddr) {
599         do run_in_newsched_task {
600             do spawntask {
601                 let mut acceptor = TcpListener::bind(addr).listen();
602
603                 acceptor.accept();
604             }
605
606             do spawntask {
607                 let stream = TcpStream::connect(addr);
608
609                 assert!(stream.is_some());
610                 let mut stream = stream.unwrap();
611
612                 // Make sure peer_name gives us the
613                 // address/port of the peer we've
614                 // connected to.
615                 let peer_name = stream.peer_name();
616                 assert!(peer_name.is_some());
617                 assert_eq!(addr, peer_name.unwrap());
618             }
619         }
620     }
621
622     #[test]
623     fn socket_and_peer_name_ip4() {
624         peer_name(next_test_ip4());
625         socket_name(next_test_ip4());
626     }
627
628     #[test]
629     fn socket_and_peer_name_ip6() {
630         // XXX: peer name is not consistent
631         //peer_name(next_test_ip6());
632         socket_name(next_test_ip6());
633     }
634
635 }