]> git.lizzy.rs Git - rust.git/blob - src/librustuv/uvio.rs
Ignore tests broken by failing on ICE
[rust.git] / src / librustuv / uvio.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! The implementation of `rtio` for libuv
12
13 use std::c_str::CString;
14 use std::cast;
15 use std::io::IoError;
16 use std::io::net::ip::SocketAddr;
17 use std::io::process::ProcessConfig;
18 use std::io::signal::Signum;
19 use std::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write,
20               ReadWrite, FileStat};
21 use std::io;
22 use libc::c_int;
23 use libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, S_IRUSR,
24                 S_IWUSR};
25 use libc;
26 use std::path::Path;
27 use std::rt::rtio;
28 use std::rt::rtio::{IoFactory, EventLoop};
29 use ai = std::io::net::addrinfo;
30
31 #[cfg(test)] use std::unstable::run_in_bare_thread;
32
33 use super::{uv_error_to_io_error, Loop};
34
35 use addrinfo::GetAddrInfoRequest;
36 use async::AsyncWatcher;
37 use file::{FsRequest, FileWatcher};
38 use queue::QueuePool;
39 use homing::HomeHandle;
40 use idle::IdleWatcher;
41 use net::{TcpWatcher, TcpListener, UdpWatcher};
42 use pipe::{PipeWatcher, PipeListener};
43 use process::Process;
44 use signal::SignalWatcher;
45 use timer::TimerWatcher;
46 use tty::TtyWatcher;
47 use uvll;
48
49 // Obviously an Event Loop is always home.
50 pub struct UvEventLoop {
51     uvio: UvIoFactory
52 }
53
54 impl UvEventLoop {
55     pub fn new() -> UvEventLoop {
56         let mut loop_ = Loop::new();
57         let handle_pool = QueuePool::new(&mut loop_);
58         UvEventLoop {
59             uvio: UvIoFactory {
60                 loop_: loop_,
61                 handle_pool: Some(handle_pool),
62             }
63         }
64     }
65 }
66
67 impl Drop for UvEventLoop {
68     fn drop(&mut self) {
69         // Must first destroy the pool of handles before we destroy the loop
70         // because otherwise the contained async handle will be destroyed after
71         // the loop is free'd (use-after-free). We also must free the uv handle
72         // after the loop has been closed because during the closing of the loop
73         // the handle is required to be used apparently.
74         //
75         // Lastly, after we've closed the pool of handles we pump the event loop
76         // one last time to run any closing callbacks to make sure the loop
77         // shuts down cleanly.
78         let handle = self.uvio.handle_pool.get_ref().handle();
79         drop(self.uvio.handle_pool.take());
80         self.run();
81
82         self.uvio.loop_.close();
83         unsafe { uvll::free_handle(handle) }
84     }
85 }
86
87 impl EventLoop for UvEventLoop {
88     fn run(&mut self) {
89         self.uvio.loop_.run();
90     }
91
92     fn callback(&mut self, f: proc()) {
93         IdleWatcher::onetime(&mut self.uvio.loop_, f);
94     }
95
96     fn pausable_idle_callback(&mut self, cb: ~rtio::Callback:Send)
97         -> ~rtio::PausableIdleCallback:Send
98     {
99         IdleWatcher::new(&mut self.uvio.loop_,
100                          cb) as ~rtio::PausableIdleCallback:Send
101     }
102
103     fn remote_callback(&mut self, f: ~rtio::Callback:Send)
104         -> ~rtio::RemoteCallback:Send
105     {
106         box AsyncWatcher::new(&mut self.uvio.loop_, f) as ~rtio::RemoteCallback:Send
107     }
108
109     fn io<'a>(&'a mut self) -> Option<&'a mut rtio::IoFactory> {
110         let factory = &mut self.uvio as &mut rtio::IoFactory;
111         Some(factory)
112     }
113
114     fn has_active_io(&self) -> bool {
115         self.uvio.loop_.get_blockers() > 0
116     }
117 }
118
119 #[test]
120 fn test_callback_run_once() {
121     run_in_bare_thread(proc() {
122         let mut event_loop = UvEventLoop::new();
123         let mut count = 0;
124         let count_ptr: *mut int = &mut count;
125         event_loop.callback(proc() {
126             unsafe { *count_ptr += 1 }
127         });
128         event_loop.run();
129         assert_eq!(count, 1);
130     });
131 }
132
133 pub struct UvIoFactory {
134     pub loop_: Loop,
135     handle_pool: Option<~QueuePool>,
136 }
137
138 impl UvIoFactory {
139     pub fn uv_loop<'a>(&mut self) -> *uvll::uv_loop_t { self.loop_.handle }
140
141     pub fn make_handle(&mut self) -> HomeHandle {
142         // It's understood by the homing code that the "local id" is just the
143         // pointer of the local I/O factory cast to a uint.
144         let id: uint = unsafe { cast::transmute_copy(&self) };
145         HomeHandle::new(id, &mut **self.handle_pool.get_mut_ref())
146     }
147 }
148
149 impl IoFactory for UvIoFactory {
150     // Connect to an address and return a new stream
151     // NB: This blocks the task waiting on the connection.
152     // It would probably be better to return a future
153     fn tcp_connect(&mut self, addr: SocketAddr, timeout: Option<u64>)
154         -> Result<~rtio::RtioTcpStream:Send, IoError>
155     {
156         match TcpWatcher::connect(self, addr, timeout) {
157             Ok(t) => Ok(box t as ~rtio::RtioTcpStream:Send),
158             Err(e) => Err(uv_error_to_io_error(e)),
159         }
160     }
161
162     fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~rtio::RtioTcpListener:Send, IoError> {
163         match TcpListener::bind(self, addr) {
164             Ok(t) => Ok(t as ~rtio::RtioTcpListener:Send),
165             Err(e) => Err(uv_error_to_io_error(e)),
166         }
167     }
168
169     fn udp_bind(&mut self, addr: SocketAddr) -> Result<~rtio::RtioUdpSocket:Send, IoError> {
170         match UdpWatcher::bind(self, addr) {
171             Ok(u) => Ok(box u as ~rtio::RtioUdpSocket:Send),
172             Err(e) => Err(uv_error_to_io_error(e)),
173         }
174     }
175
176     fn timer_init(&mut self) -> Result<~rtio::RtioTimer:Send, IoError> {
177         Ok(TimerWatcher::new(self) as ~rtio::RtioTimer:Send)
178     }
179
180     fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
181                           hint: Option<ai::Hint>) -> Result<~[ai::Info], IoError> {
182         let r = GetAddrInfoRequest::run(&self.loop_, host, servname, hint);
183         r.map_err(uv_error_to_io_error)
184     }
185
186     fn fs_from_raw_fd(&mut self, fd: c_int,
187                       close: rtio::CloseBehavior) -> ~rtio::RtioFileStream:Send {
188         box FileWatcher::new(self, fd, close) as ~rtio::RtioFileStream:Send
189     }
190
191     fn fs_open(&mut self, path: &CString, fm: FileMode, fa: FileAccess)
192         -> Result<~rtio::RtioFileStream:Send, IoError> {
193         let flags = match fm {
194             io::Open => 0,
195             io::Append => libc::O_APPEND,
196             io::Truncate => libc::O_TRUNC,
197         };
198         // Opening with a write permission must silently create the file.
199         let (flags, mode) = match fa {
200             io::Read => (flags | libc::O_RDONLY, 0),
201             io::Write => (flags | libc::O_WRONLY | libc::O_CREAT,
202                           libc::S_IRUSR | libc::S_IWUSR),
203             io::ReadWrite => (flags | libc::O_RDWR | libc::O_CREAT,
204                               libc::S_IRUSR | libc::S_IWUSR),
205         };
206
207         match FsRequest::open(self, path, flags as int, mode as int) {
208             Ok(fs) => Ok(box fs as ~rtio::RtioFileStream:Send),
209             Err(e) => Err(uv_error_to_io_error(e))
210         }
211     }
212
213     fn fs_unlink(&mut self, path: &CString) -> Result<(), IoError> {
214         let r = FsRequest::unlink(&self.loop_, path);
215         r.map_err(uv_error_to_io_error)
216     }
217     fn fs_lstat(&mut self, path: &CString) -> Result<FileStat, IoError> {
218         let r = FsRequest::lstat(&self.loop_, path);
219         r.map_err(uv_error_to_io_error)
220     }
221     fn fs_stat(&mut self, path: &CString) -> Result<FileStat, IoError> {
222         let r = FsRequest::stat(&self.loop_, path);
223         r.map_err(uv_error_to_io_error)
224     }
225     fn fs_mkdir(&mut self, path: &CString,
226                 perm: io::FilePermission) -> Result<(), IoError> {
227         let r = FsRequest::mkdir(&self.loop_, path, perm as c_int);
228         r.map_err(uv_error_to_io_error)
229     }
230     fn fs_rmdir(&mut self, path: &CString) -> Result<(), IoError> {
231         let r = FsRequest::rmdir(&self.loop_, path);
232         r.map_err(uv_error_to_io_error)
233     }
234     fn fs_rename(&mut self, path: &CString, to: &CString) -> Result<(), IoError> {
235         let r = FsRequest::rename(&self.loop_, path, to);
236         r.map_err(uv_error_to_io_error)
237     }
238     fn fs_chmod(&mut self, path: &CString,
239                 perm: io::FilePermission) -> Result<(), IoError> {
240         let r = FsRequest::chmod(&self.loop_, path, perm as c_int);
241         r.map_err(uv_error_to_io_error)
242     }
243     fn fs_readdir(&mut self, path: &CString, flags: c_int)
244         -> Result<Vec<Path>, IoError>
245     {
246         let r = FsRequest::readdir(&self.loop_, path, flags);
247         r.map_err(uv_error_to_io_error)
248     }
249     fn fs_link(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> {
250         let r = FsRequest::link(&self.loop_, src, dst);
251         r.map_err(uv_error_to_io_error)
252     }
253     fn fs_symlink(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> {
254         let r = FsRequest::symlink(&self.loop_, src, dst);
255         r.map_err(uv_error_to_io_error)
256     }
257     fn fs_chown(&mut self, path: &CString, uid: int, gid: int) -> Result<(), IoError> {
258         let r = FsRequest::chown(&self.loop_, path, uid, gid);
259         r.map_err(uv_error_to_io_error)
260     }
261     fn fs_readlink(&mut self, path: &CString) -> Result<Path, IoError> {
262         let r = FsRequest::readlink(&self.loop_, path);
263         r.map_err(uv_error_to_io_error)
264     }
265     fn fs_utime(&mut self, path: &CString, atime: u64, mtime: u64)
266         -> Result<(), IoError>
267     {
268         let r = FsRequest::utime(&self.loop_, path, atime, mtime);
269         r.map_err(uv_error_to_io_error)
270     }
271
272     fn spawn(&mut self, config: ProcessConfig)
273             -> Result<(~rtio::RtioProcess:Send, ~[Option<~rtio::RtioPipe:Send>]), IoError>
274     {
275         match Process::spawn(self, config) {
276             Ok((p, io)) => {
277                 Ok((p as ~rtio::RtioProcess:Send,
278                     io.move_iter().map(|i| i.map(|p| box p as ~rtio::RtioPipe:Send)).collect()))
279             }
280             Err(e) => Err(uv_error_to_io_error(e)),
281         }
282     }
283
284     fn kill(&mut self, pid: libc::pid_t, signum: int) -> Result<(), IoError> {
285         Process::kill(pid, signum).map_err(uv_error_to_io_error)
286     }
287
288     fn unix_bind(&mut self, path: &CString) -> Result<~rtio::RtioUnixListener:Send, IoError>
289     {
290         match PipeListener::bind(self, path) {
291             Ok(p) => Ok(p as ~rtio::RtioUnixListener:Send),
292             Err(e) => Err(uv_error_to_io_error(e)),
293         }
294     }
295
296     fn unix_connect(&mut self, path: &CString,
297                     timeout: Option<u64>) -> Result<~rtio::RtioPipe:Send, IoError> {
298         match PipeWatcher::connect(self, path, timeout) {
299             Ok(p) => Ok(box p as ~rtio::RtioPipe:Send),
300             Err(e) => Err(uv_error_to_io_error(e)),
301         }
302     }
303
304     fn tty_open(&mut self, fd: c_int, readable: bool)
305             -> Result<~rtio::RtioTTY:Send, IoError> {
306         match TtyWatcher::new(self, fd, readable) {
307             Ok(tty) => Ok(box tty as ~rtio::RtioTTY:Send),
308             Err(e) => Err(uv_error_to_io_error(e))
309         }
310     }
311
312     fn pipe_open(&mut self, fd: c_int) -> Result<~rtio::RtioPipe:Send, IoError> {
313         match PipeWatcher::open(self, fd) {
314             Ok(s) => Ok(box s as ~rtio::RtioPipe:Send),
315             Err(e) => Err(uv_error_to_io_error(e))
316         }
317     }
318
319     fn signal(&mut self, signum: Signum, channel: Sender<Signum>)
320         -> Result<~rtio::RtioSignal:Send, IoError> {
321         match SignalWatcher::new(self, signum, channel) {
322             Ok(s) => Ok(s as ~rtio::RtioSignal:Send),
323             Err(e) => Err(uv_error_to_io_error(e)),
324         }
325     }
326 }