The green scheduler can optimize its runtime based on this by deciding to not go
to sleep in epoll() if there is no active I/O and there is a task to be stolen.
This is implemented for librustuv by keeping a count of the number of tasks
which are currently homed. If a task is homed, and then performs a blocking I/O
operation, the count will be nonzero while the task is blocked. The homing count
is intentionally 0 when there are I/O handles, but no handles currently blocked.
The reason for this is that epoll() would only be used to wake up the scheduler
anyway.
The crux of this change was to have a `HomingMissile` contain a mutable borrowed
reference back to the `HomeHandle`. The rest of the change was just dealing with
this fallout. This reference is used to decrement the homed handle count in a
HomingMissile's destructor.
Also note that the count maintained is not atomic because all of its
increments/decrements/reads are all on the same I/O thread.
}
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> { None }
+
+ fn has_active_io(&self) -> bool { false }
}
struct BasicRemote {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { slot: None, status: 0, addrinfo: None };
- wait_until_woken_after(&mut cx.slot, || {
+ wait_until_woken_after(&mut cx.slot, loop_, || {
req.set_data(&cx);
});
0 => {
req.fired = true;
let mut slot = None;
- wait_until_woken_after(&mut slot, || {
+ let loop_ = unsafe { uvll::get_loop_from_fs_req(req.req) };
+ wait_until_woken_after(&mut slot, &Loop::wrap(loop_), || {
unsafe { uvll::set_data_for_req(req.req, &slot) }
});
match req.get_result() {
use std::cast;
use std::io;
use std::io::IoError;
-use std::libc::c_int;
+use std::libc::{c_int, c_void};
use std::ptr::null;
use std::ptr;
use std::rt::local::Local;
pub trait UvHandle<T> {
fn uv_handle(&self) -> *T;
+ fn uv_loop(&self) -> Loop {
+ Loop::wrap(unsafe { uvll::get_loop_for_uv_handle(self.uv_handle()) })
+ }
+
// FIXME(#8888) dummy self
fn alloc(_: Option<Self>, ty: uvll::uv_handle_type) -> *T {
unsafe {
uvll::uv_close(self.uv_handle() as *uvll::uv_handle_t, close_cb);
uvll::set_data_for_uv_handle(self.uv_handle(), ptr::null::<()>());
- wait_until_woken_after(&mut slot, || {
+ wait_until_woken_after(&mut slot, &self.uv_loop(), || {
uvll::set_data_for_uv_handle(self.uv_handle(), &slot);
})
}
}
}
-fn wait_until_woken_after(slot: *mut Option<BlockedTask>, f: ||) {
+fn wait_until_woken_after(slot: *mut Option<BlockedTask>,
+ loop_: &Loop,
+ f: ||) {
let _f = ForbidUnwind::new("wait_until_woken_after");
unsafe {
assert!((*slot).is_none());
let task: ~Task = Local::take();
+ loop_.modify_blockers(1);
task.deschedule(1, |task| {
*slot = Some(task);
f();
Ok(())
});
+ loop_.modify_blockers(-1);
}
}
pub fn new() -> Loop {
let handle = unsafe { uvll::loop_new() };
assert!(handle.is_not_null());
+ unsafe { uvll::set_data_for_uv_loop(handle, 0 as *c_void) }
Loop::wrap(handle)
}
pub fn close(&mut self) {
unsafe { uvll::uv_loop_delete(self.handle) };
}
+
+ // The 'data' field of the uv_loop_t is used to count the number of tasks
+ // that are currently blocked waiting for I/O to complete.
+ fn modify_blockers(&self, amt: uint) {
+ unsafe {
+ let cur = uvll::get_data_for_uv_loop(self.handle) as uint;
+ uvll::set_data_for_uv_loop(self.handle, (cur + amt) as *c_void)
+ }
+ }
+
+ fn get_blockers(&self) -> uint {
+ unsafe { uvll::get_data_for_uv_loop(self.handle) as uint }
+ }
}
// FIXME: Need to define the error constants like EOF so they can be
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { status: 0, task: None };
- wait_until_woken_after(&mut cx.task, || {
+ wait_until_woken_after(&mut cx.task, &io.loop_, || {
req.set_data(&cx);
});
match cx.status {
buf: Option<Buf>,
result: Option<(ssize_t, Option<ip::SocketAddr>)>,
}
+ let loop_ = self.uv_loop();
let m = self.fire_homing_missile();
let _g = self.read_access.grant(m);
result: None,
};
let handle = self.handle;
- wait_until_woken_after(&mut cx.task, || {
+ wait_until_woken_after(&mut cx.task, &loop_, || {
unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
});
match cx.result.take_unwrap() {
struct Ctx { task: Option<BlockedTask>, result: c_int }
let m = self.fire_homing_missile();
+ let loop_ = self.uv_loop();
let _g = self.write_access.grant(m);
let mut req = Request::new(uvll::UV_UDP_SEND);
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { task: None, result: 0 };
- wait_until_woken_after(&mut cx.task, || {
+ wait_until_woken_after(&mut cx.task, &loop_, || {
req.set_data(&cx);
});
match cx.result {
let mut req = Request::new(uvll::UV_CONNECT);
let pipe = PipeWatcher::new(io, false);
- wait_until_woken_after(&mut cx.task, || {
+ wait_until_woken_after(&mut cx.task, &io.loop_, || {
unsafe {
uvll::uv_pipe_connect(req.handle,
pipe.handle(),
// If there's no exit code previously listed, then the
// process's exit callback has yet to be invoked. We just
// need to deschedule ourselves and wait to be reawoken.
- wait_until_woken_after(&mut self.to_wake, || {});
+ wait_until_woken_after(&mut self.to_wake, &self.uv_loop(), || {});
assert!(self.exit_status.is_some());
}
}
use std::ptr;
use std::rt::task::BlockedTask;
+use Loop;
use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after,
ForbidUnwind, wakeup};
use uvll;
uvll::uv_read_start(self.handle, alloc_cb, read_cb)
} {
0 => {
- wait_until_woken_after(&mut rcx.task, || {});
+ let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) };
+ wait_until_woken_after(&mut rcx.task, &Loop::wrap(loop_), || {});
match rcx.result {
n if n < 0 => Err(UvError(n as c_int)),
n => Ok(n as uint),
let mut wcx = WriteContext { result: 0, task: None, };
req.defuse(); // uv callback now owns this request
- wait_until_woken_after(&mut wcx.task, || {
+ let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) };
+ wait_until_woken_after(&mut wcx.task, &Loop::wrap(loop_), || {
req.set_data(&wcx);
});
self.last_write_req = Some(Request::wrap(req.handle));
// except according to those terms.
use std::libc::c_int;
-use std::mem::replace;
-use std::rt::local::Local;
+use std::mem;
use std::rt::rtio::RtioTimer;
-use std::rt::task::{BlockedTask, Task};
+use std::rt::task::BlockedTask;
use homing::{HomeHandle, HomingIO};
-use super::{UvHandle, ForbidUnwind, ForbidSwitch};
+use super::{UvHandle, ForbidUnwind, ForbidSwitch, wait_until_woken_after};
use uvio::UvIoFactory;
use uvll;
handle: *uvll::uv_timer_t,
home: HomeHandle,
action: Option<NextAction>,
+ blocker: Option<BlockedTask>,
id: uint, // see comments in timer_cb
}
pub enum NextAction {
- WakeTask(BlockedTask),
+ WakeTask,
SendOnce(Chan<()>),
SendMany(Chan<()>, uint),
}
let me = ~TimerWatcher {
handle: handle,
action: None,
+ blocker: None,
home: io.make_handle(),
id: 0,
};
let missile = self.fire_homing_missile();
self.id += 1;
self.stop();
- let _missile = match replace(&mut self.action, None) {
+ let _missile = match mem::replace(&mut self.action, None) {
None => missile, // no need to do a homing dance
Some(action) => {
drop(missile); // un-home ourself
// started, then we need to call stop on the timer.
let _f = ForbidUnwind::new("timer");
- let task: ~Task = Local::take();
- task.deschedule(1, |task| {
- self.action = Some(WakeTask(task));
+ self.action = Some(WakeTask);
+ wait_until_woken_after(&mut self.blocker, &self.uv_loop(), || {
self.start(msecs, 0);
- Ok(())
});
self.stop();
}
self.id += 1;
self.stop();
self.start(msecs, 0);
- replace(&mut self.action, Some(SendOnce(chan)))
+ mem::replace(&mut self.action, Some(SendOnce(chan)))
};
return port;
self.id += 1;
self.stop();
self.start(msecs, msecs);
- replace(&mut self.action, Some(SendMany(chan, self.id)))
+ mem::replace(&mut self.action, Some(SendMany(chan, self.id)))
};
return port;
let timer: &mut TimerWatcher = unsafe { UvHandle::from_uv_handle(&handle) };
match timer.action.take_unwrap() {
- WakeTask(task) => {
+ WakeTask => {
+ let task = timer.blocker.take_unwrap();
let _ = task.wake().map(|t| t.reawaken());
}
SendOnce(chan) => { let _ = chan.try_send(()); }
let factory = &mut self.uvio as &mut rtio::IoFactory;
Some(factory)
}
+
+ fn has_active_io(&self) -> bool {
+ self.uvio.loop_.get_blockers() > 0
+ }
}
#[cfg(not(test))]
/// The asynchronous I/O services. Not all event loops may provide one.
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory>;
+ fn has_active_io(&self) -> bool;
}
pub trait RemoteCallback {