1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
13 //! This module contains the ability to communicate over named pipes with
14 //! synchronous I/O. On windows, this corresponds to talking over a Named Pipe,
15 //! while on Unix it corresponds to UNIX domain sockets.
17 //! These pipes are similar to TCP in the sense that you can have both a stream to a
18 //! server and a server itself. The server provided accepts other `UnixStream`
19 //! instances as clients.
21 #![allow(missing_docs)]
26 use io::{Listener, Acceptor, IoResult, TimedOut, standard_error};
27 use sys::pipe::UnixAcceptor as UnixAcceptorImp;
28 use sys::pipe::UnixListener as UnixListenerImp;
29 use sys::pipe::UnixStream as UnixStreamImp;
34 /// A stream which communicates over a named pipe.
35 pub struct UnixStream {
41 /// Connect to a pipe named by `path`. This will attempt to open a
42 /// connection to the underlying socket.
44 /// The returned stream will be closed when the object falls out of scope.
49 /// # #![allow(unused_must_use)]
50 /// use std::io::net::pipe::UnixStream;
52 /// let server = Path::new("path/to/my/socket");
53 /// let mut stream = UnixStream::connect(&server);
54 /// stream.write(&[1, 2, 3]);
56 pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> {
57 UnixStreamImp::connect(&path.to_c_str(), None)
58 .map(|inner| UnixStream { inner: inner })
61 /// Connect to a pipe named by `path`, timing out if the specified number of
64 /// This function is similar to `connect`, except that if `timeout`
65 /// elapses the function will return an error of kind `TimedOut`.
67 /// If a `timeout` with zero or negative duration is specified then
68 /// the function returns `Err`, with the error kind set to `TimedOut`.
69 #[experimental = "the timeout argument is likely to change types"]
70 pub fn connect_timeout<P: ToCStr>(path: &P,
71 timeout: Duration) -> IoResult<UnixStream> {
72 if timeout <= Duration::milliseconds(0) {
73 return Err(standard_error(TimedOut));
76 UnixStreamImp::connect(&path.to_c_str(), Some(timeout.num_milliseconds() as u64))
77 .map(|inner| UnixStream { inner: inner })
81 /// Closes the reading half of this connection.
83 /// This method will close the reading portion of this connection, causing
84 /// all pending and future reads to immediately return with an error.
86 /// Note that this method affects all cloned handles associated with this
87 /// stream, not just this one handle.
88 pub fn close_read(&mut self) -> IoResult<()> {
89 self.inner.close_read()
92 /// Closes the writing half of this connection.
94 /// This method will close the writing portion of this connection, causing
95 /// all pending and future writes to immediately return with an error.
97 /// Note that this method affects all cloned handles associated with this
98 /// stream, not just this one handle.
99 pub fn close_write(&mut self) -> IoResult<()> {
100 self.inner.close_write()
103 /// Sets the read/write timeout for this socket.
105 /// For more information, see `TcpStream::set_timeout`
106 #[experimental = "the timeout argument may change in type and value"]
107 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
108 self.inner.set_timeout(timeout_ms)
111 /// Sets the read timeout for this socket.
113 /// For more information, see `TcpStream::set_timeout`
114 #[experimental = "the timeout argument may change in type and value"]
115 pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
116 self.inner.set_read_timeout(timeout_ms)
119 /// Sets the write timeout for this socket.
121 /// For more information, see `TcpStream::set_timeout`
122 #[experimental = "the timeout argument may change in type and value"]
123 pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
124 self.inner.set_write_timeout(timeout_ms)
128 impl Clone for UnixStream {
129 fn clone(&self) -> UnixStream {
130 UnixStream { inner: self.inner.clone() }
134 impl Reader for UnixStream {
135 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
140 impl Writer for UnixStream {
141 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
142 self.inner.write(buf)
146 impl sys_common::AsInner<UnixStreamImp> for UnixStream {
147 fn as_inner(&self) -> &UnixStreamImp {
152 /// A value that can listen for incoming named pipe connection requests.
153 pub struct UnixListener {
154 /// The internal, opaque runtime Unix listener.
155 inner: UnixListenerImp,
159 /// Creates a new listener, ready to receive incoming connections on the
160 /// specified socket. The server will be named by `path`.
162 /// This listener will be closed when it falls out of scope.
169 /// # #![allow(unused_must_use)]
170 /// use std::io::net::pipe::UnixListener;
171 /// use std::io::{Listener, Acceptor};
173 /// let server = Path::new("/path/to/my/socket");
174 /// let stream = UnixListener::bind(&server);
175 /// for mut client in stream.listen().incoming() {
176 /// client.write(&[1, 2, 3, 4]);
180 pub fn bind<P: ToCStr>(path: &P) -> IoResult<UnixListener> {
181 UnixListenerImp::bind(&path.to_c_str())
182 .map(|inner| UnixListener { inner: inner })
186 impl Listener<UnixStream, UnixAcceptor> for UnixListener {
187 fn listen(self) -> IoResult<UnixAcceptor> {
189 .map(|inner| UnixAcceptor { inner: inner })
193 impl sys_common::AsInner<UnixListenerImp> for UnixListener {
194 fn as_inner(&self) -> &UnixListenerImp {
199 /// A value that can accept named pipe connections, returned from `listen()`.
200 pub struct UnixAcceptor {
201 /// The internal, opaque runtime Unix acceptor.
202 inner: UnixAcceptorImp
206 /// Sets a timeout for this acceptor, after which accept() will no longer
207 /// block indefinitely.
209 /// The argument specified is the amount of time, in milliseconds, into the
210 /// future after which all invocations of accept() will not block (and any
211 /// pending invocation will return). A value of `None` will clear any
212 /// existing timeout.
214 /// When using this method, it is likely necessary to reset the timeout as
215 /// appropriate, the timeout specified is specific to this object, not
216 /// specific to the next request.
217 #[experimental = "the name and arguments to this function are likely \
219 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
220 self.inner.set_timeout(timeout_ms)
223 /// Closes the accepting capabilities of this acceptor.
225 /// This function has the same semantics as `TcpAcceptor::close_accept`, and
226 /// more information can be found in that documentation.
228 pub fn close_accept(&mut self) -> IoResult<()> {
229 self.inner.close_accept()
233 impl Acceptor<UnixStream> for UnixAcceptor {
234 fn accept(&mut self) -> IoResult<UnixStream> {
235 self.inner.accept().map(|s| {
236 UnixStream { inner: s }
241 impl Clone for UnixAcceptor {
242 /// Creates a new handle to this unix acceptor, allowing for simultaneous
245 /// The underlying unix acceptor will not be closed until all handles to the
246 /// acceptor have been deallocated. Incoming connections will be received on
247 /// at most once acceptor, the same connection will not be accepted twice.
249 /// The `close_accept` method will shut down *all* acceptors cloned from the
250 /// same original acceptor, whereas the `set_timeout` method only affects
251 /// the selector that it is called on.
253 /// This function is useful for creating a handle to invoke `close_accept`
254 /// on to wake up any other task blocked in `accept`.
255 fn clone(&self) -> UnixAcceptor {
256 UnixAcceptor { inner: self.inner.clone() }
260 impl sys_common::AsInner<UnixAcceptorImp> for UnixAcceptor {
261 fn as_inner(&self) -> &UnixAcceptorImp {
270 use io::fs::PathExtensions;
271 use io::{EndOfFile, TimedOut, ShortWrite, IoError, ConnectionReset};
272 use io::{NotConnected, BrokenPipe, FileNotFound, InvalidInput, OtherIoError};
273 use io::{PermissionDenied, Acceptor, Listener};
276 use sync::mpsc::channel;
280 pub fn smalltest<F,G>(server: F, client: G)
281 where F : FnOnce(UnixStream), F : Send,
282 G : FnOnce(UnixStream), G : Send
284 let path1 = next_test_unix();
285 let path2 = path1.clone();
287 let mut acceptor = UnixListener::bind(&path1).listen();
289 let _t = Thread::spawn(move|| {
290 match UnixStream::connect(&path2) {
292 Err(e) => panic!("failed connect: {}", e),
296 match acceptor.accept() {
298 Err(e) => panic!("failed accept: {}", e),
304 let path = "path/to/nowhere";
305 match UnixListener::bind(&path) {
308 assert!(e.kind == PermissionDenied || e.kind == FileNotFound ||
309 e.kind == InvalidInput);
316 let path = if cfg!(windows) {
317 r"\\.\pipe\this_should_not_exist_ever"
321 match UnixStream::connect(&path) {
324 assert!(e.kind == FileNotFound || e.kind == OtherIoError);
331 smalltest(move |mut server| {
333 server.read(&mut buf).unwrap();
334 assert!(buf[0] == 99);
335 }, move|mut client| {
336 client.write(&[99]).unwrap();
340 #[cfg_attr(windows, ignore)] // FIXME(#12516)
343 smalltest(move|mut server| {
345 assert!(server.read(&mut buf).is_err());
346 assert!(server.read(&mut buf).is_err());
354 smalltest(move|mut server| {
357 match server.write(&buf) {
360 assert!(e.kind == BrokenPipe ||
361 e.kind == NotConnected ||
362 e.kind == ConnectionReset,
363 "unknown error {}", e);
376 let path1 = next_test_unix();
377 let path2 = path1.clone();
379 let mut acceptor = match UnixListener::bind(&path1).listen() {
381 Err(e) => panic!("failed listen: {}", e),
384 let _t = Thread::spawn(move|| {
385 for _ in range(0u, times) {
386 let mut stream = UnixStream::connect(&path2);
387 match stream.write(&[100]) {
389 Err(e) => panic!("failed write: {}", e)
394 for _ in range(0, times) {
395 let mut client = acceptor.accept();
397 match client.read(&mut buf) {
399 Err(e) => panic!("failed read/accept: {}", e),
401 assert_eq!(buf[0], 100);
408 let path = next_test_unix();
409 let _acceptor = UnixListener::bind(&path).listen();
410 assert!(path.exists());
414 fn unix_clone_smoke() {
415 let addr = next_test_unix();
416 let mut acceptor = UnixListener::bind(&addr).listen();
418 let _t = Thread::spawn(move|| {
419 let mut s = UnixStream::connect(&addr);
420 let mut buf = [0, 0];
421 debug!("client reading");
422 assert_eq!(s.read(&mut buf), Ok(1));
423 assert_eq!(buf[0], 1);
424 debug!("client writing");
425 s.write(&[2]).unwrap();
426 debug!("client dropping");
429 let mut s1 = acceptor.accept().unwrap();
432 let (tx1, rx1) = channel();
433 let (tx2, rx2) = channel();
434 let _t = Thread::spawn(move|| {
437 debug!("writer writing");
438 s2.write(&[1]).unwrap();
439 debug!("writer done");
440 tx2.send(()).unwrap();
442 tx1.send(()).unwrap();
443 let mut buf = [0, 0];
444 debug!("reader reading");
445 assert_eq!(s1.read(&mut buf), Ok(1));
446 debug!("reader done");
451 fn unix_clone_two_read() {
452 let addr = next_test_unix();
453 let mut acceptor = UnixListener::bind(&addr).listen();
454 let (tx1, rx) = channel();
455 let tx2 = tx1.clone();
457 let _t = Thread::spawn(move|| {
458 let mut s = UnixStream::connect(&addr);
459 s.write(&[1]).unwrap();
461 s.write(&[2]).unwrap();
465 let mut s1 = acceptor.accept().unwrap();
468 let (done, rx) = channel();
469 let _t = Thread::spawn(move|| {
471 let mut buf = [0, 0];
472 s2.read(&mut buf).unwrap();
473 tx2.send(()).unwrap();
474 done.send(()).unwrap();
476 let mut buf = [0, 0];
477 s1.read(&mut buf).unwrap();
478 tx1.send(()).unwrap();
484 fn unix_clone_two_write() {
485 let addr = next_test_unix();
486 let mut acceptor = UnixListener::bind(&addr).listen();
488 let _t = Thread::spawn(move|| {
489 let mut s = UnixStream::connect(&addr);
490 let buf = &mut [0, 1];
491 s.read(buf).unwrap();
492 s.read(buf).unwrap();
495 let mut s1 = acceptor.accept().unwrap();
498 let (tx, rx) = channel();
499 let _t = Thread::spawn(move|| {
501 s2.write(&[1]).unwrap();
502 tx.send(()).unwrap();
504 s1.write(&[2]).unwrap();
511 fn drop_removes_listener_path() {
512 let path = next_test_unix();
513 let l = UnixListener::bind(&path).unwrap();
514 assert!(path.exists());
516 assert!(!path.exists());
521 fn drop_removes_acceptor_path() {
522 let path = next_test_unix();
523 let l = UnixListener::bind(&path).unwrap();
524 assert!(path.exists());
525 drop(l.listen().unwrap());
526 assert!(!path.exists());
530 fn accept_timeout() {
531 let addr = next_test_unix();
532 let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap();
534 a.set_timeout(Some(10));
536 // Make sure we time out once and future invocations also time out
537 let err = a.accept().err().unwrap();
538 assert_eq!(err.kind, TimedOut);
539 let err = a.accept().err().unwrap();
540 assert_eq!(err.kind, TimedOut);
542 // Also make sure that even though the timeout is expired that we will
543 // continue to receive any pending connections.
544 let (tx, rx) = channel();
545 let addr2 = addr.clone();
546 let _t = Thread::spawn(move|| {
547 tx.send(UnixStream::connect(&addr2).unwrap()).unwrap();
549 let l = rx.recv().unwrap();
550 for i in range(0u, 1001) {
553 Err(ref e) if e.kind == TimedOut => {}
554 Err(e) => panic!("error: {}", e),
556 ::thread::Thread::yield_now();
557 if i == 1000 { panic!("should have a pending connection") }
561 // Unset the timeout and make sure that this always blocks.
563 let addr2 = addr.clone();
564 let _t = Thread::spawn(move|| {
565 drop(UnixStream::connect(&addr2).unwrap());
571 fn connect_timeout_error() {
572 let addr = next_test_unix();
573 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_err());
577 fn connect_timeout_success() {
578 let addr = next_test_unix();
579 let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
580 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_ok());
584 fn connect_timeout_zero() {
585 let addr = next_test_unix();
586 let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
587 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(0)).is_err());
591 fn connect_timeout_negative() {
592 let addr = next_test_unix();
593 let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
594 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(-1)).is_err());
598 fn close_readwrite_smoke() {
599 let addr = next_test_unix();
600 let a = UnixListener::bind(&addr).listen().unwrap();
601 let (_tx, rx) = channel::<()>();
602 Thread::spawn(move|| {
604 let _s = a.accept().unwrap();
609 let mut s = UnixStream::connect(&addr).unwrap();
610 let mut s2 = s.clone();
612 // closing should prevent reads/writes
613 s.close_write().unwrap();
614 assert!(s.write(&[0]).is_err());
615 s.close_read().unwrap();
616 assert!(s.read(&mut b).is_err());
618 // closing should affect previous handles
619 assert!(s2.write(&[0]).is_err());
620 assert!(s2.read(&mut b).is_err());
622 // closing should affect new handles
623 let mut s3 = s.clone();
624 assert!(s3.write(&[0]).is_err());
625 assert!(s3.read(&mut b).is_err());
627 // make sure these don't die
628 let _ = s2.close_read();
629 let _ = s2.close_write();
630 let _ = s3.close_read();
631 let _ = s3.close_write();
635 fn close_read_wakes_up() {
636 let addr = next_test_unix();
637 let a = UnixListener::bind(&addr).listen().unwrap();
638 let (_tx, rx) = channel::<()>();
639 Thread::spawn(move|| {
641 let _s = a.accept().unwrap();
645 let mut s = UnixStream::connect(&addr).unwrap();
647 let (tx, rx) = channel();
648 let _t = Thread::spawn(move|| {
650 assert!(s2.read(&mut [0]).is_err());
651 tx.send(()).unwrap();
653 // this should wake up the child task
654 s.close_read().unwrap();
656 // this test will never finish if the child doesn't wake up
661 fn readwrite_timeouts() {
662 let addr = next_test_unix();
663 let mut a = UnixListener::bind(&addr).listen().unwrap();
664 let (tx, rx) = channel::<()>();
665 Thread::spawn(move|| {
666 let mut s = UnixStream::connect(&addr).unwrap();
668 assert!(s.write(&[0]).is_ok());
672 let mut s = a.accept().unwrap();
673 s.set_timeout(Some(20));
674 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
675 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
677 s.set_timeout(Some(20));
678 for i in range(0u, 1001) {
679 match s.write(&[0; 128 * 1024]) {
680 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
681 Err(IoError { kind: TimedOut, .. }) => break,
682 Err(e) => panic!("{}", e),
684 if i == 1000 { panic!("should have filled up?!"); }
687 // I'm not sure as to why, but apparently the write on windows always
688 // succeeds after the previous timeout. Who knows?
690 assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
693 tx.send(()).unwrap();
695 assert_eq!(s.read(&mut [0, 0]), Ok(1));
700 let addr = next_test_unix();
701 let mut a = UnixListener::bind(&addr).listen().unwrap();
702 let (tx, rx) = channel::<()>();
703 Thread::spawn(move|| {
704 let mut s = UnixStream::connect(&addr).unwrap();
707 while amt < 100 * 128 * 1024 {
708 match s.read(&mut [0;128 * 1024]) {
709 Ok(n) => { amt += n; }
710 Err(e) => panic!("{}", e),
716 let mut s = a.accept().unwrap();
717 s.set_read_timeout(Some(20));
718 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
719 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
721 tx.send(()).unwrap();
722 for _ in range(0u, 100) {
723 assert!(s.write(&[0;128 * 1024]).is_ok());
728 fn write_timeouts() {
729 let addr = next_test_unix();
730 let mut a = UnixListener::bind(&addr).listen().unwrap();
731 let (tx, rx) = channel::<()>();
732 Thread::spawn(move|| {
733 let mut s = UnixStream::connect(&addr).unwrap();
735 assert!(s.write(&[0]).is_ok());
739 let mut s = a.accept().unwrap();
740 s.set_write_timeout(Some(20));
741 for i in range(0u, 1001) {
742 match s.write(&[0; 128 * 1024]) {
743 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
744 Err(IoError { kind: TimedOut, .. }) => break,
745 Err(e) => panic!("{}", e),
747 if i == 1000 { panic!("should have filled up?!"); }
750 tx.send(()).unwrap();
751 assert!(s.read(&mut [0]).is_ok());
755 fn timeout_concurrent_read() {
756 let addr = next_test_unix();
757 let mut a = UnixListener::bind(&addr).listen().unwrap();
758 let (tx, rx) = channel::<()>();
759 Thread::spawn(move|| {
760 let mut s = UnixStream::connect(&addr).unwrap();
762 assert!(s.write(&[0]).is_ok());
766 let mut s = a.accept().unwrap();
768 let (tx2, rx2) = channel();
769 let _t = Thread::spawn(move|| {
771 assert!(s2.read(&mut [0]).is_ok());
772 tx2.send(()).unwrap();
775 s.set_read_timeout(Some(20));
776 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
777 tx.send(()).unwrap();
784 fn clone_accept_smoke() {
785 let addr = next_test_unix();
786 let l = UnixListener::bind(&addr);
787 let mut a = l.listen().unwrap();
788 let mut a2 = a.clone();
790 let addr2 = addr.clone();
791 let _t = Thread::spawn(move|| {
792 let _ = UnixStream::connect(&addr2);
794 let _t = Thread::spawn(move|| {
795 let _ = UnixStream::connect(&addr);
798 assert!(a.accept().is_ok());
800 assert!(a2.accept().is_ok());
803 #[cfg(not(windows))] // FIXME #17553
805 fn clone_accept_concurrent() {
806 let addr = next_test_unix();
807 let l = UnixListener::bind(&addr);
808 let a = l.listen().unwrap();
811 let (tx, rx) = channel();
812 let tx2 = tx.clone();
814 let _t = Thread::spawn(move|| {
816 tx.send(a.accept()).unwrap()
818 let _t = Thread::spawn(move|| {
820 tx2.send(a.accept()).unwrap()
823 let addr2 = addr.clone();
824 let _t = Thread::spawn(move|| {
825 let _ = UnixStream::connect(&addr2);
827 let _t = Thread::spawn(move|| {
828 let _ = UnixStream::connect(&addr);
831 assert!(rx.recv().unwrap().is_ok());
832 assert!(rx.recv().unwrap().is_ok());
836 fn close_accept_smoke() {
837 let addr = next_test_unix();
838 let l = UnixListener::bind(&addr);
839 let mut a = l.listen().unwrap();
841 a.close_accept().unwrap();
842 assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
846 fn close_accept_concurrent() {
847 let addr = next_test_unix();
848 let l = UnixListener::bind(&addr);
849 let a = l.listen().unwrap();
850 let mut a2 = a.clone();
852 let (tx, rx) = channel();
853 let _t = Thread::spawn(move|| {
855 tx.send(a.accept()).unwrap();
857 a2.close_accept().unwrap();
859 assert_eq!(rx.recv().unwrap().err().unwrap().kind, EndOfFile);