]> git.lizzy.rs Git - rust.git/blob - src/libstd/rt/io/net/tcp.rs
dc7135f4a61ecb2bea11351b4a6b67fc3e6c9d52
[rust.git] / src / libstd / rt / io / net / tcp.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use option::{Option, Some, None};
12 use result::{Ok, Err};
13 use rt::io::net::ip::SocketAddr;
14 use rt::io::{Reader, Writer, Listener};
15 use rt::io::{io_error, read_error, EndOfFile};
16 use rt::rtio::{IoFactory, IoFactoryObject,
17                RtioSocket, RtioTcpListener,
18                RtioTcpListenerObject, RtioTcpStream,
19                RtioTcpStreamObject, RtioStream};
20 use rt::local::Local;
21
22 pub struct TcpStream(~RtioTcpStreamObject);
23
24 impl TcpStream {
25     fn new(s: ~RtioTcpStreamObject) -> TcpStream {
26         TcpStream(s)
27     }
28
29     pub fn connect(addr: SocketAddr) -> Option<TcpStream> {
30         let stream = unsafe {
31             rtdebug!("borrowing io to connect");
32             let io: *mut IoFactoryObject = Local::unsafe_borrow();
33             rtdebug!("about to connect");
34             (*io).tcp_connect(addr)
35         };
36
37         match stream {
38             Ok(s) => Some(TcpStream::new(s)),
39             Err(ioerr) => {
40                 rtdebug!("failed to connect: %?", ioerr);
41                 io_error::cond.raise(ioerr);
42                 None
43             }
44         }
45     }
46
47     pub fn peer_name(&mut self) -> Option<SocketAddr> {
48         match (**self).peer_name() {
49             Ok(pn) => Some(pn),
50             Err(ioerr) => {
51                 rtdebug!("failed to get peer name: %?", ioerr);
52                 io_error::cond.raise(ioerr);
53                 None
54             }
55         }
56     }
57
58     pub fn socket_name(&mut self) -> Option<SocketAddr> {
59         match (**self).socket_name() {
60             Ok(sn) => Some(sn),
61             Err(ioerr) => {
62                 rtdebug!("failed to get socket name: %?", ioerr);
63                 io_error::cond.raise(ioerr);
64                 None
65             }
66         }
67     }
68 }
69
70 impl Reader for TcpStream {
71     fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
72         match (***self).read(buf) {
73             Ok(read) => Some(read),
74             Err(ioerr) => {
75                 // EOF is indicated by returning None
76                 if ioerr.kind != EndOfFile {
77                     read_error::cond.raise(ioerr);
78                 }
79                 return None;
80             }
81         }
82     }
83
84     fn eof(&mut self) -> bool { fail!() }
85 }
86
87 impl Writer for TcpStream {
88     fn write(&mut self, buf: &[u8]) {
89         match (***self).write(buf) {
90             Ok(_) => (),
91             Err(ioerr) => io_error::cond.raise(ioerr),
92         }
93     }
94
95     fn flush(&mut self) { fail!() }
96 }
97
98 pub struct TcpListener(~RtioTcpListenerObject);
99
100 impl TcpListener {
101     pub fn bind(addr: SocketAddr) -> Option<TcpListener> {
102         let listener = unsafe {
103             let io: *mut IoFactoryObject = Local::unsafe_borrow();
104             (*io).tcp_bind(addr)
105         };
106         match listener {
107             Ok(l) => Some(TcpListener(l)),
108             Err(ioerr) => {
109                 io_error::cond.raise(ioerr);
110                 return None;
111             }
112         }
113     }
114
115     pub fn socket_name(&mut self) -> Option<SocketAddr> {
116         match (**self).socket_name() {
117             Ok(sn) => Some(sn),
118             Err(ioerr) => {
119                 rtdebug!("failed to get socket name: %?", ioerr);
120                 io_error::cond.raise(ioerr);
121                 None
122             }
123         }
124     }
125 }
126
127 impl Listener<TcpStream> for TcpListener {
128     fn accept(&mut self) -> Option<TcpStream> {
129         match (**self).accept() {
130             Ok(s) => Some(TcpStream::new(s)),
131             Err(ioerr) => {
132                 io_error::cond.raise(ioerr);
133                 return None;
134             }
135         }
136     }
137 }
138
139 #[cfg(test)]
140 mod test {
141     use super::*;
142     use cell::Cell;
143     use rt::test::*;
144     use rt::io::net::ip::{Ipv4Addr, SocketAddr};
145     use rt::io::*;
146     use prelude::*;
147
148     #[test] #[ignore]
149     fn bind_error() {
150         do run_in_newsched_task {
151             let mut called = false;
152             do io_error::cond.trap(|e| {
153                 assert!(e.kind == PermissionDenied);
154                 called = true;
155             }).inside {
156                 let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
157                 let listener = TcpListener::bind(addr);
158                 assert!(listener.is_none());
159             }
160             assert!(called);
161         }
162     }
163
164     #[test]
165     fn connect_error() {
166         do run_in_newsched_task {
167             let mut called = false;
168             do io_error::cond.trap(|e| {
169                 assert_eq!(e.kind, ConnectionRefused);
170                 called = true;
171             }).inside {
172                 let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
173                 let stream = TcpStream::connect(addr);
174                 assert!(stream.is_none());
175             }
176             assert!(called);
177         }
178     }
179
180     #[test]
181     fn smoke_test_ip4() {
182         do run_in_newsched_task {
183             let addr = next_test_ip4();
184
185             do spawntask {
186                 let mut listener = TcpListener::bind(addr);
187                 let mut stream = listener.accept();
188                 let mut buf = [0];
189                 stream.read(buf);
190                 assert!(buf[0] == 99);
191             }
192
193             do spawntask {
194                 let mut stream = TcpStream::connect(addr);
195                 stream.write([99]);
196             }
197         }
198     }
199
200     #[test]
201     fn smoke_test_ip6() {
202         do run_in_newsched_task {
203             let addr = next_test_ip6();
204
205             do spawntask {
206                 let mut listener = TcpListener::bind(addr);
207                 let mut stream = listener.accept();
208                 let mut buf = [0];
209                 stream.read(buf);
210                 assert!(buf[0] == 99);
211             }
212
213             do spawntask {
214                 let mut stream = TcpStream::connect(addr);
215                 stream.write([99]);
216             }
217         }
218     }
219
220     #[test]
221     fn read_eof_ip4() {
222         do run_in_newsched_task {
223             let addr = next_test_ip4();
224
225             do spawntask {
226                 let mut listener = TcpListener::bind(addr);
227                 let mut stream = listener.accept();
228                 let mut buf = [0];
229                 let nread = stream.read(buf);
230                 assert!(nread.is_none());
231             }
232
233             do spawntask {
234                 let _stream = TcpStream::connect(addr);
235                 // Close
236             }
237         }
238     }
239
240     #[test]
241     fn read_eof_ip6() {
242         do run_in_newsched_task {
243             let addr = next_test_ip6();
244
245             do spawntask {
246                 let mut listener = TcpListener::bind(addr);
247                 let mut stream = listener.accept();
248                 let mut buf = [0];
249                 let nread = stream.read(buf);
250                 assert!(nread.is_none());
251             }
252
253             do spawntask {
254                 let _stream = TcpStream::connect(addr);
255                 // Close
256             }
257         }
258     }
259
260     #[test]
261     fn read_eof_twice_ip4() {
262         do run_in_newsched_task {
263             let addr = next_test_ip4();
264
265             do spawntask {
266                 let mut listener = TcpListener::bind(addr);
267                 let mut stream = listener.accept();
268                 let mut buf = [0];
269                 let nread = stream.read(buf);
270                 assert!(nread.is_none());
271                 let nread = stream.read(buf);
272                 assert!(nread.is_none());
273             }
274
275             do spawntask {
276                 let _stream = TcpStream::connect(addr);
277                 // Close
278             }
279         }
280     }
281
282     #[test]
283     fn read_eof_twice_ip6() {
284         do run_in_newsched_task {
285             let addr = next_test_ip6();
286
287             do spawntask {
288                 let mut listener = TcpListener::bind(addr);
289                 let mut stream = listener.accept();
290                 let mut buf = [0];
291                 let nread = stream.read(buf);
292                 assert!(nread.is_none());
293                 let nread = stream.read(buf);
294                 assert!(nread.is_none());
295             }
296
297             do spawntask {
298                 let _stream = TcpStream::connect(addr);
299                 // Close
300             }
301         }
302     }
303
304     #[test]
305     fn write_close_ip4() {
306         do run_in_newsched_task {
307             let addr = next_test_ip4();
308
309             do spawntask {
310                 let mut listener = TcpListener::bind(addr);
311                 let mut stream = listener.accept();
312                 let buf = [0];
313                 loop {
314                     let mut stop = false;
315                     do io_error::cond.trap(|e| {
316                         // NB: ECONNRESET on linux, EPIPE on mac
317                         assert!(e.kind == ConnectionReset || e.kind == BrokenPipe);
318                         stop = true;
319                     }).inside {
320                         stream.write(buf);
321                     }
322                     if stop { break }
323                 }
324             }
325
326             do spawntask {
327                 let _stream = TcpStream::connect(addr);
328                 // Close
329             }
330         }
331     }
332
333     #[test]
334     fn write_close_ip6() {
335         do run_in_newsched_task {
336             let addr = next_test_ip6();
337
338             do spawntask {
339                 let mut listener = TcpListener::bind(addr);
340                 let mut stream = listener.accept();
341                 let buf = [0];
342                 loop {
343                     let mut stop = false;
344                     do io_error::cond.trap(|e| {
345                         // NB: ECONNRESET on linux, EPIPE on mac
346                         assert!(e.kind == ConnectionReset || e.kind == BrokenPipe);
347                         stop = true;
348                     }).inside {
349                         stream.write(buf);
350                     }
351                     if stop { break }
352                 }
353             }
354
355             do spawntask {
356                 let _stream = TcpStream::connect(addr);
357                 // Close
358             }
359         }
360     }
361
362     #[test]
363     fn multiple_connect_serial_ip4() {
364         do run_in_newsched_task {
365             let addr = next_test_ip4();
366             let max = 10;
367
368             do spawntask {
369                 let mut listener = TcpListener::bind(addr);
370                 do max.times {
371                     let mut stream = listener.accept();
372                     let mut buf = [0];
373                     stream.read(buf);
374                     assert_eq!(buf[0], 99);
375                 }
376             }
377
378             do spawntask {
379                 do max.times {
380                     let mut stream = TcpStream::connect(addr);
381                     stream.write([99]);
382                 }
383             }
384         }
385     }
386
387     #[test]
388     fn multiple_connect_serial_ip6() {
389         do run_in_newsched_task {
390             let addr = next_test_ip6();
391             let max = 10;
392
393             do spawntask {
394                 let mut listener = TcpListener::bind(addr);
395                 do max.times {
396                     let mut stream = listener.accept();
397                     let mut buf = [0];
398                     stream.read(buf);
399                     assert_eq!(buf[0], 99);
400                 }
401             }
402
403             do spawntask {
404                 do max.times {
405                     let mut stream = TcpStream::connect(addr);
406                     stream.write([99]);
407                 }
408             }
409         }
410     }
411
412     #[test]
413     fn multiple_connect_interleaved_greedy_schedule_ip4() {
414         do run_in_newsched_task {
415             let addr = next_test_ip4();
416             static MAX: int = 10;
417
418             do spawntask {
419                 let mut listener = TcpListener::bind(addr);
420                 for i in range(0, MAX) {
421                     let stream = Cell::new(listener.accept());
422                     rtdebug!("accepted");
423                     // Start another task to handle the connection
424                     do spawntask {
425                         let mut stream = stream.take();
426                         let mut buf = [0];
427                         stream.read(buf);
428                         assert!(buf[0] == i as u8);
429                         rtdebug!("read");
430                     }
431                 }
432             }
433
434             connect(0, addr);
435
436             fn connect(i: int, addr: SocketAddr) {
437                 if i == MAX { return }
438
439                 do spawntask {
440                     rtdebug!("connecting");
441                     let mut stream = TcpStream::connect(addr);
442                     // Connect again before writing
443                     connect(i + 1, addr);
444                     rtdebug!("writing");
445                     stream.write([i as u8]);
446                 }
447             }
448         }
449     }
450
451     #[test]
452     fn multiple_connect_interleaved_greedy_schedule_ip6() {
453         do run_in_newsched_task {
454             let addr = next_test_ip6();
455             static MAX: int = 10;
456
457             do spawntask {
458                 let mut listener = TcpListener::bind(addr);
459                 for i in range(0, MAX) {
460                     let stream = Cell::new(listener.accept());
461                     rtdebug!("accepted");
462                     // Start another task to handle the connection
463                     do spawntask {
464                         let mut stream = stream.take();
465                         let mut buf = [0];
466                         stream.read(buf);
467                         assert!(buf[0] == i as u8);
468                         rtdebug!("read");
469                     }
470                 }
471             }
472
473             connect(0, addr);
474
475             fn connect(i: int, addr: SocketAddr) {
476                 if i == MAX { return }
477
478                 do spawntask {
479                     rtdebug!("connecting");
480                     let mut stream = TcpStream::connect(addr);
481                     // Connect again before writing
482                     connect(i + 1, addr);
483                     rtdebug!("writing");
484                     stream.write([i as u8]);
485                 }
486             }
487         }
488     }
489
490     #[test]
491     fn multiple_connect_interleaved_lazy_schedule_ip4() {
492         do run_in_newsched_task {
493             let addr = next_test_ip4();
494             static MAX: int = 10;
495
496             do spawntask {
497                 let mut listener = TcpListener::bind(addr);
498                 for _ in range(0, MAX) {
499                     let stream = Cell::new(listener.accept());
500                     rtdebug!("accepted");
501                     // Start another task to handle the connection
502                     do spawntask_later {
503                         let mut stream = stream.take();
504                         let mut buf = [0];
505                         stream.read(buf);
506                         assert!(buf[0] == 99);
507                         rtdebug!("read");
508                     }
509                 }
510             }
511
512             connect(0, addr);
513
514             fn connect(i: int, addr: SocketAddr) {
515                 if i == MAX { return }
516
517                 do spawntask_later {
518                     rtdebug!("connecting");
519                     let mut stream = TcpStream::connect(addr);
520                     // Connect again before writing
521                     connect(i + 1, addr);
522                     rtdebug!("writing");
523                     stream.write([99]);
524                 }
525             }
526         }
527     }
528     #[test]
529     fn multiple_connect_interleaved_lazy_schedule_ip6() {
530         do run_in_newsched_task {
531             let addr = next_test_ip6();
532             static MAX: int = 10;
533
534             do spawntask {
535                 let mut listener = TcpListener::bind(addr);
536                 for _ in range(0, MAX) {
537                     let stream = Cell::new(listener.accept());
538                     rtdebug!("accepted");
539                     // Start another task to handle the connection
540                     do spawntask_later {
541                         let mut stream = stream.take();
542                         let mut buf = [0];
543                         stream.read(buf);
544                         assert!(buf[0] == 99);
545                         rtdebug!("read");
546                     }
547                 }
548             }
549
550             connect(0, addr);
551
552             fn connect(i: int, addr: SocketAddr) {
553                 if i == MAX { return }
554
555                 do spawntask_later {
556                     rtdebug!("connecting");
557                     let mut stream = TcpStream::connect(addr);
558                     // Connect again before writing
559                     connect(i + 1, addr);
560                     rtdebug!("writing");
561                     stream.write([99]);
562                 }
563             }
564         }
565     }
566
567     #[cfg(test)]
568     fn socket_name(addr: SocketAddr) {
569         do run_in_newsched_task {
570             do spawntask {
571                 let listener = TcpListener::bind(addr);
572
573                 assert!(listener.is_some());
574                 let mut listener = listener.unwrap();
575
576                 // Make sure socket_name gives
577                 // us the socket we binded to.
578                 let so_name = listener.socket_name();
579                 assert!(so_name.is_some());
580                 assert_eq!(addr, so_name.unwrap());
581
582             }
583         }
584     }
585
586     #[cfg(test)]
587     fn peer_name(addr: SocketAddr) {
588         do run_in_newsched_task {
589             do spawntask {
590                 let mut listener = TcpListener::bind(addr);
591
592                 listener.accept();
593             }
594
595             do spawntask {
596                 let stream = TcpStream::connect(addr);
597
598                 assert!(stream.is_some());
599                 let mut stream = stream.unwrap();
600
601                 // Make sure peer_name gives us the
602                 // address/port of the peer we've
603                 // connected to.
604                 let peer_name = stream.peer_name();
605                 assert!(peer_name.is_some());
606                 assert_eq!(addr, peer_name.unwrap());
607             }
608         }
609     }
610
611     #[test]
612     fn socket_and_peer_name_ip4() {
613         peer_name(next_test_ip4());
614         socket_name(next_test_ip4());
615     }
616
617     #[test]
618     fn socket_and_peer_name_ip6() {
619         // XXX: peer name is not consistent
620         //peer_name(next_test_ip6());
621         socket_name(next_test_ip6());
622     }
623
624 }