]> git.lizzy.rs Git - rust.git/blob - src/libgreen/sched.rs
auto merge of #15999 : Kimundi/rust/fix_folder, r=nikomatsakis
[rust.git] / src / libgreen / sched.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use std::mem;
12 use std::rt::local::Local;
13 use std::rt::mutex::NativeMutex;
14 use std::rt::rtio::{RemoteCallback, PausableIdleCallback, Callback, EventLoop};
15 use std::rt::task::BlockedTask;
16 use std::rt::task::Task;
17 use std::sync::deque;
18 use std::raw;
19
20 use std::rand::{XorShiftRng, Rng, Rand};
21
22 use TaskState;
23 use context::Context;
24 use coroutine::Coroutine;
25 use sleeper_list::SleeperList;
26 use stack::StackPool;
27 use task::{TypeSched, GreenTask, HomeSched, AnySched};
28 use msgq = message_queue;
29
30 /// A scheduler is responsible for coordinating the execution of Tasks
31 /// on a single thread. The scheduler runs inside a slightly modified
32 /// Rust Task. When not running this task is stored in the scheduler
33 /// struct. The scheduler struct acts like a baton, all scheduling
34 /// actions are transfers of the baton.
35 ///
36 /// FIXME: This creates too many callbacks to run_sched_once, resulting
37 /// in too much allocation and too many events.
38 pub struct Scheduler {
39     /// ID number of the pool that this scheduler is a member of. When
40     /// reawakening green tasks, this is used to ensure that tasks aren't
41     /// reawoken on the wrong pool of schedulers.
42     pub pool_id: uint,
43     /// The pool of stacks that this scheduler has cached
44     pub stack_pool: StackPool,
45     /// Bookkeeping for the number of tasks which are currently running around
46     /// inside this pool of schedulers
47     pub task_state: TaskState,
48     /// There are N work queues, one per scheduler.
49     work_queue: deque::Worker<Box<GreenTask>>,
50     /// Work queues for the other schedulers. These are created by
51     /// cloning the core work queues.
52     work_queues: Vec<deque::Stealer<Box<GreenTask>>>,
53     /// The queue of incoming messages from other schedulers.
54     /// These are enqueued by SchedHandles after which a remote callback
55     /// is triggered to handle the message.
56     message_queue: msgq::Consumer<SchedMessage>,
57     /// Producer used to clone sched handles from
58     message_producer: msgq::Producer<SchedMessage>,
59     /// A shared list of sleeping schedulers. We'll use this to wake
60     /// up schedulers when pushing work onto the work queue.
61     sleeper_list: SleeperList,
62     /// Indicates that we have previously pushed a handle onto the
63     /// SleeperList but have not yet received the Wake message.
64     /// Being `true` does not necessarily mean that the scheduler is
65     /// not active since there are multiple event sources that may
66     /// wake the scheduler. It just prevents the scheduler from pushing
67     /// multiple handles onto the sleeper list.
68     sleepy: bool,
69     /// A flag to indicate we've received the shutdown message and should
70     /// no longer try to go to sleep, but exit instead.
71     no_sleep: bool,
72     /// The scheduler runs on a special task. When it is not running
73     /// it is stored here instead of the work queue.
74     sched_task: Option<Box<GreenTask>>,
75     /// An action performed after a context switch on behalf of the
76     /// code running before the context switch
77     cleanup_job: Option<CleanupJob>,
78     /// If the scheduler shouldn't run some tasks, a friend to send
79     /// them to.
80     friend_handle: Option<SchedHandle>,
81     /// Should this scheduler run any task, or only pinned tasks?
82     run_anything: bool,
83     /// A fast XorShift rng for scheduler use
84     rng: XorShiftRng,
85     /// A toggleable idle callback
86     idle_callback: Option<Box<PausableIdleCallback + Send>>,
87     /// A countdown that starts at a random value and is decremented
88     /// every time a yield check is performed. When it hits 0 a task
89     /// will yield.
90     yield_check_count: uint,
91     /// A flag to tell the scheduler loop it needs to do some stealing
92     /// in order to introduce randomness as part of a yield
93     steal_for_yield: bool,
94
95     // n.b. currently destructors of an object are run in top-to-bottom in order
96     //      of field declaration. Due to its nature, the pausable idle callback
97     //      must have some sort of handle to the event loop, so it needs to get
98     //      destroyed before the event loop itself. For this reason, we destroy
99     //      the event loop last to ensure that any unsafe references to it are
100     //      destroyed before it's actually destroyed.
101
102     /// The event loop used to drive the scheduler and perform I/O
103     pub event_loop: Box<EventLoop + Send>,
104 }
105
106 /// An indication of how hard to work on a given operation, the difference
107 /// mainly being whether memory is synchronized or not
108 #[deriving(PartialEq)]
109 enum EffortLevel {
110     DontTryTooHard,
111     GiveItYourBest
112 }
113
114 static MAX_YIELD_CHECKS: uint = 20000;
115
116 fn reset_yield_check(rng: &mut XorShiftRng) -> uint {
117     let r: uint = Rand::rand(rng);
118     r % MAX_YIELD_CHECKS + 1
119 }
120
121 impl Scheduler {
122
123     // * Initialization Functions
124
125     pub fn new(pool_id: uint,
126                event_loop: Box<EventLoop + Send>,
127                work_queue: deque::Worker<Box<GreenTask>>,
128                work_queues: Vec<deque::Stealer<Box<GreenTask>>>,
129                sleeper_list: SleeperList,
130                state: TaskState)
131         -> Scheduler {
132
133         Scheduler::new_special(pool_id, event_loop, work_queue, work_queues,
134                                sleeper_list, true, None, state)
135
136     }
137
138     pub fn new_special(pool_id: uint,
139                        event_loop: Box<EventLoop + Send>,
140                        work_queue: deque::Worker<Box<GreenTask>>,
141                        work_queues: Vec<deque::Stealer<Box<GreenTask>>>,
142                        sleeper_list: SleeperList,
143                        run_anything: bool,
144                        friend: Option<SchedHandle>,
145                        state: TaskState)
146         -> Scheduler {
147
148         let (consumer, producer) = msgq::queue();
149         let mut sched = Scheduler {
150             pool_id: pool_id,
151             sleeper_list: sleeper_list,
152             message_queue: consumer,
153             message_producer: producer,
154             sleepy: false,
155             no_sleep: false,
156             event_loop: event_loop,
157             work_queue: work_queue,
158             work_queues: work_queues,
159             stack_pool: StackPool::new(),
160             sched_task: None,
161             cleanup_job: None,
162             run_anything: run_anything,
163             friend_handle: friend,
164             rng: new_sched_rng(),
165             idle_callback: None,
166             yield_check_count: 0,
167             steal_for_yield: false,
168             task_state: state,
169         };
170
171         sched.yield_check_count = reset_yield_check(&mut sched.rng);
172
173         return sched;
174     }
175
176     // FIXME: This may eventually need to be refactored so that
177     // the scheduler itself doesn't have to call event_loop.run.
178     // That will be important for embedding the runtime into external
179     // event loops.
180
181     // Take a main task to run, and a scheduler to run it in. Create a
182     // scheduler task and bootstrap into it.
183     pub fn bootstrap(mut self: Box<Scheduler>) {
184
185         // Build an Idle callback.
186         let cb = box SchedRunner as Box<Callback + Send>;
187         self.idle_callback = Some(self.event_loop.pausable_idle_callback(cb));
188
189         // Create a task for the scheduler with an empty context.
190         let sched_task = GreenTask::new_typed(Some(Coroutine::empty()),
191                                               TypeSched);
192
193         // Before starting our first task, make sure the idle callback
194         // is active. As we do not start in the sleep state this is
195         // important.
196         self.idle_callback.get_mut_ref().resume();
197
198         // Now, as far as all the scheduler state is concerned, we are inside
199         // the "scheduler" context. The scheduler immediately hands over control
200         // to the event loop, and this will only exit once the event loop no
201         // longer has any references (handles or I/O objects).
202         rtdebug!("starting scheduler {}", self.sched_id());
203         let mut sched_task = self.run(sched_task);
204
205         // Close the idle callback.
206         let mut sched = sched_task.sched.take_unwrap();
207         sched.idle_callback.take();
208         // Make one go through the loop to run the close callback.
209         let mut stask = sched.run(sched_task);
210
211         // Now that we are done with the scheduler, clean up the
212         // scheduler task. Do so by removing it from TLS and manually
213         // cleaning up the memory it uses. As we didn't actually call
214         // task.run() on the scheduler task we never get through all
215         // the cleanup code it runs.
216         rtdebug!("stopping scheduler {}", stask.sched.get_ref().sched_id());
217
218         // Should not have any messages
219         let message = stask.sched.get_mut_ref().message_queue.pop();
220         rtassert!(match message { msgq::Empty => true, _ => false });
221
222         stask.task.take().unwrap().drop();
223     }
224
225     // This does not return a scheduler, as the scheduler is placed
226     // inside the task.
227     pub fn run(mut self: Box<Scheduler>, stask: Box<GreenTask>)
228                -> Box<GreenTask> {
229
230         // This is unsafe because we need to place the scheduler, with
231         // the event_loop inside, inside our task. But we still need a
232         // mutable reference to the event_loop to give it the "run"
233         // command.
234         unsafe {
235             let event_loop: *mut Box<EventLoop + Send> = &mut self.event_loop;
236             // Our scheduler must be in the task before the event loop
237             // is started.
238             stask.put_with_sched(self);
239             (*event_loop).run();
240         }
241
242         //  This is a serious code smell, but this function could be done away
243         //  with if necessary. The ownership of `stask` was transferred into
244         //  local storage just before the event loop ran, so it is possible to
245         //  transmute `stask` as a uint across the running of the event loop to
246         //  re-acquire ownership here.
247         //
248         // This would involve removing the Task from TLS, removing the runtime,
249         // forgetting the runtime, and then putting the task into `stask`. For
250         // now, because we have `GreenTask::convert`, I chose to take this
251         // method for cleanliness. This function is *not* a fundamental reason
252         // why this function should exist.
253         GreenTask::convert(Local::take())
254     }
255
256     // * Execution Functions - Core Loop Logic
257
258     // This function is run from the idle callback on the uv loop, indicating
259     // that there are no I/O events pending. When this function returns, we will
260     // fall back to epoll() in the uv event loop, waiting for more things to
261     // happen. We may come right back off epoll() if the idle callback is still
262     // active, in which case we're truly just polling to see if I/O events are
263     // complete.
264     //
265     // The model for this function is to execute as much work as possible while
266     // still fairly considering I/O tasks. Falling back to epoll() frequently is
267     // often quite expensive, so we attempt to avoid it as much as possible. If
268     // we have any active I/O on the event loop, then we're forced to fall back
269     // to epoll() in order to provide fairness, but as long as we're doing work
270     // and there's no active I/O, we can continue to do work.
271     //
272     // If we try really hard to do some work, but no work is available to be
273     // done, then we fall back to epoll() to block this thread waiting for more
274     // work (instead of busy waiting).
275     fn run_sched_once(mut self: Box<Scheduler>, stask: Box<GreenTask>) {
276         // Make sure that we're not lying in that the `stask` argument is indeed
277         // the scheduler task for this scheduler.
278         assert!(self.sched_task.is_none());
279
280         // Assume that we need to continue idling unless we reach the
281         // end of this function without performing an action.
282         self.idle_callback.get_mut_ref().resume();
283
284         // First we check for scheduler messages, these are higher
285         // priority than regular tasks.
286         let (mut sched, mut stask, mut did_work) =
287             self.interpret_message_queue(stask, DontTryTooHard);
288
289         // After processing a message, we consider doing some more work on the
290         // event loop. The "keep going" condition changes after the first
291         // iteration because we don't want to spin here infinitely.
292         //
293         // Once we start doing work we can keep doing work so long as the
294         // iteration does something. Note that we don't want to starve the
295         // message queue here, so each iteration when we're done working we
296         // check the message queue regardless of whether we did work or not.
297         let mut keep_going = !did_work || !sched.event_loop.has_active_io();
298         while keep_going {
299             let (a, b, c) = match sched.do_work(stask) {
300                 (sched, task, false) => {
301                     sched.interpret_message_queue(task, GiveItYourBest)
302                 }
303                 (sched, task, true) => {
304                     let (sched, task, _) =
305                         sched.interpret_message_queue(task, GiveItYourBest);
306                     (sched, task, true)
307                 }
308             };
309             sched = a;
310             stask = b;
311             did_work = c;
312
313             // We only keep going if we managed to do something productive and
314             // also don't have any active I/O. If we didn't do anything, we
315             // should consider going to sleep, and if we have active I/O we need
316             // to poll for completion.
317             keep_going = did_work && !sched.event_loop.has_active_io();
318         }
319
320         // If we ever did some work, then we shouldn't put our scheduler
321         // entirely to sleep just yet. Leave the idle callback active and fall
322         // back to epoll() to see what's going on.
323         if did_work {
324             return stask.put_with_sched(sched);
325         }
326
327         // If we got here then there was no work to do.
328         // Generate a SchedHandle and push it to the sleeper list so
329         // somebody can wake us up later.
330         if !sched.sleepy && !sched.no_sleep {
331             rtdebug!("scheduler has no work to do, going to sleep");
332             sched.sleepy = true;
333             let handle = sched.make_handle();
334             sched.sleeper_list.push(handle);
335             // Since we are sleeping, deactivate the idle callback.
336             sched.idle_callback.get_mut_ref().pause();
337         } else {
338             rtdebug!("not sleeping, already doing so or no_sleep set");
339             // We may not be sleeping, but we still need to deactivate
340             // the idle callback.
341             sched.idle_callback.get_mut_ref().pause();
342         }
343
344         // Finished a cycle without using the Scheduler. Place it back
345         // in TLS.
346         stask.put_with_sched(sched);
347     }
348
349     // This function returns None if the scheduler is "used", or it
350     // returns the still-available scheduler. At this point all
351     // message-handling will count as a turn of work, and as a result
352     // return None.
353     fn interpret_message_queue(mut self: Box<Scheduler>,
354                                stask: Box<GreenTask>,
355                                effort: EffortLevel)
356                                -> (Box<Scheduler>, Box<GreenTask>, bool) {
357         let msg = if effort == DontTryTooHard {
358             self.message_queue.casual_pop()
359         } else {
360             // When popping our message queue, we could see an "inconsistent"
361             // state which means that we *should* be able to pop data, but we
362             // are unable to at this time. Our options are:
363             //
364             //  1. Spin waiting for data
365             //  2. Ignore this and pretend we didn't find a message
366             //
367             // If we choose route 1, then if the pusher in question is currently
368             // pre-empted, we're going to take up our entire time slice just
369             // spinning on this queue. If we choose route 2, then the pusher in
370             // question is still guaranteed to make a send() on its async
371             // handle, so we will guaranteed wake up and see its message at some
372             // point.
373             //
374             // I have chosen to take route #2.
375             match self.message_queue.pop() {
376                 msgq::Data(t) => Some(t),
377                 msgq::Empty | msgq::Inconsistent => None
378             }
379         };
380
381         match msg {
382             Some(PinnedTask(task)) => {
383                 let mut task = task;
384                 task.give_home(HomeSched(self.make_handle()));
385                 let (sched, task) = self.resume_task_immediately(stask, task);
386                 (sched, task, true)
387             }
388             Some(TaskFromFriend(task)) => {
389                 rtdebug!("got a task from a friend. lovely!");
390                 let (sched, task) =
391                     self.process_task(stask, task,
392                                       Scheduler::resume_task_immediately_cl);
393                 (sched, task, true)
394             }
395             Some(RunOnce(task)) => {
396                 // bypass the process_task logic to force running this task once
397                 // on this home scheduler. This is often used for I/O (homing).
398                 let (sched, task) = self.resume_task_immediately(stask, task);
399                 (sched, task, true)
400             }
401             Some(Wake) => {
402                 self.sleepy = false;
403                 (self, stask, true)
404             }
405             Some(Shutdown) => {
406                 rtdebug!("shutting down");
407                 if self.sleepy {
408                     // There may be an outstanding handle on the
409                     // sleeper list.  Pop them all to make sure that's
410                     // not the case.
411                     loop {
412                         match self.sleeper_list.pop() {
413                             Some(handle) => {
414                                 let mut handle = handle;
415                                 handle.send(Wake);
416                             }
417                             None => break
418                         }
419                     }
420                 }
421                 // No more sleeping. After there are no outstanding
422                 // event loop references we will shut down.
423                 self.no_sleep = true;
424                 self.sleepy = false;
425                 (self, stask, true)
426             }
427             Some(NewNeighbor(neighbor)) => {
428                 self.work_queues.push(neighbor);
429                 (self, stask, false)
430             }
431             None => (self, stask, false)
432         }
433     }
434
435     fn do_work(mut self: Box<Scheduler>, stask: Box<GreenTask>)
436                -> (Box<Scheduler>, Box<GreenTask>, bool) {
437         rtdebug!("scheduler calling do work");
438         match self.find_work() {
439             Some(task) => {
440                 rtdebug!("found some work! running the task");
441                 let (sched, task) =
442                     self.process_task(stask, task,
443                                       Scheduler::resume_task_immediately_cl);
444                 (sched, task, true)
445             }
446             None => {
447                 rtdebug!("no work was found, returning the scheduler struct");
448                 (self, stask, false)
449             }
450         }
451     }
452
453     // Workstealing: In this iteration of the runtime each scheduler
454     // thread has a distinct work queue. When no work is available
455     // locally, make a few attempts to steal work from the queues of
456     // other scheduler threads. If a few steals fail we end up in the
457     // old "no work" path which is fine.
458
459     // First step in the process is to find a task. This function does
460     // that by first checking the local queue, and if there is no work
461     // there, trying to steal from the remote work queues.
462     fn find_work(&mut self) -> Option<Box<GreenTask>> {
463         rtdebug!("scheduler looking for work");
464         if !self.steal_for_yield {
465             match self.work_queue.pop() {
466                 Some(task) => {
467                     rtdebug!("found a task locally");
468                     return Some(task)
469                 }
470                 None => {
471                     rtdebug!("scheduler trying to steal");
472                     return self.try_steals();
473                 }
474             }
475         } else {
476             // During execution of the last task, it performed a 'yield',
477             // so we're doing some work stealing in order to introduce some
478             // scheduling randomness. Otherwise we would just end up popping
479             // that same task again. This is pretty lame and is to work around
480             // the problem that work stealing is not designed for 'non-strict'
481             // (non-fork-join) task parallelism.
482             self.steal_for_yield = false;
483             match self.try_steals() {
484                 Some(task) => {
485                     rtdebug!("stole a task after yielding");
486                     return Some(task);
487                 }
488                 None => {
489                     rtdebug!("did not steal a task after yielding");
490                     // Back to business
491                     return self.find_work();
492                 }
493             }
494         }
495     }
496
497     // Try stealing from all queues the scheduler knows about. This
498     // naive implementation can steal from our own queue or from other
499     // special schedulers.
500     fn try_steals(&mut self) -> Option<Box<GreenTask>> {
501         let work_queues = &mut self.work_queues;
502         let len = work_queues.len();
503         let start_index = self.rng.gen_range(0, len);
504         for index in range(0, len).map(|i| (i + start_index) % len) {
505             match work_queues.get_mut(index).steal() {
506                 deque::Data(task) => {
507                     rtdebug!("found task by stealing");
508                     return Some(task)
509                 }
510                 _ => ()
511             }
512         };
513         rtdebug!("giving up on stealing");
514         return None;
515     }
516
517     // * Task Routing Functions - Make sure tasks send up in the right
518     // place.
519
520     fn process_task(mut self: Box<Scheduler>,
521                     cur: Box<GreenTask>,
522                     mut next: Box<GreenTask>,
523                     schedule_fn: SchedulingFn)
524                     -> (Box<Scheduler>, Box<GreenTask>) {
525         rtdebug!("processing a task");
526
527         match next.take_unwrap_home() {
528             HomeSched(home_handle) => {
529                 if home_handle.sched_id != self.sched_id() {
530                     rtdebug!("sending task home");
531                     next.give_home(HomeSched(home_handle));
532                     Scheduler::send_task_home(next);
533                     (self, cur)
534                 } else {
535                     rtdebug!("running task here");
536                     next.give_home(HomeSched(home_handle));
537                     schedule_fn(self, cur, next)
538                 }
539             }
540             AnySched if self.run_anything => {
541                 rtdebug!("running anysched task here");
542                 next.give_home(AnySched);
543                 schedule_fn(self, cur, next)
544             }
545             AnySched => {
546                 rtdebug!("sending task to friend");
547                 next.give_home(AnySched);
548                 self.send_to_friend(next);
549                 (self, cur)
550             }
551         }
552     }
553
554     fn send_task_home(task: Box<GreenTask>) {
555         let mut task = task;
556         match task.take_unwrap_home() {
557             HomeSched(mut home_handle) => home_handle.send(PinnedTask(task)),
558             AnySched => rtabort!("error: cannot send anysched task home"),
559         }
560     }
561
562     /// Take a non-homed task we aren't allowed to run here and send
563     /// it to the designated friend scheduler to execute.
564     fn send_to_friend(&mut self, task: Box<GreenTask>) {
565         rtdebug!("sending a task to friend");
566         match self.friend_handle {
567             Some(ref mut handle) => {
568                 handle.send(TaskFromFriend(task));
569             }
570             None => {
571                 rtabort!("tried to send task to a friend but scheduler has no friends");
572             }
573         }
574     }
575
576     /// Schedule a task to be executed later.
577     ///
578     /// Pushes the task onto the work stealing queue and tells the
579     /// event loop to run it later. Always use this instead of pushing
580     /// to the work queue directly.
581     pub fn enqueue_task(&mut self, task: Box<GreenTask>) {
582
583         // We push the task onto our local queue clone.
584         assert!(!task.is_sched());
585         self.work_queue.push(task);
586         match self.idle_callback {
587             Some(ref mut idle) => idle.resume(),
588             None => {} // allow enqueuing before the scheduler starts
589         }
590
591         // We've made work available. Notify a
592         // sleeping scheduler.
593
594         match self.sleeper_list.casual_pop() {
595             Some(handle) => {
596                 let mut handle = handle;
597                 handle.send(Wake)
598             }
599             None => { (/* pass */) }
600         };
601     }
602
603     // * Core Context Switching Functions
604
605     // The primary function for changing contexts. In the current
606     // design the scheduler is just a slightly modified GreenTask, so
607     // all context swaps are from GreenTask to GreenTask. The only difference
608     // between the various cases is where the inputs come from, and
609     // what is done with the resulting task. That is specified by the
610     // cleanup function f, which takes the scheduler and the
611     // old task as inputs.
612
613     pub fn change_task_context(mut self: Box<Scheduler>,
614                                mut current_task: Box<GreenTask>,
615                                mut next_task: Box<GreenTask>,
616                                f: |&mut Scheduler, Box<GreenTask>|)
617                                -> Box<GreenTask> {
618         let f_opaque = ClosureConverter::from_fn(f);
619
620         let current_task_dupe = &mut *current_task as *mut GreenTask;
621
622         // The current task is placed inside an enum with the cleanup
623         // function. This enum is then placed inside the scheduler.
624         self.cleanup_job = Some(CleanupJob::new(current_task, f_opaque));
625
626         // The scheduler is then placed inside the next task.
627         next_task.sched = Some(self);
628
629         // However we still need an internal mutable pointer to the
630         // original task. The strategy here was "arrange memory, then
631         // get pointers", so we crawl back up the chain using
632         // transmute to eliminate borrowck errors.
633         unsafe {
634
635             let sched: &mut Scheduler =
636                 mem::transmute(&**next_task.sched.get_mut_ref());
637
638             let current_task: &mut GreenTask = match sched.cleanup_job {
639                 Some(CleanupJob { task: ref mut task, .. }) => &mut **task,
640                 None => rtabort!("no cleanup job")
641             };
642
643             let (current_task_context, next_task_context) =
644                 Scheduler::get_contexts(current_task, &mut *next_task);
645
646             // Done with everything - put the next task in TLS. This
647             // works because due to transmute the borrow checker
648             // believes that we have no internal pointers to
649             // next_task.
650             mem::forget(next_task);
651
652             // The raw context swap operation. The next action taken
653             // will be running the cleanup job from the context of the
654             // next task.
655             Context::swap(current_task_context, next_task_context);
656         }
657
658         // When the context swaps back to this task we immediately
659         // run the cleanup job, as expected by the previously called
660         // swap_contexts function.
661         let mut current_task: Box<GreenTask> = unsafe {
662             mem::transmute(current_task_dupe)
663         };
664         current_task.sched.get_mut_ref().run_cleanup_job();
665
666         // See the comments in switch_running_tasks_and_then for why a lock
667         // is acquired here. This is the resumption points and the "bounce"
668         // that it is referring to.
669         unsafe {
670             let _guard = current_task.nasty_deschedule_lock.lock();
671         }
672         return current_task;
673     }
674
675     // Returns a mutable reference to both contexts involved in this
676     // swap. This is unsafe - we are getting mutable internal
677     // references to keep even when we don't own the tasks. It looks
678     // kinda safe because we are doing transmutes before passing in
679     // the arguments.
680     pub fn get_contexts<'a>(current_task: &mut GreenTask,
681                             next_task: &mut GreenTask)
682         -> (&'a mut Context, &'a mut Context)
683     {
684         let current_task_context =
685             &mut current_task.coroutine.get_mut_ref().saved_context;
686         let next_task_context =
687                 &mut next_task.coroutine.get_mut_ref().saved_context;
688         unsafe {
689             (mem::transmute(current_task_context),
690              mem::transmute(next_task_context))
691         }
692     }
693
694     // * Context Swapping Helpers - Here be ugliness!
695
696     pub fn resume_task_immediately(self: Box<Scheduler>,
697                                    cur: Box<GreenTask>,
698                                    next: Box<GreenTask>)
699                                    -> (Box<Scheduler>, Box<GreenTask>) {
700         assert!(cur.is_sched());
701         let mut cur = self.change_task_context(cur, next, |sched, stask| {
702             assert!(sched.sched_task.is_none());
703             sched.sched_task = Some(stask);
704         });
705         (cur.sched.take_unwrap(), cur)
706     }
707
708     fn resume_task_immediately_cl(sched: Box<Scheduler>,
709                                   cur: Box<GreenTask>,
710                                   next: Box<GreenTask>)
711                                   -> (Box<Scheduler>, Box<GreenTask>) {
712         sched.resume_task_immediately(cur, next)
713     }
714
715     /// Block a running task, context switch to the scheduler, then pass the
716     /// blocked task to a closure.
717     ///
718     /// # Safety note
719     ///
720     /// The closure here is a *stack* closure that lives in the
721     /// running task.  It gets transmuted to the scheduler's lifetime
722     /// and called while the task is blocked.
723     ///
724     /// This passes a Scheduler pointer to the fn after the context switch
725     /// in order to prevent that fn from performing further scheduling operations.
726     /// Doing further scheduling could easily result in infinite recursion.
727     ///
728     /// Note that if the closure provided relinquishes ownership of the
729     /// BlockedTask, then it is possible for the task to resume execution before
730     /// the closure has finished executing. This would naturally introduce a
731     /// race if the closure and task shared portions of the environment.
732     ///
733     /// This situation is currently prevented, or in other words it is
734     /// guaranteed that this function will not return before the given closure
735     /// has returned.
736     pub fn deschedule_running_task_and_then(mut self: Box<Scheduler>,
737                                             cur: Box<GreenTask>,
738                                             f: |&mut Scheduler, BlockedTask|) {
739         // Trickier - we need to get the scheduler task out of self
740         // and use it as the destination.
741         let stask = self.sched_task.take_unwrap();
742         // Otherwise this is the same as below.
743         self.switch_running_tasks_and_then(cur, stask, f)
744     }
745
746     pub fn switch_running_tasks_and_then(self: Box<Scheduler>,
747                                          cur: Box<GreenTask>,
748                                          next: Box<GreenTask>,
749                                          f: |&mut Scheduler, BlockedTask|) {
750         // And here comes one of the sad moments in which a lock is used in a
751         // core portion of the rust runtime. As always, this is highly
752         // undesirable, so there's a good reason behind it.
753         //
754         // There is an excellent outline of the problem in issue #8132, and it's
755         // summarized in that `f` is executed on a sched task, but its
756         // environment is on the previous task. If `f` relinquishes ownership of
757         // the BlockedTask, then it may introduce a race where `f` is using the
758         // environment as well as the code after the 'deschedule' block.
759         //
760         // The solution we have chosen to adopt for now is to acquire a
761         // task-local lock around this block. The resumption of the task in
762         // context switching will bounce on the lock, thereby waiting for this
763         // block to finish, eliminating the race mentioned above.
764         // fail!("should never return!");
765         //
766         // To actually maintain a handle to the lock, we use an unsafe pointer
767         // to it, but we're guaranteed that the task won't exit until we've
768         // unlocked the lock so there's no worry of this memory going away.
769         let cur = self.change_task_context(cur, next, |sched, mut task| {
770             let lock: *mut NativeMutex = &mut task.nasty_deschedule_lock;
771             unsafe {
772                 let _guard = (*lock).lock();
773                 f(sched, BlockedTask::block(task.swap()));
774             }
775         });
776         cur.put();
777     }
778
779     fn switch_task(sched: Box<Scheduler>,
780                    cur: Box<GreenTask>,
781                    next: Box<GreenTask>)
782                    -> (Box<Scheduler>, Box<GreenTask>) {
783         let mut cur = sched.change_task_context(cur, next, |sched, last_task| {
784             if last_task.is_sched() {
785                 assert!(sched.sched_task.is_none());
786                 sched.sched_task = Some(last_task);
787             } else {
788                 sched.enqueue_task(last_task);
789             }
790         });
791         (cur.sched.take_unwrap(), cur)
792     }
793
794     // * Task Context Helpers
795
796     /// Called by a running task to end execution, after which it will
797     /// be recycled by the scheduler for reuse in a new task.
798     pub fn terminate_current_task(mut self: Box<Scheduler>,
799                                   cur: Box<GreenTask>)
800                                   -> ! {
801         // Similar to deschedule running task and then, but cannot go through
802         // the task-blocking path. The task is already dying.
803         let stask = self.sched_task.take_unwrap();
804         let _cur = self.change_task_context(cur, stask, |sched, mut dead_task| {
805             let coroutine = dead_task.coroutine.take_unwrap();
806             coroutine.recycle(&mut sched.stack_pool);
807             sched.task_state.decrement();
808         });
809         fail!("should never return!");
810     }
811
812     pub fn run_task(self: Box<Scheduler>,
813                     cur: Box<GreenTask>,
814                     next: Box<GreenTask>) {
815         let (sched, task) =
816             self.process_task(cur, next, Scheduler::switch_task);
817         task.put_with_sched(sched);
818     }
819
820     pub fn run_task_later(mut cur: Box<GreenTask>, next: Box<GreenTask>) {
821         let mut sched = cur.sched.take_unwrap();
822         sched.enqueue_task(next);
823         cur.put_with_sched(sched);
824     }
825
826     /// Yield control to the scheduler, executing another task. This is guaranteed
827     /// to introduce some amount of randomness to the scheduler. Currently the
828     /// randomness is a result of performing a round of work stealing (which
829     /// may end up stealing from the current scheduler).
830     pub fn yield_now(mut self: Box<Scheduler>, cur: Box<GreenTask>) {
831         // Async handles trigger the scheduler by calling yield_now on the local
832         // task, which eventually gets us to here. See comments in SchedRunner
833         // for more info on this.
834         if cur.is_sched() {
835             assert!(self.sched_task.is_none());
836             self.run_sched_once(cur);
837         } else {
838             self.yield_check_count = reset_yield_check(&mut self.rng);
839             // Tell the scheduler to start stealing on the next iteration
840             self.steal_for_yield = true;
841             let stask = self.sched_task.take_unwrap();
842             let cur = self.change_task_context(cur, stask, |sched, task| {
843                 sched.enqueue_task(task);
844             });
845             cur.put()
846         }
847     }
848
849     pub fn maybe_yield(mut self: Box<Scheduler>, cur: Box<GreenTask>) {
850         // It's possible for sched tasks to possibly call this function, and it
851         // just means that they're likely sending on channels (which
852         // occasionally call this function). Sched tasks follow different paths
853         // when executing yield_now(), which may possibly trip the assertion
854         // below. For this reason, we just have sched tasks bail out soon.
855         //
856         // Sched tasks have no need to yield anyway because as soon as they
857         // return they'll yield to other threads by falling back to the event
858         // loop. Additionally, we completely control sched tasks, so we can make
859         // sure that they never execute more than enough code.
860         if cur.is_sched() {
861             return cur.put_with_sched(self)
862         }
863
864         // The number of times to do the yield check before yielding, chosen
865         // arbitrarily.
866         rtassert!(self.yield_check_count > 0);
867         self.yield_check_count -= 1;
868         if self.yield_check_count == 0 {
869             self.yield_now(cur);
870         } else {
871             cur.put_with_sched(self);
872         }
873     }
874
875
876     // * Utility Functions
877
878     pub fn sched_id(&self) -> uint { self as *const Scheduler as uint }
879
880     pub fn run_cleanup_job(&mut self) {
881         let cleanup_job = self.cleanup_job.take_unwrap();
882         cleanup_job.run(self)
883     }
884
885     pub fn make_handle(&mut self) -> SchedHandle {
886         let remote = self.event_loop.remote_callback(box SchedRunner);
887
888         return SchedHandle {
889             remote: remote,
890             queue: self.message_producer.clone(),
891             sched_id: self.sched_id()
892         }
893     }
894 }
895
896 // Supporting types
897
898 type SchedulingFn = fn(Box<Scheduler>, Box<GreenTask>, Box<GreenTask>)
899                        -> (Box<Scheduler>, Box<GreenTask>);
900
901 pub enum SchedMessage {
902     Wake,
903     Shutdown,
904     NewNeighbor(deque::Stealer<Box<GreenTask>>),
905     PinnedTask(Box<GreenTask>),
906     TaskFromFriend(Box<GreenTask>),
907     RunOnce(Box<GreenTask>),
908 }
909
910 pub struct SchedHandle {
911     remote: Box<RemoteCallback + Send>,
912     queue: msgq::Producer<SchedMessage>,
913     pub sched_id: uint
914 }
915
916 impl SchedHandle {
917     pub fn send(&mut self, msg: SchedMessage) {
918         self.queue.push(msg);
919         self.remote.fire();
920     }
921 }
922
923 struct SchedRunner;
924
925 impl Callback for SchedRunner {
926     fn call(&mut self) {
927         // In theory, this function needs to invoke the `run_sched_once`
928         // function on the scheduler. Sadly, we have no context here, except for
929         // knowledge of the local `Task`. In order to avoid a call to
930         // `GreenTask::convert`, we just call `yield_now` and the scheduler will
931         // detect when a sched task performs a yield vs a green task performing
932         // a yield (and act accordingly).
933         //
934         // This function could be converted to `GreenTask::convert` if
935         // absolutely necessary, but for cleanliness it is much better to not
936         // use the conversion function.
937         let task: Box<Task> = Local::take();
938         task.yield_now();
939     }
940 }
941
942 struct CleanupJob {
943     task: Box<GreenTask>,
944     f: UnsafeTaskReceiver
945 }
946
947 impl CleanupJob {
948     pub fn new(task: Box<GreenTask>, f: UnsafeTaskReceiver) -> CleanupJob {
949         CleanupJob {
950             task: task,
951             f: f
952         }
953     }
954
955     pub fn run(self, sched: &mut Scheduler) {
956         let CleanupJob { task: task, f: f } = self;
957         f.to_fn()(sched, task)
958     }
959 }
960
961 // FIXME: Some hacks to put a || closure in Scheduler without borrowck
962 // complaining
963 type UnsafeTaskReceiver = raw::Closure;
964 trait ClosureConverter {
965     fn from_fn(|&mut Scheduler, Box<GreenTask>|) -> Self;
966     fn to_fn(self) -> |&mut Scheduler, Box<GreenTask>|:'static ;
967 }
968 impl ClosureConverter for UnsafeTaskReceiver {
969     fn from_fn(f: |&mut Scheduler, Box<GreenTask>|) -> UnsafeTaskReceiver {
970         unsafe { mem::transmute(f) }
971     }
972     fn to_fn(self) -> |&mut Scheduler, Box<GreenTask>|:'static {
973         unsafe { mem::transmute(self) }
974     }
975 }
976
977 // On unix, we read randomness straight from /dev/urandom, but the
978 // default constructor of an XorShiftRng does this via io::fs, which
979 // relies on the scheduler existing, so we have to manually load
980 // randomness. Windows has its own C API for this, so we don't need to
981 // worry there.
982 #[cfg(windows)]
983 fn new_sched_rng() -> XorShiftRng {
984     use std::rand::OsRng;
985     match OsRng::new() {
986         Ok(mut r) => r.gen(),
987         Err(e) => {
988             rtabort!("sched: failed to create seeded RNG: {}", e)
989         }
990     }
991 }
992 #[cfg(unix)]
993 fn new_sched_rng() -> XorShiftRng {
994     use libc;
995     use std::mem;
996     use std::rand::SeedableRng;
997
998     let fd = "/dev/urandom".with_c_str(|name| {
999         unsafe { libc::open(name, libc::O_RDONLY, 0) }
1000     });
1001     if fd == -1 {
1002         rtabort!("could not open /dev/urandom for reading.")
1003     }
1004
1005     let mut seeds = [0u32, .. 4];
1006     let size = mem::size_of_val(&seeds);
1007     loop {
1008         let nbytes = unsafe {
1009             libc::read(fd,
1010                        seeds.as_mut_ptr() as *mut libc::c_void,
1011                        size as libc::size_t)
1012         };
1013         rtassert!(nbytes as uint == size);
1014
1015         if !seeds.iter().all(|x| *x == 0) {
1016             break;
1017         }
1018     }
1019
1020     unsafe {libc::close(fd);}
1021
1022     SeedableRng::from_seed(seeds)
1023 }
1024
1025 #[cfg(test)]
1026 mod test {
1027     use rustuv;
1028
1029     use std::rt::task::TaskOpts;
1030     use std::rt::task::Task;
1031     use std::rt::local::Local;
1032
1033     use {TaskState, PoolConfig, SchedPool};
1034     use basic;
1035     use sched::{TaskFromFriend, PinnedTask};
1036     use task::{GreenTask, HomeSched, AnySched};
1037
1038     fn pool() -> SchedPool {
1039         SchedPool::new(PoolConfig {
1040             threads: 1,
1041             event_loop_factory: basic::event_loop,
1042         })
1043     }
1044
1045     fn run(f: proc():Send) {
1046         let mut pool = pool();
1047         pool.spawn(TaskOpts::new(), f);
1048         pool.shutdown();
1049     }
1050
1051     fn sched_id() -> uint {
1052         let mut task = Local::borrow(None::<Task>);
1053         match task.maybe_take_runtime::<GreenTask>() {
1054             Some(green) => {
1055                 let ret = green.sched.get_ref().sched_id();
1056                 task.put_runtime(green);
1057                 return ret;
1058             }
1059             None => fail!()
1060         }
1061     }
1062
1063     #[test]
1064     fn trivial_run_in_newsched_task_test() {
1065         let mut task_ran = false;
1066         let task_ran_ptr: *mut bool = &mut task_ran;
1067         run(proc() {
1068             unsafe { *task_ran_ptr = true };
1069             rtdebug!("executed from the new scheduler")
1070         });
1071         assert!(task_ran);
1072     }
1073
1074     #[test]
1075     fn multiple_task_test() {
1076         let total = 10;
1077         let mut task_run_count = 0;
1078         let task_run_count_ptr: *mut uint = &mut task_run_count;
1079         // with only one thread this is safe to run in without worries of
1080         // contention.
1081         run(proc() {
1082             for _ in range(0u, total) {
1083                 spawn(proc() {
1084                     unsafe { *task_run_count_ptr = *task_run_count_ptr + 1};
1085                 });
1086             }
1087         });
1088         assert!(task_run_count == total);
1089     }
1090
1091     #[test]
1092     fn multiple_task_nested_test() {
1093         let mut task_run_count = 0;
1094         let task_run_count_ptr: *mut uint = &mut task_run_count;
1095         run(proc() {
1096             spawn(proc() {
1097                 unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
1098                 spawn(proc() {
1099                     unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
1100                     spawn(proc() {
1101                         unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
1102                     })
1103                 })
1104             })
1105         });
1106         assert!(task_run_count == 3);
1107     }
1108
1109     // A very simple test that confirms that a task executing on the
1110     // home scheduler notices that it is home.
1111     #[test]
1112     fn test_home_sched() {
1113         let mut pool = pool();
1114
1115         let (dtx, drx) = channel();
1116         {
1117             let (tx, rx) = channel();
1118             let mut handle1 = pool.spawn_sched();
1119             let mut handle2 = pool.spawn_sched();
1120
1121             handle1.send(TaskFromFriend(pool.task(TaskOpts::new(), proc() {
1122                 tx.send(sched_id());
1123             })));
1124             let sched1_id = rx.recv();
1125
1126             let mut task = pool.task(TaskOpts::new(), proc() {
1127                 assert_eq!(sched_id(), sched1_id);
1128                 dtx.send(());
1129             });
1130             task.give_home(HomeSched(handle1));
1131             handle2.send(TaskFromFriend(task));
1132         }
1133         drx.recv();
1134
1135         pool.shutdown();
1136     }
1137
1138     // An advanced test that checks all four possible states that a
1139     // (task,sched) can be in regarding homes.
1140
1141     #[test]
1142     fn test_schedule_home_states() {
1143         use sleeper_list::SleeperList;
1144         use super::{Shutdown, Scheduler, SchedHandle};
1145         use std::rt::thread::Thread;
1146         use std::sync::deque::BufferPool;
1147
1148         Thread::start(proc() {
1149             let sleepers = SleeperList::new();
1150             let pool = BufferPool::new();
1151             let (normal_worker, normal_stealer) = pool.deque();
1152             let (special_worker, special_stealer) = pool.deque();
1153             let queues = vec![normal_stealer, special_stealer];
1154             let (_p, state) = TaskState::new();
1155
1156             // Our normal scheduler
1157             let mut normal_sched = box Scheduler::new(
1158                 1,
1159                 basic::event_loop(),
1160                 normal_worker,
1161                 queues.clone(),
1162                 sleepers.clone(),
1163                 state.clone());
1164
1165             let normal_handle = normal_sched.make_handle();
1166             let friend_handle = normal_sched.make_handle();
1167
1168             // Our special scheduler
1169             let mut special_sched = box Scheduler::new_special(
1170                 1,
1171                 basic::event_loop(),
1172                 special_worker,
1173                 queues.clone(),
1174                 sleepers.clone(),
1175                 false,
1176                 Some(friend_handle),
1177                 state);
1178
1179             let special_handle = special_sched.make_handle();
1180
1181             let t1_handle = special_sched.make_handle();
1182             let t4_handle = special_sched.make_handle();
1183
1184             // Four test tasks:
1185             //   1) task is home on special
1186             //   2) task not homed, sched doesn't care
1187             //   3) task not homed, sched requeues
1188             //   4) task not home, send home
1189
1190             // Grab both the scheduler and the task from TLS and check if the
1191             // task is executing on an appropriate scheduler.
1192             fn on_appropriate_sched() -> bool {
1193                 use task::{TypeGreen, TypeSched, HomeSched};
1194                 let task = GreenTask::convert(Local::take());
1195                 let sched_id = task.sched.get_ref().sched_id();
1196                 let run_any = task.sched.get_ref().run_anything;
1197                 let ret = match task.task_type {
1198                     TypeGreen(Some(AnySched)) => {
1199                         run_any
1200                     }
1201                     TypeGreen(Some(HomeSched(SchedHandle {
1202                         sched_id: ref id,
1203                         ..
1204                     }))) => {
1205                         *id == sched_id
1206                     }
1207                     TypeGreen(None) => { fail!("task without home"); }
1208                     TypeSched => { fail!("expected green task"); }
1209                 };
1210                 task.put();
1211                 ret
1212             }
1213
1214             let task1 = GreenTask::new_homed(&mut special_sched.stack_pool,
1215                                              None, HomeSched(t1_handle), proc() {
1216                 rtassert!(on_appropriate_sched());
1217             });
1218
1219             let task2 = GreenTask::new(&mut normal_sched.stack_pool, None, proc() {
1220                 rtassert!(on_appropriate_sched());
1221             });
1222
1223             let task3 = GreenTask::new(&mut normal_sched.stack_pool, None, proc() {
1224                 rtassert!(on_appropriate_sched());
1225             });
1226
1227             let task4 = GreenTask::new_homed(&mut special_sched.stack_pool,
1228                                              None, HomeSched(t4_handle), proc() {
1229                 rtassert!(on_appropriate_sched());
1230             });
1231
1232             // Signal from the special task that we are done.
1233             let (tx, rx) = channel::<()>();
1234
1235             fn run(next: Box<GreenTask>) {
1236                 let mut task = GreenTask::convert(Local::take());
1237                 let sched = task.sched.take_unwrap();
1238                 sched.run_task(task, next)
1239             }
1240
1241             let normal_task = GreenTask::new(&mut normal_sched.stack_pool, None, proc() {
1242                 run(task2);
1243                 run(task4);
1244                 rx.recv();
1245                 let mut nh = normal_handle;
1246                 nh.send(Shutdown);
1247                 let mut sh = special_handle;
1248                 sh.send(Shutdown);
1249             });
1250             normal_sched.enqueue_task(normal_task);
1251
1252             let special_task = GreenTask::new(&mut special_sched.stack_pool, None, proc() {
1253                 run(task1);
1254                 run(task3);
1255                 tx.send(());
1256             });
1257             special_sched.enqueue_task(special_task);
1258
1259             let normal_sched = normal_sched;
1260             let normal_thread = Thread::start(proc() { normal_sched.bootstrap() });
1261
1262             let special_sched = special_sched;
1263             let special_thread = Thread::start(proc() { special_sched.bootstrap() });
1264
1265             normal_thread.join();
1266             special_thread.join();
1267         }).join();
1268     }
1269
1270     //#[test]
1271     //fn test_stress_schedule_task_states() {
1272     //    if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
1273     //    let n = stress_factor() * 120;
1274     //    for _ in range(0, n as int) {
1275     //        test_schedule_home_states();
1276     //    }
1277     //}
1278
1279     #[test]
1280     fn test_io_callback() {
1281         use std::io::timer;
1282
1283         let mut pool = SchedPool::new(PoolConfig {
1284             threads: 2,
1285             event_loop_factory: rustuv::event_loop,
1286         });
1287
1288         // This is a regression test that when there are no schedulable tasks in
1289         // the work queue, but we are performing I/O, that once we do put
1290         // something in the work queue again the scheduler picks it up and
1291         // doesn't exit before emptying the work queue
1292         pool.spawn(TaskOpts::new(), proc() {
1293             spawn(proc() {
1294                 timer::sleep(10);
1295             });
1296         });
1297
1298         pool.shutdown();
1299     }
1300
1301     #[test]
1302     fn wakeup_across_scheds() {
1303         let (tx1, rx1) = channel();
1304         let (tx2, rx2) = channel();
1305
1306         let mut pool1 = pool();
1307         let mut pool2 = pool();
1308
1309         pool1.spawn(TaskOpts::new(), proc() {
1310             let id = sched_id();
1311             tx1.send(());
1312             rx2.recv();
1313             assert_eq!(id, sched_id());
1314         });
1315
1316         pool2.spawn(TaskOpts::new(), proc() {
1317             let id = sched_id();
1318             rx1.recv();
1319             assert_eq!(id, sched_id());
1320             tx2.send(());
1321         });
1322
1323         pool1.shutdown();
1324         pool2.shutdown();
1325     }
1326
1327     // A regression test that the final message is always handled.
1328     // Used to deadlock because Shutdown was never recvd.
1329     #[test]
1330     fn no_missed_messages() {
1331         let mut pool = pool();
1332
1333         let task = pool.task(TaskOpts::new(), proc()());
1334         pool.spawn_sched().send(TaskFromFriend(task));
1335
1336         pool.shutdown();
1337     }
1338
1339     #[test]
1340     fn multithreading() {
1341         run(proc() {
1342             let mut rxs = vec![];
1343             for _ in range(0u, 10) {
1344                 let (tx, rx) = channel();
1345                 spawn(proc() {
1346                     tx.send(());
1347                 });
1348                 rxs.push(rx);
1349             }
1350
1351             loop {
1352                 match rxs.pop() {
1353                     Some(rx) => rx.recv(),
1354                     None => break,
1355                 }
1356             }
1357         });
1358     }
1359
1360      #[test]
1361     fn thread_ring() {
1362         run(proc() {
1363             let (end_tx, end_rx) = channel();
1364
1365             let n_tasks = 10;
1366             let token = 2000;
1367
1368             let (tx1, mut rx) = channel();
1369             tx1.send((token, end_tx));
1370             let mut i = 2;
1371             while i <= n_tasks {
1372                 let (tx, next_rx) = channel();
1373                 let imm_i = i;
1374                 let imm_rx = rx;
1375                 spawn(proc() {
1376                     roundtrip(imm_i, n_tasks, &imm_rx, &tx);
1377                 });
1378                 rx = next_rx;
1379                 i += 1;
1380             }
1381             let rx = rx;
1382             spawn(proc() {
1383                 roundtrip(1, n_tasks, &rx, &tx1);
1384             });
1385
1386             end_rx.recv();
1387         });
1388
1389         fn roundtrip(id: int, n_tasks: int,
1390                      rx: &Receiver<(int, Sender<()>)>,
1391                      tx: &Sender<(int, Sender<()>)>) {
1392             loop {
1393                 match rx.recv() {
1394                     (1, end_tx) => {
1395                         debug!("{}\n", id);
1396                         end_tx.send(());
1397                         return;
1398                     }
1399                     (token, end_tx) => {
1400                         debug!("thread: {}   got token: {}", id, token);
1401                         tx.send((token - 1, end_tx));
1402                         if token <= n_tasks {
1403                             return;
1404                         }
1405                     }
1406                 }
1407             }
1408         }
1409     }
1410
1411     #[test]
1412     fn start_closure_dtor() {
1413         // Regression test that the `start` task entrypoint can
1414         // contain dtors that use task resources
1415         run(proc() {
1416             struct S { field: () }
1417
1418             impl Drop for S {
1419                 fn drop(&mut self) {
1420                     let _foo = box 0i;
1421                 }
1422             }
1423
1424             let s = S { field: () };
1425
1426             spawn(proc() {
1427                 let _ss = &s;
1428             });
1429         });
1430     }
1431
1432     #[test]
1433     fn dont_starve_1() {
1434         let mut pool = SchedPool::new(PoolConfig {
1435             threads: 2, // this must be > 1
1436             event_loop_factory: basic::event_loop,
1437         });
1438         pool.spawn(TaskOpts::new(), proc() {
1439             let (tx, rx) = channel();
1440
1441             // This task should not be able to starve the sender;
1442             // The sender should get stolen to another thread.
1443             spawn(proc() {
1444                 while rx.try_recv().is_err() { }
1445             });
1446
1447             tx.send(());
1448         });
1449         pool.shutdown();
1450     }
1451
1452     #[test]
1453     fn dont_starve_2() {
1454         run(proc() {
1455             let (tx1, rx1) = channel();
1456             let (tx2, _rx2) = channel();
1457
1458             // This task should not be able to starve the other task.
1459             // The sends should eventually yield.
1460             spawn(proc() {
1461                 while rx1.try_recv().is_err() {
1462                     tx2.send(());
1463                 }
1464             });
1465
1466             tx1.send(());
1467         });
1468     }
1469
1470     // Regression test for a logic bug that would cause single-threaded
1471     // schedulers to sleep forever after yielding and stealing another task.
1472     #[test]
1473     fn single_threaded_yield() {
1474         use std::task::deschedule;
1475         run(proc() {
1476             for _ in range(0u, 5) { deschedule(); }
1477         });
1478     }
1479
1480     #[test]
1481     fn test_spawn_sched_blocking() {
1482         use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
1483         static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
1484
1485         // Testing that a task in one scheduler can block in foreign code
1486         // without affecting other schedulers
1487         for _ in range(0u, 20) {
1488             let mut pool = pool();
1489             let (start_tx, start_rx) = channel();
1490             let (fin_tx, fin_rx) = channel();
1491
1492             let mut handle = pool.spawn_sched();
1493             handle.send(PinnedTask(pool.task(TaskOpts::new(), proc() {
1494                 unsafe {
1495                     let guard = LOCK.lock();
1496
1497                     start_tx.send(());
1498                     guard.wait();   // block the scheduler thread
1499                     guard.signal(); // let them know we have the lock
1500                 }
1501
1502                 fin_tx.send(());
1503             })));
1504             drop(handle);
1505
1506             let mut handle = pool.spawn_sched();
1507             handle.send(PinnedTask(pool.task(TaskOpts::new(), proc() {
1508                 // Wait until the other task has its lock
1509                 start_rx.recv();
1510
1511                 fn pingpong(po: &Receiver<int>, ch: &Sender<int>) {
1512                     let mut val = 20;
1513                     while val > 0 {
1514                         val = po.recv();
1515                         let _ = ch.send_opt(val - 1);
1516                     }
1517                 }
1518
1519                 let (setup_tx, setup_rx) = channel();
1520                 let (parent_tx, parent_rx) = channel();
1521                 spawn(proc() {
1522                     let (child_tx, child_rx) = channel();
1523                     setup_tx.send(child_tx);
1524                     pingpong(&child_rx, &parent_tx);
1525                 });
1526
1527                 let child_tx = setup_rx.recv();
1528                 child_tx.send(20);
1529                 pingpong(&parent_rx, &child_tx);
1530                 unsafe {
1531                     let guard = LOCK.lock();
1532                     guard.signal();   // wakeup waiting scheduler
1533                     guard.wait();     // wait for them to grab the lock
1534                 }
1535             })));
1536             drop(handle);
1537
1538             fin_rx.recv();
1539             pool.shutdown();
1540         }
1541         unsafe { LOCK.destroy(); }
1542     }
1543 }