]> git.lizzy.rs Git - rust.git/blob - src/librustuv/uvio.rs
Register new snapshots
[rust.git] / src / librustuv / uvio.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! The implementation of `rtio` for libuv
12
13 use std::c_str::CString;
14 use std::mem;
15 use libc::c_int;
16 use libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, S_IRUSR,
17                 S_IWUSR};
18 use libc;
19 use std::rt::rtio;
20 use std::rt::rtio::{ProcessConfig, IoFactory, EventLoop, IoResult};
21
22 #[cfg(test)] use std::rt::thread::Thread;
23
24 use super::{uv_error_to_io_error, Loop};
25
26 use addrinfo::GetAddrInfoRequest;
27 use async::AsyncWatcher;
28 use file::{FsRequest, FileWatcher};
29 use queue::QueuePool;
30 use homing::HomeHandle;
31 use idle::IdleWatcher;
32 use net::{TcpWatcher, TcpListener, UdpWatcher};
33 use pipe::{PipeWatcher, PipeListener};
34 use process::Process;
35 use signal::SignalWatcher;
36 use timer::TimerWatcher;
37 use tty::TtyWatcher;
38 use uvll;
39
40 // Obviously an Event Loop is always home.
41 pub struct UvEventLoop {
42     uvio: UvIoFactory
43 }
44
45 impl UvEventLoop {
46     pub fn new() -> UvEventLoop {
47         let mut loop_ = Loop::new();
48         let handle_pool = QueuePool::new(&mut loop_);
49         UvEventLoop {
50             uvio: UvIoFactory {
51                 loop_: loop_,
52                 handle_pool: Some(handle_pool),
53             }
54         }
55     }
56 }
57
58 impl Drop for UvEventLoop {
59     fn drop(&mut self) {
60         // Must first destroy the pool of handles before we destroy the loop
61         // because otherwise the contained async handle will be destroyed after
62         // the loop is free'd (use-after-free). We also must free the uv handle
63         // after the loop has been closed because during the closing of the loop
64         // the handle is required to be used apparently.
65         //
66         // Lastly, after we've closed the pool of handles we pump the event loop
67         // one last time to run any closing callbacks to make sure the loop
68         // shuts down cleanly.
69         let handle = self.uvio.handle_pool.get_ref().handle();
70         drop(self.uvio.handle_pool.take());
71         self.run();
72
73         self.uvio.loop_.close();
74         unsafe { uvll::free_handle(handle) }
75     }
76 }
77
78 impl EventLoop for UvEventLoop {
79     fn run(&mut self) {
80         self.uvio.loop_.run();
81     }
82
83     fn callback(&mut self, f: proc()) {
84         IdleWatcher::onetime(&mut self.uvio.loop_, f);
85     }
86
87     fn pausable_idle_callback(&mut self, cb: Box<rtio::Callback + Send>)
88                               -> Box<rtio::PausableIdleCallback + Send> {
89         IdleWatcher::new(&mut self.uvio.loop_, cb)
90                          as Box<rtio::PausableIdleCallback + Send>
91     }
92
93     fn remote_callback(&mut self, f: Box<rtio::Callback + Send>)
94                        -> Box<rtio::RemoteCallback + Send> {
95         box AsyncWatcher::new(&mut self.uvio.loop_, f) as
96             Box<rtio::RemoteCallback + Send>
97     }
98
99     fn io<'a>(&'a mut self) -> Option<&'a mut rtio::IoFactory> {
100         let factory = &mut self.uvio as &mut rtio::IoFactory;
101         Some(factory)
102     }
103
104     fn has_active_io(&self) -> bool {
105         self.uvio.loop_.get_blockers() > 0
106     }
107 }
108
109 #[test]
110 fn test_callback_run_once() {
111     Thread::start(proc() {
112         let mut event_loop = UvEventLoop::new();
113         let mut count = 0;
114         let count_ptr: *mut int = &mut count;
115         event_loop.callback(proc() {
116             unsafe { *count_ptr += 1 }
117         });
118         event_loop.run();
119         assert_eq!(count, 1);
120     }).join();
121 }
122
123 pub struct UvIoFactory {
124     pub loop_: Loop,
125     handle_pool: Option<Box<QueuePool>>,
126 }
127
128 impl UvIoFactory {
129     pub fn uv_loop<'a>(&mut self) -> *uvll::uv_loop_t { self.loop_.handle }
130
131     pub fn make_handle(&mut self) -> HomeHandle {
132         // It's understood by the homing code that the "local id" is just the
133         // pointer of the local I/O factory cast to a uint.
134         let id: uint = unsafe { mem::transmute_copy(&self) };
135         HomeHandle::new(id, &mut **self.handle_pool.get_mut_ref())
136     }
137 }
138
139 impl IoFactory for UvIoFactory {
140     // Connect to an address and return a new stream
141     // NB: This blocks the task waiting on the connection.
142     // It would probably be better to return a future
143     fn tcp_connect(&mut self, addr: rtio::SocketAddr, timeout: Option<u64>)
144                    -> IoResult<Box<rtio::RtioTcpStream + Send>> {
145         match TcpWatcher::connect(self, addr, timeout) {
146             Ok(t) => Ok(box t as Box<rtio::RtioTcpStream + Send>),
147             Err(e) => Err(uv_error_to_io_error(e)),
148         }
149     }
150
151     fn tcp_bind(&mut self, addr: rtio::SocketAddr)
152                 -> IoResult<Box<rtio::RtioTcpListener + Send>> {
153         match TcpListener::bind(self, addr) {
154             Ok(t) => Ok(t as Box<rtio::RtioTcpListener + Send>),
155             Err(e) => Err(uv_error_to_io_error(e)),
156         }
157     }
158
159     fn udp_bind(&mut self, addr: rtio::SocketAddr)
160                 -> IoResult<Box<rtio::RtioUdpSocket + Send>> {
161         match UdpWatcher::bind(self, addr) {
162             Ok(u) => Ok(box u as Box<rtio::RtioUdpSocket + Send>),
163             Err(e) => Err(uv_error_to_io_error(e)),
164         }
165     }
166
167     fn timer_init(&mut self) -> IoResult<Box<rtio::RtioTimer + Send>> {
168         Ok(TimerWatcher::new(self) as Box<rtio::RtioTimer + Send>)
169     }
170
171     fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
172                           hint: Option<rtio::AddrinfoHint>)
173         -> IoResult<Vec<rtio::AddrinfoInfo>>
174     {
175         let r = GetAddrInfoRequest::run(&self.loop_, host, servname, hint);
176         r.map_err(uv_error_to_io_error)
177     }
178
179     fn fs_from_raw_fd(&mut self, fd: c_int, close: rtio::CloseBehavior)
180                       -> Box<rtio::RtioFileStream + Send> {
181         box FileWatcher::new(self, fd, close) as
182             Box<rtio::RtioFileStream + Send>
183     }
184
185     fn fs_open(&mut self, path: &CString, fm: rtio::FileMode,
186                fa: rtio::FileAccess)
187         -> IoResult<Box<rtio::RtioFileStream + Send>>
188     {
189         let flags = match fm {
190             rtio::Open => 0,
191             rtio::Append => libc::O_APPEND,
192             rtio::Truncate => libc::O_TRUNC,
193         };
194         // Opening with a write permission must silently create the file.
195         let (flags, mode) = match fa {
196             rtio::Read => (flags | libc::O_RDONLY, 0),
197             rtio::Write => (flags | libc::O_WRONLY | libc::O_CREAT,
198                             libc::S_IRUSR | libc::S_IWUSR),
199             rtio::ReadWrite => (flags | libc::O_RDWR | libc::O_CREAT,
200                                 libc::S_IRUSR | libc::S_IWUSR),
201         };
202
203         match FsRequest::open(self, path, flags as int, mode as int) {
204             Ok(fs) => Ok(box fs as Box<rtio::RtioFileStream + Send>),
205             Err(e) => Err(uv_error_to_io_error(e))
206         }
207     }
208
209     fn fs_unlink(&mut self, path: &CString) -> IoResult<()> {
210         let r = FsRequest::unlink(&self.loop_, path);
211         r.map_err(uv_error_to_io_error)
212     }
213     fn fs_lstat(&mut self, path: &CString) -> IoResult<rtio::FileStat> {
214         let r = FsRequest::lstat(&self.loop_, path);
215         r.map_err(uv_error_to_io_error)
216     }
217     fn fs_stat(&mut self, path: &CString) -> IoResult<rtio::FileStat> {
218         let r = FsRequest::stat(&self.loop_, path);
219         r.map_err(uv_error_to_io_error)
220     }
221     fn fs_mkdir(&mut self, path: &CString, perm: uint) -> IoResult<()> {
222         let r = FsRequest::mkdir(&self.loop_, path, perm as c_int);
223         r.map_err(uv_error_to_io_error)
224     }
225     fn fs_rmdir(&mut self, path: &CString) -> IoResult<()> {
226         let r = FsRequest::rmdir(&self.loop_, path);
227         r.map_err(uv_error_to_io_error)
228     }
229     fn fs_rename(&mut self, path: &CString, to: &CString) -> IoResult<()> {
230         let r = FsRequest::rename(&self.loop_, path, to);
231         r.map_err(uv_error_to_io_error)
232     }
233     fn fs_chmod(&mut self, path: &CString, perm: uint) -> IoResult<()> {
234         let r = FsRequest::chmod(&self.loop_, path, perm as c_int);
235         r.map_err(uv_error_to_io_error)
236     }
237     fn fs_readdir(&mut self, path: &CString, flags: c_int)
238         -> IoResult<Vec<CString>>
239     {
240         let r = FsRequest::readdir(&self.loop_, path, flags);
241         r.map_err(uv_error_to_io_error)
242     }
243     fn fs_link(&mut self, src: &CString, dst: &CString) -> IoResult<()> {
244         let r = FsRequest::link(&self.loop_, src, dst);
245         r.map_err(uv_error_to_io_error)
246     }
247     fn fs_symlink(&mut self, src: &CString, dst: &CString) -> IoResult<()> {
248         let r = FsRequest::symlink(&self.loop_, src, dst);
249         r.map_err(uv_error_to_io_error)
250     }
251     fn fs_chown(&mut self, path: &CString, uid: int, gid: int) -> IoResult<()> {
252         let r = FsRequest::chown(&self.loop_, path, uid, gid);
253         r.map_err(uv_error_to_io_error)
254     }
255     fn fs_readlink(&mut self, path: &CString) -> IoResult<CString> {
256         let r = FsRequest::readlink(&self.loop_, path);
257         r.map_err(uv_error_to_io_error)
258     }
259     fn fs_utime(&mut self, path: &CString, atime: u64, mtime: u64)
260         -> IoResult<()>
261     {
262         let r = FsRequest::utime(&self.loop_, path, atime, mtime);
263         r.map_err(uv_error_to_io_error)
264     }
265
266     fn spawn(&mut self, cfg: ProcessConfig)
267             -> IoResult<(Box<rtio::RtioProcess + Send>,
268                          Vec<Option<Box<rtio::RtioPipe + Send>>>)>
269     {
270         match Process::spawn(self, cfg) {
271             Ok((p, io)) => {
272                 Ok((p as Box<rtio::RtioProcess + Send>,
273                     io.move_iter().map(|i| i.map(|p| {
274                         box p as Box<rtio::RtioPipe + Send>
275                     })).collect()))
276             }
277             Err(e) => Err(uv_error_to_io_error(e)),
278         }
279     }
280
281     fn kill(&mut self, pid: libc::pid_t, signum: int) -> IoResult<()> {
282         Process::kill(pid, signum).map_err(uv_error_to_io_error)
283     }
284
285     fn unix_bind(&mut self, path: &CString)
286                  -> IoResult<Box<rtio::RtioUnixListener + Send>> {
287         match PipeListener::bind(self, path) {
288             Ok(p) => Ok(p as Box<rtio::RtioUnixListener + Send>),
289             Err(e) => Err(uv_error_to_io_error(e)),
290         }
291     }
292
293     fn unix_connect(&mut self, path: &CString, timeout: Option<u64>)
294                     -> IoResult<Box<rtio::RtioPipe + Send>> {
295         match PipeWatcher::connect(self, path, timeout) {
296             Ok(p) => Ok(box p as Box<rtio::RtioPipe + Send>),
297             Err(e) => Err(uv_error_to_io_error(e)),
298         }
299     }
300
301     fn tty_open(&mut self, fd: c_int, readable: bool)
302             -> IoResult<Box<rtio::RtioTTY + Send>> {
303         match TtyWatcher::new(self, fd, readable) {
304             Ok(tty) => Ok(box tty as Box<rtio::RtioTTY + Send>),
305             Err(e) => Err(uv_error_to_io_error(e))
306         }
307     }
308
309     fn pipe_open(&mut self, fd: c_int)
310         -> IoResult<Box<rtio::RtioPipe + Send>>
311     {
312         match PipeWatcher::open(self, fd) {
313             Ok(s) => Ok(box s as Box<rtio::RtioPipe + Send>),
314             Err(e) => Err(uv_error_to_io_error(e))
315         }
316     }
317
318     fn signal(&mut self, signum: int, cb: Box<rtio::Callback + Send>)
319         -> IoResult<Box<rtio::RtioSignal + Send>>
320     {
321         match SignalWatcher::new(self, signum, cb) {
322             Ok(s) => Ok(s as Box<rtio::RtioSignal + Send>),
323             Err(e) => Err(uv_error_to_io_error(e)),
324         }
325     }
326 }