/// no longer try to go to sleep, but exit instead.
no_sleep: bool,
stack_pool: StackPool,
- /// The event loop used to drive the scheduler and perform I/O
- event_loop: ~EventLoop,
/// The scheduler runs on a special task. When it is not running
/// it is stored here instead of the work queue.
priv sched_task: Option<~Task>,
priv yield_check_count: uint,
/// A flag to tell the scheduler loop it needs to do some stealing
/// in order to introduce randomness as part of a yield
- priv steal_for_yield: bool
+ priv steal_for_yield: bool,
+
+ // n.b. currently destructors of an object are run in top-to-bottom in order
+ // of field declaration. Due to its nature, the pausible idle callback
+ // must have some sort of handle to the event loop, so it needs to get
+ // destroyed before the event loop itself. For this reason, we destroy
+ // the event loop last to ensure that any unsafe references to it are
+ // destroyed before it's actually destroyed.
+
+ /// The event loop used to drive the scheduler and perform I/O
+ event_loop: ~EventLoop,
}
/// An indication of how hard to work on a given operation, the difference
this.process_task(task, Scheduler::resume_task_immediately_cl);
return None;
}
+ Some(RunOnce(task)) => {
+ // bypass the process_task logic to force running this task once
+ // on this home scheduler. This is often used for I/O (homing).
+ Scheduler::resume_task_immediately_cl(this, task);
+ return None;
+ }
Some(Wake) => {
this.sleepy = false;
Local::put(this);
Wake,
Shutdown,
PinnedTask(~Task),
- TaskFromFriend(~Task)
+ TaskFromFriend(~Task),
+ RunOnce(~Task),
}
pub struct SchedHandle {
use cell::Cell;
use rt::thread::Thread;
use rt::task::{Task, Sched};
- use rt::rtio::EventLoop;
+ use rt::basic;
use rt::util;
use option::{Some};
#[test]
fn test_schedule_home_states() {
- use rt::uv::uvio::UvEventLoop;
use rt::sleeper_list::SleeperList;
use rt::work_queue::WorkQueue;
use rt::sched::Shutdown;
// Our normal scheduler
let mut normal_sched = ~Scheduler::new(
- ~UvEventLoop::new() as ~EventLoop,
+ basic::event_loop(),
normal_queue,
queues.clone(),
sleepers.clone());
// Our special scheduler
let mut special_sched = ~Scheduler::new_special(
- ~UvEventLoop::new() as ~EventLoop,
+ basic::event_loop(),
special_queue.clone(),
queues.clone(),
sleepers.clone(),
#[test]
fn test_io_callback() {
+ use rt::io::timer;
+
// This is a regression test that when there are no schedulable tasks
// in the work queue, but we are performing I/O, that once we do put
// something in the work queue again the scheduler picks it up and doesn't
// exit before emptying the work queue
- do run_in_newsched_task {
+ do run_in_uv_task {
do spawntask {
- let sched: ~Scheduler = Local::take();
- do sched.deschedule_running_task_and_then |sched, task| {
- let task = Cell::new(task);
- do sched.event_loop.callback_ms(10) {
- rtdebug!("in callback");
- let mut sched: ~Scheduler = Local::take();
- sched.enqueue_blocked_task(task.take());
- Local::put(sched);
- }
- }
+ timer::sleep(10);
}
}
}
use rt::work_queue::WorkQueue;
use rt::sleeper_list::SleeperList;
use rt::stack::StackPool;
- use rt::uv::uvio::UvEventLoop;
use rt::sched::{Shutdown, TaskFromFriend};
use util;
let queues = ~[queue.clone()];
let mut sched = ~Scheduler::new(
- ~UvEventLoop::new() as ~EventLoop,
+ basic::event_loop(),
queue,
queues.clone(),
sleepers.clone());
use rt::rtio::*;
use rt::sched::{Scheduler, SchedHandle};
use rt::tube::Tube;
- use rt::task::SchedHome;
+ use rt::task::Task;
use rt::uv::*;
use rt::uv::idle::IdleWatcher;
use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr};
fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
- /* XXX This will move pinned tasks to do IO on the proper scheduler
- * and then move them back to their home.
- */
- fn go_to_IO_home(&mut self) -> SchedHome {
- use rt::sched::PinnedTask;
+ /// This function will move tasks to run on their home I/O scheduler. Note
+ /// that this function does *not* pin the task to the I/O scheduler, but
+ /// rather it simply moves it to running on the I/O scheduler.
+ fn go_to_IO_home(&mut self) -> uint {
+ use rt::sched::RunOnce;
- do task::unkillable { // FIXME(#8674)
- let mut old = None;
- {
- let ptr = &mut old;
+ let current_sched_id = do Local::borrow |sched: &mut Scheduler| {
+ sched.sched_id()
+ };
+
+ // Only need to invoke a context switch if we're not on the right
+ // scheduler.
+ if current_sched_id != self.home().sched_id {
+ do task::unkillable { // FIXME(#8674)
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
/* FIXME(#8674) if the task was already killed then wake
- * will return None. In that case, the home pointer will never be set.
+ * will return None. In that case, the home pointer will
+ * never be set.
*
- * RESOLUTION IDEA: Since the task is dead, we should just abort the IO action.
+ * RESOLUTION IDEA: Since the task is dead, we should
+ * just abort the IO action.
*/
- do task.wake().map |mut task| {
- *ptr = Some(task.take_unwrap_home());
- self.home().send(PinnedTask(task));
+ do task.wake().map |task| {
+ self.home().send(RunOnce(task));
};
}
}
- old.expect("No old home because task had already been killed.")
}
- }
- // XXX dummy self param
- fn restore_original_home(_dummy_self: Option<Self>, old: SchedHome) {
- use rt::sched::TaskFromFriend;
+ self.home().sched_id
+ }
- let old = Cell::new(old);
- do task::unkillable { // FIXME(#8674)
- let scheduler: ~Scheduler = Local::take();
- do scheduler.deschedule_running_task_and_then |scheduler, task| {
- /* FIXME(#8674) if the task was already killed then wake
- * will return None. In that case, the home pointer will never be restored.
- *
- * RESOLUTION IDEA: Since the task is dead, we should just abort the IO action.
- */
- do task.wake().map |mut task| {
- task.give_home(old.take());
- scheduler.make_handle().send(TaskFromFriend(task));
- };
+ // XXX: dummy self parameter
+ fn restore_original_home(_: Option<Self>, io_home: uint) {
+ // It would truly be a sad day if we had moved off the home I/O
+ // scheduler while we were doing I/O.
+ assert_eq!(Local::borrow(|sched: &mut Scheduler| sched.sched_id()),
+ io_home);
+
+ // If we were a homed task, then we must send ourselves back to the
+ // original scheduler. Otherwise, we can just return and keep running
+ if !Task::on_appropriate_sched() {
+ do task::unkillable { // FIXME(#8674)
+ let scheduler: ~Scheduler = Local::take();
+ do scheduler.deschedule_running_task_and_then |_, task| {
+ do task.wake().map |task| {
+ Scheduler::run_task(task);
+ };
+ }
}
}
}
fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
let home = self.go_to_IO_home();
let a = io(self); // do IO
- HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
+ HomingIO::restore_original_home(None::<Self>, home);
a // return the result of the IO
}
let mut this = self;
let home = this.go_to_IO_home();
let a = io(this); // do IO
- HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
+ HomingIO::restore_original_home(None::<Self>, home);
a // return the result of the IO
}
let scheduler: ~Scheduler = Local::take();
io_sched(self, scheduler) // do IO and scheduling action
};
- HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
+ HomingIO::restore_original_home(None::<Self>, home);
a // return result of IO
}
}
} as ~PausibleIdleCallback
}
- fn callback_ms(&mut self, ms: u64, f: ~fn()) {
- let mut timer = TimerWatcher::new(self.uvio.uv_loop());
- do timer.start(ms, 0) |timer, status| {
- assert!(status.is_none());
- timer.close(||());
- f();
- }
- }
-
fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallback {
~UvRemoteCallback::new(self.uvio.uv_loop(), f) as ~RemoteCallback
}
}
fn isatty(&self) -> bool {
- unsafe { uvll::guess_handle(self.fd) == uvll::UV_TTY }
+ unsafe { uvll::guess_handle(self.fd) == uvll::UV_TTY as c_int }
}
}