]> git.lizzy.rs Git - rust.git/blob - src/librustrt/task.rs
std: Add a new top-level thread_local module
[rust.git] / src / librustrt / task.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Language-level runtime services that should reasonably expected
12 //! to be available 'everywhere'. Unwinding, local storage, and logging.
13 //! Even a 'freestanding' Rust would likely want to implement this.
14
15 pub use self::BlockedTask::*;
16 use self::TaskState::*;
17
18 use alloc::arc::Arc;
19 use alloc::boxed::Box;
20 use core::any::Any;
21 use core::atomic::{AtomicUint, SeqCst};
22 use core::iter::Take;
23 use core::kinds::marker;
24 use core::mem;
25 use core::prelude::{Clone, Drop, Err, Iterator, None, Ok, Option, Send, Some};
26 use core::prelude::{drop};
27
28 use bookkeeping;
29 use mutex::NativeMutex;
30 use local::Local;
31 use thread::{mod, Thread};
32 use stack;
33 use unwind;
34 use unwind::Unwinder;
35 use collections::str::SendStr;
36
37 /// State associated with Rust tasks.
38 ///
39 /// This structure is currently undergoing major changes, and is
40 /// likely to be move/be merged with a `Thread` structure.
41 pub struct Task {
42     pub unwinder: Unwinder,
43     pub death: Death,
44     pub name: Option<SendStr>,
45
46     state: TaskState,
47     lock: NativeMutex,       // native synchronization
48     awoken: bool,            // used to prevent spurious wakeups
49
50     // This field holds the known bounds of the stack in (lo, hi) form. Not all
51     // native tasks necessarily know their precise bounds, hence this is
52     // optional.
53     stack_bounds: (uint, uint),
54
55     stack_guard: uint
56 }
57
58 // Once a task has entered the `Armed` state it must be destroyed via `drop`,
59 // and no other method. This state is used to track this transition.
60 #[deriving(PartialEq)]
61 enum TaskState {
62     New,
63     Armed,
64     Destroyed,
65 }
66
67 pub struct TaskOpts {
68     /// Invoke this procedure with the result of the task when it finishes.
69     pub on_exit: Option<proc(Result): Send>,
70     /// A name for the task-to-be, for identification in panic messages
71     pub name: Option<SendStr>,
72     /// The size of the stack for the spawned task
73     pub stack_size: Option<uint>,
74 }
75
76 /// Indicates the manner in which a task exited.
77 ///
78 /// A task that completes without panicking is considered to exit successfully.
79 ///
80 /// If you wish for this result's delivery to block until all
81 /// children tasks complete, recommend using a result future.
82 pub type Result = ::core::result::Result<(), Box<Any + Send>>;
83
84 /// A handle to a blocked task. Usually this means having the Box<Task>
85 /// pointer by ownership, but if the task is killable, a killer can steal it
86 /// at any time.
87 pub enum BlockedTask {
88     Owned(Box<Task>),
89     Shared(Arc<AtomicUint>),
90 }
91
92 /// Per-task state related to task death, killing, panic, etc.
93 pub struct Death {
94     pub on_exit: Option<proc(Result):Send>,
95     marker: marker::NoCopy,
96 }
97
98 pub struct BlockedTasks {
99     inner: Arc<AtomicUint>,
100 }
101
102 impl Task {
103     /// Creates a new uninitialized task.
104     pub fn new(stack_bounds: Option<(uint, uint)>, stack_guard: Option<uint>) -> Task {
105         Task {
106             unwinder: Unwinder::new(),
107             death: Death::new(),
108             state: New,
109             name: None,
110             lock: unsafe { NativeMutex::new() },
111             awoken: false,
112             // these *should* get overwritten
113             stack_bounds: stack_bounds.unwrap_or((0, 0)),
114             stack_guard: stack_guard.unwrap_or(0)
115         }
116     }
117
118     pub fn spawn(opts: TaskOpts, f: proc():Send) {
119         let TaskOpts { name, stack_size, on_exit } = opts;
120
121         let mut task = box Task::new(None, None);
122         task.name = name;
123         task.death.on_exit = on_exit;
124
125         // FIXME: change this back after moving rustrt into std
126         // let stack = stack_size.unwrap_or(rt::min_stack());
127         let stack = stack_size.unwrap_or(2 * 1024 * 1024);
128
129         // Note that this increment must happen *before* the spawn in order to
130         // guarantee that if this task exits it will always end up waiting for
131         // the spawned task to exit.
132         let token = bookkeeping::increment();
133
134         // Spawning a new OS thread guarantees that __morestack will never get
135         // triggered, but we must manually set up the actual stack bounds once
136         // this function starts executing. This raises the lower limit by a bit
137         // because by the time that this function is executing we've already
138         // consumed at least a little bit of stack (we don't know the exact byte
139         // address at which our stack started).
140         Thread::spawn_stack(stack, proc() {
141             let something_around_the_top_of_the_stack = 1;
142             let addr = &something_around_the_top_of_the_stack as *const int;
143             let my_stack = addr as uint;
144             unsafe {
145                 stack::record_os_managed_stack_bounds(my_stack - stack + 1024,
146                                                       my_stack);
147             }
148             task.stack_guard = thread::current_guard_page();
149             task.stack_bounds = (my_stack - stack + 1024, my_stack);
150
151             let mut f = Some(f);
152             drop(task.run(|| { f.take().unwrap()() }).destroy());
153             drop(token);
154         })
155     }
156
157     /// Consumes ownership of a task, runs some code, and returns the task back.
158     ///
159     /// This function can be used as an emulated "try/catch" to interoperate
160     /// with the rust runtime at the outermost boundary. It is not possible to
161     /// use this function in a nested fashion (a try/catch inside of another
162     /// try/catch). Invoking this function is quite cheap.
163     ///
164     /// If the closure `f` succeeds, then the returned task can be used again
165     /// for another invocation of `run`. If the closure `f` panics then `self`
166     /// will be internally destroyed along with all of the other associated
167     /// resources of this task. The `on_exit` callback is invoked with the
168     /// cause of panic (not returned here). This can be discovered by querying
169     /// `is_destroyed()`.
170     ///
171     /// Note that it is possible to view partial execution of the closure `f`
172     /// because it is not guaranteed to run to completion, but this function is
173     /// guaranteed to return if it panicks. Care should be taken to ensure that
174     /// stack references made by `f` are handled appropriately.
175     ///
176     /// It is invalid to call this function with a task that has been previously
177     /// destroyed via a failed call to `run`.
178     pub fn run(mut self: Box<Task>, f: ||) -> Box<Task> {
179         assert!(!self.is_destroyed(), "cannot re-use a destroyed task");
180
181         // First, make sure that no one else is in TLS. This does not allow
182         // recursive invocations of run(). If there's no one else, then
183         // relinquish ownership of ourselves back into TLS.
184         if Local::exists(None::<Task>) {
185             panic!("cannot run a task recursively inside another");
186         }
187         self.state = Armed;
188         Local::put(self);
189
190         // There are two primary reasons that general try/catch is unsafe. The
191         // first is that we do not support nested try/catch. The above check for
192         // an existing task in TLS is sufficient for this invariant to be
193         // upheld. The second is that unwinding while unwinding is not defined.
194         // We take care of that by having an 'unwinding' flag in the task
195         // itself. For these reasons, this unsafety should be ok.
196         let result = unsafe { unwind::try(f) };
197
198         // After running the closure given return the task back out if it ran
199         // successfully, or clean up the task if it panicked.
200         let task: Box<Task> = Local::take();
201         match result {
202             Ok(()) => task,
203             Err(cause) => { task.cleanup(Err(cause)) }
204         }
205     }
206
207     /// Destroy all associated resources of this task.
208     ///
209     /// This function will perform any necessary clean up to prepare the task
210     /// for destruction. It is required that this is called before a `Task`
211     /// falls out of scope.
212     ///
213     /// The returned task cannot be used for running any more code, but it may
214     /// be used to extract the runtime as necessary.
215     pub fn destroy(self: Box<Task>) -> Box<Task> {
216         if self.is_destroyed() {
217             self
218         } else {
219             self.cleanup(Ok(()))
220         }
221     }
222
223     /// Cleans up a task, processing the result of the task as appropriate.
224     ///
225     /// This function consumes ownership of the task, deallocating it once it's
226     /// done being processed. It is assumed that TLD and the local heap have
227     /// already been destroyed and/or annihilated.
228     fn cleanup(mut self: Box<Task>, result: Result) -> Box<Task> {
229         // After taking care of the data above, we need to transmit the result
230         // of this task.
231         let what_to_do = self.death.on_exit.take();
232         Local::put(self);
233
234         // FIXME: this is running in a seriously constrained context. If this
235         //        allocates TLD then it will likely abort the runtime. Similarly,
236         //        if this panics, this will also likely abort the runtime.
237         //
238         //        This closure is currently limited to a channel send via the
239         //        standard library's task interface, but this needs
240         //        reconsideration to whether it's a reasonable thing to let a
241         //        task to do or not.
242         match what_to_do {
243             Some(f) => { f(result) }
244             None => { drop(result) }
245         }
246
247         // Now that we're done, we remove the task from TLS and flag it for
248         // destruction.
249         let mut task: Box<Task> = Local::take();
250         task.state = Destroyed;
251         return task;
252     }
253
254     /// Queries whether this can be destroyed or not.
255     pub fn is_destroyed(&self) -> bool { self.state == Destroyed }
256
257     /// Deschedules the current task, invoking `f` `amt` times. It is not
258     /// recommended to use this function directly, but rather communication
259     /// primitives in `std::comm` should be used.
260     //
261     // This function gets a little interesting. There are a few safety and
262     // ownership violations going on here, but this is all done in the name of
263     // shared state. Additionally, all of the violations are protected with a
264     // mutex, so in theory there are no races.
265     //
266     // The first thing we need to do is to get a pointer to the task's internal
267     // mutex. This address will not be changing (because the task is allocated
268     // on the heap). We must have this handle separately because the task will
269     // have its ownership transferred to the given closure. We're guaranteed,
270     // however, that this memory will remain valid because *this* is the current
271     // task's execution thread.
272     //
273     // The next weird part is where ownership of the task actually goes. We
274     // relinquish it to the `f` blocking function, but upon returning this
275     // function needs to replace the task back in TLS. There is no communication
276     // from the wakeup thread back to this thread about the task pointer, and
277     // there's really no need to. In order to get around this, we cast the task
278     // to a `uint` which is then used at the end of this function to cast back
279     // to a `Box<Task>` object. Naturally, this looks like it violates
280     // ownership semantics in that there may be two `Box<Task>` objects.
281     //
282     // The fun part is that the wakeup half of this implementation knows to
283     // "forget" the task on the other end. This means that the awakening half of
284     // things silently relinquishes ownership back to this thread, but not in a
285     // way that the compiler can understand. The task's memory is always valid
286     // for both tasks because these operations are all done inside of a mutex.
287     //
288     // You'll also find that if blocking fails (the `f` function hands the
289     // BlockedTask back to us), we will `mem::forget` the handles. The
290     // reasoning for this is the same logic as above in that the task silently
291     // transfers ownership via the `uint`, not through normal compiler
292     // semantics.
293     //
294     // On a mildly unrelated note, it should also be pointed out that OS
295     // condition variables are susceptible to spurious wakeups, which we need to
296     // be ready for. In order to accommodate for this fact, we have an extra
297     // `awoken` field which indicates whether we were actually woken up via some
298     // invocation of `reawaken`. This flag is only ever accessed inside the
299     // lock, so there's no need to make it atomic.
300     pub fn deschedule(mut self: Box<Task>,
301                       times: uint,
302                       f: |BlockedTask| -> ::core::result::Result<(), BlockedTask>) {
303         unsafe {
304             let me = &mut *self as *mut Task;
305             let task = BlockedTask::block(self);
306
307             if times == 1 {
308                 let guard = (*me).lock.lock();
309                 (*me).awoken = false;
310                 match f(task) {
311                     Ok(()) => {
312                         while !(*me).awoken {
313                             guard.wait();
314                         }
315                     }
316                     Err(task) => { mem::forget(task.wake()); }
317                 }
318             } else {
319                 let iter = task.make_selectable(times);
320                 let guard = (*me).lock.lock();
321                 (*me).awoken = false;
322
323                 // Apply the given closure to all of the "selectable tasks",
324                 // bailing on the first one that produces an error. Note that
325                 // care must be taken such that when an error is occurred, we
326                 // may not own the task, so we may still have to wait for the
327                 // task to become available. In other words, if task.wake()
328                 // returns `None`, then someone else has ownership and we must
329                 // wait for their signal.
330                 match iter.map(f).filter_map(|a| a.err()).next() {
331                     None => {}
332                     Some(task) => {
333                         match task.wake() {
334                             Some(task) => {
335                                 mem::forget(task);
336                                 (*me).awoken = true;
337                             }
338                             None => {}
339                         }
340                     }
341                 }
342                 while !(*me).awoken {
343                     guard.wait();
344                 }
345             }
346             // put the task back in TLS, and everything is as it once was.
347             Local::put(mem::transmute(me));
348         }
349     }
350
351     /// Wakes up a previously blocked task. This function can only be
352     /// called on tasks that were previously blocked in `deschedule`.
353     //
354     // See the comments on `deschedule` for why the task is forgotten here, and
355     // why it's valid to do so.
356     pub fn reawaken(mut self: Box<Task>) {
357         unsafe {
358             let me = &mut *self as *mut Task;
359             mem::forget(self);
360             let guard = (*me).lock.lock();
361             (*me).awoken = true;
362             guard.signal();
363         }
364     }
365
366     /// Yields control of this task to another task. This function will
367     /// eventually return, but possibly not immediately. This is used as an
368     /// opportunity to allow other tasks a chance to run.
369     pub fn yield_now() {
370         Thread::yield_now();
371     }
372
373     /// Returns the stack bounds for this task in (lo, hi) format. The stack
374     /// bounds may not be known for all tasks, so the return value may be
375     /// `None`.
376     pub fn stack_bounds(&self) -> (uint, uint) {
377         self.stack_bounds
378     }
379
380     /// Returns the stack guard for this task, if known.
381     pub fn stack_guard(&self) -> Option<uint> {
382         if self.stack_guard != 0 {
383             Some(self.stack_guard)
384         } else {
385             None
386         }
387     }
388
389     /// Consume this task, flagging it as a candidate for destruction.
390     ///
391     /// This function is required to be invoked to destroy a task. A task
392     /// destroyed through a normal drop will abort.
393     pub fn drop(mut self) {
394         self.state = Destroyed;
395     }
396 }
397
398 impl Drop for Task {
399     fn drop(&mut self) {
400         rtdebug!("called drop for a task: {}", self as *mut Task as uint);
401         rtassert!(self.state != Armed);
402     }
403 }
404
405 impl TaskOpts {
406     pub fn new() -> TaskOpts {
407         TaskOpts { on_exit: None, name: None, stack_size: None }
408     }
409 }
410
411 impl Iterator<BlockedTask> for BlockedTasks {
412     fn next(&mut self) -> Option<BlockedTask> {
413         Some(Shared(self.inner.clone()))
414     }
415 }
416
417 impl BlockedTask {
418     /// Returns Some if the task was successfully woken; None if already killed.
419     pub fn wake(self) -> Option<Box<Task>> {
420         match self {
421             Owned(task) => Some(task),
422             Shared(arc) => {
423                 match arc.swap(0, SeqCst) {
424                     0 => None,
425                     n => Some(unsafe { mem::transmute(n) }),
426                 }
427             }
428         }
429     }
430
431     /// Reawakens this task if ownership is acquired. If finer-grained control
432     /// is desired, use `wake` instead.
433     pub fn reawaken(self) {
434         self.wake().map(|t| t.reawaken());
435     }
436
437     // This assertion has two flavours because the wake involves an atomic op.
438     // In the faster version, destructors will panic dramatically instead.
439     #[cfg(not(test))] pub fn trash(self) { }
440     #[cfg(test)]      pub fn trash(self) { assert!(self.wake().is_none()); }
441
442     /// Create a blocked task, unless the task was already killed.
443     pub fn block(task: Box<Task>) -> BlockedTask {
444         Owned(task)
445     }
446
447     /// Converts one blocked task handle to a list of many handles to the same.
448     pub fn make_selectable(self, num_handles: uint) -> Take<BlockedTasks> {
449         let arc = match self {
450             Owned(task) => {
451                 let flag = unsafe { AtomicUint::new(mem::transmute(task)) };
452                 Arc::new(flag)
453             }
454             Shared(arc) => arc.clone(),
455         };
456         BlockedTasks{ inner: arc }.take(num_handles)
457     }
458
459     /// Convert to an unsafe uint value. Useful for storing in a pipe's state
460     /// flag.
461     #[inline]
462     pub unsafe fn cast_to_uint(self) -> uint {
463         match self {
464             Owned(task) => {
465                 let blocked_task_ptr: uint = mem::transmute(task);
466                 rtassert!(blocked_task_ptr & 0x1 == 0);
467                 blocked_task_ptr
468             }
469             Shared(arc) => {
470                 let blocked_task_ptr: uint = mem::transmute(box arc);
471                 rtassert!(blocked_task_ptr & 0x1 == 0);
472                 blocked_task_ptr | 0x1
473             }
474         }
475     }
476
477     /// Convert from an unsafe uint value. Useful for retrieving a pipe's state
478     /// flag.
479     #[inline]
480     pub unsafe fn cast_from_uint(blocked_task_ptr: uint) -> BlockedTask {
481         if blocked_task_ptr & 0x1 == 0 {
482             Owned(mem::transmute(blocked_task_ptr))
483         } else {
484             let ptr: Box<Arc<AtomicUint>> =
485                 mem::transmute(blocked_task_ptr & !1);
486             Shared(*ptr)
487         }
488     }
489 }
490
491 impl Death {
492     pub fn new() -> Death {
493         Death { on_exit: None, marker: marker::NoCopy }
494     }
495 }
496
497 #[cfg(test)]
498 mod test {
499     use super::*;
500     use std::prelude::*;
501     use std::task;
502     use unwind;
503
504     #[test]
505     fn unwind() {
506         let result = task::try(proc()());
507         rtdebug!("trying first assert");
508         assert!(result.is_ok());
509         let result = task::try::<()>(proc() panic!());
510         rtdebug!("trying second assert");
511         assert!(result.is_err());
512     }
513
514     #[test]
515     fn rng() {
516         use std::rand::{StdRng, Rng};
517         let mut r = StdRng::new().ok().unwrap();
518         let _ = r.next_u32();
519     }
520
521     #[test]
522     fn comm_stream() {
523         let (tx, rx) = channel();
524         tx.send(10i);
525         assert!(rx.recv() == 10);
526     }
527
528     #[test]
529     fn comm_shared_chan() {
530         let (tx, rx) = channel();
531         tx.send(10i);
532         assert!(rx.recv() == 10);
533     }
534
535     #[test]
536     #[should_fail]
537     fn test_begin_unwind() {
538         use unwind::begin_unwind;
539         begin_unwind("cause", &(file!(), line!()))
540     }
541
542     #[test]
543     fn drop_new_task_ok() {
544         drop(Task::new(None, None));
545     }
546
547     // Task blocking tests
548
549     #[test]
550     fn block_and_wake() {
551         let task = box Task::new(None, None);
552         let task = BlockedTask::block(task).wake().unwrap();
553         task.drop();
554     }
555 }