1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
12 use std::c_str::CString;
15 use std::rt::rtio::IoResult;
16 use std::rt::task::BlockedTask;
18 use homing::{HomingIO, HomeHandle};
21 use stream::StreamWatcher;
22 use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
23 use timeout::{AcceptTimeout, ConnectCtx, AccessTimeout};
24 use uvio::UvIoFactory;
27 pub struct PipeWatcher {
28 stream: StreamWatcher,
33 // see comments in TcpWatcher for why these exist
34 write_access: AccessTimeout<()>,
35 read_access: AccessTimeout<()>,
38 pub struct PipeListener {
40 pipe: *mut uvll::uv_pipe_t,
43 pub struct PipeAcceptor {
45 handle: *mut uvll::uv_pipe_t,
46 access: AcceptTimeout<Box<rtio::RtioPipe + Send>>,
50 // PipeWatcher implementation and traits
53 // Creates an uninitialized pipe watcher. The underlying uv pipe is ready to
54 // get bound to some other source (this is normally a helper method paired
55 // with another call).
56 pub fn new(io: &mut UvIoFactory, ipc: bool) -> PipeWatcher {
57 let home = io.make_handle();
58 PipeWatcher::new_home(&io.loop_, home, ipc)
61 pub fn new_home(loop_: &Loop, home: HomeHandle, ipc: bool) -> PipeWatcher {
63 let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE);
64 assert!(!handle.is_null());
65 let ipc = ipc as libc::c_int;
66 assert_eq!(uvll::uv_pipe_init(loop_.handle, handle, ipc), 0);
70 stream: StreamWatcher::new(handle, true),
73 refcount: Refcount::new(),
74 read_access: AccessTimeout::new(()),
75 write_access: AccessTimeout::new(()),
79 pub fn open(io: &mut UvIoFactory, file: libc::c_int)
80 -> Result<PipeWatcher, UvError>
82 let pipe = PipeWatcher::new(io, false);
83 match unsafe { uvll::uv_pipe_open(pipe.handle(), file) } {
89 pub fn connect(io: &mut UvIoFactory, name: &CString, timeout: Option<u64>)
90 -> Result<PipeWatcher, UvError>
92 let pipe = PipeWatcher::new(io, false);
93 let cx = ConnectCtx { status: -1, task: None, timer: None };
94 cx.connect(pipe, timeout, io, |req, pipe, cb| {
96 uvll::uv_pipe_connect(req.handle, pipe.handle(),
103 pub fn handle(&self) -> *mut uvll::uv_pipe_t { self.stream.handle }
105 // Unwraps the underlying uv pipe. This cancels destruction of the pipe and
106 // allows the pipe to get moved elsewhere
107 fn unwrap(mut self) -> *mut uvll::uv_pipe_t {
109 return self.stream.handle;
113 impl rtio::RtioPipe for PipeWatcher {
114 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
115 let m = self.fire_homing_missile();
116 let guard = try!(self.read_access.grant(m));
118 // see comments in close_read about this check
119 if guard.access.is_closed() {
120 return Err(uv_error_to_io_error(UvError(uvll::EOF)))
123 self.stream.read(buf).map_err(uv_error_to_io_error)
126 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
127 let m = self.fire_homing_missile();
128 let guard = try!(self.write_access.grant(m));
129 self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
132 fn clone(&self) -> Box<rtio::RtioPipe + Send> {
134 stream: StreamWatcher::new(self.stream.handle, false),
136 home: self.home.clone(),
137 refcount: self.refcount.clone(),
138 read_access: self.read_access.clone(),
139 write_access: self.write_access.clone(),
140 } as Box<rtio::RtioPipe + Send>
143 fn close_read(&mut self) -> IoResult<()> {
144 // The current uv_shutdown method only shuts the writing half of the
145 // connection, and no method is provided to shut down the reading half
146 // of the connection. With a lack of method, we emulate shutting down
147 // the reading half of the connection by manually returning early from
148 // all future calls to `read`.
150 // Note that we must be careful to ensure that *all* cloned handles see
151 // the closing of the read half, so we stored the "is closed" bit in the
152 // Access struct, not in our own personal watcher. Additionally, the
153 // homing missile is used as a locking mechanism to ensure there is no
154 // contention over this bit.
156 // To shutdown the read half, we must first flag the access as being
157 // closed, and then afterwards we cease any pending read. Note that this
158 // ordering is crucial because we could in theory be rescheduled during
159 // the uv_read_stop which means that another read invocation could leak
160 // in before we set the flag.
162 let m = self.fire_homing_missile();
163 self.read_access.access.close(&m);
164 self.stream.cancel_read(uvll::EOF as libc::ssize_t)
166 let _ = task.map(|t| t.reawaken());
170 fn close_write(&mut self) -> IoResult<()> {
171 let _m = self.fire_homing_missile();
172 net::shutdown(self.stream.handle, &self.uv_loop())
175 fn set_timeout(&mut self, timeout: Option<u64>) {
176 self.set_read_timeout(timeout);
177 self.set_write_timeout(timeout);
180 fn set_read_timeout(&mut self, ms: Option<u64>) {
181 let _m = self.fire_homing_missile();
182 let loop_ = self.uv_loop();
183 self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
184 &self.stream as *const _ as uint);
186 fn cancel_read(stream: uint) -> Option<BlockedTask> {
187 let stream: &mut StreamWatcher = unsafe { mem::transmute(stream) };
188 stream.cancel_read(uvll::ECANCELED as libc::ssize_t)
192 fn set_write_timeout(&mut self, ms: Option<u64>) {
193 let _m = self.fire_homing_missile();
194 let loop_ = self.uv_loop();
195 self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
196 &self.stream as *const _ as uint);
198 fn cancel_write(stream: uint) -> Option<BlockedTask> {
199 let stream: &mut StreamWatcher = unsafe { mem::transmute(stream) };
200 stream.cancel_write()
205 impl HomingIO for PipeWatcher {
206 fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
209 impl UvHandle<uvll::uv_pipe_t> for PipeWatcher {
210 fn uv_handle(&self) -> *mut uvll::uv_pipe_t { self.stream.handle }
213 impl Drop for PipeWatcher {
215 let _m = self.fire_homing_missile();
216 if !self.defused && self.refcount.decrement() {
222 // PipeListener implementation and traits
225 pub fn bind(io: &mut UvIoFactory, name: &CString)
226 -> Result<Box<PipeListener>, UvError>
228 let pipe = PipeWatcher::new(io, false);
230 uvll::uv_pipe_bind(pipe.handle(), name.as_ptr())
233 // If successful, unwrap the PipeWatcher because we control how
234 // we close the pipe differently. We can't rely on
235 // StreamWatcher's default close method.
236 let p = box PipeListener {
237 home: io.make_handle(),
247 impl rtio::RtioUnixListener for PipeListener {
248 fn listen(self: Box<PipeListener>)
249 -> IoResult<Box<rtio::RtioUnixAcceptor + Send>> {
250 let _m = self.fire_homing_missile();
252 // create the acceptor object from ourselves
253 let acceptor = (box PipeAcceptor {
255 home: self.home.clone(),
256 access: AcceptTimeout::new(),
257 refcount: Refcount::new(),
259 self.pipe = 0 as *mut _;
261 // FIXME: the 128 backlog should be configurable
262 match unsafe { uvll::uv_listen(acceptor.handle, 128, listen_cb) } {
263 0 => Ok(acceptor as Box<rtio::RtioUnixAcceptor + Send>),
264 n => Err(uv_error_to_io_error(UvError(n))),
269 impl HomingIO for PipeListener {
270 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
273 impl UvHandle<uvll::uv_pipe_t> for PipeListener {
274 fn uv_handle(&self) -> *mut uvll::uv_pipe_t { self.pipe }
277 extern fn listen_cb(server: *mut uvll::uv_stream_t, status: libc::c_int) {
278 assert!(status != uvll::ECANCELED);
280 let pipe: &mut PipeAcceptor = unsafe { UvHandle::from_uv_handle(&server) };
281 let msg = match status {
283 let loop_ = Loop::wrap(unsafe {
284 uvll::get_loop_for_uv_handle(server)
286 let client = PipeWatcher::new_home(&loop_, pipe.home().clone(), false);
287 assert_eq!(unsafe { uvll::uv_accept(server, client.handle()) }, 0);
288 Ok(box client as Box<rtio::RtioPipe + Send>)
290 n => Err(uv_error_to_io_error(UvError(n)))
293 // If we're running then we have exclusive access, so the unsafe_get() is ok
294 unsafe { pipe.access.push(msg); }
297 impl Drop for PipeListener {
299 if self.pipe.is_null() { return }
301 let _m = self.fire_homing_missile();
306 // PipeAcceptor implementation and traits
308 impl rtio::RtioUnixAcceptor for PipeAcceptor {
309 fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> {
310 let m = self.fire_homing_missile();
311 let loop_ = self.uv_loop();
312 self.access.accept(m, &loop_)
315 fn set_timeout(&mut self, ms: Option<u64>) {
316 let _m = self.fire_homing_missile();
317 let loop_ = self.uv_loop();
318 self.access.set_timeout(ms, &loop_, &self.home);
321 fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
323 refcount: self.refcount.clone(),
324 home: self.home.clone(),
326 access: self.access.clone(),
327 } as Box<rtio::RtioUnixAcceptor + Send>
330 fn close_accept(&mut self) -> IoResult<()> {
331 let m = self.fire_homing_missile();
332 self.access.close(m);
337 impl HomingIO for PipeAcceptor {
338 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
341 impl UvHandle<uvll::uv_pipe_t> for PipeAcceptor {
342 fn uv_handle(&self) -> *mut uvll::uv_pipe_t { self.handle }
345 impl Drop for PipeAcceptor {
347 let _m = self.fire_homing_missile();
348 if self.refcount.decrement() {
356 use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe};
357 use std::io::test::next_test_unix;
359 use super::{PipeWatcher, PipeListener};
360 use super::super::local_loop;
364 match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str(),
373 match PipeListener::bind(local_loop(), &"path/to/nowhere".to_c_str()) {
375 Err(e) => assert_eq!(e.name(), "EACCES".to_string()),
381 let p = next_test_unix().to_c_str();
382 match PipeListener::bind(local_loop(), &p) {
388 #[test] #[should_fail]
390 let p = next_test_unix().to_c_str();
391 let _w = PipeListener::bind(local_loop(), &p).unwrap();
397 let path = next_test_unix();
398 let path2 = path.clone();
399 let (tx, rx) = channel();
402 let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
403 let mut p = p.listen().ok().unwrap();
405 let mut client = p.accept().ok().unwrap();
407 assert!(client.read(buf).ok().unwrap() == 1);
408 assert_eq!(buf[0], 1);
409 assert!(client.write([2]).is_ok());
412 let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
413 assert!(c.write([1]).is_ok());
415 assert!(c.read(buf).ok().unwrap() == 1);
416 assert_eq!(buf[0], 2);
419 #[test] #[should_fail]
421 let path = next_test_unix();
422 let path2 = path.clone();
423 let (tx, rx) = channel();
426 let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
427 let mut p = p.listen().ok().unwrap();
429 drop(p.accept().ok().unwrap());
432 let _c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();