]> git.lizzy.rs Git - rust.git/commitdiff
Migrate uv timer bindings away from ~fn()
authorAlex Crichton <alex@alexcrichton.com>
Fri, 1 Nov 2013 16:36:21 +0000 (09:36 -0700)
committerAlex Crichton <alex@alexcrichton.com>
Sun, 10 Nov 2013 09:37:10 +0000 (01:37 -0800)
src/librustuv/lib.rs
src/librustuv/macros.rs
src/librustuv/timer.rs
src/librustuv/uvio.rs
src/librustuv/uvll.rs
src/rt/rust_uv.cpp

index 6aa8723a4017dadd1a4d3aa26cc264354d306ac0..3d0ea4e6d1b89680a440f640b31fc63882b59023 100644 (file)
@@ -45,6 +45,7 @@
 
 #[feature(macro_rules, globs)];
 
+use std::cast;
 use std::str::raw::from_c_str;
 use std::vec;
 use std::ptr;
@@ -119,6 +120,42 @@ pub trait NativeHandle<T> {
     fn native_handle(&self) -> T;
 }
 
+/// A type that wraps a uv handle
+pub trait UvHandle<T> {
+    fn uv_handle(&self) -> *T;
+
+    // FIXME(#8888) dummy self
+    fn alloc(_: Option<Self>, ty: uvll::uv_handle_type) -> *T {
+        unsafe {
+            let handle = uvll::malloc_handle(ty);
+            assert!(!handle.is_null());
+            handle as *T
+        }
+    }
+
+    unsafe fn from_uv_handle<'a>(h: &'a *T) -> &'a mut Self {
+        cast::transmute(uvll::get_data_for_uv_handle(*h))
+    }
+
+    fn install(~self) -> ~Self {
+        unsafe {
+            let myptr = cast::transmute::<&~Self, *u8>(&self);
+            uvll::set_data_for_uv_handle(self.uv_handle(), myptr);
+        }
+        self
+    }
+
+    fn close_async_(&mut self) {
+        // we used malloc to allocate all handles, so we must always have at
+        // least a callback to free all the handles we allocated.
+        extern fn close_cb(handle: *uvll::uv_handle_t) {
+            unsafe { uvll::free_handle(handle) }
+        }
+
+        unsafe { uvll::close(self.uv_handle(), close_cb) }
+    }
+}
+
 impl Loop {
     pub fn new() -> Loop {
         let handle = unsafe { uvll::loop_new() };
@@ -367,7 +404,7 @@ pub fn empty_buf() -> Buf {
 /// Borrow a slice to a Buf
 pub fn slice_to_uv_buf(v: &[u8]) -> Buf {
     let data = vec::raw::to_ptr(v);
-    unsafe { uvll::uv_buf_init(data as *c_char, v.len() as c_uint) }
+    uvll::uv_buf_t { base: data, len: v.len() as uvll::uv_buf_len_t }
 }
 
 // XXX: Do these conversions without copying
@@ -383,7 +420,7 @@ pub fn vec_to_uv_buf(v: ~[u8]) -> Buf {
             let data = data as *mut u8;
             ptr::copy_memory(data, b, l)
         }
-        uvll::uv_buf_init(data as *c_char, v.len() as c_uint)
+        uvll::uv_buf_t { base: data, len: v.len() as uvll::uv_buf_len_t }
     }
 }
 
index cbbed316d83df571fa228e892a679db09c89bb95..90b8263da79fd326614f251e6e4af11b5253bd8c 100644 (file)
@@ -27,6 +27,11 @@ macro_rules! uvdebug (
     })
 )
 
