]> git.lizzy.rs Git - rust.git/blob - src/libgreen/task.rs
Ignore tests broken by failing on ICE
[rust.git] / src / libgreen / task.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! The Green Task implementation
12 //!
13 //! This module contains the glue to the libstd runtime necessary to integrate
14 //! M:N scheduling. This GreenTask structure is hidden as a trait object in all
15 //! rust tasks and virtual calls are made in order to interface with it.
16 //!
17 //! Each green task contains a scheduler if it is currently running, and it also
18 //! contains the rust task itself in order to juggle around ownership of the
19 //! values.
20
21 use std::any::Any;
22 use std::cast;
23 use std::raw;
24 use std::rt::Runtime;
25 use std::rt::env;
26 use std::rt::local::Local;
27 use std::rt::rtio;
28 use std::rt::stack;
29 use std::rt::task::{Task, BlockedTask, SendMessage};
30 use std::task::TaskOpts;
31 use std::unstable::mutex::NativeMutex;
32
33 use context::Context;
34 use coroutine::Coroutine;
35 use sched::{Scheduler, SchedHandle, RunOnce};
36 use stack::StackPool;
37
38 /// The necessary fields needed to keep track of a green task (as opposed to a
39 /// 1:1 task).
40 pub struct GreenTask {
41     /// Coroutine that this task is running on, otherwise known as the register
42     /// context and the stack that this task owns. This field is optional to
43     /// relinquish ownership back to a scheduler to recycle stacks at a later
44     /// date.
45     pub coroutine: Option<Coroutine>,
46
47     /// Optional handle back into the home sched pool of this task. This field
48     /// is lazily initialized.
49     pub handle: Option<SchedHandle>,
50
51     /// Slot for maintaining ownership of a scheduler. If a task is running,
52     /// this value will be Some(sched) where the task is running on "sched".
53     pub sched: Option<~Scheduler>,
54
55     /// Temporary ownership slot of a std::rt::task::Task object. This is used
56     /// to squirrel that libstd task away while we're performing green task
57     /// operations.
58     pub task: Option<~Task>,
59
60     /// Dictates whether this is a sched task or a normal green task
61     pub task_type: TaskType,
62
63     /// Home pool that this task was spawned into. This field is lazily
64     /// initialized until when the task is initially scheduled, and is used to
65     /// make sure that tasks are always woken up in the correct pool of
66     /// schedulers.
67     pub pool_id: uint,
68
69     // See the comments in the scheduler about why this is necessary
70     pub nasty_deschedule_lock: NativeMutex,
71 }
72
73 pub enum TaskType {
74     TypeGreen(Option<Home>),
75     TypeSched,
76 }
77
78 pub enum Home {
79     AnySched,
80     HomeSched(SchedHandle),
81 }
82
83 /// Trampoline code for all new green tasks which are running around. This
84 /// function is passed through to Context::new as the initial rust landing pad
85 /// for all green tasks. This code is actually called after the initial context
86 /// switch onto a green thread.
87 ///
88 /// The first argument to this function is the `~GreenTask` pointer, and the
89 /// next two arguments are the user-provided procedure for running code.
90 ///
91 /// The goal for having this weird-looking function is to reduce the number of
92 /// allocations done on a green-task startup as much as possible.
93 extern fn bootstrap_green_task(task: uint, code: *(), env: *()) -> ! {
94     // Acquire ownership of the `proc()`
95     let start: proc() = unsafe {
96         cast::transmute(raw::Procedure { code: code, env: env })
97     };
98
99     // Acquire ownership of the `~GreenTask`
100     let mut task: ~GreenTask = unsafe { cast::transmute(task) };
101
102     // First code after swap to this new context. Run our cleanup job
103     task.pool_id = {
104         let sched = task.sched.get_mut_ref();
105         sched.run_cleanup_job();
106         sched.task_state.increment();
107         sched.pool_id
108     };
109
110     // Convert our green task to a libstd task and then execute the code
111     // requested. This is the "try/catch" block for this green task and
112     // is the wrapper for *all* code run in the task.
113     let mut start = Some(start);
114     let task = task.swap().run(|| start.take_unwrap()());
115
116     // Once the function has exited, it's time to run the termination
117     // routine. This means we need to context switch one more time but
118     // clean ourselves up on the other end. Since we have no way of
119     // preserving a handle to the GreenTask down to this point, this
120     // unfortunately must call `GreenTask::convert`. In order to avoid
121     // this we could add a `terminate` function to the `Runtime` trait
122     // in libstd, but that seems less appropriate since the coversion
123     // method exists.
124     GreenTask::convert(task).terminate()
125 }
126
127 impl GreenTask {
128     /// Creates a new green task which is not homed to any particular scheduler
129     /// and will not have any contained Task structure.
130     pub fn new(stack_pool: &mut StackPool,
131                stack_size: Option<uint>,
132                start: proc():Send) -> ~GreenTask {
133         GreenTask::new_homed(stack_pool, stack_size, AnySched, start)
134     }
135
136     /// Creates a new task (like `new`), but specifies the home for new task.
137     pub fn new_homed(stack_pool: &mut StackPool,
138                      stack_size: Option<uint>,
139                      home: Home,
140                      start: proc():Send) -> ~GreenTask {
141         // Allocate ourselves a GreenTask structure
142         let mut ops = GreenTask::new_typed(None, TypeGreen(Some(home)));
143
144         // Allocate a stack for us to run on
145         let stack_size = stack_size.unwrap_or_else(|| env::min_stack());
146         let mut stack = stack_pool.take_stack(stack_size);
147         let context = Context::new(bootstrap_green_task, ops.as_uint(), start,
148                                    &mut stack);
149
150         // Package everything up in a coroutine and return
151         ops.coroutine = Some(Coroutine {
152             current_stack_segment: stack,
153             saved_context: context,
154         });
155         return ops;
156     }
157
158     /// Creates a new green task with the specified coroutine and type, this is
159     /// useful when creating scheduler tasks.
160     pub fn new_typed(coroutine: Option<Coroutine>,
161                      task_type: TaskType) -> ~GreenTask {
162         box GreenTask {
163             pool_id: 0,
164             coroutine: coroutine,
165             task_type: task_type,
166             sched: None,
167             handle: None,
168             nasty_deschedule_lock: unsafe { NativeMutex::new() },
169             task: Some(box Task::new()),
170         }
171     }
172
173     /// Creates a new green task with the given configuration options for the
174     /// contained Task object. The given stack pool is also used to allocate a
175     /// new stack for this task.
176     pub fn configure(pool: &mut StackPool,
177                      opts: TaskOpts,
178                      f: proc():Send) -> ~GreenTask {
179         let TaskOpts {
180             notify_chan, name, stack_size,
181             stderr, stdout,
182         } = opts;
183
184         let mut green = GreenTask::new(pool, stack_size, f);
185         {
186             let task = green.task.get_mut_ref();
187             task.name = name;
188             task.stderr = stderr;
189             task.stdout = stdout;
190             match notify_chan {
191                 Some(chan) => {
192                     task.death.on_exit = Some(SendMessage(chan));
193                 }
194                 None => {}
195             }
196         }
197         return green;
198     }
199
200     /// Just like the `maybe_take_runtime` function, this function should *not*
201     /// exist. Usage of this function is _strongly_ discouraged. This is an
202     /// absolute last resort necessary for converting a libstd task to a green
203     /// task.
204     ///
205     /// This function will assert that the task is indeed a green task before
206     /// returning (and will kill the entire process if this is wrong).
207     pub fn convert(mut task: ~Task) -> ~GreenTask {
208         match task.maybe_take_runtime::<GreenTask>() {
209             Some(mut green) => {
210                 green.put_task(task);
211                 green
212             }
213             None => rtabort!("not a green task any more?"),
214         }
215     }
216
217     pub fn give_home(&mut self, new_home: Home) {
218         match self.task_type {
219             TypeGreen(ref mut home) => { *home = Some(new_home); }
220             TypeSched => rtabort!("type error: used SchedTask as GreenTask"),
221         }
222     }
223
224     pub fn take_unwrap_home(&mut self) -> Home {
225         match self.task_type {
226             TypeGreen(ref mut home) => home.take_unwrap(),
227             TypeSched => rtabort!("type error: used SchedTask as GreenTask"),
228         }
229     }
230
231     // New utility functions for homes.
232
233     pub fn is_home_no_tls(&self, sched: &Scheduler) -> bool {
234         match self.task_type {
235             TypeGreen(Some(AnySched)) => { false }
236             TypeGreen(Some(HomeSched(SchedHandle { sched_id: ref id, .. }))) => {
237                 *id == sched.sched_id()
238             }
239             TypeGreen(None) => { rtabort!("task without home"); }
240             TypeSched => {
241                 // Awe yea
242                 rtabort!("type error: expected: TypeGreen, found: TaskSched");
243             }
244         }
245     }
246
247     pub fn homed(&self) -> bool {
248         match self.task_type {
249             TypeGreen(Some(AnySched)) => { false }
250             TypeGreen(Some(HomeSched(SchedHandle { .. }))) => { true }
251             TypeGreen(None) => {
252                 rtabort!("task without home");
253             }
254             TypeSched => {
255                 rtabort!("type error: expected: TypeGreen, found: TaskSched");
256             }
257         }
258     }
259
260     pub fn is_sched(&self) -> bool {
261         match self.task_type {
262             TypeGreen(..) => false, TypeSched => true,
263         }
264     }
265
266     // Unsafe functions for transferring ownership of this GreenTask across
267     // context switches
268
269     pub fn as_uint(&self) -> uint {
270         self as *GreenTask as uint
271     }
272
273     pub unsafe fn from_uint(val: uint) -> ~GreenTask { cast::transmute(val) }
274
275     // Runtime glue functions and helpers
276
277     pub fn put_with_sched(mut ~self, sched: ~Scheduler) {
278         assert!(self.sched.is_none());
279         self.sched = Some(sched);
280         self.put();
281     }
282
283     pub fn put_task(&mut self, task: ~Task) {
284         assert!(self.task.is_none());
285         self.task = Some(task);
286     }
287
288     pub fn swap(mut ~self) -> ~Task {
289         let mut task = self.task.take_unwrap();
290         task.put_runtime(self);
291         return task;
292     }
293
294     pub fn put(~self) {
295         assert!(self.sched.is_some());
296         Local::put(self.swap());
297     }
298
299     fn terminate(mut ~self) -> ! {
300         let sched = self.sched.take_unwrap();
301         sched.terminate_current_task(self)
302     }
303
304     // This function is used to remotely wakeup this green task back on to its
305     // original pool of schedulers. In order to do so, each tasks arranges a
306     // SchedHandle upon descheduling to be available for sending itself back to
307     // the original pool.
308     //
309     // Note that there is an interesting transfer of ownership going on here. We
310     // must relinquish ownership of the green task, but then also send the task
311     // over the handle back to the original scheduler. In order to safely do
312     // this, we leverage the already-present "nasty descheduling lock". The
313     // reason for doing this is that each task will bounce on this lock after
314     // resuming after a context switch. By holding the lock over the enqueueing
315     // of the task, we're guaranteed that the SchedHandle's memory will be valid
316     // for this entire function.
317     //
318     // An alternative would include having incredibly cheaply cloneable handles,
319     // but right now a SchedHandle is something like 6 allocations, so it is
320     // *not* a cheap operation to clone a handle. Until the day comes that we
321     // need to optimize this, a lock should do just fine (it's completely
322     // uncontended except for when the task is rescheduled).
323     fn reawaken_remotely(mut ~self) {
324         unsafe {
325             let mtx = &mut self.nasty_deschedule_lock as *mut NativeMutex;
326             let handle = self.handle.get_mut_ref() as *mut SchedHandle;
327             let _guard = (*mtx).lock();
328             (*handle).send(RunOnce(self));
329         }
330     }
331 }
332
333 impl Runtime for GreenTask {
334     fn yield_now(mut ~self, cur_task: ~Task) {
335         self.put_task(cur_task);
336         let sched = self.sched.take_unwrap();
337         sched.yield_now(self);
338     }
339
340     fn maybe_yield(mut ~self, cur_task: ~Task) {
341         self.put_task(cur_task);
342         let sched = self.sched.take_unwrap();
343         sched.maybe_yield(self);
344     }
345
346     fn deschedule(mut ~self, times: uint, cur_task: ~Task,
347                   f: |BlockedTask| -> Result<(), BlockedTask>) {
348         self.put_task(cur_task);
349         let mut sched = self.sched.take_unwrap();
350
351         // In order for this task to be reawoken in all possible contexts, we
352         // may need a handle back in to the current scheduler. When we're woken
353         // up in anything other than the local scheduler pool, this handle is
354         // used to send this task back into the scheduler pool.
355         if self.handle.is_none() {
356             self.handle = Some(sched.make_handle());
357             self.pool_id = sched.pool_id;
358         }
359
360         // This code is pretty standard, except for the usage of
361         // `GreenTask::convert`. Right now if we use `reawaken` directly it will
362         // expect for there to be a task in local TLS, but that is not true for
363         // this deschedule block (because the scheduler must retain ownership of
364         // the task while the cleanup job is running). In order to get around
365         // this for now, we invoke the scheduler directly with the converted
366         // Task => GreenTask structure.
367         if times == 1 {
368             sched.deschedule_running_task_and_then(self, |sched, task| {
369                 match f(task) {
370                     Ok(()) => {}
371                     Err(t) => {
372                         t.wake().map(|t| {
373                             sched.enqueue_task(GreenTask::convert(t))
374                         });
375                     }
376                 }
377             });
378         } else {
379             sched.deschedule_running_task_and_then(self, |sched, task| {
380                 for task in task.make_selectable(times) {
381                     match f(task) {
382                         Ok(()) => {},
383                         Err(task) => {
384                             task.wake().map(|t| {
385                                 sched.enqueue_task(GreenTask::convert(t))
386                             });
387                             break
388                         }
389                     }
390                 }
391             });
392         }
393     }
394
395     fn reawaken(mut ~self, to_wake: ~Task) {
396         self.put_task(to_wake);
397         assert!(self.sched.is_none());
398
399         // Optimistically look for a local task, but if one's not available to
400         // inspect (in order to see if it's in the same sched pool as we are),
401         // then just use our remote wakeup routine and carry on!
402         let mut running_task: ~Task = match Local::try_take() {
403             Some(task) => task,
404             None => return self.reawaken_remotely()
405         };
406
407         // Waking up a green thread is a bit of a tricky situation. We have no
408         // guarantee about where the current task is running. The options we
409         // have for where this current task is running are:
410         //
411         //  1. Our original scheduler pool
412         //  2. Some other scheduler pool
413         //  3. Something that isn't a scheduler pool
414         //
415         // In order to figure out what case we're in, this is the reason that
416         // the `maybe_take_runtime` function exists. Using this function we can
417         // dynamically check to see which of these cases is the current
418         // situation and then dispatch accordingly.
419         //
420         // In case 1, we just use the local scheduler to resume ourselves
421         // immediately (if a rescheduling is possible).
422         //
423         // In case 2 and 3, we need to remotely reawaken ourself in order to be
424         // transplanted back to the correct scheduler pool.
425         match running_task.maybe_take_runtime::<GreenTask>() {
426             Some(mut running_green_task) => {
427                 running_green_task.put_task(running_task);
428                 let sched = running_green_task.sched.take_unwrap();
429
430                 if sched.pool_id == self.pool_id {
431                     sched.run_task(running_green_task, self);
432                 } else {
433                     self.reawaken_remotely();
434
435                     // put that thing back where it came from!
436                     running_green_task.put_with_sched(sched);
437                 }
438             }
439             None => {
440                 self.reawaken_remotely();
441                 Local::put(running_task);
442             }
443         }
444     }
445
446     fn spawn_sibling(mut ~self, cur_task: ~Task, opts: TaskOpts, f: proc():Send) {
447         self.put_task(cur_task);
448
449         // Spawns a task into the current scheduler. We allocate the new task's
450         // stack from the scheduler's stack pool, and then configure it
451         // accordingly to `opts`. Afterwards we bootstrap it immediately by
452         // switching to it.
453         //
454         // Upon returning, our task is back in TLS and we're good to return.
455         let mut sched = self.sched.take_unwrap();
456         let sibling = GreenTask::configure(&mut sched.stack_pool, opts, f);
457         sched.run_task(self, sibling)
458     }
459
460     // Local I/O is provided by the scheduler's event loop
461     fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>> {
462         match self.sched.get_mut_ref().event_loop.io() {
463             Some(io) => Some(rtio::LocalIo::new(io)),
464             None => None,
465         }
466     }
467
468     fn stack_bounds(&self) -> (uint, uint) {
469         let c = self.coroutine.as_ref()
470             .expect("GreenTask.stack_bounds called without a coroutine");
471
472         // Don't return the red zone as part of the usable stack of this task,
473         // it's essentially an implementation detail.
474         (c.current_stack_segment.start() as uint + stack::RED_ZONE,
475          c.current_stack_segment.end() as uint)
476     }
477
478     fn can_block(&self) -> bool { false }
479
480     fn wrap(~self) -> ~Any { self as ~Any }
481 }
482
483 #[cfg(test)]
484 mod tests {
485     use std::rt::local::Local;
486     use std::rt::task::Task;
487     use std::task;
488     use std::task::TaskOpts;
489
490     use super::super::{PoolConfig, SchedPool};
491     use super::GreenTask;
492
493     fn spawn_opts(opts: TaskOpts, f: proc():Send) {
494         let mut pool = SchedPool::new(PoolConfig {
495             threads: 1,
496             event_loop_factory: ::rustuv::event_loop,
497         });
498         pool.spawn(opts, f);
499         pool.shutdown();
500     }
501
502     #[test]
503     fn smoke() {
504         let (tx, rx) = channel();
505         spawn_opts(TaskOpts::new(), proc() {
506             tx.send(());
507         });
508         rx.recv();
509     }
510
511     #[test]
512     fn smoke_fail() {
513         let (tx, rx) = channel::<int>();
514         spawn_opts(TaskOpts::new(), proc() {
515             let _tx = tx;
516             fail!()
517         });
518         assert_eq!(rx.recv_opt(), Err(()));
519     }
520
521     #[test]
522     fn smoke_opts() {
523         let mut opts = TaskOpts::new();
524         opts.name = Some("test".into_maybe_owned());
525         opts.stack_size = Some(20 * 4096);
526         let (tx, rx) = channel();
527         opts.notify_chan = Some(tx);
528         spawn_opts(opts, proc() {});
529         assert!(rx.recv().is_ok());
530     }
531
532     #[test]
533     fn smoke_opts_fail() {
534         let mut opts = TaskOpts::new();
535         let (tx, rx) = channel();
536         opts.notify_chan = Some(tx);
537         spawn_opts(opts, proc() { fail!() });
538         assert!(rx.recv().is_err());
539     }
540
541     #[test]
542     fn yield_test() {
543         let (tx, rx) = channel();
544         spawn_opts(TaskOpts::new(), proc() {
545             for _ in range(0, 10) { task::deschedule(); }
546             tx.send(());
547         });
548         rx.recv();
549     }
550
551     #[test]
552     fn spawn_children() {
553         let (tx1, rx) = channel();
554         spawn_opts(TaskOpts::new(), proc() {
555             let (tx2, rx) = channel();
556             spawn(proc() {
557                 let (tx3, rx) = channel();
558                 spawn(proc() {
559                     tx3.send(());
560                 });
561                 rx.recv();
562                 tx2.send(());
563             });
564             rx.recv();
565             tx1.send(());
566         });
567         rx.recv();
568     }
569
570     #[test]
571     fn spawn_inherits() {
572         let (tx, rx) = channel();
573         spawn_opts(TaskOpts::new(), proc() {
574             spawn(proc() {
575                 let mut task: ~Task = Local::take();
576                 match task.maybe_take_runtime::<GreenTask>() {
577                     Some(ops) => {
578                         task.put_runtime(ops);
579                     }
580                     None => fail!(),
581                 }
582                 Local::put(task);
583                 tx.send(());
584             });
585         });
586         rx.recv();
587     }
588 }