1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
13 //! This module contains the ability to communicate over named pipes with
14 //! synchronous I/O. On windows, this corresponds to talking over a Named Pipe,
15 //! while on Unix it corresponds to UNIX domain sockets.
17 //! These pipes are similar to TCP in the sense that you can have both a stream to a
18 //! server and a server itself. The server provided accepts other `UnixStream`
19 //! instances as clients.
21 #![allow(missing_docs)]
26 use path::BytesContainer;
27 use io::{Listener, Acceptor, IoResult, TimedOut, standard_error};
28 use sys::pipe::UnixAcceptor as UnixAcceptorImp;
29 use sys::pipe::UnixListener as UnixListenerImp;
30 use sys::pipe::UnixStream as UnixStreamImp;
35 /// A stream which communicates over a named pipe.
36 pub struct UnixStream {
42 /// Connect to a pipe named by `path`. This will attempt to open a
43 /// connection to the underlying socket.
45 /// The returned stream will be closed when the object falls out of scope.
50 /// # #![allow(unused_must_use)]
51 /// use std::io::net::pipe::UnixStream;
53 /// let server = Path::new("path/to/my/socket");
54 /// let mut stream = UnixStream::connect(&server);
55 /// stream.write(&[1, 2, 3]);
57 pub fn connect<P: BytesContainer>(path: P) -> IoResult<UnixStream> {
58 let path = CString::from_slice(path.container_as_bytes());
59 UnixStreamImp::connect(&path, None)
60 .map(|inner| UnixStream { inner: inner })
63 /// Connect to a pipe named by `path`, timing out if the specified number of
66 /// This function is similar to `connect`, except that if `timeout`
67 /// elapses the function will return an error of kind `TimedOut`.
69 /// If a `timeout` with zero or negative duration is specified then
70 /// the function returns `Err`, with the error kind set to `TimedOut`.
71 #[experimental = "the timeout argument is likely to change types"]
72 pub fn connect_timeout<P>(path: P, timeout: Duration)
73 -> IoResult<UnixStream>
74 where P: BytesContainer {
75 if timeout <= Duration::milliseconds(0) {
76 return Err(standard_error(TimedOut));
79 let path = CString::from_slice(path.container_as_bytes());
80 UnixStreamImp::connect(&path, Some(timeout.num_milliseconds() as u64))
81 .map(|inner| UnixStream { inner: inner })
85 /// Closes the reading half of this connection.
87 /// This method will close the reading portion of this connection, causing
88 /// all pending and future reads to immediately return with an error.
90 /// Note that this method affects all cloned handles associated with this
91 /// stream, not just this one handle.
92 pub fn close_read(&mut self) -> IoResult<()> {
93 self.inner.close_read()
96 /// Closes the writing half of this connection.
98 /// This method will close the writing portion of this connection, causing
99 /// all pending and future writes to immediately return with an error.
101 /// Note that this method affects all cloned handles associated with this
102 /// stream, not just this one handle.
103 pub fn close_write(&mut self) -> IoResult<()> {
104 self.inner.close_write()
107 /// Sets the read/write timeout for this socket.
109 /// For more information, see `TcpStream::set_timeout`
110 #[experimental = "the timeout argument may change in type and value"]
111 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
112 self.inner.set_timeout(timeout_ms)
115 /// Sets the read timeout for this socket.
117 /// For more information, see `TcpStream::set_timeout`
118 #[experimental = "the timeout argument may change in type and value"]
119 pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
120 self.inner.set_read_timeout(timeout_ms)
123 /// Sets the write timeout for this socket.
125 /// For more information, see `TcpStream::set_timeout`
126 #[experimental = "the timeout argument may change in type and value"]
127 pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
128 self.inner.set_write_timeout(timeout_ms)
132 impl Clone for UnixStream {
133 fn clone(&self) -> UnixStream {
134 UnixStream { inner: self.inner.clone() }
138 impl Reader for UnixStream {
139 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
144 impl Writer for UnixStream {
145 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
146 self.inner.write(buf)
150 impl sys_common::AsInner<UnixStreamImp> for UnixStream {
151 fn as_inner(&self) -> &UnixStreamImp {
156 /// A value that can listen for incoming named pipe connection requests.
157 pub struct UnixListener {
158 /// The internal, opaque runtime Unix listener.
159 inner: UnixListenerImp,
163 /// Creates a new listener, ready to receive incoming connections on the
164 /// specified socket. The server will be named by `path`.
166 /// This listener will be closed when it falls out of scope.
173 /// # #![allow(unused_must_use)]
174 /// use std::io::net::pipe::UnixListener;
175 /// use std::io::{Listener, Acceptor};
177 /// let server = Path::new("/path/to/my/socket");
178 /// let stream = UnixListener::bind(&server);
179 /// for mut client in stream.listen().incoming() {
180 /// client.write(&[1, 2, 3, 4]);
184 pub fn bind<P: BytesContainer>(path: P) -> IoResult<UnixListener> {
185 let path = CString::from_slice(path.container_as_bytes());
186 UnixListenerImp::bind(&path)
187 .map(|inner| UnixListener { inner: inner })
191 impl Listener<UnixStream, UnixAcceptor> for UnixListener {
192 fn listen(self) -> IoResult<UnixAcceptor> {
194 .map(|inner| UnixAcceptor { inner: inner })
198 impl sys_common::AsInner<UnixListenerImp> for UnixListener {
199 fn as_inner(&self) -> &UnixListenerImp {
204 /// A value that can accept named pipe connections, returned from `listen()`.
205 pub struct UnixAcceptor {
206 /// The internal, opaque runtime Unix acceptor.
207 inner: UnixAcceptorImp
211 /// Sets a timeout for this acceptor, after which accept() will no longer
212 /// block indefinitely.
214 /// The argument specified is the amount of time, in milliseconds, into the
215 /// future after which all invocations of accept() will not block (and any
216 /// pending invocation will return). A value of `None` will clear any
217 /// existing timeout.
219 /// When using this method, it is likely necessary to reset the timeout as
220 /// appropriate, the timeout specified is specific to this object, not
221 /// specific to the next request.
222 #[experimental = "the name and arguments to this function are likely \
224 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
225 self.inner.set_timeout(timeout_ms)
228 /// Closes the accepting capabilities of this acceptor.
230 /// This function has the same semantics as `TcpAcceptor::close_accept`, and
231 /// more information can be found in that documentation.
233 pub fn close_accept(&mut self) -> IoResult<()> {
234 self.inner.close_accept()
238 impl Acceptor<UnixStream> for UnixAcceptor {
239 fn accept(&mut self) -> IoResult<UnixStream> {
240 self.inner.accept().map(|s| {
241 UnixStream { inner: s }
246 impl Clone for UnixAcceptor {
247 /// Creates a new handle to this unix acceptor, allowing for simultaneous
250 /// The underlying unix acceptor will not be closed until all handles to the
251 /// acceptor have been deallocated. Incoming connections will be received on
252 /// at most once acceptor, the same connection will not be accepted twice.
254 /// The `close_accept` method will shut down *all* acceptors cloned from the
255 /// same original acceptor, whereas the `set_timeout` method only affects
256 /// the selector that it is called on.
258 /// This function is useful for creating a handle to invoke `close_accept`
259 /// on to wake up any other task blocked in `accept`.
260 fn clone(&self) -> UnixAcceptor {
261 UnixAcceptor { inner: self.inner.clone() }
265 impl sys_common::AsInner<UnixAcceptorImp> for UnixAcceptor {
266 fn as_inner(&self) -> &UnixAcceptorImp {
275 use io::fs::PathExtensions;
276 use io::{EndOfFile, TimedOut, ShortWrite, IoError, ConnectionReset};
277 use io::{NotConnected, BrokenPipe, FileNotFound, InvalidInput, OtherIoError};
278 use io::{PermissionDenied, Acceptor, Listener};
281 use sync::mpsc::channel;
285 pub fn smalltest<F,G>(server: F, client: G)
286 where F : FnOnce(UnixStream), F : Send,
287 G : FnOnce(UnixStream), G : Send
289 let path1 = next_test_unix();
290 let path2 = path1.clone();
292 let mut acceptor = UnixListener::bind(&path1).listen();
294 let _t = Thread::spawn(move|| {
295 match UnixStream::connect(&path2) {
297 Err(e) => panic!("failed connect: {}", e),
301 match acceptor.accept() {
303 Err(e) => panic!("failed accept: {}", e),
309 let path = "path/to/nowhere";
310 match UnixListener::bind(&path) {
313 assert!(e.kind == PermissionDenied || e.kind == FileNotFound ||
314 e.kind == InvalidInput);
321 let path = if cfg!(windows) {
322 r"\\.\pipe\this_should_not_exist_ever"
326 match UnixStream::connect(&path) {
329 assert!(e.kind == FileNotFound || e.kind == OtherIoError);
336 smalltest(move |mut server| {
338 server.read(&mut buf).unwrap();
339 assert!(buf[0] == 99);
340 }, move|mut client| {
341 client.write(&[99]).unwrap();
345 #[cfg_attr(windows, ignore)] // FIXME(#12516)
348 smalltest(move|mut server| {
350 assert!(server.read(&mut buf).is_err());
351 assert!(server.read(&mut buf).is_err());
359 smalltest(move|mut server| {
362 match server.write(&buf) {
365 assert!(e.kind == BrokenPipe ||
366 e.kind == NotConnected ||
367 e.kind == ConnectionReset,
368 "unknown error {}", e);
381 let path1 = next_test_unix();
382 let path2 = path1.clone();
384 let mut acceptor = match UnixListener::bind(&path1).listen() {
386 Err(e) => panic!("failed listen: {}", e),
389 let _t = Thread::spawn(move|| {
390 for _ in range(0u, times) {
391 let mut stream = UnixStream::connect(&path2);
392 match stream.write(&[100]) {
394 Err(e) => panic!("failed write: {}", e)
399 for _ in range(0, times) {
400 let mut client = acceptor.accept();
402 match client.read(&mut buf) {
404 Err(e) => panic!("failed read/accept: {}", e),
406 assert_eq!(buf[0], 100);
413 let path = next_test_unix();
414 let _acceptor = UnixListener::bind(&path).listen();
415 assert!(path.exists());
419 fn unix_clone_smoke() {
420 let addr = next_test_unix();
421 let mut acceptor = UnixListener::bind(&addr).listen();
423 let _t = Thread::spawn(move|| {
424 let mut s = UnixStream::connect(&addr);
425 let mut buf = [0, 0];
426 debug!("client reading");
427 assert_eq!(s.read(&mut buf), Ok(1));
428 assert_eq!(buf[0], 1);
429 debug!("client writing");
430 s.write(&[2]).unwrap();
431 debug!("client dropping");
434 let mut s1 = acceptor.accept().unwrap();
437 let (tx1, rx1) = channel();
438 let (tx2, rx2) = channel();
439 let _t = Thread::spawn(move|| {
442 debug!("writer writing");
443 s2.write(&[1]).unwrap();
444 debug!("writer done");
445 tx2.send(()).unwrap();
447 tx1.send(()).unwrap();
448 let mut buf = [0, 0];
449 debug!("reader reading");
450 assert_eq!(s1.read(&mut buf), Ok(1));
451 debug!("reader done");
456 fn unix_clone_two_read() {
457 let addr = next_test_unix();
458 let mut acceptor = UnixListener::bind(&addr).listen();
459 let (tx1, rx) = channel();
460 let tx2 = tx1.clone();
462 let _t = Thread::spawn(move|| {
463 let mut s = UnixStream::connect(&addr);
464 s.write(&[1]).unwrap();
466 s.write(&[2]).unwrap();
470 let mut s1 = acceptor.accept().unwrap();
473 let (done, rx) = channel();
474 let _t = Thread::spawn(move|| {
476 let mut buf = [0, 0];
477 s2.read(&mut buf).unwrap();
478 tx2.send(()).unwrap();
479 done.send(()).unwrap();
481 let mut buf = [0, 0];
482 s1.read(&mut buf).unwrap();
483 tx1.send(()).unwrap();
489 fn unix_clone_two_write() {
490 let addr = next_test_unix();
491 let mut acceptor = UnixListener::bind(&addr).listen();
493 let _t = Thread::spawn(move|| {
494 let mut s = UnixStream::connect(&addr);
495 let buf = &mut [0, 1];
496 s.read(buf).unwrap();
497 s.read(buf).unwrap();
500 let mut s1 = acceptor.accept().unwrap();
503 let (tx, rx) = channel();
504 let _t = Thread::spawn(move|| {
506 s2.write(&[1]).unwrap();
507 tx.send(()).unwrap();
509 s1.write(&[2]).unwrap();
516 fn drop_removes_listener_path() {
517 let path = next_test_unix();
518 let l = UnixListener::bind(&path).unwrap();
519 assert!(path.exists());
521 assert!(!path.exists());
526 fn drop_removes_acceptor_path() {
527 let path = next_test_unix();
528 let l = UnixListener::bind(&path).unwrap();
529 assert!(path.exists());
530 drop(l.listen().unwrap());
531 assert!(!path.exists());
535 fn accept_timeout() {
536 let addr = next_test_unix();
537 let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap();
539 a.set_timeout(Some(10));
541 // Make sure we time out once and future invocations also time out
542 let err = a.accept().err().unwrap();
543 assert_eq!(err.kind, TimedOut);
544 let err = a.accept().err().unwrap();
545 assert_eq!(err.kind, TimedOut);
547 // Also make sure that even though the timeout is expired that we will
548 // continue to receive any pending connections.
549 let (tx, rx) = channel();
550 let addr2 = addr.clone();
551 let _t = Thread::spawn(move|| {
552 tx.send(UnixStream::connect(&addr2).unwrap()).unwrap();
554 let l = rx.recv().unwrap();
555 for i in range(0u, 1001) {
558 Err(ref e) if e.kind == TimedOut => {}
559 Err(e) => panic!("error: {}", e),
561 ::thread::Thread::yield_now();
562 if i == 1000 { panic!("should have a pending connection") }
566 // Unset the timeout and make sure that this always blocks.
568 let addr2 = addr.clone();
569 let _t = Thread::spawn(move|| {
570 drop(UnixStream::connect(&addr2).unwrap());
576 fn connect_timeout_error() {
577 let addr = next_test_unix();
578 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_err());
582 fn connect_timeout_success() {
583 let addr = next_test_unix();
584 let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
585 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_ok());
589 fn connect_timeout_zero() {
590 let addr = next_test_unix();
591 let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
592 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(0)).is_err());
596 fn connect_timeout_negative() {
597 let addr = next_test_unix();
598 let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
599 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(-1)).is_err());
603 fn close_readwrite_smoke() {
604 let addr = next_test_unix();
605 let a = UnixListener::bind(&addr).listen().unwrap();
606 let (_tx, rx) = channel::<()>();
607 Thread::spawn(move|| {
609 let _s = a.accept().unwrap();
614 let mut s = UnixStream::connect(&addr).unwrap();
615 let mut s2 = s.clone();
617 // closing should prevent reads/writes
618 s.close_write().unwrap();
619 assert!(s.write(&[0]).is_err());
620 s.close_read().unwrap();
621 assert!(s.read(&mut b).is_err());
623 // closing should affect previous handles
624 assert!(s2.write(&[0]).is_err());
625 assert!(s2.read(&mut b).is_err());
627 // closing should affect new handles
628 let mut s3 = s.clone();
629 assert!(s3.write(&[0]).is_err());
630 assert!(s3.read(&mut b).is_err());
632 // make sure these don't die
633 let _ = s2.close_read();
634 let _ = s2.close_write();
635 let _ = s3.close_read();
636 let _ = s3.close_write();
640 fn close_read_wakes_up() {
641 let addr = next_test_unix();
642 let a = UnixListener::bind(&addr).listen().unwrap();
643 let (_tx, rx) = channel::<()>();
644 Thread::spawn(move|| {
646 let _s = a.accept().unwrap();
650 let mut s = UnixStream::connect(&addr).unwrap();
652 let (tx, rx) = channel();
653 let _t = Thread::spawn(move|| {
655 assert!(s2.read(&mut [0]).is_err());
656 tx.send(()).unwrap();
658 // this should wake up the child task
659 s.close_read().unwrap();
661 // this test will never finish if the child doesn't wake up
666 fn readwrite_timeouts() {
667 let addr = next_test_unix();
668 let mut a = UnixListener::bind(&addr).listen().unwrap();
669 let (tx, rx) = channel::<()>();
670 Thread::spawn(move|| {
671 let mut s = UnixStream::connect(&addr).unwrap();
673 assert!(s.write(&[0]).is_ok());
677 let mut s = a.accept().unwrap();
678 s.set_timeout(Some(20));
679 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
680 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
682 s.set_timeout(Some(20));
683 for i in range(0u, 1001) {
684 match s.write(&[0; 128 * 1024]) {
685 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
686 Err(IoError { kind: TimedOut, .. }) => break,
687 Err(e) => panic!("{}", e),
689 if i == 1000 { panic!("should have filled up?!"); }
692 // I'm not sure as to why, but apparently the write on windows always
693 // succeeds after the previous timeout. Who knows?
695 assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
698 tx.send(()).unwrap();
700 assert_eq!(s.read(&mut [0, 0]), Ok(1));
705 let addr = next_test_unix();
706 let mut a = UnixListener::bind(&addr).listen().unwrap();
707 let (tx, rx) = channel::<()>();
708 Thread::spawn(move|| {
709 let mut s = UnixStream::connect(&addr).unwrap();
712 while amt < 100 * 128 * 1024 {
713 match s.read(&mut [0;128 * 1024]) {
714 Ok(n) => { amt += n; }
715 Err(e) => panic!("{}", e),
721 let mut s = a.accept().unwrap();
722 s.set_read_timeout(Some(20));
723 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
724 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
726 tx.send(()).unwrap();
727 for _ in range(0u, 100) {
728 assert!(s.write(&[0;128 * 1024]).is_ok());
733 fn write_timeouts() {
734 let addr = next_test_unix();
735 let mut a = UnixListener::bind(&addr).listen().unwrap();
736 let (tx, rx) = channel::<()>();
737 Thread::spawn(move|| {
738 let mut s = UnixStream::connect(&addr).unwrap();
740 assert!(s.write(&[0]).is_ok());
744 let mut s = a.accept().unwrap();
745 s.set_write_timeout(Some(20));
746 for i in range(0u, 1001) {
747 match s.write(&[0; 128 * 1024]) {
748 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
749 Err(IoError { kind: TimedOut, .. }) => break,
750 Err(e) => panic!("{}", e),
752 if i == 1000 { panic!("should have filled up?!"); }
755 tx.send(()).unwrap();
756 assert!(s.read(&mut [0]).is_ok());
760 fn timeout_concurrent_read() {
761 let addr = next_test_unix();
762 let mut a = UnixListener::bind(&addr).listen().unwrap();
763 let (tx, rx) = channel::<()>();
764 Thread::spawn(move|| {
765 let mut s = UnixStream::connect(&addr).unwrap();
767 assert!(s.write(&[0]).is_ok());
771 let mut s = a.accept().unwrap();
773 let (tx2, rx2) = channel();
774 let _t = Thread::spawn(move|| {
776 assert!(s2.read(&mut [0]).is_ok());
777 tx2.send(()).unwrap();
780 s.set_read_timeout(Some(20));
781 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
782 tx.send(()).unwrap();
789 fn clone_accept_smoke() {
790 let addr = next_test_unix();
791 let l = UnixListener::bind(&addr);
792 let mut a = l.listen().unwrap();
793 let mut a2 = a.clone();
795 let addr2 = addr.clone();
796 let _t = Thread::spawn(move|| {
797 let _ = UnixStream::connect(&addr2);
799 let _t = Thread::spawn(move|| {
800 let _ = UnixStream::connect(&addr);
803 assert!(a.accept().is_ok());
805 assert!(a2.accept().is_ok());
808 #[cfg(not(windows))] // FIXME #17553
810 fn clone_accept_concurrent() {
811 let addr = next_test_unix();
812 let l = UnixListener::bind(&addr);
813 let a = l.listen().unwrap();
816 let (tx, rx) = channel();
817 let tx2 = tx.clone();
819 let _t = Thread::spawn(move|| {
821 tx.send(a.accept()).unwrap()
823 let _t = Thread::spawn(move|| {
825 tx2.send(a.accept()).unwrap()
828 let addr2 = addr.clone();
829 let _t = Thread::spawn(move|| {
830 let _ = UnixStream::connect(&addr2);
832 let _t = Thread::spawn(move|| {
833 let _ = UnixStream::connect(&addr);
836 assert!(rx.recv().unwrap().is_ok());
837 assert!(rx.recv().unwrap().is_ok());
841 fn close_accept_smoke() {
842 let addr = next_test_unix();
843 let l = UnixListener::bind(&addr);
844 let mut a = l.listen().unwrap();
846 a.close_accept().unwrap();
847 assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
851 fn close_accept_concurrent() {
852 let addr = next_test_unix();
853 let l = UnixListener::bind(&addr);
854 let a = l.listen().unwrap();
855 let mut a2 = a.clone();
857 let (tx, rx) = channel();
858 let _t = Thread::spawn(move|| {
860 tx.send(a.accept()).unwrap();
862 a2.close_accept().unwrap();
864 assert_eq!(rx.recv().unwrap().err().unwrap().kind, EndOfFile);