fn optimistic_check(&mut self) -> bool {
// The optimistic check is never necessary for correctness. For testing
// purposes, making it randomly return false simulates a racing sender.
- use rand::{Rand, rng};
- let mut rng = rng();
- let actually_check = Rand::rand(&mut rng);
+ use rand::{Rand};
+ let actually_check = do Local::borrow::<Scheduler, bool> |sched| {
+ Rand::rand(&mut sched.rng)
+ };
if actually_check {
unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
} else {
use cell::Cell;
use clone::Clone;
use container::Container;
-use iter::Times;
-use iterator::{Iterator, IteratorUtil};
+use iterator::{Iterator, IteratorUtil, range};
use option::{Some, None};
use ptr::RawPtr;
use rt::local::Local;
let main = Cell::new(main);
- // The shared list of sleeping schedulers. Schedulers wake each other
- // occassionally to do new work.
+ // The shared list of sleeping schedulers.
let sleepers = SleeperList::new();
- // The shared work queue. Temporary until work stealing is implemented.
- let work_queue = WorkQueue::new();
+
+ // Create a work queue for each scheduler, ntimes. Create an extra
+ // for the main thread if that flag is set. We won't steal from it.
+ let mut work_queues = ~[];
+ for _ in range(0u, nscheds) {
+ let work_queue: WorkQueue<~Task> = WorkQueue::new();
+ work_queues.push(work_queue);
+ }
// The schedulers.
let mut scheds = ~[];
// sent the Shutdown message to terminate the schedulers.
let mut handles = ~[];
- do nscheds.times {
+ for i in range(0u, nscheds) {
rtdebug!("inserting a regular scheduler");
// Every scheduler is driven by an I/O event loop.
let loop_ = ~UvEventLoop::new();
- let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone());
+ let mut sched = ~Scheduler::new(loop_,
+ work_queues[i].clone(),
+ work_queues.clone(),
+ sleepers.clone());
let handle = sched.make_handle();
scheds.push(sched);
let friend_handle = friend_sched.make_handle();
scheds.push(friend_sched);
+ // This scheduler needs a queue that isn't part of the stealee
+ // set.
+ let work_queue = WorkQueue::new();
+
let main_loop = ~UvEventLoop::new();
let mut main_sched = ~Scheduler::new_special(main_loop,
- work_queue.clone(),
+ work_queue,
+ work_queues.clone(),
sleepers.clone(),
false,
Some(friend_handle));
let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool, None,
home, main.take());
main_task.death.on_exit = Some(on_exit.take());
- rtdebug!("boostrapping main_task");
+ rtdebug!("bootstrapping main_task");
main_sched.bootstrap(main_task);
}
use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
use clone::Clone;
use unstable::raw;
-
use super::sleeper_list::SleeperList;
use super::work_queue::WorkQueue;
use super::stack::{StackPool};
use rt::metrics::SchedMetrics;
use borrow::{to_uint};
use cell::Cell;
+use rand::{XorShiftRng, RngUtil};
+use iterator::{range};
+use vec::{OwnedVector};
/// The Scheduler is responsible for coordinating execution of Coroutines
/// on a single thread. When the scheduler is running it is owned by
/// XXX: This creates too many callbacks to run_sched_once, resulting
/// in too much allocation and too many events.
pub struct Scheduler {
- /// A queue of available work. Under a work-stealing policy there
- /// is one per Scheduler.
- work_queue: WorkQueue<~Task>,
+ /// There are N work queues, one per scheduler.
+ priv work_queue: WorkQueue<~Task>,
+ /// Work queues for the other schedulers. These are created by
+ /// cloning the core work queues.
+ work_queues: ~[WorkQueue<~Task>],
/// The queue of incoming messages from other schedulers.
/// These are enqueued by SchedHandles after which a remote callback
/// is triggered to handle the message.
run_anything: bool,
/// If the scheduler shouldn't run some tasks, a friend to send
/// them to.
- friend_handle: Option<SchedHandle>
+ friend_handle: Option<SchedHandle>,
+ /// A fast XorShift rng for scheduler use
+ rng: XorShiftRng
+
}
pub struct SchedHandle {
pub fn new(event_loop: ~EventLoopObject,
work_queue: WorkQueue<~Task>,
+ work_queues: ~[WorkQueue<~Task>],
sleeper_list: SleeperList)
-> Scheduler {
- Scheduler::new_special(event_loop, work_queue, sleeper_list, true, None)
+ Scheduler::new_special(event_loop, work_queue,
+ work_queues,
+ sleeper_list, true, None)
}
// task field is None.
pub fn new_special(event_loop: ~EventLoopObject,
work_queue: WorkQueue<~Task>,
+ work_queues: ~[WorkQueue<~Task>],
sleeper_list: SleeperList,
run_anything: bool,
friend: Option<SchedHandle>)
no_sleep: false,
event_loop: event_loop,
work_queue: work_queue,
+ work_queues: work_queues,
stack_pool: StackPool::new(),
sched_task: None,
cleanup_job: None,
metrics: SchedMetrics::new(),
run_anything: run_anything,
- friend_handle: friend
+ friend_handle: friend,
+ rng: XorShiftRng::new()
}
}
// Second activity is to try resuming a task from the queue.
- let result = sched.resume_task_from_queue();
+ let result = sched.do_work();
let mut sched = match result {
Some(sched) => {
// Failed to dequeue a task, so we return.
}
}
- // Resume a task from the queue - but also take into account that
- // it might not belong here.
+ // Workstealing: In this iteration of the runtime each scheduler
+ // thread has a distinct work queue. When no work is available
+ // locally, make a few attempts to steal work from the queues of
+ // other scheduler threads. If a few steals fail we end up in the
+ // old "no work" path which is fine.
+
+ // First step in the process is to find a task. This function does
+ // that by first checking the local queue, and if there is no work
+ // there, trying to steal from the remote work queues.
+ fn find_work(&mut self) -> Option<~Task> {
+ rtdebug!("scheduler looking for work");
+ match self.work_queue.pop() {
+ Some(task) => {
+ rtdebug!("found a task locally");
+ return Some(task)
+ }
+ None => {
+ // Our naive stealing, try kinda hard.
+ rtdebug!("scheduler trying to steal");
+ let _len = self.work_queues.len();
+ return self.try_steals(2);
+ }
+ }
+ }
+
+ // With no backoff try stealing n times from the queues the
+ // scheduler knows about. This naive implementation can steal from
+ // our own queue or from other special schedulers.
+ fn try_steals(&mut self, n: uint) -> Option<~Task> {
+ for _ in range(0, n) {
+ let index = self.rng.gen_uint_range(0, self.work_queues.len());
+ let work_queues = &mut self.work_queues;
+ match work_queues[index].steal() {
+ Some(task) => {
+ rtdebug!("found task by stealing"); return Some(task)
+ }
+ None => ()
+ }
+ };
+ rtdebug!("giving up on stealing");
+ return None;
+ }
- // If we perform a scheduler action we give away the scheduler ~
- // pointer, if it is still available we return it.
+ // Given a task, execute it correctly.
+ fn process_task(~self, task: ~Task) -> Option<~Scheduler> {
+ let mut this = self;
+ let mut task = task;
- fn resume_task_from_queue(~self) -> Option<~Scheduler> {
+ rtdebug!("processing a task");
+ let home = task.take_unwrap_home();
+ match home {
+ Sched(home_handle) => {
+ if home_handle.sched_id != this.sched_id() {
+ rtdebug!("sending task home");
+ task.give_home(Sched(home_handle));
+ Scheduler::send_task_home(task);
+ return Some(this);
+ } else {
+ rtdebug!("running task here");
+ task.give_home(Sched(home_handle));
+ this.resume_task_immediately(task);
+ return None;
+ }
+ }
+ AnySched if this.run_anything => {
+ rtdebug!("running anysched task here");
+ task.give_home(AnySched);
+ this.resume_task_immediately(task);
+ return None;
+ }
+ AnySched => {
+ rtdebug!("sending task to friend");
+ task.give_home(AnySched);
+ this.send_to_friend(task);
+ return Some(this);
+ }
+ }
+ }
+
+ // Bundle the helpers together.
+ fn do_work(~self) -> Option<~Scheduler> {
let mut this = self;
- match this.work_queue.pop() {
+ rtdebug!("scheduler calling do work");
+ match this.find_work() {
Some(task) => {
- let mut task = task;
- let home = task.take_unwrap_home();
- match home {
- Sched(home_handle) => {
- if home_handle.sched_id != this.sched_id() {
- task.give_home(Sched(home_handle));
- Scheduler::send_task_home(task);
- return Some(this);
- } else {
- this.event_loop.callback(Scheduler::run_sched_once);
- task.give_home(Sched(home_handle));
- this.resume_task_immediately(task);
- return None;
- }
- }
- AnySched if this.run_anything => {
- this.event_loop.callback(Scheduler::run_sched_once);
- task.give_home(AnySched);
- this.resume_task_immediately(task);
- return None;
- }
- AnySched => {
- task.give_home(AnySched);
- this.send_to_friend(task);
- return Some(this);
- }
- }
+ rtdebug!("found some work! processing the task");
+ return this.process_task(task);
}
None => {
+ rtdebug!("no work was found, returning the scheduler struct");
return Some(this);
}
}
GiveTask(task, f) => f.to_fn()(self, task)
}
}
-
}
// The cases for the below function.
#[cfg(test)]
mod test {
+ extern mod extra;
+
use prelude::*;
use rt::test::*;
use unstable::run_in_bare_thread;
do run_in_bare_thread {
let sleepers = SleeperList::new();
- let work_queue = WorkQueue::new();
+ let normal_queue = WorkQueue::new();
+ let special_queue = WorkQueue::new();
+ let queues = ~[normal_queue.clone(), special_queue.clone()];
// Our normal scheduler
let mut normal_sched = ~Scheduler::new(
~UvEventLoop::new(),
- work_queue.clone(),
+ normal_queue,
+ queues.clone(),
sleepers.clone());
let normal_handle = Cell::new(normal_sched.make_handle());
// Our special scheduler
let mut special_sched = ~Scheduler::new_special(
~UvEventLoop::new(),
- work_queue.clone(),
+ special_queue.clone(),
+ queues.clone(),
sleepers.clone(),
false,
Some(friend_handle));
fn select_stream() {
use util;
use comm::GenericChan;
+ use iter::Times;
// Sends 10 buffered packets, and uses select to retrieve them all.
// Puts the port in a different spot in the vector each time.
fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
use rt::test::spawntask_random;
+ use iter::Times;
do run_in_newsched_task {
// A bit of stress, since ordinarily this is just smoke and mirrors.
use clone::Clone;
use container::Container;
use iterator::{Iterator, range};
-use vec::{OwnedVector, MutableVector};
use super::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
+use vec::{OwnedVector, MutableVector, ImmutableVector};
use rt::sched::Scheduler;
use unstable::run_in_bare_thread;
use rt::thread::Thread;
pub fn new_test_uv_sched() -> Scheduler {
+ let queue = WorkQueue::new();
+ let queues = ~[queue.clone()];
+
let mut sched = Scheduler::new(~UvEventLoop::new(),
- WorkQueue::new(),
+ queue,
+ queues,
SleeperList::new());
// Don't wait for the Shutdown message
};
let sleepers = SleeperList::new();
- let work_queue = WorkQueue::new();
let mut handles = ~[];
let mut scheds = ~[];
+ let mut work_queues = ~[];
for _ in range(0u, nthreads) {
+ let work_queue = WorkQueue::new();
+ work_queues.push(work_queue);
+ }
+
+ for i in range(0u, nthreads) {
let loop_ = ~UvEventLoop::new();
let mut sched = ~Scheduler::new(loop_,
- work_queue.clone(),
+ work_queues[i].clone(),
+ work_queues.clone(),
sleepers.clone());
let handle = sched.make_handle();
use rt::sched::Scheduler;
use rt::uv::uvio::UvEventLoop;
use rt::thread::Thread;
+use rt::work_queue::WorkQueue;
#[cfg(test)] use task::default_task_opts;
#[cfg(test)] use comm;
let sched = Local::unsafe_borrow::<Scheduler>();
let sched_handle = (*sched).make_handle();
+ // Since this is a 1:1 scheduler we create a queue not in
+ // the stealee set. The run_anything flag is set false
+ // which will disable stealing.
+ let work_queue = WorkQueue::new();
+
// Create a new scheduler to hold the new task
let new_loop = ~UvEventLoop::new();
let mut new_sched = ~Scheduler::new_special(new_loop,
- (*sched).work_queue.clone(),
+ work_queue,
+ (*sched).work_queues.clone(),
(*sched).sleeper_list.clone(),
false,
Some(sched_handle));
--- /dev/null
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+extern mod extra;
+
+use std::task::spawn;
+use std::os;
+use std::uint;
+use std::rt::test::spawntask_later;
+use std::cell::Cell;
+
+// This is a simple bench that creates M pairs of of tasks. These
+// tasks ping-pong back and forth over a pair of streams. This is a
+// cannonical message-passing benchmark as it heavily strains message
+// passing and almost nothing else.
+
+fn ping_pong_bench(n: uint, m: uint) {
+
+ // Create pairs of tasks that pingpong back and forth.
+ fn run_pair(n: uint) {
+ // Create a stream A->B
+ let (pa,ca) = stream::<()>();
+ // Create a stream B->A
+ let (pb,cb) = stream::<()>();
+
+ let pa = Cell::new(pa);
+ let ca = Cell::new(ca);
+ let pb = Cell::new(pb);
+ let cb = Cell::new(cb);
+
+ do spawntask_later() || {
+ let chan = ca.take();
+ let port = pb.take();
+ do n.times {
+ chan.send(());
+ port.recv();
+ }
+ }
+
+ do spawntask_later() || {
+ let chan = cb.take();
+ let port = pa.take();
+ do n.times {
+ port.recv();
+ chan.send(());
+ }
+ }
+ }
+
+ do m.times {
+ run_pair(n)
+ }
+
+}
+
+
+
+fn main() {
+
+ let args = os::args();
+ let n = if args.len() == 3 {
+ uint::from_str(args[1]).unwrap()
+ } else {
+ 10000
+ };
+
+ let m = if args.len() == 3 {
+ uint::from_str(args[2]).unwrap()
+ } else {
+ 4
+ };
+
+ ping_pong_bench(n, m);
+
+}
--- /dev/null
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+extern mod extra;
+
+use std::task::spawn;
+use std::os;
+use std::uint;
+use std::rt::test::spawntask_later;
+use std::cell::Cell;
+use std::comm::*;
+
+// A simple implementation of parfib. One subtree is found in a new
+// task and communicated over a oneshot pipe, the other is found
+// locally. There is no sequential-mode threshold.
+
+fn parfib(n: uint) -> uint {
+ if(n == 0 || n == 1) {
+ return 1;
+ }
+
+ let (port,chan) = oneshot::<uint>();
+ let chan = Cell::new(chan);
+ do spawntask_later {
+ chan.take().send(parfib(n-1));
+ };
+ let m2 = parfib(n-2);
+ return (port.recv() + m2);
+}
+
+fn main() {
+
+ let args = os::args();
+ let n = if args.len() == 2 {
+ uint::from_str(args[1]).unwrap()
+ } else {
+ 10
+ };
+
+ parfib(n);
+
+}
--- /dev/null
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+extern mod extra;
+
+use std::task::spawn;
+use std::os;
+use std::uint;
+
+// Very simple spawn rate test. Spawn N tasks that do nothing and
+// return.
+
+fn main() {
+
+ let args = os::args();
+ let n = if args.len() == 2 {
+ uint::from_str(args[1]).unwrap()
+ } else {
+ 100000
+ };
+
+ do n.times {
+ do spawn || {};
+ }
+
+}