1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Language-level runtime services that should reasonably expected
12 //! to be available 'everywhere'. Unwinding, local storage, and logging.
13 //! Even a 'freestanding' Rust would likely want to implement this.
15 pub use self::BlockedTask::*;
16 use self::TaskState::*;
19 use alloc::boxed::Box;
21 use core::atomic::{AtomicUint, SeqCst};
23 use core::kinds::marker;
25 use core::prelude::{Clone, Drop, Err, Iterator, None, Ok, Option, Send, Some};
26 use core::prelude::{drop};
29 use mutex::NativeMutex;
31 use thread::{mod, Thread};
35 use collections::str::SendStr;
37 /// State associated with Rust tasks.
39 /// This structure is currently undergoing major changes, and is
40 /// likely to be move/be merged with a `Thread` structure.
42 pub unwinder: Unwinder,
44 pub name: Option<SendStr>,
47 lock: NativeMutex, // native synchronization
48 awoken: bool, // used to prevent spurious wakeups
50 // This field holds the known bounds of the stack in (lo, hi) form. Not all
51 // native tasks necessarily know their precise bounds, hence this is
53 stack_bounds: (uint, uint),
58 // Once a task has entered the `Armed` state it must be destroyed via `drop`,
59 // and no other method. This state is used to track this transition.
60 #[deriving(PartialEq)]
68 /// Invoke this procedure with the result of the task when it finishes.
69 pub on_exit: Option<proc(Result): Send>,
70 /// A name for the task-to-be, for identification in panic messages
71 pub name: Option<SendStr>,
72 /// The size of the stack for the spawned task
73 pub stack_size: Option<uint>,
76 /// Indicates the manner in which a task exited.
78 /// A task that completes without panicking is considered to exit successfully.
80 /// If you wish for this result's delivery to block until all
81 /// children tasks complete, recommend using a result future.
82 pub type Result = ::core::result::Result<(), Box<Any + Send>>;
84 /// A handle to a blocked task. Usually this means having the Box<Task>
85 /// pointer by ownership, but if the task is killable, a killer can steal it
87 pub enum BlockedTask {
89 Shared(Arc<AtomicUint>),
92 /// Per-task state related to task death, killing, panic, etc.
94 pub on_exit: Option<proc(Result):Send>,
95 marker: marker::NoCopy,
98 pub struct BlockedTasks {
99 inner: Arc<AtomicUint>,
103 /// Creates a new uninitialized task.
104 pub fn new(stack_bounds: Option<(uint, uint)>, stack_guard: Option<uint>) -> Task {
106 unwinder: Unwinder::new(),
110 lock: unsafe { NativeMutex::new() },
112 // these *should* get overwritten
113 stack_bounds: stack_bounds.unwrap_or((0, 0)),
114 stack_guard: stack_guard.unwrap_or(0)
118 pub fn spawn(opts: TaskOpts, f: proc():Send) {
119 let TaskOpts { name, stack_size, on_exit } = opts;
121 let mut task = box Task::new(None, None);
123 task.death.on_exit = on_exit;
125 // FIXME: change this back after moving rustrt into std
126 // let stack = stack_size.unwrap_or(rt::min_stack());
127 let stack = stack_size.unwrap_or(2 * 1024 * 1024);
129 // Note that this increment must happen *before* the spawn in order to
130 // guarantee that if this task exits it will always end up waiting for
131 // the spawned task to exit.
132 let token = bookkeeping::increment();
134 // Spawning a new OS thread guarantees that __morestack will never get
135 // triggered, but we must manually set up the actual stack bounds once
136 // this function starts executing. This raises the lower limit by a bit
137 // because by the time that this function is executing we've already
138 // consumed at least a little bit of stack (we don't know the exact byte
139 // address at which our stack started).
140 Thread::spawn_stack(stack, proc() {
141 let something_around_the_top_of_the_stack = 1;
142 let addr = &something_around_the_top_of_the_stack as *const int;
143 let my_stack = addr as uint;
145 stack::record_os_managed_stack_bounds(my_stack - stack + 1024,
148 task.stack_guard = thread::current_guard_page();
149 task.stack_bounds = (my_stack - stack + 1024, my_stack);
152 drop(task.run(|| { f.take().unwrap()() }).destroy());
157 /// Consumes ownership of a task, runs some code, and returns the task back.
159 /// This function can be used as an emulated "try/catch" to interoperate
160 /// with the rust runtime at the outermost boundary. It is not possible to
161 /// use this function in a nested fashion (a try/catch inside of another
162 /// try/catch). Invoking this function is quite cheap.
164 /// If the closure `f` succeeds, then the returned task can be used again
165 /// for another invocation of `run`. If the closure `f` panics then `self`
166 /// will be internally destroyed along with all of the other associated
167 /// resources of this task. The `on_exit` callback is invoked with the
168 /// cause of panic (not returned here). This can be discovered by querying
169 /// `is_destroyed()`.
171 /// Note that it is possible to view partial execution of the closure `f`
172 /// because it is not guaranteed to run to completion, but this function is
173 /// guaranteed to return if it panicks. Care should be taken to ensure that
174 /// stack references made by `f` are handled appropriately.
176 /// It is invalid to call this function with a task that has been previously
177 /// destroyed via a failed call to `run`.
178 pub fn run(mut self: Box<Task>, f: ||) -> Box<Task> {
179 assert!(!self.is_destroyed(), "cannot re-use a destroyed task");
181 // First, make sure that no one else is in TLS. This does not allow
182 // recursive invocations of run(). If there's no one else, then
183 // relinquish ownership of ourselves back into TLS.
184 if Local::exists(None::<Task>) {
185 panic!("cannot run a task recursively inside another");
190 // There are two primary reasons that general try/catch is unsafe. The
191 // first is that we do not support nested try/catch. The above check for
192 // an existing task in TLS is sufficient for this invariant to be
193 // upheld. The second is that unwinding while unwinding is not defined.
194 // We take care of that by having an 'unwinding' flag in the task
195 // itself. For these reasons, this unsafety should be ok.
196 let result = unsafe { unwind::try(f) };
198 // After running the closure given return the task back out if it ran
199 // successfully, or clean up the task if it panicked.
200 let task: Box<Task> = Local::take();
203 Err(cause) => { task.cleanup(Err(cause)) }
207 /// Destroy all associated resources of this task.
209 /// This function will perform any necessary clean up to prepare the task
210 /// for destruction. It is required that this is called before a `Task`
211 /// falls out of scope.
213 /// The returned task cannot be used for running any more code, but it may
214 /// be used to extract the runtime as necessary.
215 pub fn destroy(self: Box<Task>) -> Box<Task> {
216 if self.is_destroyed() {
223 /// Cleans up a task, processing the result of the task as appropriate.
225 /// This function consumes ownership of the task, deallocating it once it's
226 /// done being processed. It is assumed that TLD and the local heap have
227 /// already been destroyed and/or annihilated.
228 fn cleanup(mut self: Box<Task>, result: Result) -> Box<Task> {
229 // After taking care of the data above, we need to transmit the result
231 let what_to_do = self.death.on_exit.take();
234 // FIXME: this is running in a seriously constrained context. If this
235 // allocates TLD then it will likely abort the runtime. Similarly,
236 // if this panics, this will also likely abort the runtime.
238 // This closure is currently limited to a channel send via the
239 // standard library's task interface, but this needs
240 // reconsideration to whether it's a reasonable thing to let a
241 // task to do or not.
243 Some(f) => { f(result) }
244 None => { drop(result) }
247 // Now that we're done, we remove the task from TLS and flag it for
249 let mut task: Box<Task> = Local::take();
250 task.state = Destroyed;
254 /// Queries whether this can be destroyed or not.
255 pub fn is_destroyed(&self) -> bool { self.state == Destroyed }
257 /// Deschedules the current task, invoking `f` `amt` times. It is not
258 /// recommended to use this function directly, but rather communication
259 /// primitives in `std::comm` should be used.
261 // This function gets a little interesting. There are a few safety and
262 // ownership violations going on here, but this is all done in the name of
263 // shared state. Additionally, all of the violations are protected with a
264 // mutex, so in theory there are no races.
266 // The first thing we need to do is to get a pointer to the task's internal
267 // mutex. This address will not be changing (because the task is allocated
268 // on the heap). We must have this handle separately because the task will
269 // have its ownership transferred to the given closure. We're guaranteed,
270 // however, that this memory will remain valid because *this* is the current
271 // task's execution thread.
273 // The next weird part is where ownership of the task actually goes. We
274 // relinquish it to the `f` blocking function, but upon returning this
275 // function needs to replace the task back in TLS. There is no communication
276 // from the wakeup thread back to this thread about the task pointer, and
277 // there's really no need to. In order to get around this, we cast the task
278 // to a `uint` which is then used at the end of this function to cast back
279 // to a `Box<Task>` object. Naturally, this looks like it violates
280 // ownership semantics in that there may be two `Box<Task>` objects.
282 // The fun part is that the wakeup half of this implementation knows to
283 // "forget" the task on the other end. This means that the awakening half of
284 // things silently relinquishes ownership back to this thread, but not in a
285 // way that the compiler can understand. The task's memory is always valid
286 // for both tasks because these operations are all done inside of a mutex.
288 // You'll also find that if blocking fails (the `f` function hands the
289 // BlockedTask back to us), we will `mem::forget` the handles. The
290 // reasoning for this is the same logic as above in that the task silently
291 // transfers ownership via the `uint`, not through normal compiler
294 // On a mildly unrelated note, it should also be pointed out that OS
295 // condition variables are susceptible to spurious wakeups, which we need to
296 // be ready for. In order to accommodate for this fact, we have an extra
297 // `awoken` field which indicates whether we were actually woken up via some
298 // invocation of `reawaken`. This flag is only ever accessed inside the
299 // lock, so there's no need to make it atomic.
300 pub fn deschedule(mut self: Box<Task>,
302 f: |BlockedTask| -> ::core::result::Result<(), BlockedTask>) {
304 let me = &mut *self as *mut Task;
305 let task = BlockedTask::block(self);
308 let guard = (*me).lock.lock();
309 (*me).awoken = false;
312 while !(*me).awoken {
316 Err(task) => { mem::forget(task.wake()); }
319 let iter = task.make_selectable(times);
320 let guard = (*me).lock.lock();
321 (*me).awoken = false;
323 // Apply the given closure to all of the "selectable tasks",
324 // bailing on the first one that produces an error. Note that
325 // care must be taken such that when an error is occurred, we
326 // may not own the task, so we may still have to wait for the
327 // task to become available. In other words, if task.wake()
328 // returns `None`, then someone else has ownership and we must
329 // wait for their signal.
330 match iter.map(f).filter_map(|a| a.err()).next() {
342 while !(*me).awoken {
346 // put the task back in TLS, and everything is as it once was.
347 Local::put(mem::transmute(me));
351 /// Wakes up a previously blocked task. This function can only be
352 /// called on tasks that were previously blocked in `deschedule`.
354 // See the comments on `deschedule` for why the task is forgotten here, and
355 // why it's valid to do so.
356 pub fn reawaken(mut self: Box<Task>) {
358 let me = &mut *self as *mut Task;
360 let guard = (*me).lock.lock();
366 /// Yields control of this task to another task. This function will
367 /// eventually return, but possibly not immediately. This is used as an
368 /// opportunity to allow other tasks a chance to run.
373 /// Returns the stack bounds for this task in (lo, hi) format. The stack
374 /// bounds may not be known for all tasks, so the return value may be
376 pub fn stack_bounds(&self) -> (uint, uint) {
380 /// Returns the stack guard for this task, if known.
381 pub fn stack_guard(&self) -> Option<uint> {
382 if self.stack_guard != 0 {
383 Some(self.stack_guard)
389 /// Consume this task, flagging it as a candidate for destruction.
391 /// This function is required to be invoked to destroy a task. A task
392 /// destroyed through a normal drop will abort.
393 pub fn drop(mut self) {
394 self.state = Destroyed;
400 rtdebug!("called drop for a task: {}", self as *mut Task as uint);
401 rtassert!(self.state != Armed);
406 pub fn new() -> TaskOpts {
407 TaskOpts { on_exit: None, name: None, stack_size: None }
411 impl Iterator<BlockedTask> for BlockedTasks {
412 fn next(&mut self) -> Option<BlockedTask> {
413 Some(Shared(self.inner.clone()))
418 /// Returns Some if the task was successfully woken; None if already killed.
419 pub fn wake(self) -> Option<Box<Task>> {
421 Owned(task) => Some(task),
423 match arc.swap(0, SeqCst) {
425 n => Some(unsafe { mem::transmute(n) }),
431 /// Reawakens this task if ownership is acquired. If finer-grained control
432 /// is desired, use `wake` instead.
433 pub fn reawaken(self) {
434 self.wake().map(|t| t.reawaken());
437 // This assertion has two flavours because the wake involves an atomic op.
438 // In the faster version, destructors will panic dramatically instead.
439 #[cfg(not(test))] pub fn trash(self) { }
440 #[cfg(test)] pub fn trash(self) { assert!(self.wake().is_none()); }
442 /// Create a blocked task, unless the task was already killed.
443 pub fn block(task: Box<Task>) -> BlockedTask {
447 /// Converts one blocked task handle to a list of many handles to the same.
448 pub fn make_selectable(self, num_handles: uint) -> Take<BlockedTasks> {
449 let arc = match self {
451 let flag = unsafe { AtomicUint::new(mem::transmute(task)) };
454 Shared(arc) => arc.clone(),
456 BlockedTasks{ inner: arc }.take(num_handles)
459 /// Convert to an unsafe uint value. Useful for storing in a pipe's state
462 pub unsafe fn cast_to_uint(self) -> uint {
465 let blocked_task_ptr: uint = mem::transmute(task);
466 rtassert!(blocked_task_ptr & 0x1 == 0);
470 let blocked_task_ptr: uint = mem::transmute(box arc);
471 rtassert!(blocked_task_ptr & 0x1 == 0);
472 blocked_task_ptr | 0x1
477 /// Convert from an unsafe uint value. Useful for retrieving a pipe's state
480 pub unsafe fn cast_from_uint(blocked_task_ptr: uint) -> BlockedTask {
481 if blocked_task_ptr & 0x1 == 0 {
482 Owned(mem::transmute(blocked_task_ptr))
484 let ptr: Box<Arc<AtomicUint>> =
485 mem::transmute(blocked_task_ptr & !1);
492 pub fn new() -> Death {
493 Death { on_exit: None, marker: marker::NoCopy }
506 let result = task::try(proc()());
507 rtdebug!("trying first assert");
508 assert!(result.is_ok());
509 let result = task::try::<()>(proc() panic!());
510 rtdebug!("trying second assert");
511 assert!(result.is_err());
516 use std::rand::{StdRng, Rng};
517 let mut r = StdRng::new().ok().unwrap();
518 let _ = r.next_u32();
523 let (tx, rx) = channel();
525 assert!(rx.recv() == 10);
529 fn comm_shared_chan() {
530 let (tx, rx) = channel();
532 assert!(rx.recv() == 10);
537 fn test_begin_unwind() {
538 use unwind::begin_unwind;
539 begin_unwind("cause", &(file!(), line!()))
543 fn drop_new_task_ok() {
544 drop(Task::new(None, None));
547 // Task blocking tests
550 fn block_and_wake() {
551 let task = box Task::new(None, None);
552 let task = BlockedTask::block(task).wake().unwrap();