1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 use either::{Left, Right};
12 use option::{Option, Some, None};
13 use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
16 use super::sleeper_list::SleeperList;
17 use super::work_queue::WorkQueue;
18 use super::stack::{StackPool};
19 use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject};
20 use super::context::Context;
21 use super::task::{Task, AnySched, Sched};
22 use super::message_queue::MessageQueue;
23 use rt::kill::BlockedTask;
26 use rt::rtio::{RemoteCallback, PausibleIdleCallback};
27 use borrow::{to_uint};
29 use rand::{XorShiftRng, RngUtil};
31 use vec::{OwnedVector};
33 /// A scheduler is responsible for coordinating the execution of Tasks
34 /// on a single thread. The scheduler runs inside a slightly modified
35 /// Rust Task. When not running this task is stored in the scheduler
36 /// struct. The scheduler struct acts like a baton, all scheduling
37 /// actions are transfers of the baton.
39 /// XXX: This creates too many callbacks to run_sched_once, resulting
40 /// in too much allocation and too many events.
41 pub struct Scheduler {
42 /// There are N work queues, one per scheduler.
43 priv work_queue: WorkQueue<~Task>,
44 /// Work queues for the other schedulers. These are created by
45 /// cloning the core work queues.
46 work_queues: ~[WorkQueue<~Task>],
47 /// The queue of incoming messages from other schedulers.
48 /// These are enqueued by SchedHandles after which a remote callback
49 /// is triggered to handle the message.
50 priv message_queue: MessageQueue<SchedMessage>,
51 /// A shared list of sleeping schedulers. We'll use this to wake
52 /// up schedulers when pushing work onto the work queue.
53 sleeper_list: SleeperList,
54 /// Indicates that we have previously pushed a handle onto the
55 /// SleeperList but have not yet received the Wake message.
56 /// Being `true` does not necessarily mean that the scheduler is
57 /// not active since there are multiple event sources that may
58 /// wake the scheduler. It just prevents the scheduler from pushing
59 /// multiple handles onto the sleeper list.
61 /// A flag to indicate we've received the shutdown message and should
62 /// no longer try to go to sleep, but exit instead.
64 stack_pool: StackPool,
65 /// The event loop used to drive the scheduler and perform I/O
66 event_loop: ~EventLoopObject,
67 /// The scheduler runs on a special task. When it is not running
68 /// it is stored here instead of the work queue.
69 sched_task: Option<~Task>,
70 /// An action performed after a context switch on behalf of the
71 /// code running before the context switch
72 cleanup_job: Option<CleanupJob>,
73 /// Should this scheduler run any task, or only pinned tasks?
75 /// If the scheduler shouldn't run some tasks, a friend to send
77 friend_handle: Option<SchedHandle>,
78 /// A fast XorShift rng for scheduler use
80 /// A toggleable idle callback
81 idle_callback: Option<~PausibleIdleCallback>
84 /// An indication of how hard to work on a given operation, the difference
85 /// mainly being whether memory is synchronized or not
94 // * Initialization Functions
96 pub fn new(event_loop: ~EventLoopObject,
97 work_queue: WorkQueue<~Task>,
98 work_queues: ~[WorkQueue<~Task>],
99 sleeper_list: SleeperList)
102 Scheduler::new_special(event_loop, work_queue,
104 sleeper_list, true, None)
108 pub fn new_special(event_loop: ~EventLoopObject,
109 work_queue: WorkQueue<~Task>,
110 work_queues: ~[WorkQueue<~Task>],
111 sleeper_list: SleeperList,
113 friend: Option<SchedHandle>)
117 sleeper_list: sleeper_list,
118 message_queue: MessageQueue::new(),
121 event_loop: event_loop,
122 work_queue: work_queue,
123 work_queues: work_queues,
124 stack_pool: StackPool::new(),
127 run_anything: run_anything,
128 friend_handle: friend,
129 rng: XorShiftRng::new(),
134 // XXX: This may eventually need to be refactored so that
135 // the scheduler itself doesn't have to call event_loop.run.
136 // That will be important for embedding the runtime into external
139 // Take a main task to run, and a scheduler to run it in. Create a
140 // scheduler task and bootstrap into it.
141 pub fn bootstrap(~self, task: ~Task) {
145 // Build an Idle callback.
146 this.idle_callback = Some(this.event_loop.pausible_idle_callback());
148 // Initialize the TLS key.
149 local_ptr::init_tls_key();
151 // Create a task for the scheduler with an empty context.
152 let sched_task = ~Task::new_sched_task();
154 // Now that we have an empty task struct for the scheduler
155 // task, put it in TLS.
156 Local::put::(sched_task);
158 // Before starting our first task, make sure the idle callback
159 // is active. As we do not start in the sleep state this is
161 this.idle_callback.get_mut_ref().start(Scheduler::run_sched_once);
163 // Now, as far as all the scheduler state is concerned, we are
164 // inside the "scheduler" context. So we can act like the
165 // scheduler and resume the provided task.
166 this.resume_task_immediately(task);
168 // Now we are back in the scheduler context, having
169 // successfully run the input task. Start by running the
170 // scheduler. Grab it out of TLS - performing the scheduler
171 // action will have given it away.
172 let sched: ~Scheduler = Local::take();
174 rtdebug!("starting scheduler %u", sched.sched_id());
177 // Close the idle callback.
178 let mut sched: ~Scheduler = Local::take();
179 sched.idle_callback.get_mut_ref().close();
180 // Make one go through the loop to run the close callback.
183 // Now that we are done with the scheduler, clean up the
184 // scheduler task. Do so by removing it from TLS and manually
185 // cleaning up the memory it uses. As we didn't actually call
186 // task.run() on the scheduler task we never get through all
187 // the cleanup code it runs.
188 let mut stask: ~Task = Local::take();
190 rtdebug!("stopping scheduler %u", stask.sched.get_ref().sched_id());
192 // Should not have any messages
193 let message = stask.sched.get_mut_ref().message_queue.pop();
194 rtassert!(message.is_none());
196 stask.destroyed = true;
199 // This does not return a scheduler, as the scheduler is placed
203 let mut self_sched = self;
205 // This is unsafe because we need to place the scheduler, with
206 // the event_loop inside, inside our task. But we still need a
207 // mutable reference to the event_loop to give it the "run"
210 let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop;
212 // Our scheduler must be in the task before the event loop
214 let self_sched = Cell::new(self_sched);
215 do Local::borrow |stask: &mut Task| {
216 stask.sched = Some(self_sched.take());
223 // * Execution Functions - Core Loop Logic
225 // The model for this function is that you continue through it
226 // until you either use the scheduler while performing a schedule
227 // action, in which case you give it away and return early, or
228 // you reach the end and sleep. In the case that a scheduler
229 // action is performed the loop is evented such that this function
231 fn run_sched_once() {
233 // When we reach the scheduler context via the event loop we
234 // already have a scheduler stored in our local task, so we
235 // start off by taking it. This is the only path through the
236 // scheduler where we get the scheduler this way.
237 let mut sched: ~Scheduler = Local::take();
239 // Assume that we need to continue idling unless we reach the
240 // end of this function without performing an action.
241 sched.idle_callback.get_mut_ref().resume();
243 // First we check for scheduler messages, these are higher
244 // priority than regular tasks.
245 let sched = match sched.interpret_message_queue(DontTryTooHard) {
246 Some(sched) => sched,
250 // This helper will use a randomized work-stealing algorithm
252 let sched = match sched.do_work() {
253 Some(sched) => sched,
257 // Now, before sleeping we need to find out if there really
258 // were any messages. Give it your best!
259 let mut sched = match sched.interpret_message_queue(GiveItYourBest) {
260 Some(sched) => sched,
264 // If we got here then there was no work to do.
265 // Generate a SchedHandle and push it to the sleeper list so
266 // somebody can wake us up later.
267 if !sched.sleepy && !sched.no_sleep {
268 rtdebug!("scheduler has no work to do, going to sleep");
270 let handle = sched.make_handle();
271 sched.sleeper_list.push(handle);
272 // Since we are sleeping, deactivate the idle callback.
273 sched.idle_callback.get_mut_ref().pause();
275 rtdebug!("not sleeping, already doing so or no_sleep set");
276 // We may not be sleeping, but we still need to deactivate
277 // the idle callback.
278 sched.idle_callback.get_mut_ref().pause();
281 // Finished a cycle without using the Scheduler. Place it back
286 // This function returns None if the scheduler is "used", or it
287 // returns the still-available scheduler. At this point all
288 // message-handling will count as a turn of work, and as a result
290 fn interpret_message_queue(~self, effort: EffortLevel) -> Option<~Scheduler> {
294 let msg = if effort == DontTryTooHard {
295 // Do a cheap check that may miss messages
296 this.message_queue.casual_pop()
298 this.message_queue.pop()
302 Some(PinnedTask(task)) => {
304 task.give_home(Sched(this.make_handle()));
305 this.resume_task_immediately(task);
308 Some(TaskFromFriend(task)) => {
309 rtdebug!("got a task from a friend. lovely!");
310 this.process_task(task,
311 Scheduler::resume_task_immediately_cl).map_move(Local::put);
320 rtdebug!("shutting down");
322 // There may be an outstanding handle on the
323 // sleeper list. Pop them all to make sure that's
326 match this.sleeper_list.pop() {
328 let mut handle = handle;
335 // No more sleeping. After there are no outstanding
336 // event loop references we will shut down.
337 this.no_sleep = true;
348 fn do_work(~self) -> Option<~Scheduler> {
351 rtdebug!("scheduler calling do work");
352 match this.find_work() {
354 rtdebug!("found some work! processing the task");
355 return this.process_task(task,
356 Scheduler::resume_task_immediately_cl);
359 rtdebug!("no work was found, returning the scheduler struct");
365 // Workstealing: In this iteration of the runtime each scheduler
366 // thread has a distinct work queue. When no work is available
367 // locally, make a few attempts to steal work from the queues of
368 // other scheduler threads. If a few steals fail we end up in the
369 // old "no work" path which is fine.
371 // First step in the process is to find a task. This function does
372 // that by first checking the local queue, and if there is no work
373 // there, trying to steal from the remote work queues.
374 fn find_work(&mut self) -> Option<~Task> {
375 rtdebug!("scheduler looking for work");
376 match self.work_queue.pop() {
378 rtdebug!("found a task locally");
382 // Our naive stealing, try kinda hard.
383 rtdebug!("scheduler trying to steal");
384 let len = self.work_queues.len();
385 return self.try_steals(len/2);
390 // With no backoff try stealing n times from the queues the
391 // scheduler knows about. This naive implementation can steal from
392 // our own queue or from other special schedulers.
393 fn try_steals(&mut self, n: uint) -> Option<~Task> {
394 for _ in range(0, n) {
395 let index = self.rng.gen_uint_range(0, self.work_queues.len());
396 let work_queues = &mut self.work_queues;
397 match work_queues[index].steal() {
399 rtdebug!("found task by stealing");
405 rtdebug!("giving up on stealing");
409 // * Task Routing Functions - Make sure tasks send up in the right
412 fn process_task(~self, task: ~Task,
413 schedule_fn: SchedulingFn) -> Option<~Scheduler> {
417 rtdebug!("processing a task");
419 let home = task.take_unwrap_home();
421 Sched(home_handle) => {
422 if home_handle.sched_id != this.sched_id() {
423 rtdebug!("sending task home");
424 task.give_home(Sched(home_handle));
425 Scheduler::send_task_home(task);
428 rtdebug!("running task here");
429 task.give_home(Sched(home_handle));
430 return schedule_fn(this, task);
433 AnySched if this.run_anything => {
434 rtdebug!("running anysched task here");
435 task.give_home(AnySched);
436 return schedule_fn(this, task);
439 rtdebug!("sending task to friend");
440 task.give_home(AnySched);
441 this.send_to_friend(task);
447 fn send_task_home(task: ~Task) {
449 let mut home = task.take_unwrap_home();
451 Sched(ref mut home_handle) => {
452 home_handle.send(PinnedTask(task));
455 rtabort!("error: cannot send anysched task home");
460 /// Take a non-homed task we aren't allowed to run here and send
461 /// it to the designated friend scheduler to execute.
462 fn send_to_friend(&mut self, task: ~Task) {
463 rtdebug!("sending a task to friend");
464 match self.friend_handle {
465 Some(ref mut handle) => {
466 handle.send(TaskFromFriend(task));
469 rtabort!("tried to send task to a friend but scheduler has no friends");
474 /// Schedule a task to be executed later.
476 /// Pushes the task onto the work stealing queue and tells the
477 /// event loop to run it later. Always use this instead of pushing
478 /// to the work queue directly.
479 pub fn enqueue_task(&mut self, task: ~Task) {
483 // We push the task onto our local queue clone.
484 this.work_queue.push(task);
485 this.idle_callback.get_mut_ref().resume();
487 // We've made work available. Notify a
488 // sleeping scheduler.
490 match this.sleeper_list.casual_pop() {
492 let mut handle = handle;
495 None => { (/* pass */) }
499 /// As enqueue_task, but with the possibility for the blocked task to
500 /// already have been killed.
501 pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) {
502 do blocked_task.wake().map_move |task| {
503 self.enqueue_task(task);
507 // * Core Context Switching Functions
509 // The primary function for changing contexts. In the current
510 // design the scheduler is just a slightly modified GreenTask, so
511 // all context swaps are from Task to Task. The only difference
512 // between the various cases is where the inputs come from, and
513 // what is done with the resulting task. That is specified by the
514 // cleanup function f, which takes the scheduler and the
515 // old task as inputs.
517 pub fn change_task_context(~self,
519 f: &fn(&mut Scheduler, ~Task)) {
522 // The current task is grabbed from TLS, not taken as an input.
523 // Doing an unsafe_take to avoid writing back a null pointer -
524 // We're going to call `put` later to do that.
525 let current_task: ~Task = unsafe { Local::unsafe_take() };
527 // Check that the task is not in an atomically() section (e.g.,
528 // holding a pthread mutex, which could deadlock the scheduler).
529 current_task.death.assert_may_sleep();
531 // These transmutes do something fishy with a closure.
532 let f_fake_region = unsafe {
533 transmute::<&fn(&mut Scheduler, ~Task),
534 &fn(&mut Scheduler, ~Task)>(f)
536 let f_opaque = ClosureConverter::from_fn(f_fake_region);
538 // The current task is placed inside an enum with the cleanup
539 // function. This enum is then placed inside the scheduler.
540 this.cleanup_job = Some(CleanupJob::new(current_task, f_opaque));
542 // The scheduler is then placed inside the next task.
543 let mut next_task = next_task;
544 next_task.sched = Some(this);
546 // However we still need an internal mutable pointer to the
547 // original task. The strategy here was "arrange memory, then
548 // get pointers", so we crawl back up the chain using
549 // transmute to eliminate borrowck errors.
552 let sched: &mut Scheduler =
553 transmute_mut_region(*next_task.sched.get_mut_ref());
555 let current_task: &mut Task = match sched.cleanup_job {
556 Some(CleanupJob { task: ref task, _ }) => {
557 let task_ptr: *~Task = task;
558 transmute_mut_region(*transmute_mut_unsafe(task_ptr))
561 rtabort!("no cleanup job");
565 let (current_task_context, next_task_context) =
566 Scheduler::get_contexts(current_task, next_task);
568 // Done with everything - put the next task in TLS. This
569 // works because due to transmute the borrow checker
570 // believes that we have no internal pointers to
572 Local::put(next_task);
574 // The raw context swap operation. The next action taken
575 // will be running the cleanup job from the context of the
577 Context::swap(current_task_context, next_task_context);
580 // When the context swaps back to this task we immediately
581 // run the cleanup job, as expected by the previously called
582 // swap_contexts function.
584 let task: *mut Task = Local::unsafe_borrow();
585 (*task).sched.get_mut_ref().run_cleanup_job();
587 // Must happen after running the cleanup job (of course).
588 (*task).death.check_killed((*task).unwinder.unwinding);
592 // Returns a mutable reference to both contexts involved in this
593 // swap. This is unsafe - we are getting mutable internal
594 // references to keep even when we don't own the tasks. It looks
595 // kinda safe because we are doing transmutes before passing in
597 pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) ->
598 (&'a mut Context, &'a mut Context) {
599 let current_task_context =
600 &mut current_task.coroutine.get_mut_ref().saved_context;
601 let next_task_context =
602 &mut next_task.coroutine.get_mut_ref().saved_context;
604 (transmute_mut_region(current_task_context),
605 transmute_mut_region(next_task_context))
609 // * Context Swapping Helpers - Here be ugliness!
611 pub fn resume_task_immediately(~self, task: ~Task) -> Option<~Scheduler> {
612 do self.change_task_context(task) |sched, stask| {
613 sched.sched_task = Some(stask);
618 fn resume_task_immediately_cl(sched: ~Scheduler,
619 task: ~Task) -> Option<~Scheduler> {
620 sched.resume_task_immediately(task)
624 pub fn resume_blocked_task_immediately(~self, blocked_task: BlockedTask) {
625 match blocked_task.wake() {
626 Some(task) => { self.resume_task_immediately(task); }
627 None => Local::put(self)
631 /// Block a running task, context switch to the scheduler, then pass the
632 /// blocked task to a closure.
636 /// The closure here is a *stack* closure that lives in the
637 /// running task. It gets transmuted to the scheduler's lifetime
638 /// and called while the task is blocked.
640 /// This passes a Scheduler pointer to the fn after the context switch
641 /// in order to prevent that fn from performing further scheduling operations.
642 /// Doing further scheduling could easily result in infinite recursion.
643 pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, BlockedTask)) {
644 // Trickier - we need to get the scheduler task out of self
645 // and use it as the destination.
647 let stask = this.sched_task.take_unwrap();
648 // Otherwise this is the same as below.
649 this.switch_running_tasks_and_then(stask, f);
652 pub fn switch_running_tasks_and_then(~self, next_task: ~Task,
653 f: &fn(&mut Scheduler, BlockedTask)) {
654 // This is where we convert the BlockedTask-taking closure into one
655 // that takes just a Task, and is aware of the block-or-killed protocol.
656 do self.change_task_context(next_task) |sched, task| {
657 // Task might need to receive a kill signal instead of blocking.
658 // We can call the "and_then" only if it blocks successfully.
659 match BlockedTask::try_block(task) {
660 Left(killed_task) => sched.enqueue_task(killed_task),
661 Right(blocked_task) => f(sched, blocked_task),
666 fn switch_task(sched: ~Scheduler, task: ~Task) -> Option<~Scheduler> {
667 do sched.switch_running_tasks_and_then(task) |sched, last_task| {
668 sched.enqueue_blocked_task(last_task);
673 // * Task Context Helpers
675 /// Called by a running task to end execution, after which it will
676 /// be recycled by the scheduler for reuse in a new task.
677 pub fn terminate_current_task(~self) {
678 // Similar to deschedule running task and then, but cannot go through
679 // the task-blocking path. The task is already dying.
681 let stask = this.sched_task.take_unwrap();
682 do this.change_task_context(stask) |sched, mut dead_task| {
683 let coroutine = dead_task.coroutine.take_unwrap();
684 coroutine.recycle(&mut sched.stack_pool);
688 pub fn run_task(task: ~Task) {
689 let sched: ~Scheduler = Local::take();
690 sched.process_task(task, Scheduler::switch_task).map_move(Local::put);
693 pub fn run_task_later(next_task: ~Task) {
694 let next_task = Cell::new(next_task);
695 do Local::borrow |sched: &mut Scheduler| {
696 sched.enqueue_task(next_task.take());
700 // * Utility Functions
702 pub fn sched_id(&self) -> uint { to_uint(self) }
704 pub fn run_cleanup_job(&mut self) {
705 let cleanup_job = self.cleanup_job.take_unwrap();
706 cleanup_job.run(self);
709 pub fn make_handle(&mut self) -> SchedHandle {
710 let remote = self.event_loop.remote_callback(Scheduler::run_sched_once);
714 queue: self.message_queue.clone(),
715 sched_id: self.sched_id()
722 type SchedulingFn = ~fn(~Scheduler, ~Task) -> Option<~Scheduler>;
724 pub enum SchedMessage {
728 TaskFromFriend(~Task)
731 pub struct SchedHandle {
732 priv remote: ~RemoteCallbackObject,
733 priv queue: MessageQueue<SchedMessage>,
738 pub fn send(&mut self, msg: SchedMessage) {
739 self.queue.push(msg);
746 f: UnsafeTaskReceiver
750 pub fn new(task: ~Task, f: UnsafeTaskReceiver) -> CleanupJob {
757 pub fn run(self, sched: &mut Scheduler) {
758 let CleanupJob { task: task, f: f } = self;
759 f.to_fn()(sched, task)
763 // XXX: Some hacks to put a &fn in Scheduler without borrowck
765 type UnsafeTaskReceiver = raw::Closure;
766 trait ClosureConverter {
767 fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self;
768 fn to_fn(self) -> &fn(&mut Scheduler, ~Task);
770 impl ClosureConverter for UnsafeTaskReceiver {
771 fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver {
772 unsafe { transmute(f) }
774 fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } }
783 use unstable::run_in_bare_thread;
786 use rt::sched::{Scheduler};
788 use rt::thread::Thread;
789 use rt::task::{Task, Sched};
794 fn trivial_run_in_newsched_task_test() {
795 let mut task_ran = false;
796 let task_ran_ptr: *mut bool = &mut task_ran;
797 do run_in_newsched_task || {
798 unsafe { *task_ran_ptr = true };
799 rtdebug!("executed from the new scheduler")
805 fn multiple_task_test() {
807 let mut task_run_count = 0;
808 let task_run_count_ptr: *mut uint = &mut task_run_count;
809 do run_in_newsched_task || {
810 for _ in range(0u, total) {
812 unsafe { *task_run_count_ptr = *task_run_count_ptr + 1};
816 assert!(task_run_count == total);
820 fn multiple_task_nested_test() {
821 let mut task_run_count = 0;
822 let task_run_count_ptr: *mut uint = &mut task_run_count;
823 do run_in_newsched_task || {
825 unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
827 unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
829 unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
834 assert!(task_run_count == 3);
837 // Confirm that a sched_id actually is the uint form of the
838 // pointer to the scheduler struct.
840 fn simple_sched_id_test() {
841 do run_in_bare_thread {
842 let sched = ~new_test_uv_sched();
843 assert!(to_uint(sched) == sched.sched_id());
847 // Compare two scheduler ids that are different, this should never
848 // fail but may catch a mistake someday.
850 fn compare_sched_id_test() {
851 do run_in_bare_thread {
852 let sched_one = ~new_test_uv_sched();
853 let sched_two = ~new_test_uv_sched();
854 assert!(sched_one.sched_id() != sched_two.sched_id());
859 // A very simple test that confirms that a task executing on the
860 // home scheduler notices that it is home.
862 fn test_home_sched() {
863 do run_in_bare_thread {
864 let mut task_ran = false;
865 let task_ran_ptr: *mut bool = &mut task_ran;
867 let mut sched = ~new_test_uv_sched();
868 let sched_handle = sched.make_handle();
870 let mut task = ~do Task::new_root_homed(&mut sched.stack_pool, None,
871 Sched(sched_handle)) {
872 unsafe { *task_ran_ptr = true };
873 assert!(Task::on_appropriate_sched());
876 let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status);
877 task.death.on_exit = Some(on_exit);
879 sched.bootstrap(task);
883 // An advanced test that checks all four possible states that a
884 // (task,sched) can be in regarding homes.
887 fn test_schedule_home_states() {
889 use rt::uv::uvio::UvEventLoop;
890 use rt::sleeper_list::SleeperList;
891 use rt::work_queue::WorkQueue;
892 use rt::sched::Shutdown;
896 do run_in_bare_thread {
898 let sleepers = SleeperList::new();
899 let normal_queue = WorkQueue::new();
900 let special_queue = WorkQueue::new();
901 let queues = ~[normal_queue.clone(), special_queue.clone()];
903 // Our normal scheduler
904 let mut normal_sched = ~Scheduler::new(
910 let normal_handle = Cell::new(normal_sched.make_handle());
912 let friend_handle = normal_sched.make_handle();
914 // Our special scheduler
915 let mut special_sched = ~Scheduler::new_special(
917 special_queue.clone(),
921 Some(friend_handle));
923 let special_handle = Cell::new(special_sched.make_handle());
925 let t1_handle = special_sched.make_handle();
926 let t4_handle = special_sched.make_handle();
929 // 1) task is home on special
930 // 2) task not homed, sched doesn't care
931 // 3) task not homed, sched requeues
932 // 4) task not home, send home
934 let task1 = ~do Task::new_root_homed(&mut special_sched.stack_pool, None,
935 Sched(t1_handle)) || {
936 rtassert!(Task::on_appropriate_sched());
938 rtdebug!("task1 id: **%u**", borrow::to_uint(task1));
940 let task2 = ~do Task::new_root(&mut normal_sched.stack_pool, None) {
941 rtassert!(Task::on_appropriate_sched());
944 let task3 = ~do Task::new_root(&mut normal_sched.stack_pool, None) {
945 rtassert!(Task::on_appropriate_sched());
948 let task4 = ~do Task::new_root_homed(&mut special_sched.stack_pool, None,
950 rtassert!(Task::on_appropriate_sched());
952 rtdebug!("task4 id: **%u**", borrow::to_uint(task4));
954 let task1 = Cell::new(task1);
955 let task2 = Cell::new(task2);
956 let task3 = Cell::new(task3);
957 let task4 = Cell::new(task4);
959 // Signal from the special task that we are done.
960 let (port, chan) = oneshot::<()>();
961 let port = Cell::new(port);
962 let chan = Cell::new(chan);
964 let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool, None) {
965 rtdebug!("*about to submit task2*");
966 Scheduler::run_task(task2.take());
967 rtdebug!("*about to submit task4*");
968 Scheduler::run_task(task4.take());
969 rtdebug!("*normal_task done*");
971 let mut nh = normal_handle.take();
973 let mut sh = special_handle.take();
977 rtdebug!("normal task: %u", borrow::to_uint(normal_task));
979 let special_task = ~do Task::new_root(&mut special_sched.stack_pool, None) {
980 rtdebug!("*about to submit task1*");
981 Scheduler::run_task(task1.take());
982 rtdebug!("*about to submit task3*");
983 Scheduler::run_task(task3.take());
984 rtdebug!("*done with special_task*");
985 chan.take().send(());
988 rtdebug!("special task: %u", borrow::to_uint(special_task));
990 let special_sched = Cell::new(special_sched);
991 let normal_sched = Cell::new(normal_sched);
992 let special_task = Cell::new(special_task);
993 let normal_task = Cell::new(normal_task);
995 let normal_thread = do Thread::start {
996 normal_sched.take().bootstrap(normal_task.take());
997 rtdebug!("finished with normal_thread");
1000 let special_thread = do Thread::start {
1001 special_sched.take().bootstrap(special_task.take());
1002 rtdebug!("finished with special_sched");
1005 normal_thread.join();
1006 special_thread.join();
1011 fn test_stress_schedule_task_states() {
1012 if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
1013 let n = stress_factor() * 120;
1014 for _ in range(0, n as int) {
1015 test_schedule_home_states();
1020 fn test_io_callback() {
1021 // This is a regression test that when there are no schedulable tasks
1022 // in the work queue, but we are performing I/O, that once we do put
1023 // something in the work queue again the scheduler picks it up and doesn't
1024 // exit before emptying the work queue
1025 do run_in_newsched_task {
1027 let sched: ~Scheduler = Local::take();
1028 do sched.deschedule_running_task_and_then |sched, task| {
1029 let task = Cell::new(task);
1030 do sched.event_loop.callback_ms(10) {
1031 rtdebug!("in callback");
1032 let mut sched: ~Scheduler = Local::take();
1033 sched.enqueue_blocked_task(task.take());
1045 do run_in_bare_thread {
1046 let (port, chan) = oneshot::<()>();
1047 let port = Cell::new(port);
1048 let chan = Cell::new(chan);
1050 let thread_one = do Thread::start {
1051 let chan = Cell::new(chan.take());
1052 do run_in_newsched_task_core {
1053 chan.take().send(());
1057 let thread_two = do Thread::start {
1058 let port = Cell::new(port.take());
1059 do run_in_newsched_task_core {
1069 // A regression test that the final message is always handled.
1070 // Used to deadlock because Shutdown was never recvd.
1072 fn no_missed_messages() {
1073 use rt::work_queue::WorkQueue;
1074 use rt::sleeper_list::SleeperList;
1075 use rt::stack::StackPool;
1076 use rt::uv::uvio::UvEventLoop;
1077 use rt::sched::{Shutdown, TaskFromFriend};
1080 do run_in_bare_thread {
1081 do stress_factor().times {
1082 let sleepers = SleeperList::new();
1083 let queue = WorkQueue::new();
1084 let queues = ~[queue.clone()];
1086 let mut sched = ~Scheduler::new(
1087 ~UvEventLoop::new(),
1092 let mut handle = sched.make_handle();
1094 let sched = Cell::new(sched);
1096 let thread = do Thread::start {
1097 let mut sched = sched.take();
1098 let bootstrap_task = ~Task::new_root(&mut sched.stack_pool, None, ||());
1099 sched.bootstrap(bootstrap_task);
1102 let mut stack_pool = StackPool::new();
1103 let task = ~Task::new_root(&mut stack_pool, None, ||());
1104 handle.send(TaskFromFriend(task));
1106 handle.send(Shutdown);
1107 util::ignore(handle);
1115 fn multithreading() {
1118 use vec::OwnedVector;
1119 use container::Container;
1121 do run_in_mt_newsched_task {
1122 let mut ports = ~[];
1124 let (port, chan) = oneshot();
1125 let chan_cell = Cell::new(chan);
1126 do spawntask_later {
1127 chan_cell.take().send(());
1132 while !ports.is_empty() {
1141 use comm::{GenericPort, GenericChan};
1143 do run_in_mt_newsched_task {
1144 let (end_port, end_chan) = oneshot();
1149 let (p, ch1) = stream();
1151 ch1.send((token, end_chan));
1153 while i <= n_tasks {
1154 let (next_p, ch) = stream();
1157 do spawntask_random {
1158 roundtrip(imm_i, n_tasks, &imm_p, &ch);
1165 do spawntask_random {
1166 roundtrip(1, n_tasks, &imm_p, &imm_ch);
1172 fn roundtrip(id: int, n_tasks: int,
1173 p: &Port<(int, ChanOne<()>)>, ch: &Chan<(int, ChanOne<()>)>) {
1181 (token, end_chan) => {
1182 debug!("thread: %d got token: %d", id, token);
1183 ch.send((token - 1, end_chan));
1184 if token <= n_tasks {
1194 fn start_closure_dtor() {
1197 // Regression test that the `start` task entrypoint can
1198 // contain dtors that use task resources
1199 do run_in_newsched_task {
1200 struct S { field: () }
1208 let s = S { field: () };