#[feature(macro_rules, globs)];
+use std::cast;
use std::str::raw::from_c_str;
use std::vec;
use std::ptr;
fn native_handle(&self) -> T;
}
+/// A type that wraps a uv handle
+pub trait UvHandle<T> {
+ fn uv_handle(&self) -> *T;
+
+ // FIXME(#8888) dummy self
+ fn alloc(_: Option<Self>, ty: uvll::uv_handle_type) -> *T {
+ unsafe {
+ let handle = uvll::malloc_handle(ty);
+ assert!(!handle.is_null());
+ handle as *T
+ }
+ }
+
+ unsafe fn from_uv_handle<'a>(h: &'a *T) -> &'a mut Self {
+ cast::transmute(uvll::get_data_for_uv_handle(*h))
+ }
+
+ fn install(~self) -> ~Self {
+ unsafe {
+ let myptr = cast::transmute::<&~Self, *u8>(&self);
+ uvll::set_data_for_uv_handle(self.uv_handle(), myptr);
+ }
+ self
+ }
+
+ fn close_async_(&mut self) {
+ // we used malloc to allocate all handles, so we must always have at
+ // least a callback to free all the handles we allocated.
+ extern fn close_cb(handle: *uvll::uv_handle_t) {
+ unsafe { uvll::free_handle(handle) }
+ }
+
+ unsafe { uvll::close(self.uv_handle(), close_cb) }
+ }
+}
+
impl Loop {
pub fn new() -> Loop {
let handle = unsafe { uvll::loop_new() };
/// Borrow a slice to a Buf
pub fn slice_to_uv_buf(v: &[u8]) -> Buf {
let data = vec::raw::to_ptr(v);
- unsafe { uvll::uv_buf_init(data as *c_char, v.len() as c_uint) }
+ uvll::uv_buf_t { base: data, len: v.len() as uvll::uv_buf_len_t }
}
// XXX: Do these conversions without copying
let data = data as *mut u8;
ptr::copy_memory(data, b, l)
}
- uvll::uv_buf_init(data as *c_char, v.len() as c_uint)
+ uvll::uv_buf_t { base: data, len: v.len() as uvll::uv_buf_len_t }
}
}
})
)
+// get a handle for the current scheduler
+macro_rules! get_handle_to_current_scheduler(
+ () => (do Local::borrow |sched: &mut Scheduler| { sched.make_handle() })
+)
+
pub fn dumb_println(args: &fmt::Arguments) {
use std::rt::io::native::stdio::stderr;
use std::rt::io::Writer;
// option. This file may not be copied, modified, or distributed
// except according to those terms.
+use std::cell::Cell;
+use std::comm::{oneshot, stream, PortOne, ChanOne};
use std::libc::c_int;
+use std::rt::BlockedTask;
+use std::rt::local::Local;
+use std::rt::rtio::RtioTimer;
+use std::rt::sched::{Scheduler, SchedHandle};
use uvll;
-use super::{Watcher, Loop, NativeHandle, TimerCallback, status_to_maybe_uv_error};
+use super::{Loop, NativeHandle, UvHandle};
+use uvio::HomingIO;
-pub struct TimerWatcher(*uvll::uv_timer_t);
-impl Watcher for TimerWatcher { }
+pub struct TimerWatcher {
+ handle: *uvll::uv_timer_t,
+ home: SchedHandle,
+ action: Option<NextAction>,
+}
+
+pub enum NextAction {
+ WakeTask(BlockedTask),
+ SendOnce(ChanOne<()>),
+ SendMany(Chan<()>),
+}
impl TimerWatcher {
- pub fn new(loop_: &mut Loop) -> TimerWatcher {
- unsafe {
- let handle = uvll::malloc_handle(uvll::UV_TIMER);
- assert!(handle.is_not_null());
- assert!(0 == uvll::uv_timer_init(loop_.native_handle(), handle));
- let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle);
- watcher.install_watcher_data();
- return watcher;
- }
+ pub fn new(loop_: &mut Loop) -> ~TimerWatcher {
+ let handle = UvHandle::alloc(None::<TimerWatcher>, uvll::UV_TIMER);
+ assert_eq!(unsafe {
+ uvll::timer_init(loop_.native_handle(), handle)
+ }, 0);
+ let me = ~TimerWatcher {
+ handle: handle,
+ action: None,
+ home: get_handle_to_current_scheduler!(),
+ };
+ return me.install();
}
- pub fn start(&mut self, timeout: u64, repeat: u64, cb: TimerCallback) {
- {
- let data = self.get_watcher_data();
- data.timer_cb = Some(cb);
- }
+ fn start(&mut self, msecs: u64, period: u64) {
+ assert_eq!(unsafe {
+ uvll::timer_start(self.handle, timer_cb, msecs, period)
+ }, 0)
+ }
+
+ fn stop(&mut self) {
+ assert_eq!(unsafe { uvll::timer_stop(self.handle) }, 0)
+ }
+}
+
+impl HomingIO for TimerWatcher {
+ fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+}
+
+impl UvHandle<uvll::uv_timer_t> for TimerWatcher {
+ fn uv_handle(&self) -> *uvll::uv_timer_t { self.handle }
+}
- unsafe {
- uvll::uv_timer_start(self.native_handle(), timer_cb, timeout, repeat);
+impl RtioTimer for TimerWatcher {
+ fn sleep(&mut self, msecs: u64) {
+ do self.home_for_io_with_sched |self_, scheduler| {
+ do scheduler.deschedule_running_task_and_then |_sched, task| {
+ self_.action = Some(WakeTask(task));
+ self_.start(msecs, 0);
+ }
+ self_.stop();
}
+ }
+
+ fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
+ let (port, chan) = oneshot();
+ let chan = Cell::new(chan);
- extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) {
- let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle);
- let data = watcher.get_watcher_data();
- let cb = data.timer_cb.get_ref();
- let status = status_to_maybe_uv_error(status);
- (*cb)(watcher, status);
+ do self.home_for_io |self_| {
+ self_.action = Some(SendOnce(chan.take()));
+ self_.start(msecs, 0);
}
+
+ return port;
}
- pub fn stop(&mut self) {
- unsafe {
- uvll::uv_timer_stop(self.native_handle());
+ fn period(&mut self, msecs: u64) -> Port<()> {
+ let (port, chan) = stream();
+ let chan = Cell::new(chan);
+
+ do self.home_for_io |self_| {
+ self_.action = Some(SendMany(chan.take()));
+ self_.start(msecs, msecs);
}
+
+ return port;
}
}
-impl NativeHandle<*uvll::uv_timer_t> for TimerWatcher {
- fn from_native_handle(handle: *uvll::uv_timer_t) -> TimerWatcher {
- TimerWatcher(handle)
+extern fn timer_cb(handle: *uvll::uv_timer_t, _status: c_int) {
+ let handle = handle as *uvll::uv_handle_t;
+ let foo: &mut TimerWatcher = unsafe { UvHandle::from_uv_handle(&handle) };
+
+ match foo.action.take_unwrap() {
+ WakeTask(task) => {
+ let sched: ~Scheduler = Local::take();
+ sched.resume_blocked_task_immediately(task);
+ }
+ SendOnce(chan) => chan.send(()),
+ SendMany(chan) => {
+ chan.send(());
+ foo.action = Some(SendMany(chan));
+ }
}
- fn native_handle(&self) -> *uvll::uv_idle_t {
- match self { &TimerWatcher(ptr) => ptr }
+}
+
+impl Drop for TimerWatcher {
+ fn drop(&mut self) {
+ do self.home_for_io |self_| {
+ self_.action = None;
+ self_.stop();
+ self_.close_async_();
+ }
}
}
use std::cast::transmute;
use std::cast;
use std::cell::Cell;
-use std::comm::{SendDeferred, SharedChan, Port, PortOne, GenericChan};
-use std::libc;
+use std::clone::Clone;
+use std::comm::{SendDeferred, SharedChan, GenericChan};
use std::libc::{c_int, c_uint, c_void, pid_t};
use std::ptr;
use std::str;
// XXX we should not be calling uvll functions in here.
-trait HomingIO {
+pub trait HomingIO {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
}
}
-// get a handle for the current scheduler
-macro_rules! get_handle_to_current_scheduler(
- () => (do Local::borrow |sched: &mut Scheduler| { sched.make_handle() })
-)
-
enum SocketNameKind {
TcpPeer,
Tcp,
}
fn timer_init(&mut self) -> Result<~RtioTimer, IoError> {
- let watcher = TimerWatcher::new(self.uv_loop());
- let home = get_handle_to_current_scheduler!();
- Ok(~UvTimer::new(watcher, home) as ~RtioTimer)
+ Ok(TimerWatcher::new(self.uv_loop()) as ~RtioTimer)
}
fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
}
}
-pub struct UvTimer {
- priv watcher: timer::TimerWatcher,
- priv home: SchedHandle,
-}
-
-impl HomingIO for UvTimer {
- fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
-}
-
-impl UvTimer {
- fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
- UvTimer { watcher: w, home: home }
- }
-}
-
-impl Drop for UvTimer {
- fn drop(&mut self) {
- let (_m, scheduler) = self.fire_homing_missile_sched();
- uvdebug!("closing UvTimer");
- do scheduler.deschedule_running_task_and_then |_, task| {
- let task_cell = Cell::new(task);
- do self.watcher.close {
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
- }
- }
-}
-
-impl RtioTimer for UvTimer {
- fn sleep(&mut self, msecs: u64) {
- let (_m, scheduler) = self.fire_homing_missile_sched();
- do scheduler.deschedule_running_task_and_then |_sched, task| {
- uvdebug!("sleep: entered scheduler context");
- let task_cell = Cell::new(task);
- do self.watcher.start(msecs, 0) |_, status| {
- assert!(status.is_none());
- let scheduler: ~Scheduler = Local::take();
- scheduler.resume_blocked_task_immediately(task_cell.take());
- }
- }
- self.watcher.stop();
- }
-
- fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
- use std::comm::oneshot;
-
- let (port, chan) = oneshot();
- let chan = Cell::new(chan);
- let _m = self.fire_homing_missile();
- do self.watcher.start(msecs, 0) |_, status| {
- assert!(status.is_none());
- assert!(!chan.is_empty());
- chan.take().send_deferred(());
- }
-
- return port;
- }
-
- fn period(&mut self, msecs: u64) -> Port<()> {
- use std::comm::stream;
-
- let (port, chan) = stream();
- let chan = Cell::new(chan);
- let _m = self.fire_homing_missile();
- do self.watcher.start(msecs, msecs) |_, status| {
- assert!(status.is_none());
- do chan.with_ref |chan| {
- chan.send_deferred(());
- }
- }
-
- return port;
- }
-}
-
pub struct UvFileStream {
priv loop_: Loop,
priv fd: c_int,
pub static STDIO_READABLE_PIPE: c_int = 0x10;
pub static STDIO_WRITABLE_PIPE: c_int = 0x20;
+#[cfg(unix)]
+pub type uv_buf_len_t = libc::size_t;
+#[cfg(windows)]
+pub type uv_buf_len_t = u32;
+
// see libuv/include/uv-unix.h
#[cfg(unix)]
pub struct uv_buf_t {
return write_req->handle;
}
-extern "C" void
-rust_uv_buf_init(uv_buf_t* out_buf, char* base, size_t len) {
- *out_buf = uv_buf_init(base, len);
-}
-
extern "C" uv_loop_t*
rust_uv_get_loop_for_uv_handle(uv_handle_t* handle) {
return handle->loop;