]> git.lizzy.rs Git - rust.git/blob - src/librustuv/pipe.rs
Rename all raw pointers as necessary
[rust.git] / src / librustuv / pipe.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use libc;
12 use std::c_str::CString;
13 use std::mem;
14 use std::rt::rtio;
15 use std::rt::rtio::IoResult;
16 use std::rt::task::BlockedTask;
17
18 use homing::{HomingIO, HomeHandle};
19 use net;
20 use rc::Refcount;
21 use stream::StreamWatcher;
22 use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
23 use timeout::{AcceptTimeout, ConnectCtx, AccessTimeout};
24 use uvio::UvIoFactory;
25 use uvll;
26
27 pub struct PipeWatcher {
28     stream: StreamWatcher,
29     home: HomeHandle,
30     defused: bool,
31     refcount: Refcount,
32
33     // see comments in TcpWatcher for why these exist
34     write_access: AccessTimeout,
35     read_access: AccessTimeout,
36 }
37
38 pub struct PipeListener {
39     home: HomeHandle,
40     pipe: *mut uvll::uv_pipe_t,
41     outgoing: Sender<IoResult<Box<rtio::RtioPipe + Send>>>,
42     incoming: Receiver<IoResult<Box<rtio::RtioPipe + Send>>>,
43 }
44
45 pub struct PipeAcceptor {
46     listener: Box<PipeListener>,
47     timeout: AcceptTimeout,
48 }
49
50 // PipeWatcher implementation and traits
51
52 impl PipeWatcher {
53     // Creates an uninitialized pipe watcher. The underlying uv pipe is ready to
54     // get bound to some other source (this is normally a helper method paired
55     // with another call).
56     pub fn new(io: &mut UvIoFactory, ipc: bool) -> PipeWatcher {
57         let home = io.make_handle();
58         PipeWatcher::new_home(&io.loop_, home, ipc)
59     }
60
61     pub fn new_home(loop_: &Loop, home: HomeHandle, ipc: bool) -> PipeWatcher {
62         let handle = unsafe {
63             let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE);
64             assert!(!handle.is_null());
65             let ipc = ipc as libc::c_int;
66             assert_eq!(uvll::uv_pipe_init(loop_.handle, handle, ipc), 0);
67             handle
68         };
69         PipeWatcher {
70             stream: StreamWatcher::new(handle),
71             home: home,
72             defused: false,
73             refcount: Refcount::new(),
74             read_access: AccessTimeout::new(),
75             write_access: AccessTimeout::new(),
76         }
77     }
78
79     pub fn open(io: &mut UvIoFactory, file: libc::c_int)
80         -> Result<PipeWatcher, UvError>
81     {
82         let pipe = PipeWatcher::new(io, false);
83         match unsafe { uvll::uv_pipe_open(pipe.handle(), file) } {
84             0 => Ok(pipe),
85             n => Err(UvError(n))
86         }
87     }
88
89     pub fn connect(io: &mut UvIoFactory, name: &CString, timeout: Option<u64>)
90         -> Result<PipeWatcher, UvError>
91     {
92         let pipe = PipeWatcher::new(io, false);
93         let cx = ConnectCtx { status: -1, task: None, timer: None };
94         cx.connect(pipe, timeout, io, |req, pipe, cb| {
95             unsafe {
96                 uvll::uv_pipe_connect(req.handle, pipe.handle(),
97                                       name.with_ref(|p| p), cb)
98             }
99             0
100         })
101     }
102
103     pub fn handle(&self) -> *mut uvll::uv_pipe_t { self.stream.handle }
104
105     // Unwraps the underlying uv pipe. This cancels destruction of the pipe and
106     // allows the pipe to get moved elsewhere
107     fn unwrap(mut self) -> *mut uvll::uv_pipe_t {
108         self.defused = true;
109         return self.stream.handle;
110     }
111 }
112
113 impl rtio::RtioPipe for PipeWatcher {
114     fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
115         let m = self.fire_homing_missile();
116         let guard = try!(self.read_access.grant(m));
117
118         // see comments in close_read about this check
119         if guard.access.is_closed() {
120             return Err(uv_error_to_io_error(UvError(uvll::EOF)))
121         }
122
123         self.stream.read(buf).map_err(uv_error_to_io_error)
124     }
125
126     fn write(&mut self, buf: &[u8]) -> IoResult<()> {
127         let m = self.fire_homing_missile();
128         let guard = try!(self.write_access.grant(m));
129         self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
130     }
131
132     fn clone(&self) -> Box<rtio::RtioPipe + Send> {
133         box PipeWatcher {
134             stream: StreamWatcher::new(self.stream.handle),
135             defused: false,
136             home: self.home.clone(),
137             refcount: self.refcount.clone(),
138             read_access: self.read_access.clone(),
139             write_access: self.write_access.clone(),
140         } as Box<rtio::RtioPipe + Send>
141     }
142
143     fn close_read(&mut self) -> IoResult<()> {
144         // The current uv_shutdown method only shuts the writing half of the
145         // connection, and no method is provided to shut down the reading half
146         // of the connection. With a lack of method, we emulate shutting down
147         // the reading half of the connection by manually returning early from
148         // all future calls to `read`.
149         //
150         // Note that we must be careful to ensure that *all* cloned handles see
151         // the closing of the read half, so we stored the "is closed" bit in the
152         // Access struct, not in our own personal watcher. Additionally, the
153         // homing missile is used as a locking mechanism to ensure there is no
154         // contention over this bit.
155         //
156         // To shutdown the read half, we must first flag the access as being
157         // closed, and then afterwards we cease any pending read. Note that this
158         // ordering is crucial because we could in theory be rescheduled during
159         // the uv_read_stop which means that another read invocation could leak
160         // in before we set the flag.
161         let task = {
162             let m = self.fire_homing_missile();
163             self.read_access.access.close(&m);
164             self.stream.cancel_read(uvll::EOF as libc::ssize_t)
165         };
166         let _ = task.map(|t| t.reawaken());
167         Ok(())
168     }
169
170     fn close_write(&mut self) -> IoResult<()> {
171         let _m = self.fire_homing_missile();
172         net::shutdown(self.stream.handle, &self.uv_loop())
173     }
174
175     fn set_timeout(&mut self, timeout: Option<u64>) {
176         self.set_read_timeout(timeout);
177         self.set_write_timeout(timeout);
178     }
179
180     fn set_read_timeout(&mut self, ms: Option<u64>) {
181         let _m = self.fire_homing_missile();
182         let loop_ = self.uv_loop();
183         self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
184                                      &self.stream as *const _ as uint);
185
186         fn cancel_read(stream: uint) -> Option<BlockedTask> {
187             let stream: &mut StreamWatcher = unsafe { mem::transmute(stream) };
188             stream.cancel_read(uvll::ECANCELED as libc::ssize_t)
189         }
190     }
191
192     fn set_write_timeout(&mut self, ms: Option<u64>) {
193         let _m = self.fire_homing_missile();
194         let loop_ = self.uv_loop();
195         self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
196                                       &self.stream as *const _ as uint);
197
198         fn cancel_write(stream: uint) -> Option<BlockedTask> {
199             let stream: &mut StreamWatcher = unsafe { mem::transmute(stream) };
200             stream.cancel_write()
201         }
202     }
203 }
204
205 impl HomingIO for PipeWatcher {
206     fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
207 }
208
209 impl UvHandle<uvll::uv_pipe_t> for PipeWatcher {
210     fn uv_handle(&self) -> *mut uvll::uv_pipe_t { self.stream.handle }
211 }
212
213 impl Drop for PipeWatcher {
214     fn drop(&mut self) {
215         let _m = self.fire_homing_missile();
216         if !self.defused && self.refcount.decrement() {
217             self.close();
218         }
219     }
220 }
221
222 // PipeListener implementation and traits
223
224 impl PipeListener {
225     pub fn bind(io: &mut UvIoFactory, name: &CString)
226         -> Result<Box<PipeListener>, UvError>
227     {
228         let pipe = PipeWatcher::new(io, false);
229         match unsafe {
230             uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p))
231         } {
232             0 => {
233                 // If successful, unwrap the PipeWatcher because we control how
234                 // we close the pipe differently. We can't rely on
235                 // StreamWatcher's default close method.
236                 let (tx, rx) = channel();
237                 let p = box PipeListener {
238                     home: io.make_handle(),
239                     pipe: pipe.unwrap(),
240                     incoming: rx,
241                     outgoing: tx,
242                 };
243                 Ok(p.install())
244             }
245             n => Err(UvError(n))
246         }
247     }
248 }
249
250 impl rtio::RtioUnixListener for PipeListener {
251     fn listen(~self) -> IoResult<Box<rtio::RtioUnixAcceptor + Send>> {
252         // create the acceptor object from ourselves
253         let mut acceptor = box PipeAcceptor {
254             listener: self,
255             timeout: AcceptTimeout::new(),
256         };
257
258         let _m = acceptor.fire_homing_missile();
259         // FIXME: the 128 backlog should be configurable
260         match unsafe { uvll::uv_listen(acceptor.listener.pipe, 128, listen_cb) } {
261             0 => Ok(acceptor as Box<rtio::RtioUnixAcceptor + Send>),
262             n => Err(uv_error_to_io_error(UvError(n))),
263         }
264     }
265 }
266
267 impl HomingIO for PipeListener {
268     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
269 }
270
271 impl UvHandle<uvll::uv_pipe_t> for PipeListener {
272     fn uv_handle(&self) -> *mut uvll::uv_pipe_t { self.pipe }
273 }
274
275 extern fn listen_cb(server: *mut uvll::uv_stream_t, status: libc::c_int) {
276     assert!(status != uvll::ECANCELED);
277
278     let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) };
279     let msg = match status {
280         0 => {
281             let loop_ = Loop::wrap(unsafe {
282                 uvll::get_loop_for_uv_handle(server)
283             });
284             let client = PipeWatcher::new_home(&loop_, pipe.home().clone(), false);
285             assert_eq!(unsafe { uvll::uv_accept(server, client.handle()) }, 0);
286             Ok(box client as Box<rtio::RtioPipe + Send>)
287         }
288         n => Err(uv_error_to_io_error(UvError(n)))
289     };
290     pipe.outgoing.send(msg);
291 }
292
293 impl Drop for PipeListener {
294     fn drop(&mut self) {
295         let _m = self.fire_homing_missile();
296         self.close();
297     }
298 }
299
300 // PipeAcceptor implementation and traits
301
302 impl rtio::RtioUnixAcceptor for PipeAcceptor {
303     fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> {
304         self.timeout.accept(&self.listener.incoming)
305     }
306
307     fn set_timeout(&mut self, timeout_ms: Option<u64>) {
308         match timeout_ms {
309             None => self.timeout.clear(),
310             Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
311         }
312     }
313 }
314
315 impl HomingIO for PipeAcceptor {
316     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home }
317 }
318
319 #[cfg(test)]
320 mod tests {
321     use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe};
322     use std::io::test::next_test_unix;
323
324     use super::{PipeWatcher, PipeListener};
325     use super::super::local_loop;
326
327     #[test]
328     fn connect_err() {
329         match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str(),
330                                    None) {
331             Ok(..) => fail!(),
332             Err(..) => {}
333         }
334     }
335
336     #[test]
337     fn bind_err() {
338         match PipeListener::bind(local_loop(), &"path/to/nowhere".to_c_str()) {
339             Ok(..) => fail!(),
340             Err(e) => assert_eq!(e.name(), "EACCES".to_string()),
341         }
342     }
343
344     #[test]
345     fn bind() {
346         let p = next_test_unix().to_c_str();
347         match PipeListener::bind(local_loop(), &p) {
348             Ok(..) => {}
349             Err(..) => fail!(),
350         }
351     }
352
353     #[test] #[should_fail]
354     fn bind_fail() {
355         let p = next_test_unix().to_c_str();
356         let _w = PipeListener::bind(local_loop(), &p).unwrap();
357         fail!();
358     }
359
360     #[test]
361     fn connect() {
362         let path = next_test_unix();
363         let path2 = path.clone();
364         let (tx, rx) = channel();
365
366         spawn(proc() {
367             let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
368             let mut p = p.listen().ok().unwrap();
369             tx.send(());
370             let mut client = p.accept().ok().unwrap();
371             let mut buf = [0];
372             assert!(client.read(buf).ok().unwrap() == 1);
373             assert_eq!(buf[0], 1);
374             assert!(client.write([2]).is_ok());
375         });
376         rx.recv();
377         let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
378         assert!(c.write([1]).is_ok());
379         let mut buf = [0];
380         assert!(c.read(buf).ok().unwrap() == 1);
381         assert_eq!(buf[0], 2);
382     }
383
384     #[test] #[should_fail]
385     fn connect_fail() {
386         let path = next_test_unix();
387         let path2 = path.clone();
388         let (tx, rx) = channel();
389
390         spawn(proc() {
391             let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
392             let mut p = p.listen().ok().unwrap();
393             tx.send(());
394             drop(p.accept().ok().unwrap());
395         });
396         rx.recv();
397         let _c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
398         fail!()
399
400     }
401 }