]> git.lizzy.rs Git - rust.git/commitdiff
Expose whether event loops have active I/O
authorAlex Crichton <alex@alexcrichton.com>
Tue, 11 Feb 2014 03:59:35 +0000 (19:59 -0800)
committerAlex Crichton <alex@alexcrichton.com>
Wed, 12 Feb 2014 17:46:31 +0000 (09:46 -0800)
The green scheduler can optimize its runtime based on this by deciding to not go
to sleep in epoll() if there is no active I/O and there is a task to be stolen.

This is implemented for librustuv by keeping a count of the number of tasks
which are currently homed. If a task is homed, and then performs a blocking I/O
operation, the count will be nonzero while the task is blocked. The homing count
is intentionally 0 when there are I/O handles, but no handles currently blocked.
The reason for this is that epoll() would only be used to wake up the scheduler
anyway.

The crux of this change was to have a `HomingMissile` contain a mutable borrowed
reference back to the `HomeHandle`. The rest of the change was just dealing with
this fallout. This reference is used to decrement the homed handle count in a
HomingMissile's destructor.

Also note that the count maintained is not atomic because all of its
increments/decrements/reads are all on the same I/O thread.

src/libgreen/basic.rs
src/librustuv/addrinfo.rs
src/librustuv/file.rs
src/librustuv/lib.rs
src/librustuv/net.rs
src/librustuv/pipe.rs
src/librustuv/process.rs
src/librustuv/stream.rs
src/librustuv/timer.rs
src/librustuv/uvio.rs
src/libstd/rt/rtio.rs

index 10a56b2b2253a649a7bec0d3a0e670ad27141942..5bccf05f7b3f6ed4f661dd1173fcb433fdcc6c22 100644 (file)
@@ -158,6 +158,8 @@ fn remote_callback(&mut self, f: ~Callback) -> ~RemoteCallback {
     }
 
     fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> { None }
