1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
15 This module contains the ability to communicate over named pipes with
16 synchronous I/O. On windows, this corresponds to talking over a Named Pipe,
17 while on Unix it corresponds to UNIX domain sockets.
19 These pipes are similar to TCP in the sense that you can have both a stream to a
20 server and a server itself. The server provided accepts other `UnixStream`
25 #![allow(missing_doc)]
31 use io::{Listener, Acceptor, Reader, Writer, IoResult, IoError};
34 use rt::rtio::{IoFactory, LocalIo, RtioUnixListener};
35 use rt::rtio::{RtioUnixAcceptor, RtioPipe};
37 /// A stream which communicates over a named pipe.
38 pub struct UnixStream {
39 obj: Box<RtioPipe:Send>,
43 /// Connect to a pipe named by `path`. This will attempt to open a
44 /// connection to the underlying socket.
46 /// The returned stream will be closed when the object falls out of scope.
51 /// # #![allow(unused_must_use)]
52 /// use std::io::net::unix::UnixStream;
54 /// let server = Path::new("path/to/my/socket");
55 /// let mut stream = UnixStream::connect(&server);
56 /// stream.write([1, 2, 3]);
58 pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> {
59 LocalIo::maybe_raise(|io| {
60 io.unix_connect(&path.to_c_str(), None).map(|p| UnixStream { obj: p })
61 }).map_err(IoError::from_rtio_error)
64 /// Connect to a pipe named by `path`, timing out if the specified number of
67 /// This function is similar to `connect`, except that if `timeout_ms`
68 /// elapses the function will return an error of kind `TimedOut`.
69 #[experimental = "the timeout argument is likely to change types"]
70 pub fn connect_timeout<P: ToCStr>(path: &P,
71 timeout_ms: u64) -> IoResult<UnixStream> {
72 LocalIo::maybe_raise(|io| {
73 let s = io.unix_connect(&path.to_c_str(), Some(timeout_ms));
74 s.map(|p| UnixStream { obj: p })
75 }).map_err(IoError::from_rtio_error)
79 /// Closes the reading half of this connection.
81 /// This method will close the reading portion of this connection, causing
82 /// all pending and future reads to immediately return with an error.
84 /// Note that this method affects all cloned handles associated with this
85 /// stream, not just this one handle.
86 pub fn close_read(&mut self) -> IoResult<()> {
87 self.obj.close_read().map_err(IoError::from_rtio_error)
90 /// Closes the writing half of this connection.
92 /// This method will close the writing portion of this connection, causing
93 /// all pending and future writes to immediately return with an error.
95 /// Note that this method affects all cloned handles associated with this
96 /// stream, not just this one handle.
97 pub fn close_write(&mut self) -> IoResult<()> {
98 self.obj.close_write().map_err(IoError::from_rtio_error)
101 /// Sets the read/write timeout for this socket.
103 /// For more information, see `TcpStream::set_timeout`
104 #[experimental = "the timeout argument may change in type and value"]
105 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
106 self.obj.set_timeout(timeout_ms)
109 /// Sets the read timeout for this socket.
111 /// For more information, see `TcpStream::set_timeout`
112 #[experimental = "the timeout argument may change in type and value"]
113 pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
114 self.obj.set_read_timeout(timeout_ms)
117 /// Sets the write timeout for this socket.
119 /// For more information, see `TcpStream::set_timeout`
120 #[experimental = "the timeout argument may change in type and value"]
121 pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
122 self.obj.set_write_timeout(timeout_ms)
126 impl Clone for UnixStream {
127 fn clone(&self) -> UnixStream {
128 UnixStream { obj: self.obj.clone() }
132 impl Reader for UnixStream {
133 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
134 self.obj.read(buf).map_err(IoError::from_rtio_error)
138 impl Writer for UnixStream {
139 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
140 self.obj.write(buf).map_err(IoError::from_rtio_error)
144 /// A value that can listen for incoming named pipe connection requests.
145 pub struct UnixListener {
146 /// The internal, opaque runtime Unix listener.
147 obj: Box<RtioUnixListener:Send>,
152 /// Creates a new listener, ready to receive incoming connections on the
153 /// specified socket. The server will be named by `path`.
155 /// This listener will be closed when it falls out of scope.
162 /// # #![allow(unused_must_use)]
163 /// use std::io::net::unix::UnixListener;
164 /// use std::io::{Listener, Acceptor};
166 /// let server = Path::new("/path/to/my/socket");
167 /// let stream = UnixListener::bind(&server);
168 /// for mut client in stream.listen().incoming() {
169 /// client.write([1, 2, 3, 4]);
173 pub fn bind<P: ToCStr>(path: &P) -> IoResult<UnixListener> {
174 LocalIo::maybe_raise(|io| {
175 io.unix_bind(&path.to_c_str()).map(|s| UnixListener { obj: s })
176 }).map_err(IoError::from_rtio_error)
180 impl Listener<UnixStream, UnixAcceptor> for UnixListener {
181 fn listen(self) -> IoResult<UnixAcceptor> {
182 self.obj.listen().map(|obj| {
183 UnixAcceptor { obj: obj }
184 }).map_err(IoError::from_rtio_error)
188 /// A value that can accept named pipe connections, returned from `listen()`.
189 pub struct UnixAcceptor {
190 /// The internal, opaque runtime Unix acceptor.
191 obj: Box<RtioUnixAcceptor:Send>,
195 /// Sets a timeout for this acceptor, after which accept() will no longer
196 /// block indefinitely.
198 /// The argument specified is the amount of time, in milliseconds, into the
199 /// future after which all invocations of accept() will not block (and any
200 /// pending invocation will return). A value of `None` will clear any
201 /// existing timeout.
203 /// When using this method, it is likely necessary to reset the timeout as
204 /// appropriate, the timeout specified is specific to this object, not
205 /// specific to the next request.
206 #[experimental = "the name and arguments to this function are likely \
208 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
209 self.obj.set_timeout(timeout_ms)
213 impl Acceptor<UnixStream> for UnixAcceptor {
214 fn accept(&mut self) -> IoResult<UnixStream> {
215 self.obj.accept().map(|s| {
216 UnixStream { obj: s }
217 }).map_err(IoError::from_rtio_error)
222 #[allow(experimental)]
229 pub fn smalltest(server: proc(UnixStream):Send, client: proc(UnixStream):Send) {
230 let path1 = next_test_unix();
231 let path2 = path1.clone();
233 let mut acceptor = UnixListener::bind(&path1).listen();
236 match UnixStream::connect(&path2) {
238 Err(e) => fail!("failed connect: {}", e),
242 match acceptor.accept() {
244 Err(e) => fail!("failed accept: {}", e),
248 iotest!(fn bind_error() {
249 let path = "path/to/nowhere";
250 match UnixListener::bind(&path) {
253 assert!(e.kind == PermissionDenied || e.kind == FileNotFound ||
254 e.kind == InvalidInput);
259 iotest!(fn connect_error() {
260 let path = if cfg!(windows) {
261 r"\\.\pipe\this_should_not_exist_ever"
265 match UnixStream::connect(&path) {
268 assert!(e.kind == FileNotFound || e.kind == OtherIoError);
274 smalltest(proc(mut server) {
276 server.read(buf).unwrap();
277 assert!(buf[0] == 99);
278 }, proc(mut client) {
279 client.write([99]).unwrap();
283 iotest!(fn read_eof() {
284 smalltest(proc(mut server) {
286 assert!(server.read(buf).is_err());
287 assert!(server.read(buf).is_err());
291 } #[ignore(cfg(windows))]) // FIXME(#12516)
293 iotest!(fn write_begone() {
294 smalltest(proc(mut server) {
297 match server.write(buf) {
300 assert!(e.kind == BrokenPipe ||
301 e.kind == NotConnected ||
302 e.kind == ConnectionReset,
303 "unknown error {:?}", e);
313 iotest!(fn accept_lots() {
315 let path1 = next_test_unix();
316 let path2 = path1.clone();
318 let mut acceptor = match UnixListener::bind(&path1).listen() {
320 Err(e) => fail!("failed listen: {}", e),
324 for _ in range(0, times) {
325 let mut stream = UnixStream::connect(&path2);
326 match stream.write([100]) {
328 Err(e) => fail!("failed write: {}", e)
333 for _ in range(0, times) {
334 let mut client = acceptor.accept();
336 match client.read(buf) {
338 Err(e) => fail!("failed read/accept: {}", e),
340 assert_eq!(buf[0], 100);
345 iotest!(fn path_exists() {
346 let path = next_test_unix();
347 let _acceptor = UnixListener::bind(&path).listen();
348 assert!(path.exists());
351 iotest!(fn unix_clone_smoke() {
352 let addr = next_test_unix();
353 let mut acceptor = UnixListener::bind(&addr).listen();
356 let mut s = UnixStream::connect(&addr);
357 let mut buf = [0, 0];
358 debug!("client reading");
359 assert_eq!(s.read(buf), Ok(1));
360 assert_eq!(buf[0], 1);
361 debug!("client writing");
362 s.write([2]).unwrap();
363 debug!("client dropping");
366 let mut s1 = acceptor.accept().unwrap();
369 let (tx1, rx1) = channel();
370 let (tx2, rx2) = channel();
374 debug!("writer writing");
375 s2.write([1]).unwrap();
376 debug!("writer done");
380 let mut buf = [0, 0];
381 debug!("reader reading");
382 assert_eq!(s1.read(buf), Ok(1));
383 debug!("reader done");
387 iotest!(fn unix_clone_two_read() {
388 let addr = next_test_unix();
389 let mut acceptor = UnixListener::bind(&addr).listen();
390 let (tx1, rx) = channel();
391 let tx2 = tx1.clone();
394 let mut s = UnixStream::connect(&addr);
395 s.write([1]).unwrap();
397 s.write([2]).unwrap();
401 let mut s1 = acceptor.accept().unwrap();
404 let (done, rx) = channel();
407 let mut buf = [0, 0];
408 s2.read(buf).unwrap();
412 let mut buf = [0, 0];
413 s1.read(buf).unwrap();
419 iotest!(fn unix_clone_two_write() {
420 let addr = next_test_unix();
421 let mut acceptor = UnixListener::bind(&addr).listen();
424 let mut s = UnixStream::connect(&addr);
425 let mut buf = [0, 1];
426 s.read(buf).unwrap();
427 s.read(buf).unwrap();
430 let mut s1 = acceptor.accept().unwrap();
433 let (tx, rx) = channel();
436 s2.write([1]).unwrap();
439 s1.write([2]).unwrap();
444 iotest!(fn drop_removes_listener_path() {
445 let path = next_test_unix();
446 let l = UnixListener::bind(&path).unwrap();
447 assert!(path.exists());
449 assert!(!path.exists());
450 } #[cfg(not(windows))])
452 iotest!(fn drop_removes_acceptor_path() {
453 let path = next_test_unix();
454 let l = UnixListener::bind(&path).unwrap();
455 assert!(path.exists());
456 drop(l.listen().unwrap());
457 assert!(!path.exists());
458 } #[cfg(not(windows))])
460 iotest!(fn accept_timeout() {
461 let addr = next_test_unix();
462 let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap();
464 a.set_timeout(Some(10));
466 // Make sure we time out once and future invocations also time out
467 let err = a.accept().err().unwrap();
468 assert_eq!(err.kind, TimedOut);
469 let err = a.accept().err().unwrap();
470 assert_eq!(err.kind, TimedOut);
472 // Also make sure that even though the timeout is expired that we will
473 // continue to receive any pending connections.
474 let (tx, rx) = channel();
475 let addr2 = addr.clone();
477 tx.send(UnixStream::connect(&addr2).unwrap());
480 for i in range(0, 1001) {
483 Err(ref e) if e.kind == TimedOut => {}
484 Err(e) => fail!("error: {}", e),
486 ::task::deschedule();
487 if i == 1000 { fail!("should have a pending connection") }
491 // Unset the timeout and make sure that this always blocks.
493 let addr2 = addr.clone();
495 drop(UnixStream::connect(&addr2).unwrap());
500 iotest!(fn connect_timeout_error() {
501 let addr = next_test_unix();
502 assert!(UnixStream::connect_timeout(&addr, 100).is_err());
505 iotest!(fn connect_timeout_success() {
506 let addr = next_test_unix();
507 let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
508 assert!(UnixStream::connect_timeout(&addr, 100).is_ok());
511 iotest!(fn close_readwrite_smoke() {
512 let addr = next_test_unix();
513 let a = UnixListener::bind(&addr).listen().unwrap();
514 let (_tx, rx) = channel::<()>();
517 let _s = a.accept().unwrap();
518 let _ = rx.recv_opt();
522 let mut s = UnixStream::connect(&addr).unwrap();
523 let mut s2 = s.clone();
525 // closing should prevent reads/writes
526 s.close_write().unwrap();
527 assert!(s.write([0]).is_err());
528 s.close_read().unwrap();
529 assert!(s.read(b).is_err());
531 // closing should affect previous handles
532 assert!(s2.write([0]).is_err());
533 assert!(s2.read(b).is_err());
535 // closing should affect new handles
536 let mut s3 = s.clone();
537 assert!(s3.write([0]).is_err());
538 assert!(s3.read(b).is_err());
540 // make sure these don't die
541 let _ = s2.close_read();
542 let _ = s2.close_write();
543 let _ = s3.close_read();
544 let _ = s3.close_write();
547 iotest!(fn close_read_wakes_up() {
548 let addr = next_test_unix();
549 let a = UnixListener::bind(&addr).listen().unwrap();
550 let (_tx, rx) = channel::<()>();
553 let _s = a.accept().unwrap();
554 let _ = rx.recv_opt();
557 let mut s = UnixStream::connect(&addr).unwrap();
559 let (tx, rx) = channel();
562 assert!(s2.read([0]).is_err());
565 // this should wake up the child task
566 s.close_read().unwrap();
568 // this test will never finish if the child doesn't wake up
572 iotest!(fn readwrite_timeouts() {
573 let addr = next_test_unix();
574 let mut a = UnixListener::bind(&addr).listen().unwrap();
575 let (tx, rx) = channel::<()>();
577 let mut s = UnixStream::connect(&addr).unwrap();
579 assert!(s.write([0]).is_ok());
580 let _ = rx.recv_opt();
583 let mut s = a.accept().unwrap();
584 s.set_timeout(Some(20));
585 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
586 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
588 s.set_timeout(Some(20));
589 for i in range(0, 1001) {
590 match s.write([0, .. 128 * 1024]) {
591 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
592 Err(IoError { kind: TimedOut, .. }) => break,
593 Err(e) => fail!("{}", e),
595 if i == 1000 { fail!("should have filled up?!"); }
598 // I'm not sure as to why, but apparently the write on windows always
599 // succeeds after the previous timeout. Who knows?
601 assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
606 assert_eq!(s.read([0, 0]), Ok(1));
609 iotest!(fn read_timeouts() {
610 let addr = next_test_unix();
611 let mut a = UnixListener::bind(&addr).listen().unwrap();
612 let (tx, rx) = channel::<()>();
614 let mut s = UnixStream::connect(&addr).unwrap();
617 while amt < 100 * 128 * 1024 {
618 match s.read([0, ..128 * 1024]) {
619 Ok(n) => { amt += n; }
620 Err(e) => fail!("{}", e),
623 let _ = rx.recv_opt();
626 let mut s = a.accept().unwrap();
627 s.set_read_timeout(Some(20));
628 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
629 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
632 for _ in range(0, 100) {
633 assert!(s.write([0, ..128 * 1024]).is_ok());
637 iotest!(fn write_timeouts() {
638 let addr = next_test_unix();
639 let mut a = UnixListener::bind(&addr).listen().unwrap();
640 let (tx, rx) = channel::<()>();
642 let mut s = UnixStream::connect(&addr).unwrap();
644 assert!(s.write([0]).is_ok());
645 let _ = rx.recv_opt();
648 let mut s = a.accept().unwrap();
649 s.set_write_timeout(Some(20));
650 for i in range(0, 1001) {
651 match s.write([0, .. 128 * 1024]) {
652 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
653 Err(IoError { kind: TimedOut, .. }) => break,
654 Err(e) => fail!("{}", e),
656 if i == 1000 { fail!("should have filled up?!"); }
660 assert!(s.read([0]).is_ok());
663 iotest!(fn timeout_concurrent_read() {
664 let addr = next_test_unix();
665 let mut a = UnixListener::bind(&addr).listen().unwrap();
666 let (tx, rx) = channel::<()>();
668 let mut s = UnixStream::connect(&addr).unwrap();
670 assert!(s.write([0]).is_ok());
671 let _ = rx.recv_opt();
674 let mut s = a.accept().unwrap();
676 let (tx2, rx2) = channel();
679 assert!(s2.read([0]).is_ok());
683 s.set_read_timeout(Some(20));
684 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);