1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! The implementation of `rtio` for libuv
13 use std::c_str::CString;
16 use libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, S_IRUSR,
20 use std::rt::rtio::{ProcessConfig, IoFactory, EventLoop, IoResult};
22 #[cfg(test)] use std::rt::thread::Thread;
24 use super::{uv_error_to_io_error, Loop};
26 use addrinfo::GetAddrInfoRequest;
27 use async::AsyncWatcher;
28 use file::{FsRequest, FileWatcher};
30 use homing::HomeHandle;
31 use idle::IdleWatcher;
32 use net::{TcpWatcher, TcpListener, UdpWatcher};
33 use pipe::{PipeWatcher, PipeListener};
35 use signal::SignalWatcher;
36 use timer::TimerWatcher;
40 // Obviously an Event Loop is always home.
41 pub struct UvEventLoop {
46 pub fn new() -> UvEventLoop {
47 let mut loop_ = Loop::new();
48 let handle_pool = QueuePool::new(&mut loop_);
52 handle_pool: Some(handle_pool),
58 impl Drop for UvEventLoop {
60 // Must first destroy the pool of handles before we destroy the loop
61 // because otherwise the contained async handle will be destroyed after
62 // the loop is free'd (use-after-free). We also must free the uv handle
63 // after the loop has been closed because during the closing of the loop
64 // the handle is required to be used apparently.
66 // Lastly, after we've closed the pool of handles we pump the event loop
67 // one last time to run any closing callbacks to make sure the loop
68 // shuts down cleanly.
69 let handle = self.uvio.handle_pool.get_ref().handle();
70 drop(self.uvio.handle_pool.take());
73 self.uvio.loop_.close();
74 unsafe { uvll::free_handle(handle) }
78 impl EventLoop for UvEventLoop {
80 self.uvio.loop_.run();
83 fn callback(&mut self, f: proc()) {
84 IdleWatcher::onetime(&mut self.uvio.loop_, f);
87 fn pausable_idle_callback(&mut self, cb: Box<rtio::Callback:Send>)
88 -> Box<rtio::PausableIdleCallback:Send> {
89 IdleWatcher::new(&mut self.uvio.loop_, cb)
90 as Box<rtio::PausableIdleCallback:Send>
93 fn remote_callback(&mut self, f: Box<rtio::Callback:Send>)
94 -> Box<rtio::RemoteCallback:Send> {
95 box AsyncWatcher::new(&mut self.uvio.loop_, f) as
96 Box<rtio::RemoteCallback:Send>
99 fn io<'a>(&'a mut self) -> Option<&'a mut rtio::IoFactory> {
100 let factory = &mut self.uvio as &mut rtio::IoFactory;
104 fn has_active_io(&self) -> bool {
105 self.uvio.loop_.get_blockers() > 0
110 fn test_callback_run_once() {
111 Thread::start(proc() {
112 let mut event_loop = UvEventLoop::new();
114 let count_ptr: *mut int = &mut count;
115 event_loop.callback(proc() {
116 unsafe { *count_ptr += 1 }
119 assert_eq!(count, 1);
123 pub struct UvIoFactory {
125 handle_pool: Option<Box<QueuePool>>,
129 pub fn uv_loop<'a>(&mut self) -> *uvll::uv_loop_t { self.loop_.handle }
131 pub fn make_handle(&mut self) -> HomeHandle {
132 // It's understood by the homing code that the "local id" is just the
133 // pointer of the local I/O factory cast to a uint.
134 let id: uint = unsafe { mem::transmute_copy(&self) };
135 HomeHandle::new(id, &mut **self.handle_pool.get_mut_ref())
139 impl IoFactory for UvIoFactory {
140 // Connect to an address and return a new stream
141 // NB: This blocks the task waiting on the connection.
142 // It would probably be better to return a future
143 fn tcp_connect(&mut self, addr: rtio::SocketAddr, timeout: Option<u64>)
144 -> IoResult<Box<rtio::RtioTcpStream:Send>> {
145 match TcpWatcher::connect(self, addr, timeout) {
146 Ok(t) => Ok(box t as Box<rtio::RtioTcpStream:Send>),
147 Err(e) => Err(uv_error_to_io_error(e)),
151 fn tcp_bind(&mut self, addr: rtio::SocketAddr)
152 -> IoResult<Box<rtio::RtioTcpListener:Send>> {
153 match TcpListener::bind(self, addr) {
154 Ok(t) => Ok(t as Box<rtio::RtioTcpListener:Send>),
155 Err(e) => Err(uv_error_to_io_error(e)),
159 fn udp_bind(&mut self, addr: rtio::SocketAddr)
160 -> IoResult<Box<rtio::RtioUdpSocket:Send>> {
161 match UdpWatcher::bind(self, addr) {
162 Ok(u) => Ok(box u as Box<rtio::RtioUdpSocket:Send>),
163 Err(e) => Err(uv_error_to_io_error(e)),
167 fn timer_init(&mut self) -> IoResult<Box<rtio::RtioTimer:Send>> {
168 Ok(TimerWatcher::new(self) as Box<rtio::RtioTimer:Send>)
171 fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
172 hint: Option<rtio::AddrinfoHint>)
173 -> IoResult<Vec<rtio::AddrinfoInfo>>
175 let r = GetAddrInfoRequest::run(&self.loop_, host, servname, hint);
176 r.map_err(uv_error_to_io_error)
179 fn fs_from_raw_fd(&mut self, fd: c_int, close: rtio::CloseBehavior)
180 -> Box<rtio::RtioFileStream:Send> {
181 box FileWatcher::new(self, fd, close) as
182 Box<rtio::RtioFileStream:Send>
185 fn fs_open(&mut self, path: &CString, fm: rtio::FileMode,
186 fa: rtio::FileAccess)
187 -> IoResult<Box<rtio::RtioFileStream:Send>>
189 let flags = match fm {
191 rtio::Append => libc::O_APPEND,
192 rtio::Truncate => libc::O_TRUNC,
194 // Opening with a write permission must silently create the file.
195 let (flags, mode) = match fa {
196 rtio::Read => (flags | libc::O_RDONLY, 0),
197 rtio::Write => (flags | libc::O_WRONLY | libc::O_CREAT,
198 libc::S_IRUSR | libc::S_IWUSR),
199 rtio::ReadWrite => (flags | libc::O_RDWR | libc::O_CREAT,
200 libc::S_IRUSR | libc::S_IWUSR),
203 match FsRequest::open(self, path, flags as int, mode as int) {
204 Ok(fs) => Ok(box fs as Box<rtio::RtioFileStream:Send>),
205 Err(e) => Err(uv_error_to_io_error(e))
209 fn fs_unlink(&mut self, path: &CString) -> IoResult<()> {
210 let r = FsRequest::unlink(&self.loop_, path);
211 r.map_err(uv_error_to_io_error)
213 fn fs_lstat(&mut self, path: &CString) -> IoResult<rtio::FileStat> {
214 let r = FsRequest::lstat(&self.loop_, path);
215 r.map_err(uv_error_to_io_error)
217 fn fs_stat(&mut self, path: &CString) -> IoResult<rtio::FileStat> {
218 let r = FsRequest::stat(&self.loop_, path);
219 r.map_err(uv_error_to_io_error)
221 fn fs_mkdir(&mut self, path: &CString, perm: uint) -> IoResult<()> {
222 let r = FsRequest::mkdir(&self.loop_, path, perm as c_int);
223 r.map_err(uv_error_to_io_error)
225 fn fs_rmdir(&mut self, path: &CString) -> IoResult<()> {
226 let r = FsRequest::rmdir(&self.loop_, path);
227 r.map_err(uv_error_to_io_error)
229 fn fs_rename(&mut self, path: &CString, to: &CString) -> IoResult<()> {
230 let r = FsRequest::rename(&self.loop_, path, to);
231 r.map_err(uv_error_to_io_error)
233 fn fs_chmod(&mut self, path: &CString, perm: uint) -> IoResult<()> {
234 let r = FsRequest::chmod(&self.loop_, path, perm as c_int);
235 r.map_err(uv_error_to_io_error)
237 fn fs_readdir(&mut self, path: &CString, flags: c_int)
238 -> IoResult<Vec<CString>>
240 let r = FsRequest::readdir(&self.loop_, path, flags);
241 r.map_err(uv_error_to_io_error)
243 fn fs_link(&mut self, src: &CString, dst: &CString) -> IoResult<()> {
244 let r = FsRequest::link(&self.loop_, src, dst);
245 r.map_err(uv_error_to_io_error)
247 fn fs_symlink(&mut self, src: &CString, dst: &CString) -> IoResult<()> {
248 let r = FsRequest::symlink(&self.loop_, src, dst);
249 r.map_err(uv_error_to_io_error)
251 fn fs_chown(&mut self, path: &CString, uid: int, gid: int) -> IoResult<()> {
252 let r = FsRequest::chown(&self.loop_, path, uid, gid);
253 r.map_err(uv_error_to_io_error)
255 fn fs_readlink(&mut self, path: &CString) -> IoResult<CString> {
256 let r = FsRequest::readlink(&self.loop_, path);
257 r.map_err(uv_error_to_io_error)
259 fn fs_utime(&mut self, path: &CString, atime: u64, mtime: u64)
262 let r = FsRequest::utime(&self.loop_, path, atime, mtime);
263 r.map_err(uv_error_to_io_error)
266 fn spawn(&mut self, cfg: ProcessConfig)
267 -> IoResult<(Box<rtio::RtioProcess:Send>,
268 Vec<Option<Box<rtio::RtioPipe:Send>>>)>
270 match Process::spawn(self, cfg) {
272 Ok((p as Box<rtio::RtioProcess:Send>,
273 io.move_iter().map(|i| i.map(|p| {
274 box p as Box<rtio::RtioPipe:Send>
277 Err(e) => Err(uv_error_to_io_error(e)),
281 fn kill(&mut self, pid: libc::pid_t, signum: int) -> IoResult<()> {
282 Process::kill(pid, signum).map_err(uv_error_to_io_error)
285 fn unix_bind(&mut self, path: &CString)
286 -> IoResult<Box<rtio::RtioUnixListener:Send>> {
287 match PipeListener::bind(self, path) {
288 Ok(p) => Ok(p as Box<rtio::RtioUnixListener:Send>),
289 Err(e) => Err(uv_error_to_io_error(e)),
293 fn unix_connect(&mut self, path: &CString, timeout: Option<u64>)
294 -> IoResult<Box<rtio::RtioPipe:Send>> {
295 match PipeWatcher::connect(self, path, timeout) {
296 Ok(p) => Ok(box p as Box<rtio::RtioPipe:Send>),
297 Err(e) => Err(uv_error_to_io_error(e)),
301 fn tty_open(&mut self, fd: c_int, readable: bool)
302 -> IoResult<Box<rtio::RtioTTY:Send>> {
303 match TtyWatcher::new(self, fd, readable) {
304 Ok(tty) => Ok(box tty as Box<rtio::RtioTTY:Send>),
305 Err(e) => Err(uv_error_to_io_error(e))
309 fn pipe_open(&mut self, fd: c_int)
310 -> IoResult<Box<rtio::RtioPipe:Send>>
312 match PipeWatcher::open(self, fd) {
313 Ok(s) => Ok(box s as Box<rtio::RtioPipe:Send>),
314 Err(e) => Err(uv_error_to_io_error(e))
318 fn signal(&mut self, signum: int, cb: Box<rtio::Callback:Send>)
319 -> IoResult<Box<rtio::RtioSignal:Send>>
321 match SignalWatcher::new(self, signum, cb) {
322 Ok(s) => Ok(s as Box<rtio::RtioSignal:Send>),
323 Err(e) => Err(uv_error_to_io_error(e)),