1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
12 use std::c_str::CString;
15 use std::rt::rtio::IoResult;
16 use std::rt::task::BlockedTask;
18 use homing::{HomingIO, HomeHandle};
21 use stream::StreamWatcher;
22 use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
23 use timeout::{AcceptTimeout, ConnectCtx, AccessTimeout};
24 use uvio::UvIoFactory;
27 pub struct PipeWatcher {
28 stream: StreamWatcher,
33 // see comments in TcpWatcher for why these exist
34 write_access: AccessTimeout,
35 read_access: AccessTimeout,
38 pub struct PipeListener {
40 pipe: *uvll::uv_pipe_t,
41 outgoing: Sender<IoResult<Box<rtio::RtioPipe + Send>>>,
42 incoming: Receiver<IoResult<Box<rtio::RtioPipe + Send>>>,
45 pub struct PipeAcceptor {
46 listener: Box<PipeListener>,
47 timeout: AcceptTimeout,
50 // PipeWatcher implementation and traits
53 // Creates an uninitialized pipe watcher. The underlying uv pipe is ready to
54 // get bound to some other source (this is normally a helper method paired
55 // with another call).
56 pub fn new(io: &mut UvIoFactory, ipc: bool) -> PipeWatcher {
57 let home = io.make_handle();
58 PipeWatcher::new_home(&io.loop_, home, ipc)
61 pub fn new_home(loop_: &Loop, home: HomeHandle, ipc: bool) -> PipeWatcher {
63 let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE);
64 assert!(!handle.is_null());
65 let ipc = ipc as libc::c_int;
66 assert_eq!(uvll::uv_pipe_init(loop_.handle, handle, ipc), 0);
70 stream: StreamWatcher::new(handle),
73 refcount: Refcount::new(),
74 read_access: AccessTimeout::new(),
75 write_access: AccessTimeout::new(),
79 pub fn open(io: &mut UvIoFactory, file: libc::c_int)
80 -> Result<PipeWatcher, UvError>
82 let pipe = PipeWatcher::new(io, false);
83 match unsafe { uvll::uv_pipe_open(pipe.handle(), file) } {
89 pub fn connect(io: &mut UvIoFactory, name: &CString, timeout: Option<u64>)
90 -> Result<PipeWatcher, UvError>
92 let pipe = PipeWatcher::new(io, false);
93 let cx = ConnectCtx { status: -1, task: None, timer: None };
94 cx.connect(pipe, timeout, io, |req, pipe, cb| {
96 uvll::uv_pipe_connect(req.handle, pipe.handle(),
97 name.with_ref(|p| p), cb)
103 pub fn handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }
105 // Unwraps the underlying uv pipe. This cancels destruction of the pipe and
106 // allows the pipe to get moved elsewhere
107 fn unwrap(mut self) -> *uvll::uv_pipe_t {
109 return self.stream.handle;
113 impl rtio::RtioPipe for PipeWatcher {
114 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
115 let m = self.fire_homing_missile();
116 let guard = try!(self.read_access.grant(m));
118 // see comments in close_read about this check
119 if guard.access.is_closed() {
120 return Err(uv_error_to_io_error(UvError(uvll::EOF)))
123 self.stream.read(buf).map_err(uv_error_to_io_error)
126 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
127 let m = self.fire_homing_missile();
128 let guard = try!(self.write_access.grant(m));
129 self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
132 fn clone(&self) -> Box<rtio::RtioPipe + Send> {
134 stream: StreamWatcher::new(self.stream.handle),
136 home: self.home.clone(),
137 refcount: self.refcount.clone(),
138 read_access: self.read_access.clone(),
139 write_access: self.write_access.clone(),
140 } as Box<rtio::RtioPipe + Send>
143 fn close_read(&mut self) -> IoResult<()> {
144 // The current uv_shutdown method only shuts the writing half of the
145 // connection, and no method is provided to shut down the reading half
146 // of the connection. With a lack of method, we emulate shutting down
147 // the reading half of the connection by manually returning early from
148 // all future calls to `read`.
150 // Note that we must be careful to ensure that *all* cloned handles see
151 // the closing of the read half, so we stored the "is closed" bit in the
152 // Access struct, not in our own personal watcher. Additionally, the
153 // homing missile is used as a locking mechanism to ensure there is no
154 // contention over this bit.
156 // To shutdown the read half, we must first flag the access as being
157 // closed, and then afterwards we cease any pending read. Note that this
158 // ordering is crucial because we could in theory be rescheduled during
159 // the uv_read_stop which means that another read invocation could leak
160 // in before we set the flag.
162 let m = self.fire_homing_missile();
163 self.read_access.access.close(&m);
164 self.stream.cancel_read(uvll::EOF as libc::ssize_t)
166 let _ = task.map(|t| t.reawaken());
170 fn close_write(&mut self) -> IoResult<()> {
171 let _m = self.fire_homing_missile();
172 net::shutdown(self.stream.handle, &self.uv_loop())
175 fn set_timeout(&mut self, timeout: Option<u64>) {
176 self.set_read_timeout(timeout);
177 self.set_write_timeout(timeout);
180 fn set_read_timeout(&mut self, ms: Option<u64>) {
181 let _m = self.fire_homing_missile();
182 let loop_ = self.uv_loop();
183 self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
184 &self.stream as *_ as uint);
186 fn cancel_read(stream: uint) -> Option<BlockedTask> {
187 let stream: &mut StreamWatcher = unsafe { mem::transmute(stream) };
188 stream.cancel_read(uvll::ECANCELED as libc::ssize_t)
192 fn set_write_timeout(&mut self, ms: Option<u64>) {
193 let _m = self.fire_homing_missile();
194 let loop_ = self.uv_loop();
195 self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
196 &self.stream as *_ as uint);
198 fn cancel_write(stream: uint) -> Option<BlockedTask> {
199 let stream: &mut StreamWatcher = unsafe { mem::transmute(stream) };
200 stream.cancel_write()
205 impl HomingIO for PipeWatcher {
206 fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
209 impl UvHandle<uvll::uv_pipe_t> for PipeWatcher {
210 fn uv_handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }
213 impl Drop for PipeWatcher {
215 let _m = self.fire_homing_missile();
216 if !self.defused && self.refcount.decrement() {
222 // PipeListener implementation and traits
225 pub fn bind(io: &mut UvIoFactory, name: &CString)
226 -> Result<Box<PipeListener>, UvError>
228 let pipe = PipeWatcher::new(io, false);
230 uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p))
233 // If successful, unwrap the PipeWatcher because we control how
234 // we close the pipe differently. We can't rely on
235 // StreamWatcher's default close method.
236 let (tx, rx) = channel();
237 let p = box PipeListener {
238 home: io.make_handle(),
250 impl rtio::RtioUnixListener for PipeListener {
251 fn listen(~self) -> IoResult<Box<rtio::RtioUnixAcceptor + Send>> {
252 // create the acceptor object from ourselves
253 let mut acceptor = box PipeAcceptor {
255 timeout: AcceptTimeout::new(),
258 let _m = acceptor.fire_homing_missile();
259 // FIXME: the 128 backlog should be configurable
260 match unsafe { uvll::uv_listen(acceptor.listener.pipe, 128, listen_cb) } {
261 0 => Ok(acceptor as Box<rtio::RtioUnixAcceptor + Send>),
262 n => Err(uv_error_to_io_error(UvError(n))),
267 impl HomingIO for PipeListener {
268 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
271 impl UvHandle<uvll::uv_pipe_t> for PipeListener {
272 fn uv_handle(&self) -> *uvll::uv_pipe_t { self.pipe }
275 extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) {
276 assert!(status != uvll::ECANCELED);
278 let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) };
279 let msg = match status {
281 let loop_ = Loop::wrap(unsafe {
282 uvll::get_loop_for_uv_handle(server)
284 let client = PipeWatcher::new_home(&loop_, pipe.home().clone(), false);
285 assert_eq!(unsafe { uvll::uv_accept(server, client.handle()) }, 0);
286 Ok(box client as Box<rtio::RtioPipe + Send>)
288 n => Err(uv_error_to_io_error(UvError(n)))
290 pipe.outgoing.send(msg);
293 impl Drop for PipeListener {
295 let _m = self.fire_homing_missile();
300 // PipeAcceptor implementation and traits
302 impl rtio::RtioUnixAcceptor for PipeAcceptor {
303 fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> {
304 self.timeout.accept(&self.listener.incoming)
307 fn set_timeout(&mut self, timeout_ms: Option<u64>) {
309 None => self.timeout.clear(),
310 Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
315 impl HomingIO for PipeAcceptor {
316 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home }
321 use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe};
322 use std::io::test::next_test_unix;
324 use super::{PipeWatcher, PipeListener};
325 use super::super::local_loop;
329 match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str(),
338 match PipeListener::bind(local_loop(), &"path/to/nowhere".to_c_str()) {
340 Err(e) => assert_eq!(e.name(), "EACCES".to_string()),
346 let p = next_test_unix().to_c_str();
347 match PipeListener::bind(local_loop(), &p) {
353 #[test] #[should_fail]
355 let p = next_test_unix().to_c_str();
356 let _w = PipeListener::bind(local_loop(), &p).unwrap();
362 let path = next_test_unix();
363 let path2 = path.clone();
364 let (tx, rx) = channel();
367 let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
368 let mut p = p.listen().ok().unwrap();
370 let mut client = p.accept().ok().unwrap();
372 assert!(client.read(buf).ok().unwrap() == 1);
373 assert_eq!(buf[0], 1);
374 assert!(client.write([2]).is_ok());
377 let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
378 assert!(c.write([1]).is_ok());
380 assert!(c.read(buf).ok().unwrap() == 1);
381 assert_eq!(buf[0], 2);
384 #[test] #[should_fail]
386 let path = next_test_unix();
387 let path2 = path.clone();
388 let (tx, rx) = channel();
391 let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
392 let mut p = p.listen().ok().unwrap();
394 drop(p.accept().ok().unwrap());
397 let _c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();