1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
13 //! This module contains the ability to communicate over named pipes with
14 //! synchronous I/O. On windows, this corresponds to talking over a Named Pipe,
15 //! while on Unix it corresponds to UNIX domain sockets.
17 //! These pipes are similar to TCP in the sense that you can have both a stream to a
18 //! server and a server itself. The server provided accepts other `UnixStream`
19 //! instances as clients.
21 #![allow(missing_docs)]
26 use old_path::BytesContainer;
27 use old_io::{Listener, Acceptor, IoResult, TimedOut, standard_error};
28 use sys::pipe::UnixAcceptor as UnixAcceptorImp;
29 use sys::pipe::UnixListener as UnixListenerImp;
30 use sys::pipe::UnixStream as UnixStreamImp;
35 /// A stream which communicates over a named pipe.
36 pub struct UnixStream {
42 /// Connect to a pipe named by `path`. This will attempt to open a
43 /// connection to the underlying socket.
45 /// The returned stream will be closed when the object falls out of scope.
50 /// # #![allow(unused_must_use)]
51 /// use std::old_io::net::pipe::UnixStream;
53 /// let server = Path::new("path/to/my/socket");
54 /// let mut stream = UnixStream::connect(&server);
55 /// stream.write(&[1, 2, 3]);
57 pub fn connect<P: BytesContainer>(path: P) -> IoResult<UnixStream> {
58 let path = try!(CString::new(path.container_as_bytes()));
59 UnixStreamImp::connect(&path, None)
60 .map(|inner| UnixStream { inner: inner })
63 /// Connect to a pipe named by `path`, timing out if the specified number of
66 /// This function is similar to `connect`, except that if `timeout`
67 /// elapses the function will return an error of kind `TimedOut`.
69 /// If a `timeout` with zero or negative duration is specified then
70 /// the function returns `Err`, with the error kind set to `TimedOut`.
71 #[unstable(feature = "io",
72 reason = "the timeout argument is likely to change types")]
73 pub fn connect_timeout<P>(path: P, timeout: Duration)
74 -> IoResult<UnixStream>
75 where P: BytesContainer {
76 if timeout <= Duration::milliseconds(0) {
77 return Err(standard_error(TimedOut));
80 let path = try!(CString::new(path.container_as_bytes()));
81 UnixStreamImp::connect(&path, Some(timeout.num_milliseconds() as u64))
82 .map(|inner| UnixStream { inner: inner })
86 /// Closes the reading half of this connection.
88 /// This method will close the reading portion of this connection, causing
89 /// all pending and future reads to immediately return with an error.
91 /// Note that this method affects all cloned handles associated with this
92 /// stream, not just this one handle.
93 pub fn close_read(&mut self) -> IoResult<()> {
94 self.inner.close_read()
97 /// Closes the writing half of this connection.
99 /// This method will close the writing portion of this connection, causing
100 /// all pending and future writes to immediately return with an error.
102 /// Note that this method affects all cloned handles associated with this
103 /// stream, not just this one handle.
104 pub fn close_write(&mut self) -> IoResult<()> {
105 self.inner.close_write()
108 /// Sets the read/write timeout for this socket.
110 /// For more information, see `TcpStream::set_timeout`
111 #[unstable(feature = "io",
112 reason = "the timeout argument may change in type and value")]
113 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
114 self.inner.set_timeout(timeout_ms)
117 /// Sets the read timeout for this socket.
119 /// For more information, see `TcpStream::set_timeout`
120 #[unstable(feature = "io",
121 reason = "the timeout argument may change in type and value")]
122 pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
123 self.inner.set_read_timeout(timeout_ms)
126 /// Sets the write timeout for this socket.
128 /// For more information, see `TcpStream::set_timeout`
129 #[unstable(feature = "io",
130 reason = "the timeout argument may change in type and value")]
131 pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
132 self.inner.set_write_timeout(timeout_ms)
136 impl Clone for UnixStream {
137 fn clone(&self) -> UnixStream {
138 UnixStream { inner: self.inner.clone() }
142 impl Reader for UnixStream {
143 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
148 impl Writer for UnixStream {
149 fn write_all(&mut self, buf: &[u8]) -> IoResult<()> {
150 self.inner.write(buf)
154 impl sys_common::AsInner<UnixStreamImp> for UnixStream {
155 fn as_inner(&self) -> &UnixStreamImp {
160 /// A value that can listen for incoming named pipe connection requests.
161 pub struct UnixListener {
162 /// The internal, opaque runtime Unix listener.
163 inner: UnixListenerImp,
167 /// Creates a new listener, ready to receive incoming connections on the
168 /// specified socket. The server will be named by `path`.
170 /// This listener will be closed when it falls out of scope.
176 /// use std::old_io::net::pipe::UnixListener;
177 /// use std::old_io::{Listener, Acceptor};
179 /// let server = Path::new("/path/to/my/socket");
180 /// let stream = UnixListener::bind(&server);
181 /// for mut client in stream.listen().incoming() {
182 /// client.write(&[1, 2, 3, 4]);
186 pub fn bind<P: BytesContainer>(path: P) -> IoResult<UnixListener> {
187 let path = try!(CString::new(path.container_as_bytes()));
188 UnixListenerImp::bind(&path)
189 .map(|inner| UnixListener { inner: inner })
193 impl Listener<UnixStream, UnixAcceptor> for UnixListener {
194 fn listen(self) -> IoResult<UnixAcceptor> {
196 .map(|inner| UnixAcceptor { inner: inner })
200 impl sys_common::AsInner<UnixListenerImp> for UnixListener {
201 fn as_inner(&self) -> &UnixListenerImp {
206 /// A value that can accept named pipe connections, returned from `listen()`.
207 pub struct UnixAcceptor {
208 /// The internal, opaque runtime Unix acceptor.
209 inner: UnixAcceptorImp
213 /// Sets a timeout for this acceptor, after which accept() will no longer
214 /// block indefinitely.
216 /// The argument specified is the amount of time, in milliseconds, into the
217 /// future after which all invocations of accept() will not block (and any
218 /// pending invocation will return). A value of `None` will clear any
219 /// existing timeout.
221 /// When using this method, it is likely necessary to reset the timeout as
222 /// appropriate, the timeout specified is specific to this object, not
223 /// specific to the next request.
224 #[unstable(feature = "io",
225 reason = "the name and arguments to this function are likely \
227 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
228 self.inner.set_timeout(timeout_ms)
231 /// Closes the accepting capabilities of this acceptor.
233 /// This function has the same semantics as `TcpAcceptor::close_accept`, and
234 /// more information can be found in that documentation.
235 #[unstable(feature = "io")]
236 pub fn close_accept(&mut self) -> IoResult<()> {
237 self.inner.close_accept()
241 impl Acceptor<UnixStream> for UnixAcceptor {
242 fn accept(&mut self) -> IoResult<UnixStream> {
243 self.inner.accept().map(|s| {
244 UnixStream { inner: s }
249 impl Clone for UnixAcceptor {
250 /// Creates a new handle to this unix acceptor, allowing for simultaneous
253 /// The underlying unix acceptor will not be closed until all handles to the
254 /// acceptor have been deallocated. Incoming connections will be received on
255 /// at most once acceptor, the same connection will not be accepted twice.
257 /// The `close_accept` method will shut down *all* acceptors cloned from the
258 /// same original acceptor, whereas the `set_timeout` method only affects
259 /// the selector that it is called on.
261 /// This function is useful for creating a handle to invoke `close_accept`
262 /// on to wake up any other task blocked in `accept`.
263 fn clone(&self) -> UnixAcceptor {
264 UnixAcceptor { inner: self.inner.clone() }
268 impl sys_common::AsInner<UnixAcceptorImp> for UnixAcceptor {
269 fn as_inner(&self) -> &UnixAcceptorImp {
278 use old_io::fs::PathExtensions;
279 use old_io::{EndOfFile, TimedOut, ShortWrite, IoError, ConnectionReset};
280 use old_io::{NotConnected, BrokenPipe, FileNotFound, InvalidInput, OtherIoError};
281 use old_io::{PermissionDenied, Acceptor, Listener};
284 use sync::mpsc::channel;
288 pub fn smalltest<F,G>(server: F, client: G)
289 where F : FnOnce(UnixStream), F : Send,
290 G : FnOnce(UnixStream), G : Send + 'static
292 let path1 = next_test_unix();
293 let path2 = path1.clone();
295 let mut acceptor = UnixListener::bind(&path1).listen();
297 let _t = thread::spawn(move|| {
298 match UnixStream::connect(&path2) {
300 Err(e) => panic!("failed connect: {}", e),
304 match acceptor.accept() {
306 Err(e) => panic!("failed accept: {}", e),
312 let path = "path/to/nowhere";
313 match UnixListener::bind(&path) {
316 assert!(e.kind == PermissionDenied || e.kind == FileNotFound ||
317 e.kind == InvalidInput);
324 let path = if cfg!(windows) {
325 r"\\.\pipe\this_should_not_exist_ever"
329 match UnixStream::connect(&path) {
332 assert!(e.kind == FileNotFound || e.kind == OtherIoError);
339 smalltest(move |mut server| {
341 server.read(&mut buf).unwrap();
342 assert!(buf[0] == 99);
343 }, move|mut client| {
344 client.write(&[99]).unwrap();
348 #[cfg_attr(windows, ignore)] // FIXME(#12516)
351 smalltest(move|mut server| {
353 assert!(server.read(&mut buf).is_err());
354 assert!(server.read(&mut buf).is_err());
362 smalltest(move|mut server| {
365 match server.write(&buf) {
368 assert!(e.kind == BrokenPipe ||
369 e.kind == NotConnected ||
370 e.kind == ConnectionReset,
371 "unknown error {}", e);
384 let path1 = next_test_unix();
385 let path2 = path1.clone();
387 let mut acceptor = match UnixListener::bind(&path1).listen() {
389 Err(e) => panic!("failed listen: {}", e),
392 let _t = thread::spawn(move|| {
394 let mut stream = UnixStream::connect(&path2);
395 match stream.write(&[100]) {
397 Err(e) => panic!("failed write: {}", e)
403 let mut client = acceptor.accept();
405 match client.read(&mut buf) {
407 Err(e) => panic!("failed read/accept: {}", e),
409 assert_eq!(buf[0], 100);
416 let path = next_test_unix();
417 let _acceptor = UnixListener::bind(&path).listen();
418 assert!(path.exists());
422 fn unix_clone_smoke() {
423 let addr = next_test_unix();
424 let mut acceptor = UnixListener::bind(&addr).listen();
426 let _t = thread::spawn(move|| {
427 let mut s = UnixStream::connect(&addr);
428 let mut buf = [0, 0];
429 debug!("client reading");
430 assert_eq!(s.read(&mut buf), Ok(1));
431 assert_eq!(buf[0], 1);
432 debug!("client writing");
433 s.write(&[2]).unwrap();
434 debug!("client dropping");
437 let mut s1 = acceptor.accept().unwrap();
440 let (tx1, rx1) = channel();
441 let (tx2, rx2) = channel();
442 let _t = thread::spawn(move|| {
445 debug!("writer writing");
446 s2.write(&[1]).unwrap();
447 debug!("writer done");
448 tx2.send(()).unwrap();
450 tx1.send(()).unwrap();
451 let mut buf = [0, 0];
452 debug!("reader reading");
453 assert_eq!(s1.read(&mut buf), Ok(1));
454 debug!("reader done");
459 fn unix_clone_two_read() {
460 let addr = next_test_unix();
461 let mut acceptor = UnixListener::bind(&addr).listen();
462 let (tx1, rx) = channel();
463 let tx2 = tx1.clone();
465 let _t = thread::spawn(move|| {
466 let mut s = UnixStream::connect(&addr);
467 s.write(&[1]).unwrap();
469 s.write(&[2]).unwrap();
473 let mut s1 = acceptor.accept().unwrap();
476 let (done, rx) = channel();
477 let _t = thread::spawn(move|| {
479 let mut buf = [0, 0];
480 s2.read(&mut buf).unwrap();
481 tx2.send(()).unwrap();
482 done.send(()).unwrap();
484 let mut buf = [0, 0];
485 s1.read(&mut buf).unwrap();
486 tx1.send(()).unwrap();
492 fn unix_clone_two_write() {
493 let addr = next_test_unix();
494 let mut acceptor = UnixListener::bind(&addr).listen();
496 let _t = thread::spawn(move|| {
497 let mut s = UnixStream::connect(&addr);
498 let buf = &mut [0, 1];
499 s.read(buf).unwrap();
500 s.read(buf).unwrap();
503 let mut s1 = acceptor.accept().unwrap();
506 let (tx, rx) = channel();
507 let _t = thread::spawn(move|| {
509 s2.write(&[1]).unwrap();
510 tx.send(()).unwrap();
512 s1.write(&[2]).unwrap();
519 fn drop_removes_listener_path() {
520 let path = next_test_unix();
521 let l = UnixListener::bind(&path).unwrap();
522 assert!(path.exists());
524 assert!(!path.exists());
529 fn drop_removes_acceptor_path() {
530 let path = next_test_unix();
531 let l = UnixListener::bind(&path).unwrap();
532 assert!(path.exists());
533 drop(l.listen().unwrap());
534 assert!(!path.exists());
538 fn accept_timeout() {
539 let addr = next_test_unix();
540 let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap();
542 a.set_timeout(Some(10));
544 // Make sure we time out once and future invocations also time out
545 let err = a.accept().err().unwrap();
546 assert_eq!(err.kind, TimedOut);
547 let err = a.accept().err().unwrap();
548 assert_eq!(err.kind, TimedOut);
550 // Also make sure that even though the timeout is expired that we will
551 // continue to receive any pending connections.
552 let (tx, rx) = channel();
553 let addr2 = addr.clone();
554 let _t = thread::spawn(move|| {
555 tx.send(UnixStream::connect(&addr2).unwrap()).unwrap();
557 let l = rx.recv().unwrap();
561 Err(ref e) if e.kind == TimedOut => {}
562 Err(e) => panic!("error: {}", e),
564 ::thread::yield_now();
565 if i == 1000 { panic!("should have a pending connection") }
569 // Unset the timeout and make sure that this always blocks.
571 let addr2 = addr.clone();
572 let _t = thread::spawn(move|| {
573 drop(UnixStream::connect(&addr2).unwrap());
579 fn connect_timeout_error() {
580 let addr = next_test_unix();
581 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_err());
585 fn connect_timeout_success() {
586 let addr = next_test_unix();
587 let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
588 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_ok());
592 fn connect_timeout_zero() {
593 let addr = next_test_unix();
594 let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
595 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(0)).is_err());
599 fn connect_timeout_negative() {
600 let addr = next_test_unix();
601 let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
602 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(-1)).is_err());
606 fn close_readwrite_smoke() {
607 let addr = next_test_unix();
608 let a = UnixListener::bind(&addr).listen().unwrap();
609 let (_tx, rx) = channel::<()>();
610 thread::spawn(move|| {
612 let _s = a.accept().unwrap();
617 let mut s = UnixStream::connect(&addr).unwrap();
618 let mut s2 = s.clone();
620 // closing should prevent reads/writes
621 s.close_write().unwrap();
622 assert!(s.write(&[0]).is_err());
623 s.close_read().unwrap();
624 assert!(s.read(&mut b).is_err());
626 // closing should affect previous handles
627 assert!(s2.write(&[0]).is_err());
628 assert!(s2.read(&mut b).is_err());
630 // closing should affect new handles
631 let mut s3 = s.clone();
632 assert!(s3.write(&[0]).is_err());
633 assert!(s3.read(&mut b).is_err());
635 // make sure these don't die
636 let _ = s2.close_read();
637 let _ = s2.close_write();
638 let _ = s3.close_read();
639 let _ = s3.close_write();
643 fn close_read_wakes_up() {
644 let addr = next_test_unix();
645 let a = UnixListener::bind(&addr).listen().unwrap();
646 let (_tx, rx) = channel::<()>();
647 thread::spawn(move|| {
649 let _s = a.accept().unwrap();
653 let mut s = UnixStream::connect(&addr).unwrap();
655 let (tx, rx) = channel();
656 let _t = thread::spawn(move|| {
658 assert!(s2.read(&mut [0]).is_err());
659 tx.send(()).unwrap();
661 // this should wake up the child task
662 s.close_read().unwrap();
664 // this test will never finish if the child doesn't wake up
669 fn readwrite_timeouts() {
670 let addr = next_test_unix();
671 let mut a = UnixListener::bind(&addr).listen().unwrap();
672 let (tx, rx) = channel::<()>();
673 thread::spawn(move|| {
674 let mut s = UnixStream::connect(&addr).unwrap();
676 assert!(s.write(&[0]).is_ok());
680 let mut s = a.accept().unwrap();
681 s.set_timeout(Some(20));
682 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
683 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
685 s.set_timeout(Some(20));
687 match s.write(&[0; 128 * 1024]) {
688 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
689 Err(IoError { kind: TimedOut, .. }) => break,
690 Err(e) => panic!("{}", e),
692 if i == 1000 { panic!("should have filled up?!"); }
695 // I'm not sure as to why, but apparently the write on windows always
696 // succeeds after the previous timeout. Who knows?
698 assert_eq!(s.write(&[0]).err().unwrap().kind, TimedOut);
701 tx.send(()).unwrap();
703 assert_eq!(s.read(&mut [0, 0]), Ok(1));
708 let addr = next_test_unix();
709 let mut a = UnixListener::bind(&addr).listen().unwrap();
710 let (tx, rx) = channel::<()>();
711 thread::spawn(move|| {
712 let mut s = UnixStream::connect(&addr).unwrap();
715 while amt < 100 * 128 * 1024 {
716 match s.read(&mut [0;128 * 1024]) {
717 Ok(n) => { amt += n; }
718 Err(e) => panic!("{}", e),
724 let mut s = a.accept().unwrap();
725 s.set_read_timeout(Some(20));
726 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
727 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
729 tx.send(()).unwrap();
731 assert!(s.write(&[0;128 * 1024]).is_ok());
736 fn write_timeouts() {
737 let addr = next_test_unix();
738 let mut a = UnixListener::bind(&addr).listen().unwrap();
739 let (tx, rx) = channel::<()>();
740 thread::spawn(move|| {
741 let mut s = UnixStream::connect(&addr).unwrap();
743 assert!(s.write(&[0]).is_ok());
747 let mut s = a.accept().unwrap();
748 s.set_write_timeout(Some(20));
750 match s.write(&[0; 128 * 1024]) {
751 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
752 Err(IoError { kind: TimedOut, .. }) => break,
753 Err(e) => panic!("{}", e),
755 if i == 1000 { panic!("should have filled up?!"); }
758 tx.send(()).unwrap();
759 assert!(s.read(&mut [0]).is_ok());
763 fn timeout_concurrent_read() {
764 let addr = next_test_unix();
765 let mut a = UnixListener::bind(&addr).listen().unwrap();
766 let (tx, rx) = channel::<()>();
767 thread::spawn(move|| {
768 let mut s = UnixStream::connect(&addr).unwrap();
770 assert!(s.write(&[0]).is_ok());
774 let mut s = a.accept().unwrap();
776 let (tx2, rx2) = channel();
777 let _t = thread::spawn(move|| {
779 assert!(s2.read(&mut [0]).is_ok());
780 tx2.send(()).unwrap();
783 s.set_read_timeout(Some(20));
784 assert_eq!(s.read(&mut [0]).err().unwrap().kind, TimedOut);
785 tx.send(()).unwrap();
792 fn clone_accept_smoke() {
793 let addr = next_test_unix();
794 let l = UnixListener::bind(&addr);
795 let mut a = l.listen().unwrap();
796 let mut a2 = a.clone();
798 let addr2 = addr.clone();
799 let _t = thread::spawn(move|| {
800 let _ = UnixStream::connect(&addr2);
802 let _t = thread::spawn(move|| {
803 let _ = UnixStream::connect(&addr);
806 assert!(a.accept().is_ok());
808 assert!(a2.accept().is_ok());
811 #[cfg(not(windows))] // FIXME #17553
813 fn clone_accept_concurrent() {
814 let addr = next_test_unix();
815 let l = UnixListener::bind(&addr);
816 let a = l.listen().unwrap();
819 let (tx, rx) = channel();
820 let tx2 = tx.clone();
822 let _t = thread::spawn(move|| {
824 tx.send(a.accept()).unwrap()
826 let _t = thread::spawn(move|| {
828 tx2.send(a.accept()).unwrap()
831 let addr2 = addr.clone();
832 let _t = thread::spawn(move|| {
833 let _ = UnixStream::connect(&addr2);
835 let _t = thread::spawn(move|| {
836 let _ = UnixStream::connect(&addr);
839 assert!(rx.recv().unwrap().is_ok());
840 assert!(rx.recv().unwrap().is_ok());
844 fn close_accept_smoke() {
845 let addr = next_test_unix();
846 let l = UnixListener::bind(&addr);
847 let mut a = l.listen().unwrap();
849 a.close_accept().unwrap();
850 assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
854 fn close_accept_concurrent() {
855 let addr = next_test_unix();
856 let l = UnixListener::bind(&addr);
857 let a = l.listen().unwrap();
858 let mut a2 = a.clone();
860 let (tx, rx) = channel();
861 let _t = thread::spawn(move|| {
863 tx.send(a.accept()).unwrap();
865 a2.close_accept().unwrap();
867 assert_eq!(rx.recv().unwrap().err().unwrap().kind, EndOfFile);