1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! The implementation of `rtio` for libuv
13 use std::c_str::CString;
16 use std::io::net::ip::SocketAddr;
17 use std::io::process::ProcessConfig;
18 use std::io::signal::Signum;
19 use std::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write,
23 use libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, S_IRUSR,
28 use std::rt::rtio::{IoFactory, EventLoop};
29 use ai = std::io::net::addrinfo;
31 #[cfg(test)] use std::unstable::run_in_bare_thread;
33 use super::{uv_error_to_io_error, Loop};
35 use addrinfo::GetAddrInfoRequest;
36 use async::AsyncWatcher;
37 use file::{FsRequest, FileWatcher};
39 use homing::HomeHandle;
40 use idle::IdleWatcher;
41 use net::{TcpWatcher, TcpListener, UdpWatcher};
42 use pipe::{PipeWatcher, PipeListener};
44 use signal::SignalWatcher;
45 use timer::TimerWatcher;
49 // Obviously an Event Loop is always home.
50 pub struct UvEventLoop {
55 pub fn new() -> UvEventLoop {
56 let mut loop_ = Loop::new();
57 let handle_pool = QueuePool::new(&mut loop_);
61 handle_pool: Some(handle_pool),
67 impl Drop for UvEventLoop {
69 // Must first destroy the pool of handles before we destroy the loop
70 // because otherwise the contained async handle will be destroyed after
71 // the loop is free'd (use-after-free). We also must free the uv handle
72 // after the loop has been closed because during the closing of the loop
73 // the handle is required to be used apparently.
75 // Lastly, after we've closed the pool of handles we pump the event loop
76 // one last time to run any closing callbacks to make sure the loop
77 // shuts down cleanly.
78 let handle = self.uvio.handle_pool.get_ref().handle();
79 drop(self.uvio.handle_pool.take());
82 self.uvio.loop_.close();
83 unsafe { uvll::free_handle(handle) }
87 impl EventLoop for UvEventLoop {
89 self.uvio.loop_.run();
92 fn callback(&mut self, f: proc()) {
93 IdleWatcher::onetime(&mut self.uvio.loop_, f);
96 fn pausable_idle_callback(&mut self, cb: ~rtio::Callback:Send)
97 -> ~rtio::PausableIdleCallback:Send
99 IdleWatcher::new(&mut self.uvio.loop_,
100 cb) as ~rtio::PausableIdleCallback:Send
103 fn remote_callback(&mut self, f: ~rtio::Callback:Send)
104 -> ~rtio::RemoteCallback:Send
106 box AsyncWatcher::new(&mut self.uvio.loop_, f) as ~rtio::RemoteCallback:Send
109 fn io<'a>(&'a mut self) -> Option<&'a mut rtio::IoFactory> {
110 let factory = &mut self.uvio as &mut rtio::IoFactory;
114 fn has_active_io(&self) -> bool {
115 self.uvio.loop_.get_blockers() > 0
120 fn test_callback_run_once() {
121 run_in_bare_thread(proc() {
122 let mut event_loop = UvEventLoop::new();
124 let count_ptr: *mut int = &mut count;
125 event_loop.callback(proc() {
126 unsafe { *count_ptr += 1 }
129 assert_eq!(count, 1);
133 pub struct UvIoFactory {
135 handle_pool: Option<~QueuePool>,
139 pub fn uv_loop<'a>(&mut self) -> *uvll::uv_loop_t { self.loop_.handle }
141 pub fn make_handle(&mut self) -> HomeHandle {
142 // It's understood by the homing code that the "local id" is just the
143 // pointer of the local I/O factory cast to a uint.
144 let id: uint = unsafe { cast::transmute_copy(&self) };
145 HomeHandle::new(id, &mut **self.handle_pool.get_mut_ref())
149 impl IoFactory for UvIoFactory {
150 // Connect to an address and return a new stream
151 // NB: This blocks the task waiting on the connection.
152 // It would probably be better to return a future
153 fn tcp_connect(&mut self, addr: SocketAddr, timeout: Option<u64>)
154 -> Result<~rtio::RtioTcpStream:Send, IoError>
156 match TcpWatcher::connect(self, addr, timeout) {
157 Ok(t) => Ok(box t as ~rtio::RtioTcpStream:Send),
158 Err(e) => Err(uv_error_to_io_error(e)),
162 fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~rtio::RtioTcpListener:Send, IoError> {
163 match TcpListener::bind(self, addr) {
164 Ok(t) => Ok(t as ~rtio::RtioTcpListener:Send),
165 Err(e) => Err(uv_error_to_io_error(e)),
169 fn udp_bind(&mut self, addr: SocketAddr) -> Result<~rtio::RtioUdpSocket:Send, IoError> {
170 match UdpWatcher::bind(self, addr) {
171 Ok(u) => Ok(box u as ~rtio::RtioUdpSocket:Send),
172 Err(e) => Err(uv_error_to_io_error(e)),
176 fn timer_init(&mut self) -> Result<~rtio::RtioTimer:Send, IoError> {
177 Ok(TimerWatcher::new(self) as ~rtio::RtioTimer:Send)
180 fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
181 hint: Option<ai::Hint>) -> Result<~[ai::Info], IoError> {
182 let r = GetAddrInfoRequest::run(&self.loop_, host, servname, hint);
183 r.map_err(uv_error_to_io_error)
186 fn fs_from_raw_fd(&mut self, fd: c_int,
187 close: rtio::CloseBehavior) -> ~rtio::RtioFileStream:Send {
188 box FileWatcher::new(self, fd, close) as ~rtio::RtioFileStream:Send
191 fn fs_open(&mut self, path: &CString, fm: FileMode, fa: FileAccess)
192 -> Result<~rtio::RtioFileStream:Send, IoError> {
193 let flags = match fm {
195 io::Append => libc::O_APPEND,
196 io::Truncate => libc::O_TRUNC,
198 // Opening with a write permission must silently create the file.
199 let (flags, mode) = match fa {
200 io::Read => (flags | libc::O_RDONLY, 0),
201 io::Write => (flags | libc::O_WRONLY | libc::O_CREAT,
202 libc::S_IRUSR | libc::S_IWUSR),
203 io::ReadWrite => (flags | libc::O_RDWR | libc::O_CREAT,
204 libc::S_IRUSR | libc::S_IWUSR),
207 match FsRequest::open(self, path, flags as int, mode as int) {
208 Ok(fs) => Ok(box fs as ~rtio::RtioFileStream:Send),
209 Err(e) => Err(uv_error_to_io_error(e))
213 fn fs_unlink(&mut self, path: &CString) -> Result<(), IoError> {
214 let r = FsRequest::unlink(&self.loop_, path);
215 r.map_err(uv_error_to_io_error)
217 fn fs_lstat(&mut self, path: &CString) -> Result<FileStat, IoError> {
218 let r = FsRequest::lstat(&self.loop_, path);
219 r.map_err(uv_error_to_io_error)
221 fn fs_stat(&mut self, path: &CString) -> Result<FileStat, IoError> {
222 let r = FsRequest::stat(&self.loop_, path);
223 r.map_err(uv_error_to_io_error)
225 fn fs_mkdir(&mut self, path: &CString,
226 perm: io::FilePermission) -> Result<(), IoError> {
227 let r = FsRequest::mkdir(&self.loop_, path, perm as c_int);
228 r.map_err(uv_error_to_io_error)
230 fn fs_rmdir(&mut self, path: &CString) -> Result<(), IoError> {
231 let r = FsRequest::rmdir(&self.loop_, path);
232 r.map_err(uv_error_to_io_error)
234 fn fs_rename(&mut self, path: &CString, to: &CString) -> Result<(), IoError> {
235 let r = FsRequest::rename(&self.loop_, path, to);
236 r.map_err(uv_error_to_io_error)
238 fn fs_chmod(&mut self, path: &CString,
239 perm: io::FilePermission) -> Result<(), IoError> {
240 let r = FsRequest::chmod(&self.loop_, path, perm as c_int);
241 r.map_err(uv_error_to_io_error)
243 fn fs_readdir(&mut self, path: &CString, flags: c_int)
244 -> Result<Vec<Path>, IoError>
246 let r = FsRequest::readdir(&self.loop_, path, flags);
247 r.map_err(uv_error_to_io_error)
249 fn fs_link(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> {
250 let r = FsRequest::link(&self.loop_, src, dst);
251 r.map_err(uv_error_to_io_error)
253 fn fs_symlink(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> {
254 let r = FsRequest::symlink(&self.loop_, src, dst);
255 r.map_err(uv_error_to_io_error)
257 fn fs_chown(&mut self, path: &CString, uid: int, gid: int) -> Result<(), IoError> {
258 let r = FsRequest::chown(&self.loop_, path, uid, gid);
259 r.map_err(uv_error_to_io_error)
261 fn fs_readlink(&mut self, path: &CString) -> Result<Path, IoError> {
262 let r = FsRequest::readlink(&self.loop_, path);
263 r.map_err(uv_error_to_io_error)
265 fn fs_utime(&mut self, path: &CString, atime: u64, mtime: u64)
266 -> Result<(), IoError>
268 let r = FsRequest::utime(&self.loop_, path, atime, mtime);
269 r.map_err(uv_error_to_io_error)
272 fn spawn(&mut self, config: ProcessConfig)
273 -> Result<(~rtio::RtioProcess:Send, ~[Option<~rtio::RtioPipe:Send>]), IoError>
275 match Process::spawn(self, config) {
277 Ok((p as ~rtio::RtioProcess:Send,
278 io.move_iter().map(|i| i.map(|p| box p as ~rtio::RtioPipe:Send)).collect()))
280 Err(e) => Err(uv_error_to_io_error(e)),
284 fn kill(&mut self, pid: libc::pid_t, signum: int) -> Result<(), IoError> {
285 Process::kill(pid, signum).map_err(uv_error_to_io_error)
288 fn unix_bind(&mut self, path: &CString) -> Result<~rtio::RtioUnixListener:Send, IoError>
290 match PipeListener::bind(self, path) {
291 Ok(p) => Ok(p as ~rtio::RtioUnixListener:Send),
292 Err(e) => Err(uv_error_to_io_error(e)),
296 fn unix_connect(&mut self, path: &CString,
297 timeout: Option<u64>) -> Result<~rtio::RtioPipe:Send, IoError> {
298 match PipeWatcher::connect(self, path, timeout) {
299 Ok(p) => Ok(box p as ~rtio::RtioPipe:Send),
300 Err(e) => Err(uv_error_to_io_error(e)),
304 fn tty_open(&mut self, fd: c_int, readable: bool)
305 -> Result<~rtio::RtioTTY:Send, IoError> {
306 match TtyWatcher::new(self, fd, readable) {
307 Ok(tty) => Ok(box tty as ~rtio::RtioTTY:Send),
308 Err(e) => Err(uv_error_to_io_error(e))
312 fn pipe_open(&mut self, fd: c_int) -> Result<~rtio::RtioPipe:Send, IoError> {
313 match PipeWatcher::open(self, fd) {
314 Ok(s) => Ok(box s as ~rtio::RtioPipe:Send),
315 Err(e) => Err(uv_error_to_io_error(e))
319 fn signal(&mut self, signum: Signum, channel: Sender<Signum>)
320 -> Result<~rtio::RtioSignal:Send, IoError> {
321 match SignalWatcher::new(self, signum, channel) {
322 Ok(s) => Ok(s as ~rtio::RtioSignal:Send),
323 Err(e) => Err(uv_error_to_io_error(e)),