1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
15 This module contains the ability to communicate over named pipes with
16 synchronous I/O. On windows, this corresponds to talking over a Named Pipe,
17 while on Unix it corresponds to UNIX domain sockets.
19 These pipes are similar to TCP in the sense that you can have both a stream to a
20 server and a server itself. The server provided accepts other `UnixStream`
25 #![allow(missing_doc)]
29 use io::{Listener, Acceptor, IoResult, IoError, TimedOut, standard_error};
30 use rt::rtio::{IoFactory, LocalIo, RtioUnixListener};
31 use rt::rtio::{RtioUnixAcceptor, RtioPipe};
34 /// A stream which communicates over a named pipe.
35 pub struct UnixStream {
36 obj: Box<RtioPipe + Send>,
40 /// Connect to a pipe named by `path`. This will attempt to open a
41 /// connection to the underlying socket.
43 /// The returned stream will be closed when the object falls out of scope.
48 /// # #![allow(unused_must_use)]
49 /// use std::io::net::unix::UnixStream;
51 /// let server = Path::new("path/to/my/socket");
52 /// let mut stream = UnixStream::connect(&server);
53 /// stream.write([1, 2, 3]);
55 pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> {
56 LocalIo::maybe_raise(|io| {
57 io.unix_connect(&path.to_c_str(), None).map(|p| UnixStream { obj: p })
58 }).map_err(IoError::from_rtio_error)
61 /// Connect to a pipe named by `path`, timing out if the specified number of
64 /// This function is similar to `connect`, except that if `timeout`
65 /// elapses the function will return an error of kind `TimedOut`.
67 /// If a `timeout` with zero or negative duration is specified then
68 /// the function returns `Err`, with the error kind set to `TimedOut`.
69 #[experimental = "the timeout argument is likely to change types"]
70 pub fn connect_timeout<P: ToCStr>(path: &P,
71 timeout: Duration) -> IoResult<UnixStream> {
72 if timeout <= Duration::milliseconds(0) {
73 return Err(standard_error(TimedOut));
76 LocalIo::maybe_raise(|io| {
77 let s = io.unix_connect(&path.to_c_str(), Some(timeout.num_milliseconds() as u64));
78 s.map(|p| UnixStream { obj: p })
79 }).map_err(IoError::from_rtio_error)
83 /// Closes the reading half of this connection.
85 /// This method will close the reading portion of this connection, causing
86 /// all pending and future reads to immediately return with an error.
88 /// Note that this method affects all cloned handles associated with this
89 /// stream, not just this one handle.
90 pub fn close_read(&mut self) -> IoResult<()> {
91 self.obj.close_read().map_err(IoError::from_rtio_error)
94 /// Closes the writing half of this connection.
96 /// This method will close the writing portion of this connection, causing
97 /// all pending and future writes to immediately return with an error.
99 /// Note that this method affects all cloned handles associated with this
100 /// stream, not just this one handle.
101 pub fn close_write(&mut self) -> IoResult<()> {
102 self.obj.close_write().map_err(IoError::from_rtio_error)
105 /// Sets the read/write timeout for this socket.
107 /// For more information, see `TcpStream::set_timeout`
108 #[experimental = "the timeout argument may change in type and value"]
109 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
110 self.obj.set_timeout(timeout_ms)
113 /// Sets the read timeout for this socket.
115 /// For more information, see `TcpStream::set_timeout`
116 #[experimental = "the timeout argument may change in type and value"]
117 pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
118 self.obj.set_read_timeout(timeout_ms)
121 /// Sets the write timeout for this socket.
123 /// For more information, see `TcpStream::set_timeout`
124 #[experimental = "the timeout argument may change in type and value"]
125 pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
126 self.obj.set_write_timeout(timeout_ms)
130 impl Clone for UnixStream {
131 fn clone(&self) -> UnixStream {
132 UnixStream { obj: self.obj.clone() }
136 impl Reader for UnixStream {
137 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
138 self.obj.read(buf).map_err(IoError::from_rtio_error)
142 impl Writer for UnixStream {
143 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
144 self.obj.write(buf).map_err(IoError::from_rtio_error)
148 /// A value that can listen for incoming named pipe connection requests.
149 pub struct UnixListener {
150 /// The internal, opaque runtime Unix listener.
151 obj: Box<RtioUnixListener + Send>,
156 /// Creates a new listener, ready to receive incoming connections on the
157 /// specified socket. The server will be named by `path`.
159 /// This listener will be closed when it falls out of scope.
166 /// # #![allow(unused_must_use)]
167 /// use std::io::net::unix::UnixListener;
168 /// use std::io::{Listener, Acceptor};
170 /// let server = Path::new("/path/to/my/socket");
171 /// let stream = UnixListener::bind(&server);
172 /// for mut client in stream.listen().incoming() {
173 /// client.write([1, 2, 3, 4]);
177 pub fn bind<P: ToCStr>(path: &P) -> IoResult<UnixListener> {
178 LocalIo::maybe_raise(|io| {
179 io.unix_bind(&path.to_c_str()).map(|s| UnixListener { obj: s })
180 }).map_err(IoError::from_rtio_error)
184 impl Listener<UnixStream, UnixAcceptor> for UnixListener {
185 fn listen(self) -> IoResult<UnixAcceptor> {
186 self.obj.listen().map(|obj| {
187 UnixAcceptor { obj: obj }
188 }).map_err(IoError::from_rtio_error)
192 /// A value that can accept named pipe connections, returned from `listen()`.
193 pub struct UnixAcceptor {
194 /// The internal, opaque runtime Unix acceptor.
195 obj: Box<RtioUnixAcceptor + Send>,
199 /// Sets a timeout for this acceptor, after which accept() will no longer
200 /// block indefinitely.
202 /// The argument specified is the amount of time, in milliseconds, into the
203 /// future after which all invocations of accept() will not block (and any
204 /// pending invocation will return). A value of `None` will clear any
205 /// existing timeout.
207 /// When using this method, it is likely necessary to reset the timeout as
208 /// appropriate, the timeout specified is specific to this object, not
209 /// specific to the next request.
210 #[experimental = "the name and arguments to this function are likely \
212 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
213 self.obj.set_timeout(timeout_ms)
216 /// Closes the accepting capabilities of this acceptor.
218 /// This function has the same semantics as `TcpAcceptor::close_accept`, and
219 /// more information can be found in that documentation.
221 pub fn close_accept(&mut self) -> IoResult<()> {
222 self.obj.close_accept().map_err(IoError::from_rtio_error)
226 impl Acceptor<UnixStream> for UnixAcceptor {
227 fn accept(&mut self) -> IoResult<UnixStream> {
228 self.obj.accept().map(|s| {
229 UnixStream { obj: s }
230 }).map_err(IoError::from_rtio_error)
234 impl Clone for UnixAcceptor {
235 /// Creates a new handle to this unix acceptor, allowing for simultaneous
238 /// The underlying unix acceptor will not be closed until all handles to the
239 /// acceptor have been deallocated. Incoming connections will be received on
240 /// at most once acceptor, the same connection will not be accepted twice.
242 /// The `close_accept` method will shut down *all* acceptors cloned from the
243 /// same original acceptor, whereas the `set_timeout` method only affects
244 /// the selector that it is called on.
246 /// This function is useful for creating a handle to invoke `close_accept`
247 /// on to wake up any other task blocked in `accept`.
248 fn clone(&self) -> UnixAcceptor {
249 UnixAcceptor { obj: self.obj.clone() }
254 #[allow(experimental)]
261 pub fn smalltest(server: proc(UnixStream):Send, client: proc(UnixStream):Send) {
262 let path1 = next_test_unix();
263 let path2 = path1.clone();
265 let mut acceptor = UnixListener::bind(&path1).listen();
268 match UnixStream::connect(&path2) {
270 Err(e) => fail!("failed connect: {}", e),
274 match acceptor.accept() {
276 Err(e) => fail!("failed accept: {}", e),
280 iotest!(fn bind_error() {
281 let path = "path/to/nowhere";
282 match UnixListener::bind(&path) {
285 assert!(e.kind == PermissionDenied || e.kind == FileNotFound ||
286 e.kind == InvalidInput);
291 iotest!(fn connect_error() {
292 let path = if cfg!(windows) {
293 r"\\.\pipe\this_should_not_exist_ever"
297 match UnixStream::connect(&path) {
300 assert!(e.kind == FileNotFound || e.kind == OtherIoError);
306 smalltest(proc(mut server) {
308 server.read(buf).unwrap();
309 assert!(buf[0] == 99);
310 }, proc(mut client) {
311 client.write([99]).unwrap();
315 iotest!(fn read_eof() {
316 smalltest(proc(mut server) {
318 assert!(server.read(buf).is_err());
319 assert!(server.read(buf).is_err());
323 } #[ignore(cfg(windows))]) // FIXME(#12516)
325 iotest!(fn write_begone() {
326 smalltest(proc(mut server) {
329 match server.write(buf) {
332 assert!(e.kind == BrokenPipe ||
333 e.kind == NotConnected ||
334 e.kind == ConnectionReset,
335 "unknown error {:?}", e);
345 iotest!(fn accept_lots() {
347 let path1 = next_test_unix();
348 let path2 = path1.clone();
350 let mut acceptor = match UnixListener::bind(&path1).listen() {
352 Err(e) => fail!("failed listen: {}", e),
356 for _ in range(0u, times) {
357 let mut stream = UnixStream::connect(&path2);
358 match stream.write([100]) {
360 Err(e) => fail!("failed write: {}", e)
365 for _ in range(0, times) {
366 let mut client = acceptor.accept();
368 match client.read(buf) {
370 Err(e) => fail!("failed read/accept: {}", e),
372 assert_eq!(buf[0], 100);
377 iotest!(fn path_exists() {
378 let path = next_test_unix();
379 let _acceptor = UnixListener::bind(&path).listen();
380 assert!(path.exists());
383 iotest!(fn unix_clone_smoke() {
384 let addr = next_test_unix();
385 let mut acceptor = UnixListener::bind(&addr).listen();
388 let mut s = UnixStream::connect(&addr);
389 let mut buf = [0, 0];
390 debug!("client reading");
391 assert_eq!(s.read(buf), Ok(1));
392 assert_eq!(buf[0], 1);
393 debug!("client writing");
394 s.write([2]).unwrap();
395 debug!("client dropping");
398 let mut s1 = acceptor.accept().unwrap();
401 let (tx1, rx1) = channel();
402 let (tx2, rx2) = channel();
406 debug!("writer writing");
407 s2.write([1]).unwrap();
408 debug!("writer done");
412 let mut buf = [0, 0];
413 debug!("reader reading");
414 assert_eq!(s1.read(buf), Ok(1));
415 debug!("reader done");
419 iotest!(fn unix_clone_two_read() {
420 let addr = next_test_unix();
421 let mut acceptor = UnixListener::bind(&addr).listen();
422 let (tx1, rx) = channel();
423 let tx2 = tx1.clone();
426 let mut s = UnixStream::connect(&addr);
427 s.write([1]).unwrap();
429 s.write([2]).unwrap();
433 let mut s1 = acceptor.accept().unwrap();
436 let (done, rx) = channel();
439 let mut buf = [0, 0];
440 s2.read(buf).unwrap();
444 let mut buf = [0, 0];
445 s1.read(buf).unwrap();
451 iotest!(fn unix_clone_two_write() {
452 let addr = next_test_unix();
453 let mut acceptor = UnixListener::bind(&addr).listen();
456 let mut s = UnixStream::connect(&addr);
457 let mut buf = [0, 1];
458 s.read(buf).unwrap();
459 s.read(buf).unwrap();
462 let mut s1 = acceptor.accept().unwrap();
465 let (tx, rx) = channel();
468 s2.write([1]).unwrap();
471 s1.write([2]).unwrap();
476 iotest!(fn drop_removes_listener_path() {
477 let path = next_test_unix();
478 let l = UnixListener::bind(&path).unwrap();
479 assert!(path.exists());
481 assert!(!path.exists());
482 } #[cfg(not(windows))])
484 iotest!(fn drop_removes_acceptor_path() {
485 let path = next_test_unix();
486 let l = UnixListener::bind(&path).unwrap();
487 assert!(path.exists());
488 drop(l.listen().unwrap());
489 assert!(!path.exists());
490 } #[cfg(not(windows))])
492 iotest!(fn accept_timeout() {
493 let addr = next_test_unix();
494 let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap();
496 a.set_timeout(Some(10));
498 // Make sure we time out once and future invocations also time out
499 let err = a.accept().err().unwrap();
500 assert_eq!(err.kind, TimedOut);
501 let err = a.accept().err().unwrap();
502 assert_eq!(err.kind, TimedOut);
504 // Also make sure that even though the timeout is expired that we will
505 // continue to receive any pending connections.
506 let (tx, rx) = channel();
507 let addr2 = addr.clone();
509 tx.send(UnixStream::connect(&addr2).unwrap());
512 for i in range(0u, 1001) {
515 Err(ref e) if e.kind == TimedOut => {}
516 Err(e) => fail!("error: {}", e),
518 ::task::deschedule();
519 if i == 1000 { fail!("should have a pending connection") }
523 // Unset the timeout and make sure that this always blocks.
525 let addr2 = addr.clone();
527 drop(UnixStream::connect(&addr2).unwrap());
532 iotest!(fn connect_timeout_error() {
533 let addr = next_test_unix();
534 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_err());
537 iotest!(fn connect_timeout_success() {
538 let addr = next_test_unix();
539 let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
540 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(100)).is_ok());
543 iotest!(fn connect_timeout_zero() {
544 let addr = next_test_unix();
545 let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
546 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(0)).is_err());
549 iotest!(fn connect_timeout_negative() {
550 let addr = next_test_unix();
551 let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
552 assert!(UnixStream::connect_timeout(&addr, Duration::milliseconds(-1)).is_err());
555 iotest!(fn close_readwrite_smoke() {
556 let addr = next_test_unix();
557 let a = UnixListener::bind(&addr).listen().unwrap();
558 let (_tx, rx) = channel::<()>();
561 let _s = a.accept().unwrap();
562 let _ = rx.recv_opt();
566 let mut s = UnixStream::connect(&addr).unwrap();
567 let mut s2 = s.clone();
569 // closing should prevent reads/writes
570 s.close_write().unwrap();
571 assert!(s.write([0]).is_err());
572 s.close_read().unwrap();
573 assert!(s.read(b).is_err());
575 // closing should affect previous handles
576 assert!(s2.write([0]).is_err());
577 assert!(s2.read(b).is_err());
579 // closing should affect new handles
580 let mut s3 = s.clone();
581 assert!(s3.write([0]).is_err());
582 assert!(s3.read(b).is_err());
584 // make sure these don't die
585 let _ = s2.close_read();
586 let _ = s2.close_write();
587 let _ = s3.close_read();
588 let _ = s3.close_write();
591 iotest!(fn close_read_wakes_up() {
592 let addr = next_test_unix();
593 let a = UnixListener::bind(&addr).listen().unwrap();
594 let (_tx, rx) = channel::<()>();
597 let _s = a.accept().unwrap();
598 let _ = rx.recv_opt();
601 let mut s = UnixStream::connect(&addr).unwrap();
603 let (tx, rx) = channel();
606 assert!(s2.read([0]).is_err());
609 // this should wake up the child task
610 s.close_read().unwrap();
612 // this test will never finish if the child doesn't wake up
616 iotest!(fn readwrite_timeouts() {
617 let addr = next_test_unix();
618 let mut a = UnixListener::bind(&addr).listen().unwrap();
619 let (tx, rx) = channel::<()>();
621 let mut s = UnixStream::connect(&addr).unwrap();
623 assert!(s.write([0]).is_ok());
624 let _ = rx.recv_opt();
627 let mut s = a.accept().unwrap();
628 s.set_timeout(Some(20));
629 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
630 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
632 s.set_timeout(Some(20));
633 for i in range(0u, 1001) {
634 match s.write([0, .. 128 * 1024]) {
635 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
636 Err(IoError { kind: TimedOut, .. }) => break,
637 Err(e) => fail!("{}", e),
639 if i == 1000 { fail!("should have filled up?!"); }
642 // I'm not sure as to why, but apparently the write on windows always
643 // succeeds after the previous timeout. Who knows?
645 assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
650 assert_eq!(s.read([0, 0]), Ok(1));
653 iotest!(fn read_timeouts() {
654 let addr = next_test_unix();
655 let mut a = UnixListener::bind(&addr).listen().unwrap();
656 let (tx, rx) = channel::<()>();
658 let mut s = UnixStream::connect(&addr).unwrap();
661 while amt < 100 * 128 * 1024 {
662 match s.read([0, ..128 * 1024]) {
663 Ok(n) => { amt += n; }
664 Err(e) => fail!("{}", e),
667 let _ = rx.recv_opt();
670 let mut s = a.accept().unwrap();
671 s.set_read_timeout(Some(20));
672 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
673 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
676 for _ in range(0u, 100) {
677 assert!(s.write([0, ..128 * 1024]).is_ok());
681 iotest!(fn write_timeouts() {
682 let addr = next_test_unix();
683 let mut a = UnixListener::bind(&addr).listen().unwrap();
684 let (tx, rx) = channel::<()>();
686 let mut s = UnixStream::connect(&addr).unwrap();
688 assert!(s.write([0]).is_ok());
689 let _ = rx.recv_opt();
692 let mut s = a.accept().unwrap();
693 s.set_write_timeout(Some(20));
694 for i in range(0u, 1001) {
695 match s.write([0, .. 128 * 1024]) {
696 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
697 Err(IoError { kind: TimedOut, .. }) => break,
698 Err(e) => fail!("{}", e),
700 if i == 1000 { fail!("should have filled up?!"); }
704 assert!(s.read([0]).is_ok());
707 iotest!(fn timeout_concurrent_read() {
708 let addr = next_test_unix();
709 let mut a = UnixListener::bind(&addr).listen().unwrap();
710 let (tx, rx) = channel::<()>();
712 let mut s = UnixStream::connect(&addr).unwrap();
714 assert!(s.write([0]).is_ok());
715 let _ = rx.recv_opt();
718 let mut s = a.accept().unwrap();
720 let (tx2, rx2) = channel();
723 assert!(s2.read([0]).is_ok());
727 s.set_read_timeout(Some(20));
728 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
735 iotest!(fn clone_accept_smoke() {
736 let addr = next_test_unix();
737 let l = UnixListener::bind(&addr);
738 let mut a = l.listen().unwrap();
739 let mut a2 = a.clone();
741 let addr2 = addr.clone();
743 let _ = UnixStream::connect(&addr2);
746 let _ = UnixStream::connect(&addr);
749 assert!(a.accept().is_ok());
751 assert!(a2.accept().is_ok());
754 iotest!(fn clone_accept_concurrent() {
755 let addr = next_test_unix();
756 let l = UnixListener::bind(&addr);
757 let a = l.listen().unwrap();
760 let (tx, rx) = channel();
761 let tx2 = tx.clone();
763 spawn(proc() { let mut a = a; tx.send(a.accept()) });
764 spawn(proc() { let mut a = a2; tx2.send(a.accept()) });
766 let addr2 = addr.clone();
768 let _ = UnixStream::connect(&addr2);
771 let _ = UnixStream::connect(&addr);
774 assert!(rx.recv().is_ok());
775 assert!(rx.recv().is_ok());
778 iotest!(fn close_accept_smoke() {
779 let addr = next_test_unix();
780 let l = UnixListener::bind(&addr);
781 let mut a = l.listen().unwrap();
783 a.close_accept().unwrap();
784 assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
787 iotest!(fn close_accept_concurrent() {
788 let addr = next_test_unix();
789 let l = UnixListener::bind(&addr);
790 let a = l.listen().unwrap();
791 let mut a2 = a.clone();
793 let (tx, rx) = channel();
798 a2.close_accept().unwrap();
800 assert_eq!(rx.recv().err().unwrap().kind, EndOfFile);