+// get a handle for the current scheduler
+macro_rules! get_handle_to_current_scheduler(
+    () => (do Local::borrow |sched: &mut Scheduler| { sched.make_handle() })
+)
+
 pub fn dumb_println(args: &fmt::Arguments) {
     use std::rt::io::native::stdio::stderr;
     use std::rt::io::Writer;
index 4fc4934bf650af62e3361ce9c6337a5c100a6bf9..f89a6c5e5c57fa7b625b0170f087bb2c49711569 100644 (file)
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+use std::cell::Cell;
+use std::comm::{oneshot, stream, PortOne, ChanOne};
 use std::libc::c_int;
+use std::rt::BlockedTask;
+use std::rt::local::Local;
+use std::rt::rtio::RtioTimer;
+use std::rt::sched::{Scheduler, SchedHandle};
 
 use uvll;
-use super::{Watcher, Loop, NativeHandle, TimerCallback, status_to_maybe_uv_error};
+use super::{Loop, NativeHandle, UvHandle};
+use uvio::HomingIO;
 
-pub struct TimerWatcher(*uvll::uv_timer_t);
-impl Watcher for TimerWatcher { }
+pub struct TimerWatcher {
+    handle: *uvll::uv_timer_t,
+    home: SchedHandle,
+    action: Option<NextAction>,
+}
+
+pub enum NextAction {
+    WakeTask(BlockedTask),
+    SendOnce(ChanOne<()>),
+    SendMany(Chan<()>),
+}
 
 impl TimerWatcher {
-    pub fn new(loop_: &mut Loop) -> TimerWatcher {
-        unsafe {
-            let handle = uvll::malloc_handle(uvll::UV_TIMER);
-            assert!(handle.is_not_null());
-            assert!(0 == uvll::uv_timer_init(loop_.native_handle(), handle));
-            let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle);
-            watcher.install_watcher_data();
-            return watcher;
-        }
+    pub fn new(loop_: &mut Loop) -> ~TimerWatcher {
+        let handle = UvHandle::alloc(None::<TimerWatcher>, uvll::UV_TIMER);
+        assert_eq!(unsafe {
+            uvll::timer_init(loop_.native_handle(), handle)
+        }, 0);
+        let me = ~TimerWatcher {
+            handle: handle,
+            action: None,
+            home: get_handle_to_current_scheduler!(),
+        };
+        return me.install();
     }
 
-    pub fn start(&mut self, timeout: u64, repeat: u64, cb: TimerCallback) {
-        {
-            let data = self.get_watcher_data();
-            data.timer_cb = Some(cb);
-        }
+    fn start(&mut self, msecs: u64, period: u64) {
+        assert_eq!(unsafe {
+            uvll::timer_start(self.handle, timer_cb, msecs, period)
+        }, 0)
+    }
+
+    fn stop(&mut self) {
+        assert_eq!(unsafe { uvll::timer_stop(self.handle) }, 0)
+    }
+}
+
+impl HomingIO for TimerWatcher {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+}
+
+impl UvHandle<uvll::uv_timer_t> for TimerWatcher {
+    fn uv_handle(&self) -> *uvll::uv_timer_t { self.handle }
+}
 
-        unsafe {
-            uvll::uv_timer_start(self.native_handle(), timer_cb, timeout, repeat);
+impl RtioTimer for TimerWatcher {
+    fn sleep(&mut self, msecs: u64) {
+        do self.home_for_io_with_sched |self_, scheduler| {
+            do scheduler.deschedule_running_task_and_then |_sched, task| {
+                self_.action = Some(WakeTask(task));
+                self_.start(msecs, 0);
+            }
+            self_.stop();
         }
+    }
+
+    fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
+        let (port, chan) = oneshot();
+        let chan = Cell::new(chan);
 
-        extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) {
-            let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle);
-            let data = watcher.get_watcher_data();
-            let cb = data.timer_cb.get_ref();
-            let status = status_to_maybe_uv_error(status);
-            (*cb)(watcher, status);
+        do self.home_for_io |self_| {
+            self_.action = Some(SendOnce(chan.take()));
+            self_.start(msecs, 0);
         }
+
+        return port;
     }
 
-    pub fn stop(&mut self) {
-        unsafe {
-            uvll::uv_timer_stop(self.native_handle());
+    fn period(&mut self, msecs: u64) -> Port<()> {
+        let (port, chan) = stream();
+        let chan = Cell::new(chan);
+
+        do self.home_for_io |self_| {
+            self_.action = Some(SendMany(chan.take()));
+            self_.start(msecs, msecs);
         }
+
+        return port;
     }
 }
 
-impl NativeHandle<*uvll::uv_timer_t> for TimerWatcher {
-    fn from_native_handle(handle: *uvll::uv_timer_t) -> TimerWatcher {
-        TimerWatcher(handle)
+extern fn timer_cb(handle: *uvll::uv_timer_t, _status: c_int) {
+    let handle = handle as *uvll::uv_handle_t;
+    let foo: &mut TimerWatcher = unsafe { UvHandle::from_uv_handle(&handle) };
+
+    match foo.action.take_unwrap() {
+        WakeTask(task) => {
+            let sched: ~Scheduler = Local::take();
+            sched.resume_blocked_task_immediately(task);
+        }
+        SendOnce(chan) => chan.send(()),
+        SendMany(chan) => {
+            chan.send(());
+            foo.action = Some(SendMany(chan));
+        }
     }
-    fn native_handle(&self) -> *uvll::uv_idle_t {
-        match self { &TimerWatcher(ptr) => ptr }
+}
+
+impl Drop for TimerWatcher {
+    fn drop(&mut self) {
+        do self.home_for_io |self_| {
+            self_.action = None;
+            self_.stop();
+            self_.close_async_();
+        }
     }
 }
 
index bf8358070dcdf9e7572f9a909fd59d2f2893fe91..5e67e79c020f6ab40e2236321a9d8a8337d68bde 100644 (file)
@@ -12,8 +12,8 @@
 use std::cast::transmute;
 use std::cast;
 use std::cell::Cell;
-use std::comm::{SendDeferred, SharedChan, Port, PortOne, GenericChan};
-use std::libc;
+use std::clone::Clone;
+use std::comm::{SendDeferred, SharedChan, GenericChan};
 use std::libc::{c_int, c_uint, c_void, pid_t};
 use std::ptr;
 use std::str;
@@ -49,7 +49,7 @@
 
 // XXX we should not be calling uvll functions in here.
 
-trait HomingIO {
+pub trait HomingIO {
 
     fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
 
@@ -135,11 +135,6 @@ fn drop(&mut self) {
     }
 }
 
-// get a handle for the current scheduler
-macro_rules! get_handle_to_current_scheduler(
-    () => (do Local::borrow |sched: &mut Scheduler| { sched.make_handle() })
-)
-
 enum SocketNameKind {
     TcpPeer,
     Tcp,
@@ -581,9 +576,7 @@ fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocket, IoError> {
     }
 
     fn timer_init(&mut self) -> Result<~RtioTimer, IoError> {
-        let watcher = TimerWatcher::new(self.uv_loop());
-        let home = get_handle_to_current_scheduler!();
-        Ok(~UvTimer::new(watcher, home) as ~RtioTimer)
+        Ok(TimerWatcher::new(self.uv_loop()) as ~RtioTimer)
     }
 
     fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
@@ -1365,82 +1358,6 @@ fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
     }
 }
 
-pub struct UvTimer {
-    priv watcher: timer::TimerWatcher,
-    priv home: SchedHandle,
-}
-
-impl HomingIO for UvTimer {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
-}
-
-impl UvTimer {
-    fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
-        UvTimer { watcher: w, home: home }
-    }
-}
-
-impl Drop for UvTimer {
-    fn drop(&mut self) {
-        let (_m, scheduler) = self.fire_homing_missile_sched();
-        uvdebug!("closing UvTimer");
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            do self.watcher.close {
-                let scheduler: ~Scheduler = Local::take();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
-            }
-        }
-    }
-}
-
-impl RtioTimer for UvTimer {
-    fn sleep(&mut self, msecs: u64) {
-        let (_m, scheduler) = self.fire_homing_missile_sched();
-        do scheduler.deschedule_running_task_and_then |_sched, task| {
-            uvdebug!("sleep: entered scheduler context");
-            let task_cell = Cell::new(task);
-            do self.watcher.start(msecs, 0) |_, status| {
-                assert!(status.is_none());
-                let scheduler: ~Scheduler = Local::take();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
-            }
-        }
-        self.watcher.stop();
-    }
-
-    fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
-        use std::comm::oneshot;
-
-        let (port, chan) = oneshot();
-        let chan = Cell::new(chan);
-        let _m = self.fire_homing_missile();
-        do self.watcher.start(msecs, 0) |_, status| {
-            assert!(status.is_none());
-            assert!(!chan.is_empty());
-            chan.take().send_deferred(());
-        }
-
-        return port;
-    }
-
-    fn period(&mut self, msecs: u64) -> Port<()> {
-        use std::comm::stream;
-
-        let (port, chan) = stream();
-        let chan = Cell::new(chan);
-        let _m = self.fire_homing_missile();
-        do self.watcher.start(msecs, msecs) |_, status| {
-            assert!(status.is_none());
-            do chan.with_ref |chan| {
-                chan.send_deferred(());
-            }
-        }
-
-        return port;
-    }
-}
-
 pub struct UvFileStream {
     priv loop_: Loop,
     priv fd: c_int,
index 120a69fb244980f19c0a7879d85b603dd8d54ce5..3028546972fe625a7f84c56fd84d9ea07501cad1 100644 (file)
@@ -80,6 +80,11 @@ pub mod errors {
 pub static STDIO_READABLE_PIPE: c_int = 0x10;
 pub static STDIO_WRITABLE_PIPE: c_int = 0x20;
 
+#[cfg(unix)]
+pub type uv_buf_len_t = libc::size_t;
+#[cfg(windows)]
+pub type uv_buf_len_t = u32;
+
 // see libuv/include/uv-unix.h
 #[cfg(unix)]
 pub struct uv_buf_t {
index 09aa806891ace92abab8ee998ac5038b61d7cd8c..2745c6062e6e4b688da0703da95d2c4f2777f010 100644 (file)
@@ -135,11 +135,6 @@ rust_uv_get_stream_handle_from_write_req(uv_write_t* write_req) {
     return write_req->handle;
 }
 
-extern "C" void
-rust_uv_buf_init(uv_buf_t* out_buf, char* base, size_t len) {
-    *out_buf = uv_buf_init(base, len);
-}
-
 extern "C" uv_loop_t*
 rust_uv_get_loop_for_uv_handle(uv_handle_t* handle) {
     return handle->loop;