]> git.lizzy.rs Git - rust.git/blob - src/librustuv/pipe.rs
aa89e5e5f034e24cdafaa8758760a03241e7796b
[rust.git] / src / librustuv / pipe.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use libc;
12 use std::c_str::CString;
13 use std::mem;
14 use std::rt::rtio;
15 use std::rt::rtio::IoResult;
16 use std::rt::task::BlockedTask;
17
18 use homing::{HomingIO, HomeHandle};
19 use net;
20 use rc::Refcount;
21 use stream::StreamWatcher;
22 use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
23 use timeout::{AcceptTimeout, ConnectCtx, AccessTimeout};
24 use uvio::UvIoFactory;
25 use uvll;
26
27 pub struct PipeWatcher {
28     stream: StreamWatcher,
29     home: HomeHandle,
30     defused: bool,
31     refcount: Refcount,
32
33     // see comments in TcpWatcher for why these exist
34     write_access: AccessTimeout<()>,
35     read_access: AccessTimeout<()>,
36 }
37
38 pub struct PipeListener {
39     home: HomeHandle,
40     pipe: *mut uvll::uv_pipe_t,
41 }
42
43 pub struct PipeAcceptor {
44     home: HomeHandle,
45     handle: *mut uvll::uv_pipe_t,
46     access: AcceptTimeout<Box<rtio::RtioPipe + Send>>,
47     refcount: Refcount,
48 }
49
50 // PipeWatcher implementation and traits
51
52 impl PipeWatcher {
53     // Creates an uninitialized pipe watcher. The underlying uv pipe is ready to
54     // get bound to some other source (this is normally a helper method paired
55     // with another call).
56     pub fn new(io: &mut UvIoFactory, ipc: bool) -> PipeWatcher {
57         let home = io.make_handle();
58         PipeWatcher::new_home(&io.loop_, home, ipc)
59     }
60
61     pub fn new_home(loop_: &Loop, home: HomeHandle, ipc: bool) -> PipeWatcher {
62         let handle = unsafe {
63             let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE);
64             assert!(!handle.is_null());
65             let ipc = ipc as libc::c_int;
66             assert_eq!(uvll::uv_pipe_init(loop_.handle, handle, ipc), 0);
67             handle
68         };
69         PipeWatcher {
70             stream: StreamWatcher::new(handle, true),
71             home: home,
72             defused: false,
73             refcount: Refcount::new(),
74             read_access: AccessTimeout::new(()),
75             write_access: AccessTimeout::new(()),
76         }
77     }
78
79     pub fn open(io: &mut UvIoFactory, file: libc::c_int)
80         -> Result<PipeWatcher, UvError>
81     {
82         let pipe = PipeWatcher::new(io, false);
83         match unsafe { uvll::uv_pipe_open(pipe.handle(), file) } {
84             0 => Ok(pipe),
85             n => Err(UvError(n))
86         }
87     }
88
89     pub fn connect(io: &mut UvIoFactory, name: &CString, timeout: Option<u64>)
90         -> Result<PipeWatcher, UvError>
91     {
92         let pipe = PipeWatcher::new(io, false);
93         let cx = ConnectCtx { status: -1, task: None, timer: None };
94         cx.connect(pipe, timeout, io, |req, pipe, cb| {
95             unsafe {
96                 uvll::uv_pipe_connect(req.handle, pipe.handle(),
97                                       name.as_ptr(), cb)
98             }
99             0
100         })
101     }
102
103     pub fn handle(&self) -> *mut uvll::uv_pipe_t { self.stream.handle }
104
105     // Unwraps the underlying uv pipe. This cancels destruction of the pipe and
106     // allows the pipe to get moved elsewhere
107     fn unwrap(mut self) -> *mut uvll::uv_pipe_t {
108         self.defused = true;
109         return self.stream.handle;
110     }
111 }
112
113 impl rtio::RtioPipe for PipeWatcher {
114     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
115         let m = self.fire_homing_missile();
116         let guard = try!(self.read_access.grant(m));
117
118         // see comments in close_read about this check
119         if guard.access.is_closed() {
120             return Err(uv_error_to_io_error(UvError(uvll::EOF)))
121         }
122
123         self.stream.read(buf).map_err(uv_error_to_io_error)
124     }
125
126     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
127         let m = self.fire_homing_missile();
128         let guard = try!(self.write_access.grant(m));
129         self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
130     }
131
132     fn clone(&self) -> Box<rtio::RtioPipe + Send> {
133         box PipeWatcher {
134             stream: StreamWatcher::new(self.stream.handle, false),
135             defused: false,
136             home: self.home.clone(),
137             refcount: self.refcount.clone(),
138             read_access: self.read_access.clone(),
139             write_access: self.write_access.clone(),
140         } as Box<rtio::RtioPipe + Send>
141     }
142
143     fn close_read(&mut self) -> IoResult<()> {
144         // The current uv_shutdown method only shuts the writing half of the
145         // connection, and no method is provided to shut down the reading half
146         // of the connection. With a lack of method, we emulate shutting down
147         // the reading half of the connection by manually returning early from
148         // all future calls to `read`.
149         //
150         // Note that we must be careful to ensure that *all* cloned handles see
151         // the closing of the read half, so we stored the "is closed" bit in the
152         // Access struct, not in our own personal watcher. Additionally, the
153         // homing missile is used as a locking mechanism to ensure there is no
154         // contention over this bit.
155         //
156         // To shutdown the read half, we must first flag the access as being
157         // closed, and then afterwards we cease any pending read. Note that this
158         // ordering is crucial because we could in theory be rescheduled during
159         // the uv_read_stop which means that another read invocation could leak
160         // in before we set the flag.
161         let task = {
162             let m = self.fire_homing_missile();
163             self.read_access.access.close(&m);
164             self.stream.cancel_read(uvll::EOF as libc::ssize_t)
165         };
166         let _ = task.map(|t| t.reawaken());
167         Ok(())
168     }
169
170     fn close_write(&mut self) -> IoResult<()> {
171         let _m = self.fire_homing_missile();
172         net::shutdown(self.stream.handle, &self.uv_loop())
173     }
174
175     fn set_timeout(&mut self, timeout: Option<u64>) {
176         self.set_read_timeout(timeout);
177         self.set_write_timeout(timeout);
178     }
179
180     fn set_read_timeout(&mut self, ms: Option<u64>) {
181         let _m = self.fire_homing_missile();
182         let loop_ = self.uv_loop();
183         self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
184                                      &self.stream as *const _ as uint);
185
186         fn cancel_read(stream: uint) -> Option<BlockedTask> {
187             let stream: &mut StreamWatcher = unsafe { mem::transmute(stream) };
188             stream.cancel_read(uvll::ECANCELED as libc::ssize_t)
189         }
190     }
191
192     fn set_write_timeout(&mut self, ms: Option<u64>) {
193         let _m = self.fire_homing_missile();
194         let loop_ = self.uv_loop();
195         self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
196                                       &self.stream as *const _ as uint);
197
198         fn cancel_write(stream: uint) -> Option<BlockedTask> {
199             let stream: &mut StreamWatcher = unsafe { mem::transmute(stream) };
200             stream.cancel_write()
201         }
202     }
203 }
204
205 impl HomingIO for PipeWatcher {
206     fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
207 }
208
209 impl UvHandle<uvll::uv_pipe_t> for PipeWatcher {
210     fn uv_handle(&self) -> *mut uvll::uv_pipe_t { self.stream.handle }
211 }
212
213 impl Drop for PipeWatcher {
214     fn drop(&mut self) {
215         let _m = self.fire_homing_missile();
216         if !self.defused && self.refcount.decrement() {
217             self.close();
218         }
219     }
220 }
221
222 // PipeListener implementation and traits
223
224 impl PipeListener {
225     pub fn bind(io: &mut UvIoFactory, name: &CString)
226         -> Result<Box<PipeListener>, UvError>
227     {
228         let pipe = PipeWatcher::new(io, false);
229         match unsafe {
230             uvll::uv_pipe_bind(pipe.handle(), name.as_ptr())
231         } {
232             0 => {
233                 // If successful, unwrap the PipeWatcher because we control how
234                 // we close the pipe differently. We can't rely on
235                 // StreamWatcher's default close method.
236                 let p = box PipeListener {
237                     home: io.make_handle(),
238                     pipe: pipe.unwrap(),
239                 };
240                 Ok(p.install())
241             }
242             n => Err(UvError(n))
243         }
244     }
245 }
246
247 impl rtio::RtioUnixListener for PipeListener {
248     fn listen(self: Box<PipeListener>)
249               -> IoResult<Box<rtio::RtioUnixAcceptor + Send>> {
250         let _m = self.fire_homing_missile();
251
252         // create the acceptor object from ourselves
253         let acceptor = (box PipeAcceptor {
254             handle: self.pipe,
255             home: self.home.clone(),
256             access: AcceptTimeout::new(),
257             refcount: Refcount::new(),
258         }).install();
259         self.pipe = 0 as *mut _;
260
261         // FIXME: the 128 backlog should be configurable
262         match unsafe { uvll::uv_listen(acceptor.handle, 128, listen_cb) } {
263             0 => Ok(acceptor as Box<rtio::RtioUnixAcceptor + Send>),
264             n => Err(uv_error_to_io_error(UvError(n))),
265         }
266     }
267 }
268
269 impl HomingIO for PipeListener {
270     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
271 }
272
273 impl UvHandle<uvll::uv_pipe_t> for PipeListener {
274     fn uv_handle(&self) -> *mut uvll::uv_pipe_t { self.pipe }
275 }
276
277 extern fn listen_cb(server: *mut uvll::uv_stream_t, status: libc::c_int) {
278     assert!(status != uvll::ECANCELED);
279
280     let pipe: &mut PipeAcceptor = unsafe { UvHandle::from_uv_handle(&server) };
281     let msg = match status {
282         0 => {
283             let loop_ = Loop::wrap(unsafe {
284                 uvll::get_loop_for_uv_handle(server)
285             });
286             let client = PipeWatcher::new_home(&loop_, pipe.home().clone(), false);
287             assert_eq!(unsafe { uvll::uv_accept(server, client.handle()) }, 0);
288             Ok(box client as Box<rtio::RtioPipe + Send>)
289         }
290         n => Err(uv_error_to_io_error(UvError(n)))
291     };
292
293     // If we're running then we have exclusive access, so the unsafe_get() is ok
294     unsafe { pipe.access.push(msg); }
295 }
296
297 impl Drop for PipeListener {
298     fn drop(&mut self) {
299         if self.pipe.is_null() { return }
300
301         let _m = self.fire_homing_missile();
302         self.close();
303     }
304 }
305
306 // PipeAcceptor implementation and traits
307
308 impl rtio::RtioUnixAcceptor for PipeAcceptor {
309     fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> {
310         let m = self.fire_homing_missile();
311         let loop_ = self.uv_loop();
312         self.access.accept(m, &loop_)
313     }
314
315     fn set_timeout(&mut self, ms: Option<u64>) {
316         let _m = self.fire_homing_missile();
317         let loop_ = self.uv_loop();
318         self.access.set_timeout(ms, &loop_, &self.home);
319     }
320
321     fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
322         box PipeAcceptor {
323             refcount: self.refcount.clone(),
324             home: self.home.clone(),
325             handle: self.handle,
326             access: self.access.clone(),
327         } as Box<rtio::RtioUnixAcceptor + Send>
328     }
329
330     fn close_accept(&mut self) -> IoResult<()> {
331         let m = self.fire_homing_missile();
332         self.access.close(m);
333         Ok(())
334     }
335 }
336
337 impl HomingIO for PipeAcceptor {
338     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
339 }
340
341 impl UvHandle<uvll::uv_pipe_t> for PipeAcceptor {
342     fn uv_handle(&self) -> *mut uvll::uv_pipe_t { self.handle }
343 }
344
345 impl Drop for PipeAcceptor {
346     fn drop(&mut self) {
347         let _m = self.fire_homing_missile();
348         if self.refcount.decrement() {
349             self.close();
350         }
351     }
352 }
353
354 #[cfg(test)]
355 mod tests {
356     use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe};
357     use std::io::test::next_test_unix;
358
359     use super::{PipeWatcher, PipeListener};
360     use super::super::local_loop;
361
362     #[test]
363     fn connect_err() {
364         match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str(),
365                                    None) {
366             Ok(..) => fail!(),
367             Err(..) => {}
368         }
369     }
370
371     #[test]
372     fn bind_err() {
373         match PipeListener::bind(local_loop(), &"path/to/nowhere".to_c_str()) {
374             Ok(..) => fail!(),
375             Err(e) => assert_eq!(e.name(), "EACCES".to_string()),
376         }
377     }
378
379     #[test]
380     fn bind() {
381         let p = next_test_unix().to_c_str();
382         match PipeListener::bind(local_loop(), &p) {
383             Ok(..) => {}
384             Err(..) => fail!(),
385         }
386     }
387
388     #[test] #[should_fail]
389     fn bind_fail() {
390         let p = next_test_unix().to_c_str();
391         let _w = PipeListener::bind(local_loop(), &p).unwrap();
392         fail!();
393     }
394
395     #[test]
396     fn connect() {
397         let path = next_test_unix();
398         let path2 = path.clone();
399         let (tx, rx) = channel();
400
401         spawn(proc() {
402             let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
403             let mut p = p.listen().ok().unwrap();
404             tx.send(());
405             let mut client = p.accept().ok().unwrap();
406             let mut buf = [0];
407             assert!(client.read(buf).ok().unwrap() == 1);
408             assert_eq!(buf[0], 1);
409             assert!(client.write([2]).is_ok());
410         });
411         rx.recv();
412         let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
413         assert!(c.write([1]).is_ok());
414         let mut buf = [0];
415         assert!(c.read(buf).ok().unwrap() == 1);
416         assert_eq!(buf[0], 2);
417     }
418
419     #[test] #[should_fail]
420     fn connect_fail() {
421         let path = next_test_unix();
422         let path2 = path.clone();
423         let (tx, rx) = channel();
424
425         spawn(proc() {
426             let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
427             let mut p = p.listen().ok().unwrap();
428             tx.send(());
429             drop(p.accept().ok().unwrap());
430         });
431         rx.recv();
432         let _c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
433         fail!()
434
435     }
436 }