+
+    fn has_active_io(&self) -> bool { false }
 }
 
 struct BasicRemote {
index 2740671c00d60489abb90b402fce4ddafa5ea4f8..5d6af2969b8b3ba21434ed72f589e26bd103e234 100644 (file)
@@ -86,7 +86,7 @@ pub fn run(loop_: &Loop, node: Option<&str>, service: Option<&str>,
                 req.defuse(); // uv callback now owns this request
                 let mut cx = Ctx { slot: None, status: 0, addrinfo: None };
 
-                wait_until_woken_after(&mut cx.slot, || {
+                wait_until_woken_after(&mut cx.slot, loop_, || {
                     req.set_data(&cx);
                 });
 
index 2cef2664c2fc41caf59cb6a69ce6d7b624117e88..e66452041a5315fec507cbac63528d10a5e6a302 100644 (file)
@@ -304,7 +304,8 @@ fn execute(f: |*uvll::uv_fs_t, uvll::uv_fs_cb| -> c_int)
         0 => {
             req.fired = true;
             let mut slot = None;
-            wait_until_woken_after(&mut slot, || {
+            let loop_ = unsafe { uvll::get_loop_from_fs_req(req.req) };
+            wait_until_woken_after(&mut slot, &Loop::wrap(loop_), || {
                 unsafe { uvll::set_data_for_req(req.req, &slot) }
             });
             match req.get_result() {
index b463bb7fd733db129ad429414e67b33bcdc9bb30..d5e643febe35a02611619907d31e89433c4a8f9c 100644 (file)
@@ -47,7 +47,7 @@
 use std::cast;
 use std::io;
 use std::io::IoError;
-use std::libc::c_int;
+use std::libc::{c_int, c_void};
 use std::ptr::null;
 use std::ptr;
 use std::rt::local::Local;
 pub trait UvHandle<T> {
     fn uv_handle(&self) -> *T;
 
+    fn uv_loop(&self) -> Loop {
+        Loop::wrap(unsafe { uvll::get_loop_for_uv_handle(self.uv_handle()) })
+    }
+
     // FIXME(#8888) dummy self
     fn alloc(_: Option<Self>, ty: uvll::uv_handle_type) -> *T {
         unsafe {
@@ -136,7 +140,7 @@ fn close(&mut self) {
             uvll::uv_close(self.uv_handle() as *uvll::uv_handle_t, close_cb);
             uvll::set_data_for_uv_handle(self.uv_handle(), ptr::null::<()>());
 
-            wait_until_woken_after(&mut slot, || {
+            wait_until_woken_after(&mut slot, &self.uv_loop(), || {
                 uvll::set_data_for_uv_handle(self.uv_handle(), &slot);
             })
         }
@@ -195,16 +199,20 @@ fn drop(&mut self) {
     }
 }
 
-fn wait_until_woken_after(slot: *mut Option<BlockedTask>, f: ||) {
+fn wait_until_woken_after(slot: *mut Option<BlockedTask>,
+                          loop_: &Loop,
+                          f: ||) {
     let _f = ForbidUnwind::new("wait_until_woken_after");
     unsafe {
         assert!((*slot).is_none());
         let task: ~Task = Local::take();
+        loop_.modify_blockers(1);
         task.deschedule(1, |task| {
             *slot = Some(task);
             f();
             Ok(())
         });
+        loop_.modify_blockers(-1);
     }
 }
 
@@ -273,6 +281,7 @@ impl Loop {
     pub fn new() -> Loop {
         let handle = unsafe { uvll::loop_new() };
         assert!(handle.is_not_null());
+        unsafe { uvll::set_data_for_uv_loop(handle, 0 as *c_void) }
         Loop::wrap(handle)
     }
 
@@ -285,6 +294,19 @@ pub fn run(&mut self) {
     pub fn close(&mut self) {
         unsafe { uvll::uv_loop_delete(self.handle) };
     }
+
+    // The 'data' field of the uv_loop_t is used to count the number of tasks
+    // that are currently blocked waiting for I/O to complete.
+    fn modify_blockers(&self, amt: uint) {
+        unsafe {
+            let cur = uvll::get_data_for_uv_loop(self.handle) as uint;
+            uvll::set_data_for_uv_loop(self.handle, (cur + amt) as *c_void)
+        }
+    }
+
+    fn get_blockers(&self) -> uint {
+        unsafe { uvll::get_data_for_uv_loop(self.handle) as uint }
+    }
 }
 
 // FIXME: Need to define the error constants like EOF so they can be
index 551e2c9faf74f47ef980cc2fa8e8cd780d93aacf..a091829f297e86b4435eb06c0683d5e2c2b4f6d1 100644 (file)
@@ -216,7 +216,7 @@ struct Ctx { status: c_int, task: Option<BlockedTask> }
             0 => {
                 req.defuse(); // uv callback now owns this request
                 let mut cx = Ctx { status: 0, task: None };
-                wait_until_woken_after(&mut cx.task, || {
+                wait_until_woken_after(&mut cx.task, &io.loop_, || {
                     req.set_data(&cx);
                 });
                 match cx.status {
@@ -498,6 +498,7 @@ struct Ctx {
             buf: Option<Buf>,
             result: Option<(ssize_t, Option<ip::SocketAddr>)>,
         }
+        let loop_ = self.uv_loop();
         let m = self.fire_homing_missile();
         let _g = self.read_access.grant(m);
 
@@ -511,7 +512,7 @@ struct Ctx {
                     result: None,
                 };
                 let handle = self.handle;
-                wait_until_woken_after(&mut cx.task, || {
+                wait_until_woken_after(&mut cx.task, &loop_, || {
                     unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
                 });
                 match cx.result.take_unwrap() {
@@ -571,6 +572,7 @@ fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> {
         struct Ctx { task: Option<BlockedTask>, result: c_int }
 
         let m = self.fire_homing_missile();
+        let loop_ = self.uv_loop();
         let _g = self.write_access.grant(m);
 
         let mut req = Request::new(uvll::UV_UDP_SEND);
@@ -586,7 +588,7 @@ struct Ctx { task: Option<BlockedTask>, result: c_int }
             0 => {
                 req.defuse(); // uv callback now owns this request
                 let mut cx = Ctx { task: None, result: 0 };
-                wait_until_woken_after(&mut cx.task, || {
+                wait_until_woken_after(&mut cx.task, &loop_, || {
                     req.set_data(&cx);
                 });
                 match cx.result {
index c312f112d28b4caeb975d0fcd582ad32b48e8905..24ac17700cc2748148b2426964ee041367d6ecc2 100644 (file)
@@ -92,7 +92,7 @@ struct Ctx { task: Option<BlockedTask>, result: libc::c_int, }
         let mut req = Request::new(uvll::UV_CONNECT);
         let pipe = PipeWatcher::new(io, false);
 
-        wait_until_woken_after(&mut cx.task, || {
+        wait_until_woken_after(&mut cx.task, &io.loop_, || {
             unsafe {
                 uvll::uv_pipe_connect(req.handle,
                                       pipe.handle(),
index 7b7a16d7084e5aa8da28c5e64d246e369719f410..e1f94d8c4df5a2bd99576d93d9b58f4be7d8aecc 100644 (file)
@@ -211,7 +211,7 @@ fn wait(&mut self) -> process::ProcessExit {
                 // If there's no exit code previously listed, then the
                 // process's exit callback has yet to be invoked. We just
                 // need to deschedule ourselves and wait to be reawoken.
-                wait_until_woken_after(&mut self.to_wake, || {});
+                wait_until_woken_after(&mut self.to_wake, &self.uv_loop(), || {});
                 assert!(self.exit_status.is_some());
             }
         }
index 262952f8890cb7be7ad314ce8fca68bce0489ef7..f7bf2f051eb90c5029176b0e29f40bc3ebad3169 100644 (file)
@@ -13,6 +13,7 @@
 use std::ptr;
 use std::rt::task::BlockedTask;
 
+use Loop;
 use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after,
             ForbidUnwind, wakeup};
 use uvll;
@@ -87,7 +88,8 @@ pub fn read(&mut self, buf: &mut [u8]) -> Result<uint, UvError> {
             uvll::uv_read_start(self.handle, alloc_cb, read_cb)
         } {
             0 => {
-                wait_until_woken_after(&mut rcx.task, || {});
+                let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) };
+                wait_until_woken_after(&mut rcx.task, &Loop::wrap(loop_), || {});
                 match rcx.result {
                     n if n < 0 => Err(UvError(n as c_int)),
                     n => Ok(n as uint),
@@ -121,7 +123,8 @@ pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> {
                 let mut wcx = WriteContext { result: 0, task: None, };
                 req.defuse(); // uv callback now owns this request
 
-                wait_until_woken_after(&mut wcx.task, || {
+                let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) };
+                wait_until_woken_after(&mut wcx.task, &Loop::wrap(loop_), || {
                     req.set_data(&wcx);
                 });
                 self.last_write_req = Some(Request::wrap(req.handle));
index 0ce2501d2cc7f8fd65a37285ef3c10b5ec190997..8c80cc991450483d12653697db01562130ca7d00 100644 (file)
@@ -9,13 +9,12 @@
 // except according to those terms.
 
 use std::libc::c_int;
-use std::mem::replace;
-use std::rt::local::Local;
+use std::mem;
 use std::rt::rtio::RtioTimer;
-use std::rt::task::{BlockedTask, Task};
+use std::rt::task::BlockedTask;
 
 use homing::{HomeHandle, HomingIO};
-use super::{UvHandle, ForbidUnwind, ForbidSwitch};
+use super::{UvHandle, ForbidUnwind, ForbidSwitch, wait_until_woken_after};
 use uvio::UvIoFactory;
 use uvll;
 
@@ -23,11 +22,12 @@ pub struct TimerWatcher {
     handle: *uvll::uv_timer_t,
     home: HomeHandle,
     action: Option<NextAction>,
+    blocker: Option<BlockedTask>,
     id: uint, // see comments in timer_cb
 }
 
 pub enum NextAction {
-    WakeTask(BlockedTask),
+    WakeTask,
     SendOnce(Chan<()>),
     SendMany(Chan<()>, uint),
 }
@@ -41,6 +41,7 @@ pub fn new(io: &mut UvIoFactory) -> ~TimerWatcher {
         let me = ~TimerWatcher {
             handle: handle,
             action: None,
+            blocker: None,
             home: io.make_handle(),
             id: 0,
         };
@@ -76,7 +77,7 @@ fn sleep(&mut self, msecs: u64) {
         let missile = self.fire_homing_missile();
         self.id += 1;
         self.stop();
-        let _missile = match replace(&mut self.action, None) {
+        let _missile = match mem::replace(&mut self.action, None) {
             None => missile, // no need to do a homing dance
             Some(action) => {
                 drop(missile);      // un-home ourself
@@ -89,11 +90,9 @@ fn sleep(&mut self, msecs: u64) {
         // started, then we need to call stop on the timer.
         let _f = ForbidUnwind::new("timer");
 
-        let task: ~Task = Local::take();
-        task.deschedule(1, |task| {
-            self.action = Some(WakeTask(task));
+        self.action = Some(WakeTask);
+        wait_until_woken_after(&mut self.blocker, &self.uv_loop(), || {
             self.start(msecs, 0);
-            Ok(())
         });
         self.stop();
     }
@@ -108,7 +107,7 @@ fn oneshot(&mut self, msecs: u64) -> Port<()> {
             self.id += 1;
             self.stop();
             self.start(msecs, 0);
-            replace(&mut self.action, Some(SendOnce(chan)))
+            mem::replace(&mut self.action, Some(SendOnce(chan)))
         };
 
         return port;
@@ -124,7 +123,7 @@ fn period(&mut self, msecs: u64) -> Port<()> {
             self.id += 1;
             self.stop();
             self.start(msecs, msecs);
-            replace(&mut self.action, Some(SendMany(chan, self.id)))
+            mem::replace(&mut self.action, Some(SendMany(chan, self.id)))
         };
 
         return port;
@@ -137,7 +136,8 @@ fn period(&mut self, msecs: u64) -> Port<()> {
     let timer: &mut TimerWatcher = unsafe { UvHandle::from_uv_handle(&handle) };
 
     match timer.action.take_unwrap() {
-        WakeTask(task) => {
+        WakeTask => {
+            let task = timer.blocker.take_unwrap();
             let _ = task.wake().map(|t| t.reawaken());
         }
         SendOnce(chan) => { let _ = chan.try_send(()); }
index 54db4b4d3d13f422e3cdf9974a55d2be09774901..14406cb2a6a0146b6f3d21d699fb0b739f615100 100644 (file)
@@ -99,6 +99,10 @@ fn io<'a>(&'a mut self) -> Option<&'a mut rtio::IoFactory> {
         let factory = &mut self.uvio as &mut rtio::IoFactory;
         Some(factory)
     }
+
+    fn has_active_io(&self) -> bool {
+        self.uvio.loop_.get_blockers() > 0
+    }
 }
 
 #[cfg(not(test))]
index 39623e329eae928095361fa01228b425a85378ea..5573f8ec02eb3c17e457a22cf57336958f1139cd 100644 (file)
@@ -41,6 +41,7 @@ pub trait EventLoop {
 
     /// The asynchronous I/O services. Not all event loops may provide one.
     fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory>;
+    fn has_active_io(&self) -> bool;
 }
 
 pub trait RemoteCallback {