]> git.lizzy.rs Git - rust.git/blob - src/librustuv/pipe.rs
auto merge of #13967 : richo/rust/features/ICE-fails, r=alexcrichton
[rust.git] / src / librustuv / pipe.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use libc;
12 use std::c_str::CString;
13 use std::io::IoError;
14 use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
15
16 use access::Access;
17 use homing::{HomingIO, HomeHandle};
18 use net;
19 use rc::Refcount;
20 use stream::StreamWatcher;
21 use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
22 use uvio::UvIoFactory;
23 use uvll;
24
25 pub struct PipeWatcher {
26     stream: StreamWatcher,
27     home: HomeHandle,
28     defused: bool,
29     refcount: Refcount,
30
31     // see comments in TcpWatcher for why these exist
32     write_access: Access,
33     read_access: Access,
34 }
35
36 pub struct PipeListener {
37     home: HomeHandle,
38     pipe: *uvll::uv_pipe_t,
39     outgoing: Sender<Result<Box<RtioPipe:Send>, IoError>>,
40     incoming: Receiver<Result<Box<RtioPipe:Send>, IoError>>,
41 }
42
43 pub struct PipeAcceptor {
44     listener: Box<PipeListener>,
45     timeout: net::AcceptTimeout,
46 }
47
48 // PipeWatcher implementation and traits
49
50 impl PipeWatcher {
51     // Creates an uninitialized pipe watcher. The underlying uv pipe is ready to
52     // get bound to some other source (this is normally a helper method paired
53     // with another call).
54     pub fn new(io: &mut UvIoFactory, ipc: bool) -> PipeWatcher {
55         let home = io.make_handle();
56         PipeWatcher::new_home(&io.loop_, home, ipc)
57     }
58
59     pub fn new_home(loop_: &Loop, home: HomeHandle, ipc: bool) -> PipeWatcher {
60         let handle = unsafe {
61             let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE);
62             assert!(!handle.is_null());
63             let ipc = ipc as libc::c_int;
64             assert_eq!(uvll::uv_pipe_init(loop_.handle, handle, ipc), 0);
65             handle
66         };
67         PipeWatcher {
68             stream: StreamWatcher::new(handle),
69             home: home,
70             defused: false,
71             refcount: Refcount::new(),
72             read_access: Access::new(),
73             write_access: Access::new(),
74         }
75     }
76
77     pub fn open(io: &mut UvIoFactory, file: libc::c_int)
78         -> Result<PipeWatcher, UvError>
79     {
80         let pipe = PipeWatcher::new(io, false);
81         match unsafe { uvll::uv_pipe_open(pipe.handle(), file) } {
82             0 => Ok(pipe),
83             n => Err(UvError(n))
84         }
85     }
86
87     pub fn connect(io: &mut UvIoFactory, name: &CString, timeout: Option<u64>)
88         -> Result<PipeWatcher, UvError>
89     {
90         let pipe = PipeWatcher::new(io, false);
91         let cx = net::ConnectCtx { status: -1, task: None, timer: None };
92         cx.connect(pipe, timeout, io, |req, pipe, cb| {
93             unsafe {
94                 uvll::uv_pipe_connect(req.handle, pipe.handle(),
95                                       name.with_ref(|p| p), cb)
96             }
97             0
98         })
99     }
100
101     pub fn handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }
102
103     // Unwraps the underlying uv pipe. This cancels destruction of the pipe and
104     // allows the pipe to get moved elsewhere
105     fn unwrap(mut self) -> *uvll::uv_pipe_t {
106         self.defused = true;
107         return self.stream.handle;
108     }
109 }
110
111 impl RtioPipe for PipeWatcher {
112     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
113         let m = self.fire_homing_missile();
114         let _g = self.read_access.grant(m);
115         self.stream.read(buf).map_err(uv_error_to_io_error)
116     }
117
118     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
119         let m = self.fire_homing_missile();
120         let _g = self.write_access.grant(m);
121         self.stream.write(buf).map_err(uv_error_to_io_error)
122     }
123
124     fn clone(&self) -> Box<RtioPipe:Send> {
125         box PipeWatcher {
126             stream: StreamWatcher::new(self.stream.handle),
127             defused: false,
128             home: self.home.clone(),
129             refcount: self.refcount.clone(),
130             read_access: self.read_access.clone(),
131             write_access: self.write_access.clone(),
132         } as Box<RtioPipe:Send>
133     }
134 }
135
136 impl HomingIO for PipeWatcher {
137     fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
138 }
139
140 impl UvHandle<uvll::uv_pipe_t> for PipeWatcher {
141     fn uv_handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }
142 }
143
144 impl Drop for PipeWatcher {
145     fn drop(&mut self) {
146         let _m = self.fire_homing_missile();
147         if !self.defused && self.refcount.decrement() {
148             self.close();
149         }
150     }
151 }
152
153 // PipeListener implementation and traits
154
155 impl PipeListener {
156     pub fn bind(io: &mut UvIoFactory, name: &CString)
157         -> Result<Box<PipeListener>, UvError>
158     {
159         let pipe = PipeWatcher::new(io, false);
160         match unsafe {
161             uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p))
162         } {
163             0 => {
164                 // If successful, unwrap the PipeWatcher because we control how
165                 // we close the pipe differently. We can't rely on
166                 // StreamWatcher's default close method.
167                 let (tx, rx) = channel();
168                 let p = box PipeListener {
169                     home: io.make_handle(),
170                     pipe: pipe.unwrap(),
171                     incoming: rx,
172                     outgoing: tx,
173                 };
174                 Ok(p.install())
175             }
176             n => Err(UvError(n))
177         }
178     }
179 }
180
181 impl RtioUnixListener for PipeListener {
182     fn listen(~self) -> Result<Box<RtioUnixAcceptor:Send>, IoError> {
183         // create the acceptor object from ourselves
184         let mut acceptor = box PipeAcceptor {
185             listener: self,
186             timeout: net::AcceptTimeout::new(),
187         };
188
189         let _m = acceptor.fire_homing_missile();
190         // FIXME: the 128 backlog should be configurable
191         match unsafe { uvll::uv_listen(acceptor.listener.pipe, 128, listen_cb) } {
192             0 => Ok(acceptor as Box<RtioUnixAcceptor:Send>),
193             n => Err(uv_error_to_io_error(UvError(n))),
194         }
195     }
196 }
197
198 impl HomingIO for PipeListener {
199     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
200 }
201
202 impl UvHandle<uvll::uv_pipe_t> for PipeListener {
203     fn uv_handle(&self) -> *uvll::uv_pipe_t { self.pipe }
204 }
205
206 extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) {
207     assert!(status != uvll::ECANCELED);
208
209     let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) };
210     let msg = match status {
211         0 => {
212             let loop_ = Loop::wrap(unsafe {
213                 uvll::get_loop_for_uv_handle(server)
214             });
215             let client = PipeWatcher::new_home(&loop_, pipe.home().clone(), false);
216             assert_eq!(unsafe { uvll::uv_accept(server, client.handle()) }, 0);
217             Ok(box client as Box<RtioPipe:Send>)
218         }
219         n => Err(uv_error_to_io_error(UvError(n)))
220     };
221     pipe.outgoing.send(msg);
222 }
223
224 impl Drop for PipeListener {
225     fn drop(&mut self) {
226         let _m = self.fire_homing_missile();
227         self.close();
228     }
229 }
230
231 // PipeAcceptor implementation and traits
232
233 impl RtioUnixAcceptor for PipeAcceptor {
234     fn accept(&mut self) -> Result<Box<RtioPipe:Send>, IoError> {
235         self.timeout.accept(&self.listener.incoming)
236     }
237
238     fn set_timeout(&mut self, timeout_ms: Option<u64>) {
239         match timeout_ms {
240             None => self.timeout.clear(),
241             Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
242         }
243     }
244 }
245
246 impl HomingIO for PipeAcceptor {
247     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home }
248 }
249
250 #[cfg(test)]
251 mod tests {
252     use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe};
253     use std::io::test::next_test_unix;
254
255     use super::{PipeWatcher, PipeListener};
256     use super::super::local_loop;
257
258     #[test]
259     fn connect_err() {
260         match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str(),
261                                    None) {
262             Ok(..) => fail!(),
263             Err(..) => {}
264         }
265     }
266
267     #[test]
268     fn bind_err() {
269         match PipeListener::bind(local_loop(), &"path/to/nowhere".to_c_str()) {
270             Ok(..) => fail!(),
271             Err(e) => assert_eq!(e.name(), "EACCES".to_owned()),
272         }
273     }
274
275     #[test]
276     fn bind() {
277         let p = next_test_unix().to_c_str();
278         match PipeListener::bind(local_loop(), &p) {
279             Ok(..) => {}
280             Err(..) => fail!(),
281         }
282     }
283
284     #[test] #[should_fail]
285     fn bind_fail() {
286         let p = next_test_unix().to_c_str();
287         let _w = PipeListener::bind(local_loop(), &p).unwrap();
288         fail!();
289     }
290
291     #[test]
292     fn connect() {
293         let path = next_test_unix();
294         let path2 = path.clone();
295         let (tx, rx) = channel();
296
297         spawn(proc() {
298             let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
299             let mut p = p.listen().unwrap();
300             tx.send(());
301             let mut client = p.accept().unwrap();
302             let mut buf = [0];
303             assert!(client.read(buf).unwrap() == 1);
304             assert_eq!(buf[0], 1);
305             assert!(client.write([2]).is_ok());
306         });
307         rx.recv();
308         let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
309         assert!(c.write([1]).is_ok());
310         let mut buf = [0];
311         assert!(c.read(buf).unwrap() == 1);
312         assert_eq!(buf[0], 2);
313     }
314
315     #[test] #[should_fail]
316     fn connect_fail() {
317         let path = next_test_unix();
318         let path2 = path.clone();
319         let (tx, rx) = channel();
320
321         spawn(proc() {
322             let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
323             let mut p = p.listen().unwrap();
324             tx.send(());
325             drop(p.accept().unwrap());
326         });
327         rx.recv();
328         let _c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
329         fail!()
330
331     }
332 }