1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
15 This module contains the ability to communicate over named pipes with
16 synchronous I/O. On windows, this corresponds to talking over a Named Pipe,
17 while on Unix it corresponds to UNIX domain sockets.
19 These pipes are similar to TCP in the sense that you can have both a stream to a
20 server and a server itself. The server provided accepts other `UnixStream`
25 #![allow(missing_doc)]
31 use io::pipe::PipeStream;
32 use io::{Listener, Acceptor, Reader, Writer, IoResult};
34 use rt::rtio::{IoFactory, LocalIo, RtioUnixListener};
35 use rt::rtio::{RtioUnixAcceptor, RtioPipe};
37 /// A stream which communicates over a named pipe.
38 pub struct UnixStream {
43 fn new(obj: ~RtioPipe:Send) -> UnixStream {
44 UnixStream { obj: PipeStream::new(obj) }
47 /// Connect to a pipe named by `path`. This will attempt to open a
48 /// connection to the underlying socket.
50 /// The returned stream will be closed when the object falls out of scope.
55 /// # #[allow(unused_must_use)];
56 /// use std::io::net::unix::UnixStream;
58 /// let server = Path::new("path/to/my/socket");
59 /// let mut stream = UnixStream::connect(&server);
60 /// stream.write([1, 2, 3]);
62 pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> {
63 LocalIo::maybe_raise(|io| {
64 io.unix_connect(&path.to_c_str()).map(UnixStream::new)
69 impl Clone for UnixStream {
70 fn clone(&self) -> UnixStream {
71 UnixStream { obj: self.obj.clone() }
75 impl Reader for UnixStream {
76 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) }
79 impl Writer for UnixStream {
80 fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.obj.write(buf) }
83 /// A value that can listen for incoming named pipe connection requests.
84 pub struct UnixListener {
85 /// The internal, opaque runtime Unix listener.
86 priv obj: ~RtioUnixListener:Send,
91 /// Creates a new listener, ready to receive incoming connections on the
92 /// specified socket. The server will be named by `path`.
94 /// This listener will be closed when it falls out of scope.
101 /// # #[allow(unused_must_use)];
102 /// use std::io::net::unix::UnixListener;
103 /// use std::io::{Listener, Acceptor};
105 /// let server = Path::new("/path/to/my/socket");
106 /// let stream = UnixListener::bind(&server);
107 /// for mut client in stream.listen().incoming() {
108 /// client.write([1, 2, 3, 4]);
112 pub fn bind<P: ToCStr>(path: &P) -> IoResult<UnixListener> {
113 LocalIo::maybe_raise(|io| {
114 io.unix_bind(&path.to_c_str()).map(|s| UnixListener { obj: s })
119 impl Listener<UnixStream, UnixAcceptor> for UnixListener {
120 fn listen(self) -> IoResult<UnixAcceptor> {
121 self.obj.listen().map(|obj| UnixAcceptor { obj: obj })
125 /// A value that can accept named pipe connections, returned from `listen()`.
126 pub struct UnixAcceptor {
127 /// The internal, opaque runtime Unix acceptor.
128 priv obj: ~RtioUnixAcceptor:Send,
131 impl Acceptor<UnixStream> for UnixAcceptor {
132 fn accept(&mut self) -> IoResult<UnixStream> {
133 self.obj.accept().map(UnixStream::new)
144 pub fn smalltest(server: proc:Send(UnixStream), client: proc:Send(UnixStream)) {
145 let path1 = next_test_unix();
146 let path2 = path1.clone();
148 let mut acceptor = UnixListener::bind(&path1).listen();
151 match UnixStream::connect(&path2) {
153 Err(e) => fail!("failed connect: {}", e),
157 match acceptor.accept() {
159 Err(e) => fail!("failed accept: {}", e),
163 iotest!(fn bind_error() {
164 let path = "path/to/nowhere";
165 match UnixListener::bind(&path) {
168 assert!(e.kind == PermissionDenied || e.kind == FileNotFound ||
169 e.kind == InvalidInput);
174 iotest!(fn connect_error() {
175 let path = if cfg!(windows) {
176 r"\\.\pipe\this_should_not_exist_ever"
180 match UnixStream::connect(&path) {
183 assert!(e.kind == FileNotFound || e.kind == OtherIoError);
189 smalltest(proc(mut server) {
191 server.read(buf).unwrap();
192 assert!(buf[0] == 99);
193 }, proc(mut client) {
194 client.write([99]).unwrap();
198 iotest!(fn read_eof() {
199 smalltest(proc(mut server) {
201 assert!(server.read(buf).is_err());
202 assert!(server.read(buf).is_err());
206 } #[ignore(cfg(windows))]) // FIXME(#12516)
208 iotest!(fn write_begone() {
209 smalltest(proc(mut server) {
212 match server.write(buf) {
215 assert!(e.kind == BrokenPipe ||
216 e.kind == NotConnected ||
217 e.kind == ConnectionReset,
218 "unknown error {:?}", e);
228 iotest!(fn accept_lots() {
230 let path1 = next_test_unix();
231 let path2 = path1.clone();
233 let mut acceptor = match UnixListener::bind(&path1).listen() {
235 Err(e) => fail!("failed listen: {}", e),
239 for _ in range(0, times) {
240 let mut stream = UnixStream::connect(&path2);
241 match stream.write([100]) {
243 Err(e) => fail!("failed write: {}", e)
248 for _ in range(0, times) {
249 let mut client = acceptor.accept();
251 match client.read(buf) {
253 Err(e) => fail!("failed read/accept: {}", e),
255 assert_eq!(buf[0], 100);
260 iotest!(fn path_exists() {
261 let path = next_test_unix();
262 let _acceptor = UnixListener::bind(&path).listen();
263 assert!(path.exists());
266 iotest!(fn unix_clone_smoke() {
267 let addr = next_test_unix();
268 let mut acceptor = UnixListener::bind(&addr).listen();
271 let mut s = UnixStream::connect(&addr);
272 let mut buf = [0, 0];
273 debug!("client reading");
274 assert_eq!(s.read(buf), Ok(1));
275 assert_eq!(buf[0], 1);
276 debug!("client writing");
277 s.write([2]).unwrap();
278 debug!("client dropping");
281 let mut s1 = acceptor.accept().unwrap();
284 let (tx1, rx1) = channel();
285 let (tx2, rx2) = channel();
289 debug!("writer writing");
290 s2.write([1]).unwrap();
291 debug!("writer done");
295 let mut buf = [0, 0];
296 debug!("reader reading");
297 assert_eq!(s1.read(buf), Ok(1));
298 debug!("reader done");
302 iotest!(fn unix_clone_two_read() {
303 let addr = next_test_unix();
304 let mut acceptor = UnixListener::bind(&addr).listen();
305 let (tx1, rx) = channel();
306 let tx2 = tx1.clone();
309 let mut s = UnixStream::connect(&addr);
310 s.write([1]).unwrap();
312 s.write([2]).unwrap();
316 let mut s1 = acceptor.accept().unwrap();
319 let (done, rx) = channel();
322 let mut buf = [0, 0];
323 s2.read(buf).unwrap();
327 let mut buf = [0, 0];
328 s1.read(buf).unwrap();
334 iotest!(fn unix_clone_two_write() {
335 let addr = next_test_unix();
336 let mut acceptor = UnixListener::bind(&addr).listen();
339 let mut s = UnixStream::connect(&addr);
340 let mut buf = [0, 1];
341 s.read(buf).unwrap();
342 s.read(buf).unwrap();
345 let mut s1 = acceptor.accept().unwrap();
348 let (tx, rx) = channel();
351 s2.write([1]).unwrap();
354 s1.write([2]).unwrap();