]> git.lizzy.rs Git - rust.git/blob - src/libstd/rt/sched.rs
bcf9ae4a2a8410037d2f0e80ec48e8e4b62c3dba
[rust.git] / src / libstd / rt / sched.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use either::{Left, Right};
12 use option::{Option, Some, None};
13 use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
14 use clone::Clone;
15 use unstable::raw;
16 use super::sleeper_list::SleeperList;
17 use super::work_queue::WorkQueue;
18 use super::stack::{StackPool};
19 use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject};
20 use super::context::Context;
21 use super::task::{Task, AnySched, Sched};
22 use super::message_queue::MessageQueue;
23 use rt::kill::BlockedTask;
24 use rt::local_ptr;
25 use rt::local::Local;
26 use rt::rtio::{RemoteCallback, PausibleIdleCallback};
27 use borrow::{to_uint};
28 use cell::Cell;
29 use rand::{XorShiftRng, RngUtil};
30 use iter::range;
31 use vec::{OwnedVector};
32
33 /// A scheduler is responsible for coordinating the execution of Tasks
34 /// on a single thread. The scheduler runs inside a slightly modified
35 /// Rust Task. When not running this task is stored in the scheduler
36 /// struct. The scheduler struct acts like a baton, all scheduling
37 /// actions are transfers of the baton.
38 ///
39 /// XXX: This creates too many callbacks to run_sched_once, resulting
40 /// in too much allocation and too many events.
41 pub struct Scheduler {
42     /// There are N work queues, one per scheduler.
43     priv work_queue: WorkQueue<~Task>,
44     /// Work queues for the other schedulers. These are created by
45     /// cloning the core work queues.
46     work_queues: ~[WorkQueue<~Task>],
47     /// The queue of incoming messages from other schedulers.
48     /// These are enqueued by SchedHandles after which a remote callback
49     /// is triggered to handle the message.
50     priv message_queue: MessageQueue<SchedMessage>,
51     /// A shared list of sleeping schedulers. We'll use this to wake
52     /// up schedulers when pushing work onto the work queue.
53     sleeper_list: SleeperList,
54     /// Indicates that we have previously pushed a handle onto the
55     /// SleeperList but have not yet received the Wake message.
56     /// Being `true` does not necessarily mean that the scheduler is
57     /// not active since there are multiple event sources that may
58     /// wake the scheduler. It just prevents the scheduler from pushing
59     /// multiple handles onto the sleeper list.
60     priv sleepy: bool,
61     /// A flag to indicate we've received the shutdown message and should
62     /// no longer try to go to sleep, but exit instead.
63     no_sleep: bool,
64     stack_pool: StackPool,
65     /// The event loop used to drive the scheduler and perform I/O
66     event_loop: ~EventLoopObject,
67     /// The scheduler runs on a special task. When it is not running
68     /// it is stored here instead of the work queue.
69     sched_task: Option<~Task>,
70     /// An action performed after a context switch on behalf of the
71     /// code running before the context switch
72     cleanup_job: Option<CleanupJob>,
73     /// Should this scheduler run any task, or only pinned tasks?
74     run_anything: bool,
75     /// If the scheduler shouldn't run some tasks, a friend to send
76     /// them to.
77     friend_handle: Option<SchedHandle>,
78     /// A fast XorShift rng for scheduler use
79     rng: XorShiftRng,
80     /// A toggleable idle callback
81     idle_callback: Option<~PausibleIdleCallback>
82 }
83
84 /// An indication of how hard to work on a given operation, the difference
85 /// mainly being whether memory is synchronized or not
86 #[deriving(Eq)]
87 enum EffortLevel {
88     DontTryTooHard,
89     GiveItYourBest
90 }
91
92 impl Scheduler {
93
94     // * Initialization Functions
95
96     pub fn new(event_loop: ~EventLoopObject,
97                work_queue: WorkQueue<~Task>,
98                work_queues: ~[WorkQueue<~Task>],
99                sleeper_list: SleeperList)
100         -> Scheduler {
101
102         Scheduler::new_special(event_loop, work_queue,
103                                work_queues,
104                                sleeper_list, true, None)
105
106     }
107
108     pub fn new_special(event_loop: ~EventLoopObject,
109                        work_queue: WorkQueue<~Task>,
110                        work_queues: ~[WorkQueue<~Task>],
111                        sleeper_list: SleeperList,
112                        run_anything: bool,
113                        friend: Option<SchedHandle>)
114         -> Scheduler {
115
116         Scheduler {
117             sleeper_list: sleeper_list,
118             message_queue: MessageQueue::new(),
119             sleepy: false,
120             no_sleep: false,
121             event_loop: event_loop,
122             work_queue: work_queue,
123             work_queues: work_queues,
124             stack_pool: StackPool::new(),
125             sched_task: None,
126             cleanup_job: None,
127             run_anything: run_anything,
128             friend_handle: friend,
129             rng: XorShiftRng::new(),
130             idle_callback: None
131         }
132     }
133
134     // XXX: This may eventually need to be refactored so that
135     // the scheduler itself doesn't have to call event_loop.run.
136     // That will be important for embedding the runtime into external
137     // event loops.
138
139     // Take a main task to run, and a scheduler to run it in. Create a
140     // scheduler task and bootstrap into it.
141     pub fn bootstrap(~self, task: ~Task) {
142
143         let mut this = self;
144
145         // Build an Idle callback.
146         this.idle_callback = Some(this.event_loop.pausible_idle_callback());
147
148         // Initialize the TLS key.
149         local_ptr::init_tls_key();
150
151         // Create a task for the scheduler with an empty context.
152         let sched_task = ~Task::new_sched_task();
153
154         // Now that we have an empty task struct for the scheduler
155         // task, put it in TLS.
156         Local::put::(sched_task);
157
158         // Before starting our first task, make sure the idle callback
159         // is active. As we do not start in the sleep state this is
160         // important.
161         this.idle_callback.get_mut_ref().start(Scheduler::run_sched_once);
162
163         // Now, as far as all the scheduler state is concerned, we are
164         // inside the "scheduler" context. So we can act like the
165         // scheduler and resume the provided task.
166         this.resume_task_immediately(task);
167
168         // Now we are back in the scheduler context, having
169         // successfully run the input task. Start by running the
170         // scheduler. Grab it out of TLS - performing the scheduler
171         // action will have given it away.
172         let sched: ~Scheduler = Local::take();
173
174         rtdebug!("starting scheduler %u", sched.sched_id());
175         sched.run();
176
177         // Close the idle callback.
178         let mut sched: ~Scheduler = Local::take();
179         sched.idle_callback.get_mut_ref().close();
180         // Make one go through the loop to run the close callback.
181         sched.run();
182
183         // Now that we are done with the scheduler, clean up the
184         // scheduler task. Do so by removing it from TLS and manually
185         // cleaning up the memory it uses. As we didn't actually call
186         // task.run() on the scheduler task we never get through all
187         // the cleanup code it runs.
188         let mut stask: ~Task = Local::take();
189
190         rtdebug!("stopping scheduler %u", stask.sched.get_ref().sched_id());
191
192         // Should not have any messages
193         let message = stask.sched.get_mut_ref().message_queue.pop();
194         rtassert!(message.is_none());
195
196         stask.destroyed = true;
197     }
198
199     // This does not return a scheduler, as the scheduler is placed
200     // inside the task.
201     pub fn run(~self) {
202
203         let mut self_sched = self;
204
205         // This is unsafe because we need to place the scheduler, with
206         // the event_loop inside, inside our task. But we still need a
207         // mutable reference to the event_loop to give it the "run"
208         // command.
209         unsafe {
210             let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop;
211
212             // Our scheduler must be in the task before the event loop
213             // is started.
214             let self_sched = Cell::new(self_sched);
215             do Local::borrow |stask: &mut Task| {
216                 stask.sched = Some(self_sched.take());
217             };
218
219             (*event_loop).run();
220         }
221     }
222
223     // * Execution Functions - Core Loop Logic
224
225     // The model for this function is that you continue through it
226     // until you either use the scheduler while performing a schedule
227     // action, in which case you give it away and return early, or
228     // you reach the end and sleep. In the case that a scheduler
229     // action is performed the loop is evented such that this function
230     // is called again.
231     fn run_sched_once() {
232
233         // When we reach the scheduler context via the event loop we
234         // already have a scheduler stored in our local task, so we
235         // start off by taking it. This is the only path through the
236         // scheduler where we get the scheduler this way.
237         let mut sched: ~Scheduler = Local::take();
238
239         // Assume that we need to continue idling unless we reach the
240         // end of this function without performing an action.
241         sched.idle_callback.get_mut_ref().resume();
242
243         // First we check for scheduler messages, these are higher
244         // priority than regular tasks.
245         let sched = match sched.interpret_message_queue(DontTryTooHard) {
246             Some(sched) => sched,
247             None => return
248         };
249
250         // This helper will use a randomized work-stealing algorithm
251         // to find work.
252         let sched = match sched.do_work() {
253             Some(sched) => sched,
254             None => return
255         };
256
257         // Now, before sleeping we need to find out if there really
258         // were any messages. Give it your best!
259         let mut sched = match sched.interpret_message_queue(GiveItYourBest) {
260             Some(sched) => sched,
261             None => return
262         };
263
264         // If we got here then there was no work to do.
265         // Generate a SchedHandle and push it to the sleeper list so
266         // somebody can wake us up later.
267         if !sched.sleepy && !sched.no_sleep {
268             rtdebug!("scheduler has no work to do, going to sleep");
269             sched.sleepy = true;
270             let handle = sched.make_handle();
271             sched.sleeper_list.push(handle);
272             // Since we are sleeping, deactivate the idle callback.
273             sched.idle_callback.get_mut_ref().pause();
274         } else {
275             rtdebug!("not sleeping, already doing so or no_sleep set");
276             // We may not be sleeping, but we still need to deactivate
277             // the idle callback.
278             sched.idle_callback.get_mut_ref().pause();
279         }
280
281         // Finished a cycle without using the Scheduler. Place it back
282         // in TLS.
283         Local::put(sched);
284     }
285
286     // This function returns None if the scheduler is "used", or it
287     // returns the still-available scheduler. At this point all
288     // message-handling will count as a turn of work, and as a result
289     // return None.
290     fn interpret_message_queue(~self, effort: EffortLevel) -> Option<~Scheduler> {
291
292         let mut this = self;
293
294         let msg = if effort == DontTryTooHard {
295             // Do a cheap check that may miss messages
296             this.message_queue.casual_pop()
297         } else {
298             this.message_queue.pop()
299         };
300
301         match msg {
302             Some(PinnedTask(task)) => {
303                 let mut task = task;
304                 task.give_home(Sched(this.make_handle()));
305                 this.resume_task_immediately(task);
306                 return None;
307             }
308             Some(TaskFromFriend(task)) => {
309                 rtdebug!("got a task from a friend. lovely!");
310                 this.process_task(task,
311                                   Scheduler::resume_task_immediately_cl).map_move(Local::put);
312                 return None;
313             }
314             Some(Wake) => {
315                 this.sleepy = false;
316                 Local::put(this);
317                 return None;
318             }
319             Some(Shutdown) => {
320                 rtdebug!("shutting down");
321                 if this.sleepy {
322                     // There may be an outstanding handle on the
323                     // sleeper list.  Pop them all to make sure that's
324                     // not the case.
325                     loop {
326                         match this.sleeper_list.pop() {
327                             Some(handle) => {
328                                 let mut handle = handle;
329                                 handle.send(Wake);
330                             }
331                             None => break
332                         }
333                     }
334                 }
335                 // No more sleeping. After there are no outstanding
336                 // event loop references we will shut down.
337                 this.no_sleep = true;
338                 this.sleepy = false;
339                 Local::put(this);
340                 return None;
341             }
342             None => {
343                 return Some(this);
344             }
345         }
346     }
347
348     fn do_work(~self) -> Option<~Scheduler> {
349         let mut this = self;
350
351         rtdebug!("scheduler calling do work");
352         match this.find_work() {
353             Some(task) => {
354                 rtdebug!("found some work! processing the task");
355                 return this.process_task(task,
356                                          Scheduler::resume_task_immediately_cl);
357             }
358             None => {
359                 rtdebug!("no work was found, returning the scheduler struct");
360                 return Some(this);
361             }
362         }
363     }
364
365     // Workstealing: In this iteration of the runtime each scheduler
366     // thread has a distinct work queue. When no work is available
367     // locally, make a few attempts to steal work from the queues of
368     // other scheduler threads. If a few steals fail we end up in the
369     // old "no work" path which is fine.
370
371     // First step in the process is to find a task. This function does
372     // that by first checking the local queue, and if there is no work
373     // there, trying to steal from the remote work queues.
374     fn find_work(&mut self) -> Option<~Task> {
375         rtdebug!("scheduler looking for work");
376         match self.work_queue.pop() {
377             Some(task) => {
378                 rtdebug!("found a task locally");
379                 return Some(task)
380             }
381             None => {
382                 // Our naive stealing, try kinda hard.
383                 rtdebug!("scheduler trying to steal");
384                 let len = self.work_queues.len();
385                 return self.try_steals(len/2);
386             }
387         }
388     }
389
390     // With no backoff try stealing n times from the queues the
391     // scheduler knows about. This naive implementation can steal from
392     // our own queue or from other special schedulers.
393     fn try_steals(&mut self, n: uint) -> Option<~Task> {
394         for _ in range(0, n) {
395             let index = self.rng.gen_uint_range(0, self.work_queues.len());
396             let work_queues = &mut self.work_queues;
397             match work_queues[index].steal() {
398                 Some(task) => {
399                     rtdebug!("found task by stealing");
400                     return Some(task)
401                 }
402                 None => ()
403             }
404         };
405         rtdebug!("giving up on stealing");
406         return None;
407     }
408
409     // * Task Routing Functions - Make sure tasks send up in the right
410     // place.
411
412     fn process_task(~self, task: ~Task,
413                     schedule_fn: SchedulingFn) -> Option<~Scheduler> {
414         let mut this = self;
415         let mut task = task;
416
417         rtdebug!("processing a task");
418
419         let home = task.take_unwrap_home();
420         match home {
421             Sched(home_handle) => {
422                 if home_handle.sched_id != this.sched_id() {
423                     rtdebug!("sending task home");
424                     task.give_home(Sched(home_handle));
425                     Scheduler::send_task_home(task);
426                     return Some(this);
427                 } else {
428                     rtdebug!("running task here");
429                     task.give_home(Sched(home_handle));
430                     return schedule_fn(this, task);
431                 }
432             }
433             AnySched if this.run_anything => {
434                 rtdebug!("running anysched task here");
435                 task.give_home(AnySched);
436                 return schedule_fn(this, task);
437             }
438             AnySched => {
439                 rtdebug!("sending task to friend");
440                 task.give_home(AnySched);
441                 this.send_to_friend(task);
442                 return Some(this);
443             }
444         }
445     }
446
447     fn send_task_home(task: ~Task) {
448         let mut task = task;
449         let mut home = task.take_unwrap_home();
450         match home {
451             Sched(ref mut home_handle) => {
452                 home_handle.send(PinnedTask(task));
453             }
454             AnySched => {
455                         rtabort!("error: cannot send anysched task home");
456             }
457         }
458     }
459
460     /// Take a non-homed task we aren't allowed to run here and send
461     /// it to the designated friend scheduler to execute.
462     fn send_to_friend(&mut self, task: ~Task) {
463         rtdebug!("sending a task to friend");
464         match self.friend_handle {
465             Some(ref mut handle) => {
466                 handle.send(TaskFromFriend(task));
467             }
468             None => {
469                 rtabort!("tried to send task to a friend but scheduler has no friends");
470             }
471         }
472     }
473
474     /// Schedule a task to be executed later.
475     ///
476     /// Pushes the task onto the work stealing queue and tells the
477     /// event loop to run it later. Always use this instead of pushing
478     /// to the work queue directly.
479     pub fn enqueue_task(&mut self, task: ~Task) {
480
481         let this = self;
482
483         // We push the task onto our local queue clone.
484         this.work_queue.push(task);
485         this.idle_callback.get_mut_ref().resume();
486
487         // We've made work available. Notify a
488         // sleeping scheduler.
489
490         match this.sleeper_list.casual_pop() {
491             Some(handle) => {
492                         let mut handle = handle;
493                 handle.send(Wake)
494             }
495             None => { (/* pass */) }
496         };
497     }
498
499     /// As enqueue_task, but with the possibility for the blocked task to
500     /// already have been killed.
501     pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) {
502         do blocked_task.wake().map_move |task| {
503             self.enqueue_task(task);
504         };
505     }
506
507     // * Core Context Switching Functions
508
509     // The primary function for changing contexts. In the current
510     // design the scheduler is just a slightly modified GreenTask, so
511     // all context swaps are from Task to Task. The only difference
512     // between the various cases is where the inputs come from, and
513     // what is done with the resulting task. That is specified by the
514     // cleanup function f, which takes the scheduler and the
515     // old task as inputs.
516
517     pub fn change_task_context(~self,
518                                next_task: ~Task,
519                                f: &fn(&mut Scheduler, ~Task)) {
520         let mut this = self;
521
522         // The current task is grabbed from TLS, not taken as an input.
523         // Doing an unsafe_take to avoid writing back a null pointer -
524         // We're going to call `put` later to do that.
525         let current_task: ~Task = unsafe { Local::unsafe_take() };
526
527         // Check that the task is not in an atomically() section (e.g.,
528         // holding a pthread mutex, which could deadlock the scheduler).
529         current_task.death.assert_may_sleep();
530
531         // These transmutes do something fishy with a closure.
532         let f_fake_region = unsafe {
533             transmute::<&fn(&mut Scheduler, ~Task),
534                         &fn(&mut Scheduler, ~Task)>(f)
535         };
536         let f_opaque = ClosureConverter::from_fn(f_fake_region);
537
538         // The current task is placed inside an enum with the cleanup
539         // function. This enum is then placed inside the scheduler.
540         this.cleanup_job = Some(CleanupJob::new(current_task, f_opaque));
541
542         // The scheduler is then placed inside the next task.
543         let mut next_task = next_task;
544         next_task.sched = Some(this);
545
546         // However we still need an internal mutable pointer to the
547         // original task. The strategy here was "arrange memory, then
548         // get pointers", so we crawl back up the chain using
549         // transmute to eliminate borrowck errors.
550         unsafe {
551
552             let sched: &mut Scheduler =
553                 transmute_mut_region(*next_task.sched.get_mut_ref());
554
555             let current_task: &mut Task = match sched.cleanup_job {
556                 Some(CleanupJob { task: ref task, _ }) => {
557                     let task_ptr: *~Task = task;
558                     transmute_mut_region(*transmute_mut_unsafe(task_ptr))
559                 }
560                 None => {
561                     rtabort!("no cleanup job");
562                 }
563             };
564
565             let (current_task_context, next_task_context) =
566                 Scheduler::get_contexts(current_task, next_task);
567
568             // Done with everything - put the next task in TLS. This
569             // works because due to transmute the borrow checker
570             // believes that we have no internal pointers to
571             // next_task.
572             Local::put(next_task);
573
574             // The raw context swap operation. The next action taken
575             // will be running the cleanup job from the context of the
576             // next task.
577             Context::swap(current_task_context, next_task_context);
578         }
579
580         // When the context swaps back to this task we immediately
581         // run the cleanup job, as expected by the previously called
582         // swap_contexts function.
583         unsafe {
584             let task: *mut Task = Local::unsafe_borrow();
585             (*task).sched.get_mut_ref().run_cleanup_job();
586
587             // Must happen after running the cleanup job (of course).
588             (*task).death.check_killed((*task).unwinder.unwinding);
589         }
590     }
591
592     // Returns a mutable reference to both contexts involved in this
593     // swap. This is unsafe - we are getting mutable internal
594     // references to keep even when we don't own the tasks. It looks
595     // kinda safe because we are doing transmutes before passing in
596     // the arguments.
597     pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) ->
598         (&'a mut Context, &'a mut Context) {
599         let current_task_context =
600             &mut current_task.coroutine.get_mut_ref().saved_context;
601         let next_task_context =
602                 &mut next_task.coroutine.get_mut_ref().saved_context;
603         unsafe {
604             (transmute_mut_region(current_task_context),
605              transmute_mut_region(next_task_context))
606         }
607     }
608
609     // * Context Swapping Helpers - Here be ugliness!
610
611     pub fn resume_task_immediately(~self, task: ~Task) -> Option<~Scheduler> {
612         do self.change_task_context(task) |sched, stask| {
613             sched.sched_task = Some(stask);
614         }
615         return None;
616     }
617
618     fn resume_task_immediately_cl(sched: ~Scheduler,
619                                   task: ~Task) -> Option<~Scheduler> {
620         sched.resume_task_immediately(task)
621     }
622
623
624     pub fn resume_blocked_task_immediately(~self, blocked_task: BlockedTask) {
625         match blocked_task.wake() {
626             Some(task) => { self.resume_task_immediately(task); }
627             None => Local::put(self)
628         };
629     }
630
631     /// Block a running task, context switch to the scheduler, then pass the
632     /// blocked task to a closure.
633     ///
634     /// # Safety note
635     ///
636     /// The closure here is a *stack* closure that lives in the
637     /// running task.  It gets transmuted to the scheduler's lifetime
638     /// and called while the task is blocked.
639     ///
640     /// This passes a Scheduler pointer to the fn after the context switch
641     /// in order to prevent that fn from performing further scheduling operations.
642     /// Doing further scheduling could easily result in infinite recursion.
643     pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, BlockedTask)) {
644         // Trickier - we need to get the scheduler task out of self
645         // and use it as the destination.
646         let mut this = self;
647         let stask = this.sched_task.take_unwrap();
648         // Otherwise this is the same as below.
649         this.switch_running_tasks_and_then(stask, f);
650     }
651
652     pub fn switch_running_tasks_and_then(~self, next_task: ~Task,
653                                          f: &fn(&mut Scheduler, BlockedTask)) {
654         // This is where we convert the BlockedTask-taking closure into one
655         // that takes just a Task, and is aware of the block-or-killed protocol.
656         do self.change_task_context(next_task) |sched, task| {
657             // Task might need to receive a kill signal instead of blocking.
658             // We can call the "and_then" only if it blocks successfully.
659             match BlockedTask::try_block(task) {
660                 Left(killed_task) => sched.enqueue_task(killed_task),
661                 Right(blocked_task) => f(sched, blocked_task),
662             }
663         }
664     }
665
666     fn switch_task(sched: ~Scheduler, task: ~Task) -> Option<~Scheduler> {
667         do sched.switch_running_tasks_and_then(task) |sched, last_task| {
668             sched.enqueue_blocked_task(last_task);
669         };
670         return None;
671     }
672
673     // * Task Context Helpers
674
675     /// Called by a running task to end execution, after which it will
676     /// be recycled by the scheduler for reuse in a new task.
677     pub fn terminate_current_task(~self) {
678         // Similar to deschedule running task and then, but cannot go through
679         // the task-blocking path. The task is already dying.
680         let mut this = self;
681         let stask = this.sched_task.take_unwrap();
682         do this.change_task_context(stask) |sched, mut dead_task| {
683             let coroutine = dead_task.coroutine.take_unwrap();
684             coroutine.recycle(&mut sched.stack_pool);
685         }
686     }
687
688     pub fn run_task(task: ~Task) {
689         let sched: ~Scheduler = Local::take();
690         sched.process_task(task, Scheduler::switch_task).map_move(Local::put);
691     }
692
693     pub fn run_task_later(next_task: ~Task) {
694         let next_task = Cell::new(next_task);
695         do Local::borrow |sched: &mut Scheduler| {
696             sched.enqueue_task(next_task.take());
697         };
698     }
699
700     // * Utility Functions
701
702     pub fn sched_id(&self) -> uint { to_uint(self) }
703
704     pub fn run_cleanup_job(&mut self) {
705         let cleanup_job = self.cleanup_job.take_unwrap();
706         cleanup_job.run(self);
707     }
708
709     pub fn make_handle(&mut self) -> SchedHandle {
710         let remote = self.event_loop.remote_callback(Scheduler::run_sched_once);
711
712         return SchedHandle {
713             remote: remote,
714             queue: self.message_queue.clone(),
715             sched_id: self.sched_id()
716         };
717     }
718 }
719
720 // Supporting types
721
722 type SchedulingFn = ~fn(~Scheduler, ~Task) -> Option<~Scheduler>;
723
724 pub enum SchedMessage {
725     Wake,
726     Shutdown,
727     PinnedTask(~Task),
728     TaskFromFriend(~Task)
729 }
730
731 pub struct SchedHandle {
732     priv remote: ~RemoteCallbackObject,
733     priv queue: MessageQueue<SchedMessage>,
734     sched_id: uint
735 }
736
737 impl SchedHandle {
738     pub fn send(&mut self, msg: SchedMessage) {
739         self.queue.push(msg);
740         self.remote.fire();
741     }
742 }
743
744 struct CleanupJob {
745     task: ~Task,
746     f: UnsafeTaskReceiver
747 }
748
749 impl CleanupJob {
750     pub fn new(task: ~Task, f: UnsafeTaskReceiver) -> CleanupJob {
751         CleanupJob {
752             task: task,
753             f: f
754         }
755     }
756
757     pub fn run(self, sched: &mut Scheduler) {
758         let CleanupJob { task: task, f: f } = self;
759         f.to_fn()(sched, task)
760     }
761 }
762
763 // XXX: Some hacks to put a &fn in Scheduler without borrowck
764 // complaining
765 type UnsafeTaskReceiver = raw::Closure;
766 trait ClosureConverter {
767     fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self;
768     fn to_fn(self) -> &fn(&mut Scheduler, ~Task);
769 }
770 impl ClosureConverter for UnsafeTaskReceiver {
771     fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver {
772         unsafe { transmute(f) }
773     }
774     fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } }
775 }
776
777 #[cfg(test)]
778 mod test {
779     extern mod extra;
780
781     use prelude::*;
782     use rt::test::*;
783     use unstable::run_in_bare_thread;
784     use borrow::to_uint;
785     use rt::local::*;
786     use rt::sched::{Scheduler};
787     use cell::Cell;
788     use rt::thread::Thread;
789     use rt::task::{Task, Sched};
790     use rt::util;
791     use option::{Some};
792
793     #[test]
794     fn trivial_run_in_newsched_task_test() {
795         let mut task_ran = false;
796         let task_ran_ptr: *mut bool = &mut task_ran;
797         do run_in_newsched_task || {
798             unsafe { *task_ran_ptr = true };
799             rtdebug!("executed from the new scheduler")
800         }
801         assert!(task_ran);
802     }
803
804     #[test]
805     fn multiple_task_test() {
806         let total = 10;
807         let mut task_run_count = 0;
808         let task_run_count_ptr: *mut uint = &mut task_run_count;
809         do run_in_newsched_task || {
810             for _ in range(0u, total) {
811                 do spawntask || {
812                     unsafe { *task_run_count_ptr = *task_run_count_ptr + 1};
813                 }
814             }
815         }
816         assert!(task_run_count == total);
817     }
818
819     #[test]
820     fn multiple_task_nested_test() {
821         let mut task_run_count = 0;
822         let task_run_count_ptr: *mut uint = &mut task_run_count;
823         do run_in_newsched_task || {
824             do spawntask || {
825                 unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
826                 do spawntask || {
827                     unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
828                     do spawntask || {
829                         unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
830                     }
831                 }
832             }
833         }
834         assert!(task_run_count == 3);
835     }
836
837     // Confirm that a sched_id actually is the uint form of the
838     // pointer to the scheduler struct.
839     #[test]
840     fn simple_sched_id_test() {
841         do run_in_bare_thread {
842             let sched = ~new_test_uv_sched();
843             assert!(to_uint(sched) == sched.sched_id());
844         }
845     }
846
847     // Compare two scheduler ids that are different, this should never
848     // fail but may catch a mistake someday.
849     #[test]
850     fn compare_sched_id_test() {
851         do run_in_bare_thread {
852             let sched_one = ~new_test_uv_sched();
853             let sched_two = ~new_test_uv_sched();
854             assert!(sched_one.sched_id() != sched_two.sched_id());
855         }
856     }
857
858
859     // A very simple test that confirms that a task executing on the
860     // home scheduler notices that it is home.
861     #[test]
862     fn test_home_sched() {
863         do run_in_bare_thread {
864             let mut task_ran = false;
865             let task_ran_ptr: *mut bool = &mut task_ran;
866
867             let mut sched = ~new_test_uv_sched();
868             let sched_handle = sched.make_handle();
869
870             let mut task = ~do Task::new_root_homed(&mut sched.stack_pool, None,
871                                                 Sched(sched_handle)) {
872                 unsafe { *task_ran_ptr = true };
873                 assert!(Task::on_appropriate_sched());
874             };
875
876             let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status);
877             task.death.on_exit = Some(on_exit);
878
879             sched.bootstrap(task);
880         }
881     }
882
883     // An advanced test that checks all four possible states that a
884     // (task,sched) can be in regarding homes.
885
886     #[test]
887     fn test_schedule_home_states() {
888
889         use rt::uv::uvio::UvEventLoop;
890         use rt::sleeper_list::SleeperList;
891         use rt::work_queue::WorkQueue;
892         use rt::sched::Shutdown;
893         use borrow;
894         use rt::comm::*;
895
896         do run_in_bare_thread {
897
898             let sleepers = SleeperList::new();
899             let normal_queue = WorkQueue::new();
900             let special_queue = WorkQueue::new();
901             let queues = ~[normal_queue.clone(), special_queue.clone()];
902
903             // Our normal scheduler
904             let mut normal_sched = ~Scheduler::new(
905                 ~UvEventLoop::new(),
906                 normal_queue,
907                 queues.clone(),
908                 sleepers.clone());
909
910             let normal_handle = Cell::new(normal_sched.make_handle());
911
912             let friend_handle = normal_sched.make_handle();
913
914             // Our special scheduler
915             let mut special_sched = ~Scheduler::new_special(
916                 ~UvEventLoop::new(),
917                 special_queue.clone(),
918                 queues.clone(),
919                 sleepers.clone(),
920                 false,
921                 Some(friend_handle));
922
923             let special_handle = Cell::new(special_sched.make_handle());
924
925             let t1_handle = special_sched.make_handle();
926             let t4_handle = special_sched.make_handle();
927
928             // Four test tasks:
929             //   1) task is home on special
930             //   2) task not homed, sched doesn't care
931             //   3) task not homed, sched requeues
932             //   4) task not home, send home
933
934             let task1 = ~do Task::new_root_homed(&mut special_sched.stack_pool, None,
935                                                  Sched(t1_handle)) || {
936                 rtassert!(Task::on_appropriate_sched());
937             };
938             rtdebug!("task1 id: **%u**", borrow::to_uint(task1));
939
940             let task2 = ~do Task::new_root(&mut normal_sched.stack_pool, None) {
941                 rtassert!(Task::on_appropriate_sched());
942             };
943
944             let task3 = ~do Task::new_root(&mut normal_sched.stack_pool, None) {
945                 rtassert!(Task::on_appropriate_sched());
946             };
947
948             let task4 = ~do Task::new_root_homed(&mut special_sched.stack_pool, None,
949                                                  Sched(t4_handle)) {
950                 rtassert!(Task::on_appropriate_sched());
951             };
952             rtdebug!("task4 id: **%u**", borrow::to_uint(task4));
953
954             let task1 = Cell::new(task1);
955             let task2 = Cell::new(task2);
956             let task3 = Cell::new(task3);
957             let task4 = Cell::new(task4);
958
959             // Signal from the special task that we are done.
960             let (port, chan) = oneshot::<()>();
961             let port = Cell::new(port);
962             let chan = Cell::new(chan);
963
964             let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool, None) {
965                 rtdebug!("*about to submit task2*");
966                 Scheduler::run_task(task2.take());
967                 rtdebug!("*about to submit task4*");
968                 Scheduler::run_task(task4.take());
969                 rtdebug!("*normal_task done*");
970                 port.take().recv();
971                 let mut nh = normal_handle.take();
972                 nh.send(Shutdown);
973                 let mut sh = special_handle.take();
974                 sh.send(Shutdown);
975             };
976
977             rtdebug!("normal task: %u", borrow::to_uint(normal_task));
978
979             let special_task = ~do Task::new_root(&mut special_sched.stack_pool, None) {
980                 rtdebug!("*about to submit task1*");
981                 Scheduler::run_task(task1.take());
982                 rtdebug!("*about to submit task3*");
983                 Scheduler::run_task(task3.take());
984                 rtdebug!("*done with special_task*");
985                 chan.take().send(());
986             };
987
988             rtdebug!("special task: %u", borrow::to_uint(special_task));
989
990             let special_sched = Cell::new(special_sched);
991             let normal_sched = Cell::new(normal_sched);
992             let special_task = Cell::new(special_task);
993             let normal_task = Cell::new(normal_task);
994
995             let normal_thread = do Thread::start {
996                 normal_sched.take().bootstrap(normal_task.take());
997                 rtdebug!("finished with normal_thread");
998             };
999
1000             let special_thread = do Thread::start {
1001                 special_sched.take().bootstrap(special_task.take());
1002                 rtdebug!("finished with special_sched");
1003             };
1004
1005             normal_thread.join();
1006             special_thread.join();
1007         }
1008     }
1009
1010     #[test]
1011     fn test_stress_schedule_task_states() {
1012         if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
1013         let n = stress_factor() * 120;
1014         for _ in range(0, n as int) {
1015             test_schedule_home_states();
1016         }
1017     }
1018
1019     #[test]
1020     fn test_io_callback() {
1021         // This is a regression test that when there are no schedulable tasks
1022         // in the work queue, but we are performing I/O, that once we do put
1023         // something in the work queue again the scheduler picks it up and doesn't
1024         // exit before emptying the work queue
1025         do run_in_newsched_task {
1026             do spawntask {
1027                 let sched: ~Scheduler = Local::take();
1028                 do sched.deschedule_running_task_and_then |sched, task| {
1029                     let task = Cell::new(task);
1030                     do sched.event_loop.callback_ms(10) {
1031                         rtdebug!("in callback");
1032                         let mut sched: ~Scheduler = Local::take();
1033                         sched.enqueue_blocked_task(task.take());
1034                         Local::put(sched);
1035                     }
1036                 }
1037             }
1038         }
1039     }
1040
1041     #[test]
1042     fn handle() {
1043         use rt::comm::*;
1044
1045         do run_in_bare_thread {
1046             let (port, chan) = oneshot::<()>();
1047             let port = Cell::new(port);
1048             let chan = Cell::new(chan);
1049
1050             let thread_one = do Thread::start {
1051                 let chan = Cell::new(chan.take());
1052                 do run_in_newsched_task_core {
1053                     chan.take().send(());
1054                 }
1055             };
1056
1057             let thread_two = do Thread::start {
1058                 let port = Cell::new(port.take());
1059                 do run_in_newsched_task_core {
1060                     port.take().recv();
1061                 }
1062             };
1063
1064             thread_two.join();
1065             thread_one.join();
1066         }
1067     }
1068
1069     // A regression test that the final message is always handled.
1070     // Used to deadlock because Shutdown was never recvd.
1071     #[test]
1072     fn no_missed_messages() {
1073         use rt::work_queue::WorkQueue;
1074         use rt::sleeper_list::SleeperList;
1075         use rt::stack::StackPool;
1076         use rt::uv::uvio::UvEventLoop;
1077         use rt::sched::{Shutdown, TaskFromFriend};
1078         use util;
1079
1080         do run_in_bare_thread {
1081             do stress_factor().times {
1082                 let sleepers = SleeperList::new();
1083                 let queue = WorkQueue::new();
1084                 let queues = ~[queue.clone()];
1085
1086                 let mut sched = ~Scheduler::new(
1087                     ~UvEventLoop::new(),
1088                     queue,
1089                     queues.clone(),
1090                     sleepers.clone());
1091
1092                 let mut handle = sched.make_handle();
1093
1094                 let sched = Cell::new(sched);
1095
1096                 let thread = do Thread::start {
1097                     let mut sched = sched.take();
1098                     let bootstrap_task = ~Task::new_root(&mut sched.stack_pool, None, ||());
1099                     sched.bootstrap(bootstrap_task);
1100                 };
1101
1102                 let mut stack_pool = StackPool::new();
1103                 let task = ~Task::new_root(&mut stack_pool, None, ||());
1104                 handle.send(TaskFromFriend(task));
1105
1106                 handle.send(Shutdown);
1107                 util::ignore(handle);
1108
1109                 thread.join();
1110             }
1111         }
1112     }
1113
1114     #[test]
1115     fn multithreading() {
1116         use rt::comm::*;
1117         use num::Times;
1118         use vec::OwnedVector;
1119         use container::Container;
1120
1121         do run_in_mt_newsched_task {
1122             let mut ports = ~[];
1123             do 10.times {
1124                 let (port, chan) = oneshot();
1125                 let chan_cell = Cell::new(chan);
1126                 do spawntask_later {
1127                     chan_cell.take().send(());
1128                 }
1129                 ports.push(port);
1130             }
1131
1132             while !ports.is_empty() {
1133                 ports.pop().recv();
1134             }
1135         }
1136     }
1137
1138      #[test]
1139     fn thread_ring() {
1140         use rt::comm::*;
1141         use comm::{GenericPort, GenericChan};
1142
1143         do run_in_mt_newsched_task {
1144                 let (end_port, end_chan) = oneshot();
1145
1146             let n_tasks = 10;
1147             let token = 2000;
1148
1149                 let (p, ch1) = stream();
1150             let mut p = p;
1151                 ch1.send((token, end_chan));
1152                 let mut i = 2;
1153             while i <= n_tasks {
1154                 let (next_p, ch) = stream();
1155                 let imm_i = i;
1156                 let imm_p = p;
1157                 do spawntask_random {
1158                     roundtrip(imm_i, n_tasks, &imm_p, &ch);
1159                 };
1160                 p = next_p;
1161                 i += 1;
1162             }
1163             let imm_p = p;
1164             let imm_ch = ch1;
1165             do spawntask_random {
1166                 roundtrip(1, n_tasks, &imm_p, &imm_ch);
1167             }
1168
1169             end_port.recv();
1170         }
1171
1172         fn roundtrip(id: int, n_tasks: int,
1173                      p: &Port<(int, ChanOne<()>)>, ch: &Chan<(int, ChanOne<()>)>) {
1174             while (true) {
1175                 match p.recv() {
1176                     (1, end_chan) => {
1177                                         debug!("%d\n", id);
1178                                 end_chan.send(());
1179                                 return;
1180                     }
1181                     (token, end_chan) => {
1182                         debug!("thread: %d   got token: %d", id, token);
1183                         ch.send((token - 1, end_chan));
1184                         if token <= n_tasks {
1185                             return;
1186                         }
1187                     }
1188                 }
1189             }
1190         }
1191     }
1192
1193     #[test]
1194     fn start_closure_dtor() {
1195         use ops::Drop;
1196
1197         // Regression test that the `start` task entrypoint can
1198         // contain dtors that use task resources
1199         do run_in_newsched_task {
1200             struct S { field: () }
1201
1202             impl Drop for S {
1203                 fn drop(&self) {
1204                         let _foo = @0;
1205                 }
1206             }
1207
1208             let s = S { field: () };
1209
1210             do spawntask {
1211                         let _ss = &s;
1212             }
1213         }
1214     }
1215
1216 }