]> git.lizzy.rs Git - rust.git/blob - src/libstd/io/net/unix.rs
c5ddda9945de1f82b86b287b468fba37647228df
[rust.git] / src / libstd / io / net / unix.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /*!
12
13 Named pipes
14
15 This module contains the ability to communicate over named pipes with
16 synchronous I/O. On windows, this corresponds to talking over a Named Pipe,
17 while on Unix it corresponds to UNIX domain sockets.
18
19 These pipes are similar to TCP in the sense that you can have both a stream to a
20 server and a server itself. The server provided accepts other `UnixStream`
21 instances as clients.
22
23 */
24
25 #![allow(missing_doc)]
26
27 use prelude::*;
28
29 use c_str::ToCStr;
30 use clone::Clone;
31 use io::{Listener, Acceptor, Reader, Writer, IoResult, IoError};
32 use kinds::Send;
33 use owned::Box;
34 use rt::rtio::{IoFactory, LocalIo, RtioUnixListener};
35 use rt::rtio::{RtioUnixAcceptor, RtioPipe};
36
37 /// A stream which communicates over a named pipe.
38 pub struct UnixStream {
39     obj: Box<RtioPipe + Send>,
40 }
41
42 impl UnixStream {
43     /// Connect to a pipe named by `path`. This will attempt to open a
44     /// connection to the underlying socket.
45     ///
46     /// The returned stream will be closed when the object falls out of scope.
47     ///
48     /// # Example
49     ///
50     /// ```rust
51     /// # #![allow(unused_must_use)]
52     /// use std::io::net::unix::UnixStream;
53     ///
54     /// let server = Path::new("path/to/my/socket");
55     /// let mut stream = UnixStream::connect(&server);
56     /// stream.write([1, 2, 3]);
57     /// ```
58     pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> {
59         LocalIo::maybe_raise(|io| {
60             io.unix_connect(&path.to_c_str(), None).map(|p| UnixStream { obj: p })
61         }).map_err(IoError::from_rtio_error)
62     }
63
64     /// Connect to a pipe named by `path`, timing out if the specified number of
65     /// milliseconds.
66     ///
67     /// This function is similar to `connect`, except that if `timeout_ms`
68     /// elapses the function will return an error of kind `TimedOut`.
69     #[experimental = "the timeout argument is likely to change types"]
70     pub fn connect_timeout<P: ToCStr>(path: &P,
71                                       timeout_ms: u64) -> IoResult<UnixStream> {
72         LocalIo::maybe_raise(|io| {
73             let s = io.unix_connect(&path.to_c_str(), Some(timeout_ms));
74             s.map(|p| UnixStream { obj: p })
75         }).map_err(IoError::from_rtio_error)
76     }
77
78
79     /// Closes the reading half of this connection.
80     ///
81     /// This method will close the reading portion of this connection, causing
82     /// all pending and future reads to immediately return with an error.
83     ///
84     /// Note that this method affects all cloned handles associated with this
85     /// stream, not just this one handle.
86     pub fn close_read(&mut self) -> IoResult<()> {
87         self.obj.close_read().map_err(IoError::from_rtio_error)
88     }
89
90     /// Closes the writing half of this connection.
91     ///
92     /// This method will close the writing portion of this connection, causing
93     /// all pending and future writes to immediately return with an error.
94     ///
95     /// Note that this method affects all cloned handles associated with this
96     /// stream, not just this one handle.
97     pub fn close_write(&mut self) -> IoResult<()> {
98         self.obj.close_write().map_err(IoError::from_rtio_error)
99     }
100
101     /// Sets the read/write timeout for this socket.
102     ///
103     /// For more information, see `TcpStream::set_timeout`
104     #[experimental = "the timeout argument may change in type and value"]
105     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
106         self.obj.set_timeout(timeout_ms)
107     }
108
109     /// Sets the read timeout for this socket.
110     ///
111     /// For more information, see `TcpStream::set_timeout`
112     #[experimental = "the timeout argument may change in type and value"]
113     pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
114         self.obj.set_read_timeout(timeout_ms)
115     }
116
117     /// Sets the write timeout for this socket.
118     ///
119     /// For more information, see `TcpStream::set_timeout`
120     #[experimental = "the timeout argument may change in type and value"]
121     pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
122         self.obj.set_write_timeout(timeout_ms)
123     }
124 }
125
126 impl Clone for UnixStream {
127     fn clone(&self) -> UnixStream {
128         UnixStream { obj: self.obj.clone() }
129     }
130 }
131
132 impl Reader for UnixStream {
133     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
134         self.obj.read(buf).map_err(IoError::from_rtio_error)
135     }
136 }
137
138 impl Writer for UnixStream {
139     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
140         self.obj.write(buf).map_err(IoError::from_rtio_error)
141     }
142 }
143
144 /// A value that can listen for incoming named pipe connection requests.
145 pub struct UnixListener {
146     /// The internal, opaque runtime Unix listener.
147     obj: Box<RtioUnixListener + Send>,
148 }
149
150 impl UnixListener {
151
152     /// Creates a new listener, ready to receive incoming connections on the
153     /// specified socket. The server will be named by `path`.
154     ///
155     /// This listener will be closed when it falls out of scope.
156     ///
157     /// # Example
158     ///
159     /// ```
160     /// # fn main() {}
161     /// # fn foo() {
162     /// # #![allow(unused_must_use)]
163     /// use std::io::net::unix::UnixListener;
164     /// use std::io::{Listener, Acceptor};
165     ///
166     /// let server = Path::new("/path/to/my/socket");
167     /// let stream = UnixListener::bind(&server);
168     /// for mut client in stream.listen().incoming() {
169     ///     client.write([1, 2, 3, 4]);
170     /// }
171     /// # }
172     /// ```
173     pub fn bind<P: ToCStr>(path: &P) -> IoResult<UnixListener> {
174         LocalIo::maybe_raise(|io| {
175             io.unix_bind(&path.to_c_str()).map(|s| UnixListener { obj: s })
176         }).map_err(IoError::from_rtio_error)
177     }
178 }
179
180 impl Listener<UnixStream, UnixAcceptor> for UnixListener {
181     fn listen(self) -> IoResult<UnixAcceptor> {
182         self.obj.listen().map(|obj| {
183             UnixAcceptor { obj: obj }
184         }).map_err(IoError::from_rtio_error)
185     }
186 }
187
188 /// A value that can accept named pipe connections, returned from `listen()`.
189 pub struct UnixAcceptor {
190     /// The internal, opaque runtime Unix acceptor.
191     obj: Box<RtioUnixAcceptor + Send>,
192 }
193
194 impl UnixAcceptor {
195     /// Sets a timeout for this acceptor, after which accept() will no longer
196     /// block indefinitely.
197     ///
198     /// The argument specified is the amount of time, in milliseconds, into the
199     /// future after which all invocations of accept() will not block (and any
200     /// pending invocation will return). A value of `None` will clear any
201     /// existing timeout.
202     ///
203     /// When using this method, it is likely necessary to reset the timeout as
204     /// appropriate, the timeout specified is specific to this object, not
205     /// specific to the next request.
206     #[experimental = "the name and arguments to this function are likely \
207                       to change"]
208     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
209         self.obj.set_timeout(timeout_ms)
210     }
211 }
212
213 impl Acceptor<UnixStream> for UnixAcceptor {
214     fn accept(&mut self) -> IoResult<UnixStream> {
215         self.obj.accept().map(|s| {
216             UnixStream { obj: s }
217         }).map_err(IoError::from_rtio_error)
218     }
219 }
220
221 #[cfg(test)]
222 #[allow(experimental)]
223 mod tests {
224     use prelude::*;
225     use super::*;
226     use io::*;
227     use io::test::*;
228
229     pub fn smalltest(server: proc(UnixStream):Send, client: proc(UnixStream):Send) {
230         let path1 = next_test_unix();
231         let path2 = path1.clone();
232
233         let mut acceptor = UnixListener::bind(&path1).listen();
234
235         spawn(proc() {
236             match UnixStream::connect(&path2) {
237                 Ok(c) => client(c),
238                 Err(e) => fail!("failed connect: {}", e),
239             }
240         });
241
242         match acceptor.accept() {
243             Ok(c) => server(c),
244             Err(e) => fail!("failed accept: {}", e),
245         }
246     }
247
248     iotest!(fn bind_error() {
249         let path = "path/to/nowhere";
250         match UnixListener::bind(&path) {
251             Ok(..) => fail!(),
252             Err(e) => {
253                 assert!(e.kind == PermissionDenied || e.kind == FileNotFound ||
254                         e.kind == InvalidInput);
255             }
256         }
257     })
258
259     iotest!(fn connect_error() {
260         let path = if cfg!(windows) {
261             r"\\.\pipe\this_should_not_exist_ever"
262         } else {
263             "path/to/nowhere"
264         };
265         match UnixStream::connect(&path) {
266             Ok(..) => fail!(),
267             Err(e) => {
268                 assert!(e.kind == FileNotFound || e.kind == OtherIoError);
269             }
270         }
271     })
272
273     iotest!(fn smoke() {
274         smalltest(proc(mut server) {
275             let mut buf = [0];
276             server.read(buf).unwrap();
277             assert!(buf[0] == 99);
278         }, proc(mut client) {
279             client.write([99]).unwrap();
280         })
281     })
282
283     iotest!(fn read_eof() {
284         smalltest(proc(mut server) {
285             let mut buf = [0];
286             assert!(server.read(buf).is_err());
287             assert!(server.read(buf).is_err());
288         }, proc(_client) {
289             // drop the client
290         })
291     } #[ignore(cfg(windows))]) // FIXME(#12516)
292
293     iotest!(fn write_begone() {
294         smalltest(proc(mut server) {
295             let buf = [0];
296             loop {
297                 match server.write(buf) {
298                     Ok(..) => {}
299                     Err(e) => {
300                         assert!(e.kind == BrokenPipe ||
301                                 e.kind == NotConnected ||
302                                 e.kind == ConnectionReset,
303                                 "unknown error {:?}", e);
304                         break;
305                     }
306                 }
307             }
308         }, proc(_client) {
309             // drop the client
310         })
311     })
312
313     iotest!(fn accept_lots() {
314         let times = 10;
315         let path1 = next_test_unix();
316         let path2 = path1.clone();
317
318         let mut acceptor = match UnixListener::bind(&path1).listen() {
319             Ok(a) => a,
320             Err(e) => fail!("failed listen: {}", e),
321         };
322
323         spawn(proc() {
324             for _ in range(0u, times) {
325                 let mut stream = UnixStream::connect(&path2);
326                 match stream.write([100]) {
327                     Ok(..) => {}
328                     Err(e) => fail!("failed write: {}", e)
329                 }
330             }
331         });
332
333         for _ in range(0, times) {
334             let mut client = acceptor.accept();
335             let mut buf = [0];
336             match client.read(buf) {
337                 Ok(..) => {}
338                 Err(e) => fail!("failed read/accept: {}", e),
339             }
340             assert_eq!(buf[0], 100);
341         }
342     })
343
344     #[cfg(unix)]
345     iotest!(fn path_exists() {
346         let path = next_test_unix();
347         let _acceptor = UnixListener::bind(&path).listen();
348         assert!(path.exists());
349     })
350
351     iotest!(fn unix_clone_smoke() {
352         let addr = next_test_unix();
353         let mut acceptor = UnixListener::bind(&addr).listen();
354
355         spawn(proc() {
356             let mut s = UnixStream::connect(&addr);
357             let mut buf = [0, 0];
358             debug!("client reading");
359             assert_eq!(s.read(buf), Ok(1));
360             assert_eq!(buf[0], 1);
361             debug!("client writing");
362             s.write([2]).unwrap();
363             debug!("client dropping");
364         });
365
366         let mut s1 = acceptor.accept().unwrap();
367         let s2 = s1.clone();
368
369         let (tx1, rx1) = channel();
370         let (tx2, rx2) = channel();
371         spawn(proc() {
372             let mut s2 = s2;
373             rx1.recv();
374             debug!("writer writing");
375             s2.write([1]).unwrap();
376             debug!("writer done");
377             tx2.send(());
378         });
379         tx1.send(());
380         let mut buf = [0, 0];
381         debug!("reader reading");
382         assert_eq!(s1.read(buf), Ok(1));
383         debug!("reader done");
384         rx2.recv();
385     })
386
387     iotest!(fn unix_clone_two_read() {
388         let addr = next_test_unix();
389         let mut acceptor = UnixListener::bind(&addr).listen();
390         let (tx1, rx) = channel();
391         let tx2 = tx1.clone();
392
393         spawn(proc() {
394             let mut s = UnixStream::connect(&addr);
395             s.write([1]).unwrap();
396             rx.recv();
397             s.write([2]).unwrap();
398             rx.recv();
399         });
400
401         let mut s1 = acceptor.accept().unwrap();
402         let s2 = s1.clone();
403
404         let (done, rx) = channel();
405         spawn(proc() {
406             let mut s2 = s2;
407             let mut buf = [0, 0];
408             s2.read(buf).unwrap();
409             tx2.send(());
410             done.send(());
411         });
412         let mut buf = [0, 0];
413         s1.read(buf).unwrap();
414         tx1.send(());
415
416         rx.recv();
417     })
418
419     iotest!(fn unix_clone_two_write() {
420         let addr = next_test_unix();
421         let mut acceptor = UnixListener::bind(&addr).listen();
422
423         spawn(proc() {
424             let mut s = UnixStream::connect(&addr);
425             let mut buf = [0, 1];
426             s.read(buf).unwrap();
427             s.read(buf).unwrap();
428         });
429
430         let mut s1 = acceptor.accept().unwrap();
431         let s2 = s1.clone();
432
433         let (tx, rx) = channel();
434         spawn(proc() {
435             let mut s2 = s2;
436             s2.write([1]).unwrap();
437             tx.send(());
438         });
439         s1.write([2]).unwrap();
440
441         rx.recv();
442     })
443
444     iotest!(fn drop_removes_listener_path() {
445         let path = next_test_unix();
446         let l = UnixListener::bind(&path).unwrap();
447         assert!(path.exists());
448         drop(l);
449         assert!(!path.exists());
450     } #[cfg(not(windows))])
451
452     iotest!(fn drop_removes_acceptor_path() {
453         let path = next_test_unix();
454         let l = UnixListener::bind(&path).unwrap();
455         assert!(path.exists());
456         drop(l.listen().unwrap());
457         assert!(!path.exists());
458     } #[cfg(not(windows))])
459
460     iotest!(fn accept_timeout() {
461         let addr = next_test_unix();
462         let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap();
463
464         a.set_timeout(Some(10));
465
466         // Make sure we time out once and future invocations also time out
467         let err = a.accept().err().unwrap();
468         assert_eq!(err.kind, TimedOut);
469         let err = a.accept().err().unwrap();
470         assert_eq!(err.kind, TimedOut);
471
472         // Also make sure that even though the timeout is expired that we will
473         // continue to receive any pending connections.
474         let (tx, rx) = channel();
475         let addr2 = addr.clone();
476         spawn(proc() {
477             tx.send(UnixStream::connect(&addr2).unwrap());
478         });
479         let l = rx.recv();
480         for i in range(0u, 1001) {
481             match a.accept() {
482                 Ok(..) => break,
483                 Err(ref e) if e.kind == TimedOut => {}
484                 Err(e) => fail!("error: {}", e),
485             }
486             ::task::deschedule();
487             if i == 1000 { fail!("should have a pending connection") }
488         }
489         drop(l);
490
491         // Unset the timeout and make sure that this always blocks.
492         a.set_timeout(None);
493         let addr2 = addr.clone();
494         spawn(proc() {
495             drop(UnixStream::connect(&addr2).unwrap());
496         });
497         a.accept().unwrap();
498     })
499
500     iotest!(fn connect_timeout_error() {
501         let addr = next_test_unix();
502         assert!(UnixStream::connect_timeout(&addr, 100).is_err());
503     })
504
505     iotest!(fn connect_timeout_success() {
506         let addr = next_test_unix();
507         let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
508         assert!(UnixStream::connect_timeout(&addr, 100).is_ok());
509     })
510
511     iotest!(fn close_readwrite_smoke() {
512         let addr = next_test_unix();
513         let a = UnixListener::bind(&addr).listen().unwrap();
514         let (_tx, rx) = channel::<()>();
515         spawn(proc() {
516             let mut a = a;
517             let _s = a.accept().unwrap();
518             let _ = rx.recv_opt();
519         });
520
521         let mut b = [0];
522         let mut s = UnixStream::connect(&addr).unwrap();
523         let mut s2 = s.clone();
524
525         // closing should prevent reads/writes
526         s.close_write().unwrap();
527         assert!(s.write([0]).is_err());
528         s.close_read().unwrap();
529         assert!(s.read(b).is_err());
530
531         // closing should affect previous handles
532         assert!(s2.write([0]).is_err());
533         assert!(s2.read(b).is_err());
534
535         // closing should affect new handles
536         let mut s3 = s.clone();
537         assert!(s3.write([0]).is_err());
538         assert!(s3.read(b).is_err());
539
540         // make sure these don't die
541         let _ = s2.close_read();
542         let _ = s2.close_write();
543         let _ = s3.close_read();
544         let _ = s3.close_write();
545     })
546
547     iotest!(fn close_read_wakes_up() {
548         let addr = next_test_unix();
549         let a = UnixListener::bind(&addr).listen().unwrap();
550         let (_tx, rx) = channel::<()>();
551         spawn(proc() {
552             let mut a = a;
553             let _s = a.accept().unwrap();
554             let _ = rx.recv_opt();
555         });
556
557         let mut s = UnixStream::connect(&addr).unwrap();
558         let s2 = s.clone();
559         let (tx, rx) = channel();
560         spawn(proc() {
561             let mut s2 = s2;
562             assert!(s2.read([0]).is_err());
563             tx.send(());
564         });
565         // this should wake up the child task
566         s.close_read().unwrap();
567
568         // this test will never finish if the child doesn't wake up
569         rx.recv();
570     })
571
572     iotest!(fn readwrite_timeouts() {
573         let addr = next_test_unix();
574         let mut a = UnixListener::bind(&addr).listen().unwrap();
575         let (tx, rx) = channel::<()>();
576         spawn(proc() {
577             let mut s = UnixStream::connect(&addr).unwrap();
578             rx.recv();
579             assert!(s.write([0]).is_ok());
580             let _ = rx.recv_opt();
581         });
582
583         let mut s = a.accept().unwrap();
584         s.set_timeout(Some(20));
585         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
586         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
587
588         s.set_timeout(Some(20));
589         for i in range(0u, 1001) {
590             match s.write([0, .. 128 * 1024]) {
591                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
592                 Err(IoError { kind: TimedOut, .. }) => break,
593                 Err(e) => fail!("{}", e),
594            }
595            if i == 1000 { fail!("should have filled up?!"); }
596         }
597
598         // I'm not sure as to why, but apparently the write on windows always
599         // succeeds after the previous timeout. Who knows?
600         if !cfg!(windows) {
601             assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
602         }
603
604         tx.send(());
605         s.set_timeout(None);
606         assert_eq!(s.read([0, 0]), Ok(1));
607     })
608
609     iotest!(fn read_timeouts() {
610         let addr = next_test_unix();
611         let mut a = UnixListener::bind(&addr).listen().unwrap();
612         let (tx, rx) = channel::<()>();
613         spawn(proc() {
614             let mut s = UnixStream::connect(&addr).unwrap();
615             rx.recv();
616             let mut amt = 0;
617             while amt < 100 * 128 * 1024 {
618                 match s.read([0, ..128 * 1024]) {
619                     Ok(n) => { amt += n; }
620                     Err(e) => fail!("{}", e),
621                 }
622             }
623             let _ = rx.recv_opt();
624         });
625
626         let mut s = a.accept().unwrap();
627         s.set_read_timeout(Some(20));
628         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
629         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
630
631         tx.send(());
632         for _ in range(0u, 100) {
633             assert!(s.write([0, ..128 * 1024]).is_ok());
634         }
635     })
636
637     iotest!(fn write_timeouts() {
638         let addr = next_test_unix();
639         let mut a = UnixListener::bind(&addr).listen().unwrap();
640         let (tx, rx) = channel::<()>();
641         spawn(proc() {
642             let mut s = UnixStream::connect(&addr).unwrap();
643             rx.recv();
644             assert!(s.write([0]).is_ok());
645             let _ = rx.recv_opt();
646         });
647
648         let mut s = a.accept().unwrap();
649         s.set_write_timeout(Some(20));
650         for i in range(0u, 1001) {
651             match s.write([0, .. 128 * 1024]) {
652                 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
653                 Err(IoError { kind: TimedOut, .. }) => break,
654                 Err(e) => fail!("{}", e),
655            }
656            if i == 1000 { fail!("should have filled up?!"); }
657         }
658
659         tx.send(());
660         assert!(s.read([0]).is_ok());
661     })
662
663     iotest!(fn timeout_concurrent_read() {
664         let addr = next_test_unix();
665         let mut a = UnixListener::bind(&addr).listen().unwrap();
666         let (tx, rx) = channel::<()>();
667         spawn(proc() {
668             let mut s = UnixStream::connect(&addr).unwrap();
669             rx.recv();
670             assert!(s.write([0]).is_ok());
671             let _ = rx.recv_opt();
672         });
673
674         let mut s = a.accept().unwrap();
675         let s2 = s.clone();
676         let (tx2, rx2) = channel();
677         spawn(proc() {
678             let mut s2 = s2;
679             assert!(s2.read([0]).is_ok());
680             tx2.send(());
681         });
682
683         s.set_read_timeout(Some(20));
684         assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
685         tx.send(());
686
687         rx2.recv();
688     })
689 }