]> git.lizzy.rs Git - rust.git/commitdiff
Enabled workstealing in the scheduler. Previously we had one global work queue shared...
authortoddaaro <github@opprobrio.us>
Mon, 5 Aug 2013 20:06:24 +0000 (13:06 -0700)
committertoddaaro <github@opprobrio.us>
Thu, 8 Aug 2013 21:13:41 +0000 (14:13 -0700)
src/libstd/rt/comm.rs
src/libstd/rt/mod.rs
src/libstd/rt/sched.rs
src/libstd/rt/select.rs
src/libstd/rt/test.rs
src/libstd/task/spawn.rs
src/test/bench/rt-messaging-ping-pong.rs [new file with mode: 0644]
src/test/bench/rt-parfib.rs [new file with mode: 0644]
src/test/bench/rt-spawn-rate.rs [new file with mode: 0644]

index 0cf223f3029134cee4d535498d78352cb7edbe1e..33b4b307af846b3c393105f78ea80c5d4ace1016 100644 (file)
@@ -225,9 +225,10 @@ fn optimistic_check(&mut self) -> bool {
     fn optimistic_check(&mut self) -> bool {
         // The optimistic check is never necessary for correctness. For testing
         // purposes, making it randomly return false simulates a racing sender.
-        use rand::{Rand, rng};
-        let mut rng = rng();
-        let actually_check = Rand::rand(&mut rng);
+        use rand::{Rand};
+        let actually_check = do Local::borrow::<Scheduler, bool> |sched| {
+            Rand::rand(&mut sched.rng)
+        };
         if actually_check {
             unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
         } else {
index 147c75e5c41ef1bc47fdc7a358799c9588acb3bf..01a52892f633b95eb71484d218fbf18f43dab314 100644 (file)
@@ -63,8 +63,7 @@
 use cell::Cell;
 use clone::Clone;
 use container::Container;
-use iter::Times;
-use iterator::{Iterator, IteratorUtil};
+use iterator::{Iterator, IteratorUtil, range};
 use option::{Some, None};
 use ptr::RawPtr;
 use rt::local::Local;
@@ -247,11 +246,16 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
 
     let main = Cell::new(main);
 
-    // The shared list of sleeping schedulers. Schedulers wake each other
-    // occassionally to do new work.
+    // The shared list of sleeping schedulers.
     let sleepers = SleeperList::new();
-    // The shared work queue. Temporary until work stealing is implemented.
-    let work_queue = WorkQueue::new();
+
+    // Create a work queue for each scheduler, ntimes. Create an extra
+    // for the main thread if that flag is set. We won't steal from it.
+    let mut work_queues = ~[];
+    for _ in range(0u, nscheds) {
+        let work_queue: WorkQueue<~Task> = WorkQueue::new();
+        work_queues.push(work_queue);
+    }
 
     // The schedulers.
     let mut scheds = ~[];
@@ -259,12 +263,15 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
     // sent the Shutdown message to terminate the schedulers.
     let mut handles = ~[];
 
-    do nscheds.times {
+    for i in range(0u, nscheds) {
         rtdebug!("inserting a regular scheduler");
 
         // Every scheduler is driven by an I/O event loop.
         let loop_ = ~UvEventLoop::new();
-        let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone());
+        let mut sched = ~Scheduler::new(loop_,
+                                        work_queues[i].clone(),
+                                        work_queues.clone(),
+                                        sleepers.clone());
         let handle = sched.make_handle();
 
         scheds.push(sched);
@@ -280,9 +287,14 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
         let friend_handle = friend_sched.make_handle();
         scheds.push(friend_sched);
 
+        // This scheduler needs a queue that isn't part of the stealee
+        // set.
+        let work_queue = WorkQueue::new();
+
         let main_loop = ~UvEventLoop::new();
         let mut main_sched = ~Scheduler::new_special(main_loop,
-                                                     work_queue.clone(),
+                                                     work_queue,
+                                                     work_queues.clone(),
                                                      sleepers.clone(),
                                                      false,
                                                      Some(friend_handle));
@@ -371,7 +383,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
         let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool, None,
                                                   home, main.take());
         main_task.death.on_exit = Some(on_exit.take());
-        rtdebug!("boostrapping main_task");
+        rtdebug!("bootstrapping main_task");
 
         main_sched.bootstrap(main_task);
     }
index 990e1a4a3de9915cb65355119d555b7a9e55fe3f..ce4e64c47d2ef5cb3faefd7cd5ead4bf9400d9e3 100644 (file)
@@ -13,7 +13,6 @@
 use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
 use clone::Clone;
 use unstable::raw;
-
 use super::sleeper_list::SleeperList;
 use super::work_queue::WorkQueue;
 use super::stack::{StackPool};
@@ -28,6 +27,9 @@
 use rt::metrics::SchedMetrics;
 use borrow::{to_uint};
 use cell::Cell;
+use rand::{XorShiftRng, RngUtil};
+use iterator::{range};
+use vec::{OwnedVector};
 
 /// The Scheduler is responsible for coordinating execution of Coroutines
 /// on a single thread. When the scheduler is running it is owned by
 /// XXX: This creates too many callbacks to run_sched_once, resulting
 /// in too much allocation and too many events.
 pub struct Scheduler {
-    /// A queue of available work. Under a work-stealing policy there
-    /// is one per Scheduler.
-    work_queue: WorkQueue<~Task>,
+    /// There are N work queues, one per scheduler.
+    priv work_queue: WorkQueue<~Task>,
+    /// Work queues for the other schedulers. These are created by
+    /// cloning the core work queues.
+    work_queues: ~[WorkQueue<~Task>],
     /// The queue of incoming messages from other schedulers.
     /// These are enqueued by SchedHandles after which a remote callback
     /// is triggered to handle the message.
@@ -70,7 +74,10 @@ pub struct Scheduler {
     run_anything: bool,
     /// If the scheduler shouldn't run some tasks, a friend to send
     /// them to.
-    friend_handle: Option<SchedHandle>
+    friend_handle: Option<SchedHandle>,
+    /// A fast XorShift rng for scheduler use
+    rng: XorShiftRng
+
 }
 
 pub struct SchedHandle {
@@ -97,10 +104,13 @@ pub fn sched_id(&self) -> uint { to_uint(self) }
 
     pub fn new(event_loop: ~EventLoopObject,
                work_queue: WorkQueue<~Task>,
+               work_queues: ~[WorkQueue<~Task>],
                sleeper_list: SleeperList)
         -> Scheduler {
 
-        Scheduler::new_special(event_loop, work_queue, sleeper_list, true, None)
+        Scheduler::new_special(event_loop, work_queue,
+                               work_queues,
+                               sleeper_list, true, None)
 
     }
 
@@ -108,6 +118,7 @@ pub fn new(event_loop: ~EventLoopObject,
     // task field is None.
     pub fn new_special(event_loop: ~EventLoopObject,
                        work_queue: WorkQueue<~Task>,
+                       work_queues: ~[WorkQueue<~Task>],
                        sleeper_list: SleeperList,
                        run_anything: bool,
                        friend: Option<SchedHandle>)
@@ -120,12 +131,14 @@ pub fn new_special(event_loop: ~EventLoopObject,
             no_sleep: false,
             event_loop: event_loop,
             work_queue: work_queue,
+            work_queues: work_queues,
             stack_pool: StackPool::new(),
             sched_task: None,
             cleanup_job: None,
             metrics: SchedMetrics::new(),
             run_anything: run_anything,
-            friend_handle: friend
+            friend_handle: friend,
+            rng: XorShiftRng::new()
         }
     }
 
@@ -248,7 +261,7 @@ fn run_sched_once() {
 
         // Second activity is to try resuming a task from the queue.
 
-        let result = sched.resume_task_from_queue();
+        let result = sched.do_work();
         let mut sched = match result {
             Some(sched) => {
                 // Failed to dequeue a task, so we return.
@@ -415,47 +428,98 @@ fn send_to_friend(&mut self, task: ~Task) {
         }
     }
 
-    // Resume a task from the queue - but also take into account that
-    // it might not belong here.
+    // Workstealing: In this iteration of the runtime each scheduler
+    // thread has a distinct work queue. When no work is available
+    // locally, make a few attempts to steal work from the queues of
+    // other scheduler threads. If a few steals fail we end up in the
+    // old "no work" path which is fine.
+
+    // First step in the process is to find a task. This function does
+    // that by first checking the local queue, and if there is no work
+    // there, trying to steal from the remote work queues.
+    fn find_work(&mut self) -> Option<~Task> {
+        rtdebug!("scheduler looking for work");
+        match self.work_queue.pop() {
+            Some(task) => {
+                rtdebug!("found a task locally");
+                return Some(task)
+            }
+            None => {
+                // Our naive stealing, try kinda hard.
+                rtdebug!("scheduler trying to steal");
+                let _len = self.work_queues.len();
+                return self.try_steals(2);
+            }
+        }
+    }
+
+    // With no backoff try stealing n times from the queues the
+    // scheduler knows about. This naive implementation can steal from
+    // our own queue or from other special schedulers.
+    fn try_steals(&mut self, n: uint) -> Option<~Task> {
+        for _ in range(0, n) {
+            let index = self.rng.gen_uint_range(0, self.work_queues.len());
+            let work_queues = &mut self.work_queues;
+            match work_queues[index].steal() {
+                Some(task) => {
+                    rtdebug!("found task by stealing"); return Some(task)
+                }
+                None => ()
+            }
+        };
+        rtdebug!("giving up on stealing");
+        return None;
+    }
 
-    // If we perform a scheduler action we give away the scheduler ~
-    // pointer, if it is still available we return it.
+    // Given a task, execute it correctly.
+    fn process_task(~self, task: ~Task) -> Option<~Scheduler> {
+        let mut this = self;
+        let mut task = task;
 
-    fn resume_task_from_queue(~self) -> Option<~Scheduler> {
+        rtdebug!("processing a task");
 
+        let home = task.take_unwrap_home();
+        match home {
+            Sched(home_handle) => {
+                if home_handle.sched_id != this.sched_id() {
+                    rtdebug!("sending task home");
+                    task.give_home(Sched(home_handle));
+                    Scheduler::send_task_home(task);
+                    return Some(this);
+                } else {
+                    rtdebug!("running task here");
+                    task.give_home(Sched(home_handle));
+                    this.resume_task_immediately(task);
+                    return None;
+                }
+            }
+            AnySched if this.run_anything => {
+                rtdebug!("running anysched task here");
+                task.give_home(AnySched);
+                this.resume_task_immediately(task);
+                return None;
+            }
+            AnySched => {
+                rtdebug!("sending task to friend");
+                task.give_home(AnySched);
+                this.send_to_friend(task);
+                return Some(this);
+            }
+        }
+    }
+
+    // Bundle the helpers together.
+    fn do_work(~self) -> Option<~Scheduler> {
         let mut this = self;
 
-        match this.work_queue.pop() {
+        rtdebug!("scheduler calling do work");
+        match this.find_work() {
             Some(task) => {
-                let mut task = task;
-                let home = task.take_unwrap_home();
-                match home {
-                    Sched(home_handle) => {
-                        if home_handle.sched_id != this.sched_id() {
-                            task.give_home(Sched(home_handle));
-                            Scheduler::send_task_home(task);
-                            return Some(this);
-                        } else {
-                            this.event_loop.callback(Scheduler::run_sched_once);
-                            task.give_home(Sched(home_handle));
-                            this.resume_task_immediately(task);
-                            return None;
-                        }
-                    }
-                    AnySched if this.run_anything => {
-                        this.event_loop.callback(Scheduler::run_sched_once);
-                        task.give_home(AnySched);
-                        this.resume_task_immediately(task);
-                        return None;
-                    }
-                    AnySched => {
-                        task.give_home(AnySched);
-                        this.send_to_friend(task);
-                        return Some(this);
-                    }
-                }
+                rtdebug!("found some work! processing the task");
+                return this.process_task(task);
             }
             None => {
+                rtdebug!("no work was found, returning the scheduler struct");
                 return Some(this);
             }
         }
@@ -711,7 +775,6 @@ pub fn run_cleanup_job(&mut self) {
             GiveTask(task, f) => f.to_fn()(self, task)
         }
     }
-
 }
 
 // The cases for the below function.
@@ -745,6 +808,8 @@ fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } }
 
 #[cfg(test)]
 mod test {
+    extern mod extra;
+
     use prelude::*;
     use rt::test::*;
     use unstable::run_in_bare_thread;
@@ -862,12 +927,15 @@ fn test_schedule_home_states() {
         do run_in_bare_thread {
 
             let sleepers = SleeperList::new();
-            let work_queue = WorkQueue::new();
+            let normal_queue = WorkQueue::new();
+            let special_queue = WorkQueue::new();
+            let queues = ~[normal_queue.clone(), special_queue.clone()];
 
             // Our normal scheduler
             let mut normal_sched = ~Scheduler::new(
                 ~UvEventLoop::new(),
-                work_queue.clone(),
+                normal_queue,
+                queues.clone(),
                 sleepers.clone());
 
             let normal_handle = Cell::new(normal_sched.make_handle());
@@ -877,7 +945,8 @@ fn test_schedule_home_states() {
             // Our special scheduler
             let mut special_sched = ~Scheduler::new_special(
                 ~UvEventLoop::new(),
-                work_queue.clone(),
+                special_queue.clone(),
+                queues.clone(),
                 sleepers.clone(),
                 false,
                 Some(friend_handle));
index 006b777b71b1486168ae54c36a002adbefa4c61f..07f8ca77d9db4664115baea2cc6f137aad914747 100644 (file)
@@ -182,6 +182,7 @@ fn select_a_lot() {
     fn select_stream() {
         use util;
         use comm::GenericChan;
+        use iter::Times;
 
         // Sends 10 buffered packets, and uses select to retrieve them all.
         // Puts the port in a different spot in the vector each time.
@@ -265,6 +266,7 @@ fn select_racing_senders() {
 
         fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
             use rt::test::spawntask_random;
+            use iter::Times;
 
             do run_in_newsched_task {
                 // A bit of stress, since ordinarily this is just smoke and mirrors.
index 792ea5eb33f5acfa138be4692e53222858c34c2a..92366d5187fe23cae6bc7aa3126d2bda464af8e5 100644 (file)
@@ -15,8 +15,8 @@
 use clone::Clone;
 use container::Container;
 use iterator::{Iterator, range};
-use vec::{OwnedVector, MutableVector};
 use super::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
+use vec::{OwnedVector, MutableVector, ImmutableVector};
 use rt::sched::Scheduler;
 use unstable::run_in_bare_thread;
 use rt::thread::Thread;
 
 pub fn new_test_uv_sched() -> Scheduler {
 
+    let queue = WorkQueue::new();
+    let queues = ~[queue.clone()];
+
     let mut sched = Scheduler::new(~UvEventLoop::new(),
-                                   WorkQueue::new(),
+                                   queue,
+                                   queues,
                                    SleeperList::new());
 
     // Don't wait for the Shutdown message
@@ -164,15 +168,21 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
         };
 
         let sleepers = SleeperList::new();
-        let work_queue = WorkQueue::new();
 
         let mut handles = ~[];
         let mut scheds = ~[];
+        let mut work_queues = ~[];
 
         for _ in range(0u, nthreads) {
+            let work_queue = WorkQueue::new();
+            work_queues.push(work_queue);
+        }
+
+        for i in range(0u, nthreads) {
             let loop_ = ~UvEventLoop::new();
             let mut sched = ~Scheduler::new(loop_,
-                                            work_queue.clone(),
+                                            work_queues[i].clone(),
+                                            work_queues.clone(),
                                             sleepers.clone());
             let handle = sched.make_handle();
 
index 2d0a2d98e9fc0780f0c090289b6c47512e4539fd..05a17f8539c216b93dce541651ceffa47b62cbb8 100644 (file)
@@ -98,6 +98,7 @@
 use rt::sched::Scheduler;
 use rt::uv::uvio::UvEventLoop;
 use rt::thread::Thread;
+use rt::work_queue::WorkQueue;
 
 #[cfg(test)] use task::default_task_opts;
 #[cfg(test)] use comm;
@@ -722,10 +723,16 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) {
             let sched = Local::unsafe_borrow::<Scheduler>();
             let sched_handle = (*sched).make_handle();
 
+            // Since this is a 1:1 scheduler we create a queue not in
+            // the stealee set. The run_anything flag is set false
+            // which will disable stealing.
+            let work_queue = WorkQueue::new();
+
             // Create a new scheduler to hold the new task
             let new_loop = ~UvEventLoop::new();
             let mut new_sched = ~Scheduler::new_special(new_loop,
-                                                        (*sched).work_queue.clone(),
+                                                        work_queue,
+                                                        (*sched).work_queues.clone(),
                                                         (*sched).sleeper_list.clone(),
                                                         false,
                                                         Some(sched_handle));
diff --git a/src/test/bench/rt-messaging-ping-pong.rs b/src/test/bench/rt-messaging-ping-pong.rs
new file mode 100644 (file)
index 0000000..3d38d61
--- /dev/null
@@ -0,0 +1,82 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+extern mod extra;
+
+use std::task::spawn;
+use std::os;
+use std::uint;
+use std::rt::test::spawntask_later;
+use std::cell::Cell;
+
+// This is a simple bench that creates M pairs of of tasks. These
+// tasks ping-pong back and forth over a pair of streams. This is a
+// cannonical message-passing benchmark as it heavily strains message
+// passing and almost nothing else.
+
+fn ping_pong_bench(n: uint, m: uint) {
+
+    // Create pairs of tasks that pingpong back and forth.
+    fn run_pair(n: uint) {
+            // Create a stream A->B
+            let (pa,ca) = stream::<()>();
+            // Create a stream B->A
+            let (pb,cb) = stream::<()>();
+
+            let pa = Cell::new(pa);
+            let ca = Cell::new(ca);
+            let pb = Cell::new(pb);
+            let cb = Cell::new(cb);
+
+        do spawntask_later() || {
+            let chan = ca.take();
+            let port = pb.take();
+            do n.times {
+                chan.send(());
+                port.recv();
+            }
+        }
+
+        do spawntask_later() || {
+            let chan = cb.take();
+            let port = pa.take();
+            do n.times {
+                port.recv();
+                chan.send(());
+            }
+        }
+    }
+
+    do m.times {
+        run_pair(n)
+    }
+
+}
+
+
+
+fn main() {
+
+    let args = os::args();
+    let n = if args.len() == 3 {
+        uint::from_str(args[1]).unwrap()
+    } else {
+        10000
+    };
+
+    let m = if args.len() == 3 {
+        uint::from_str(args[2]).unwrap()
+    } else {
+        4
+    };
+
+    ping_pong_bench(n, m);
+
+}
diff --git a/src/test/bench/rt-parfib.rs b/src/test/bench/rt-parfib.rs
new file mode 100644 (file)
index 0000000..6669342
--- /dev/null
@@ -0,0 +1,49 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+extern mod extra;
+
+use std::task::spawn;
+use std::os;
+use std::uint;
+use std::rt::test::spawntask_later;
+use std::cell::Cell;
+use std::comm::*;
+
+// A simple implementation of parfib. One subtree is found in a new
+// task and communicated over a oneshot pipe, the other is found
+// locally. There is no sequential-mode threshold.
+
+fn parfib(n: uint) -> uint {
+    if(n == 0 || n == 1) {
+        return 1;
+    }
+
+    let (port,chan) = oneshot::<uint>();
+    let chan = Cell::new(chan);
+    do spawntask_later {
+        chan.take().send(parfib(n-1));
+    };
+    let m2 = parfib(n-2);
+    return (port.recv() + m2);
+}
+
+fn main() {
+
+    let args = os::args();
+    let n = if args.len() == 2 {
+        uint::from_str(args[1]).unwrap()
+    } else {
+        10
+    };
+
+    parfib(n);
+
+}
diff --git a/src/test/bench/rt-spawn-rate.rs b/src/test/bench/rt-spawn-rate.rs
new file mode 100644 (file)
index 0000000..ff578ed
--- /dev/null
@@ -0,0 +1,33 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+extern mod extra;
+
+use std::task::spawn;
+use std::os;
+use std::uint;
+
+// Very simple spawn rate test. Spawn N tasks that do nothing and
+// return.
+
+fn main() {
+
+    let args = os::args();
+    let n = if args.len() == 2 {
+        uint::from_str(args[1]).unwrap()
+    } else {
+        100000
+    };
+
+    do n.times {
+        do spawn || {};
+    }
+
+}