1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
12 use std::c_str::CString;
14 use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
17 use homing::{HomingIO, HomeHandle};
20 use stream::StreamWatcher;
21 use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
22 use uvio::UvIoFactory;
25 pub struct PipeWatcher {
26 stream: StreamWatcher,
31 // see comments in TcpWatcher for why these exist
36 pub struct PipeListener {
38 pipe: *uvll::uv_pipe_t,
39 outgoing: Sender<Result<Box<RtioPipe:Send>, IoError>>,
40 incoming: Receiver<Result<Box<RtioPipe:Send>, IoError>>,
43 pub struct PipeAcceptor {
44 listener: Box<PipeListener>,
45 timeout: net::AcceptTimeout,
48 // PipeWatcher implementation and traits
51 // Creates an uninitialized pipe watcher. The underlying uv pipe is ready to
52 // get bound to some other source (this is normally a helper method paired
53 // with another call).
54 pub fn new(io: &mut UvIoFactory, ipc: bool) -> PipeWatcher {
55 let home = io.make_handle();
56 PipeWatcher::new_home(&io.loop_, home, ipc)
59 pub fn new_home(loop_: &Loop, home: HomeHandle, ipc: bool) -> PipeWatcher {
61 let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE);
62 assert!(!handle.is_null());
63 let ipc = ipc as libc::c_int;
64 assert_eq!(uvll::uv_pipe_init(loop_.handle, handle, ipc), 0);
68 stream: StreamWatcher::new(handle),
71 refcount: Refcount::new(),
72 read_access: Access::new(),
73 write_access: Access::new(),
77 pub fn open(io: &mut UvIoFactory, file: libc::c_int)
78 -> Result<PipeWatcher, UvError>
80 let pipe = PipeWatcher::new(io, false);
81 match unsafe { uvll::uv_pipe_open(pipe.handle(), file) } {
87 pub fn connect(io: &mut UvIoFactory, name: &CString, timeout: Option<u64>)
88 -> Result<PipeWatcher, UvError>
90 let pipe = PipeWatcher::new(io, false);
91 let cx = net::ConnectCtx { status: -1, task: None, timer: None };
92 cx.connect(pipe, timeout, io, |req, pipe, cb| {
94 uvll::uv_pipe_connect(req.handle, pipe.handle(),
95 name.with_ref(|p| p), cb)
101 pub fn handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }
103 // Unwraps the underlying uv pipe. This cancels destruction of the pipe and
104 // allows the pipe to get moved elsewhere
105 fn unwrap(mut self) -> *uvll::uv_pipe_t {
107 return self.stream.handle;
111 impl RtioPipe for PipeWatcher {
112 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
113 let m = self.fire_homing_missile();
114 let _g = self.read_access.grant(m);
115 self.stream.read(buf).map_err(uv_error_to_io_error)
118 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
119 let m = self.fire_homing_missile();
120 let _g = self.write_access.grant(m);
121 self.stream.write(buf).map_err(uv_error_to_io_error)
124 fn clone(&self) -> Box<RtioPipe:Send> {
126 stream: StreamWatcher::new(self.stream.handle),
128 home: self.home.clone(),
129 refcount: self.refcount.clone(),
130 read_access: self.read_access.clone(),
131 write_access: self.write_access.clone(),
132 } as Box<RtioPipe:Send>
136 impl HomingIO for PipeWatcher {
137 fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
140 impl UvHandle<uvll::uv_pipe_t> for PipeWatcher {
141 fn uv_handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }
144 impl Drop for PipeWatcher {
146 let _m = self.fire_homing_missile();
147 if !self.defused && self.refcount.decrement() {
153 // PipeListener implementation and traits
156 pub fn bind(io: &mut UvIoFactory, name: &CString)
157 -> Result<Box<PipeListener>, UvError>
159 let pipe = PipeWatcher::new(io, false);
161 uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p))
164 // If successful, unwrap the PipeWatcher because we control how
165 // we close the pipe differently. We can't rely on
166 // StreamWatcher's default close method.
167 let (tx, rx) = channel();
168 let p = box PipeListener {
169 home: io.make_handle(),
181 impl RtioUnixListener for PipeListener {
182 fn listen(~self) -> Result<Box<RtioUnixAcceptor:Send>, IoError> {
183 // create the acceptor object from ourselves
184 let mut acceptor = box PipeAcceptor {
186 timeout: net::AcceptTimeout::new(),
189 let _m = acceptor.fire_homing_missile();
190 // FIXME: the 128 backlog should be configurable
191 match unsafe { uvll::uv_listen(acceptor.listener.pipe, 128, listen_cb) } {
192 0 => Ok(acceptor as Box<RtioUnixAcceptor:Send>),
193 n => Err(uv_error_to_io_error(UvError(n))),
198 impl HomingIO for PipeListener {
199 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
202 impl UvHandle<uvll::uv_pipe_t> for PipeListener {
203 fn uv_handle(&self) -> *uvll::uv_pipe_t { self.pipe }
206 extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) {
207 assert!(status != uvll::ECANCELED);
209 let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) };
210 let msg = match status {
212 let loop_ = Loop::wrap(unsafe {
213 uvll::get_loop_for_uv_handle(server)
215 let client = PipeWatcher::new_home(&loop_, pipe.home().clone(), false);
216 assert_eq!(unsafe { uvll::uv_accept(server, client.handle()) }, 0);
217 Ok(box client as Box<RtioPipe:Send>)
219 n => Err(uv_error_to_io_error(UvError(n)))
221 pipe.outgoing.send(msg);
224 impl Drop for PipeListener {
226 let _m = self.fire_homing_missile();
231 // PipeAcceptor implementation and traits
233 impl RtioUnixAcceptor for PipeAcceptor {
234 fn accept(&mut self) -> Result<Box<RtioPipe:Send>, IoError> {
235 self.timeout.accept(&self.listener.incoming)
238 fn set_timeout(&mut self, timeout_ms: Option<u64>) {
240 None => self.timeout.clear(),
241 Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
246 impl HomingIO for PipeAcceptor {
247 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home }
252 use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe};
253 use std::io::test::next_test_unix;
255 use super::{PipeWatcher, PipeListener};
256 use super::super::local_loop;
260 match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str(),
269 match PipeListener::bind(local_loop(), &"path/to/nowhere".to_c_str()) {
271 Err(e) => assert_eq!(e.name(), "EACCES".to_owned()),
277 let p = next_test_unix().to_c_str();
278 match PipeListener::bind(local_loop(), &p) {
284 #[test] #[should_fail]
286 let p = next_test_unix().to_c_str();
287 let _w = PipeListener::bind(local_loop(), &p).unwrap();
293 let path = next_test_unix();
294 let path2 = path.clone();
295 let (tx, rx) = channel();
298 let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
299 let mut p = p.listen().unwrap();
301 let mut client = p.accept().unwrap();
303 assert!(client.read(buf).unwrap() == 1);
304 assert_eq!(buf[0], 1);
305 assert!(client.write([2]).is_ok());
308 let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
309 assert!(c.write([1]).is_ok());
311 assert!(c.read(buf).unwrap() == 1);
312 assert_eq!(buf[0], 2);
315 #[test] #[should_fail]
317 let path = next_test_unix();
318 let path2 = path.clone();
319 let (tx, rx) = channel();
322 let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
323 let mut p = p.listen().unwrap();
325 drop(p.accept().unwrap());
328 let _c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();