//! Task death: asynchronous killing, linked failure, exit code propagation.
+use cast;
use cell::Cell;
use option::{Option, Some, None};
use prelude::*;
+use rt::task::Task;
+use unstable::atomics::{AtomicUint, SeqCst};
use unstable::sync::{UnsafeAtomicRcBox, LittleLock};
use util;
+static KILLED_MSG: &'static str = "killed by linked failure";
+
+// State values for the 'killed' and 'unkillable' atomic flags below.
+static KILL_RUNNING: uint = 0;
+static KILL_KILLED: uint = 1;
+static KILL_UNKILLABLE: uint = 2;
+
// FIXME(#7544)(bblum): think about the cache efficiency of this
struct KillHandleInner {
- // ((more fields to be added in a future commit))
+ // Is the task running, blocked, or killed? Possible values:
+ // * KILL_RUNNING - Not unkillable, no kill pending.
+ // * KILL_KILLED - Kill pending.
+ // * <ptr> - A transmuted blocked ~Task pointer.
+ // This flag is refcounted because it may also be referenced by a blocking
+ // concurrency primitive, used to wake the task normally, whose reference
+ // may outlive the handle's if the task is killed.
+ killed: UnsafeAtomicRcBox<AtomicUint>,
+ // Has the task deferred kill signals? This flag guards the above one.
+ // Possible values:
+ // * KILL_RUNNING - Not unkillable, no kill pending.
+ // * KILL_KILLED - Kill pending.
+ // * KILL_UNKILLABLE - Kill signals deferred.
+ unkillable: AtomicUint,
// Shared state between task and children for exit code propagation. These
// are here so we can re-use the kill handle to implement watched children
// Action to be done with the exit code. If set, also makes the task wait
// until all its watched children exit before collecting the status.
on_exit: Option<~fn(bool)>,
+ // nesting level counter for task::unkillable calls (0 == killable).
+ unkillable: int,
+ // nesting level counter for task::atomically calls (0 == can yield).
+ wont_sleep: int,
}
impl KillHandle {
pub fn new() -> KillHandle {
KillHandle(UnsafeAtomicRcBox::new(KillHandleInner {
// Linked failure fields
- // ((none yet))
+ killed: UnsafeAtomicRcBox::new(AtomicUint::new(KILL_RUNNING)),
+ unkillable: AtomicUint::new(KILL_RUNNING),
// Exit code propagation fields
any_child_failed: false,
child_tombstones: None,
}))
}
+ // Will begin unwinding if a kill signal was received, unless already_failing.
+ // This can't be used recursively, because a task which sees a KILLED
+ // signal must fail immediately, which an already-unkillable task can't do.
+ #[inline]
+ pub fn inhibit_kill(&mut self, already_failing: bool) {
+ let inner = unsafe { &mut *self.get() };
+ // Expect flag to contain RUNNING. If KILLED, it should stay KILLED.
+ // FIXME(#7544)(bblum): is it really necessary to prohibit double kill?
+ match inner.unkillable.compare_and_swap(KILL_RUNNING, KILL_UNKILLABLE, SeqCst) {
+ KILL_RUNNING => { }, // normal case
+ KILL_KILLED => if !already_failing { fail!(KILLED_MSG) },
+ _ => rtabort!("inhibit_kill: task already unkillable"),
+ }
+ }
+
+ // Will begin unwinding if a kill signal was received, unless already_failing.
+ #[inline]
+ pub fn allow_kill(&mut self, already_failing: bool) {
+ let inner = unsafe { &mut *self.get() };
+ // Expect flag to contain UNKILLABLE. If KILLED, it should stay KILLED.
+ // FIXME(#7544)(bblum): is it really necessary to prohibit double kill?
+ match inner.unkillable.compare_and_swap(KILL_UNKILLABLE, KILL_RUNNING, SeqCst) {
+ KILL_UNKILLABLE => { }, // normal case
+ KILL_KILLED => if !already_failing { fail!(KILLED_MSG) },
+ _ => rtabort!("allow_kill: task already killable"),
+ }
+ }
+
+ // Send a kill signal to the handle's owning task. Returns the task itself
+ // if it was blocked and needs punted awake. To be called by other tasks.
+ pub fn kill(&mut self) -> Option<~Task> {
+ let inner = unsafe { &mut *self.get() };
+ if inner.unkillable.swap(KILL_KILLED, SeqCst) == KILL_RUNNING {
+ // Got in. Allowed to try to punt the task awake.
+ let flag = unsafe { &mut *inner.killed.get() };
+ match flag.swap(KILL_KILLED, SeqCst) {
+ // Task either not blocked or already taken care of.
+ KILL_RUNNING | KILL_KILLED => None,
+ // Got ownership of the blocked task.
+ task_ptr => Some(unsafe { cast::transmute(task_ptr) }),
+ }
+ } else {
+ // Otherwise it was either unkillable or already killed. Somebody
+ // else was here first who will deal with the kill signal.
+ None
+ }
+ }
+
pub fn notify_immediate_failure(&mut self) {
// A benign data race may happen here if there are failing sibling
// tasks that were also spawned-watched. The refcount's write barriers
}
// NB: Takes a pthread mutex -- 'blk' not allowed to reschedule.
+ #[inline]
fn add_lazy_tombstone(parent: &mut KillHandle,
blk: &fn(Option<~fn() -> bool>) -> ~fn() -> bool) {
kill_handle: Some(KillHandle::new()),
watching_parent: None,
on_exit: None,
+ unkillable: 0,
+ wont_sleep: 0,
}
}
kill_handle: Some(KillHandle::new()),
watching_parent: self.kill_handle.clone(),
on_exit: None,
+ unkillable: 0,
+ wont_sleep: 0,
}
}
/// Collect failure exit codes from children and propagate them to a parent.
pub fn collect_failure(&mut self, mut success: bool) {
+ // This may run after the task has already failed, so even though the
+ // task appears to need to be killed, the scheduler should not fail us
+ // when we block to unwrap.
+ // (XXX: Another less-elegant reason for doing this is so that the use
+ // of the LittleLock in reparent_children_to doesn't need to access the
+ // unkillable flag in the kill_handle, since we'll have removed it.)
+ rtassert!(self.unkillable == 0);
+ self.unkillable = 1;
+
// Step 1. Decide if we need to collect child failures synchronously.
do self.on_exit.take_map |on_exit| {
if success {
parent_handle.notify_immediate_failure();
}
};
+
+ // Can't use allow_kill directly; that would require the kill handle.
+ rtassert!(self.unkillable == 1);
+ self.unkillable = 0;
+ }
+
+ /// Enter a possibly-nested unkillable section of code.
+ /// All calls must be paired with a subsequent call to allow_kill.
+ #[inline]
+ pub fn inhibit_kill(&mut self, already_failing: bool) {
+ if self.unkillable == 0 {
+ rtassert!(self.kill_handle.is_some());
+ self.kill_handle.get_mut_ref().inhibit_kill(already_failing);
+ }
+ self.unkillable += 1;
+ }
+
+ /// Exit a possibly-nested unkillable section of code.
+ /// All calls must be paired with a preceding call to inhibit_kill.
+ #[inline]
+ pub fn allow_kill(&mut self, already_failing: bool) {
+ rtassert!(self.unkillable != 0);
+ self.unkillable -= 1;
+ if self.unkillable == 0 {
+ rtassert!(self.kill_handle.is_some());
+ self.kill_handle.get_mut_ref().allow_kill(already_failing);
+ }
+ }
+
+ /// Enter a possibly-nested "atomic" section of code. Just for assertions.
+ /// All calls must be paired with a subsequent call to allow_yield.
+ #[inline]
+ pub fn inhibit_yield(&mut self) {
+ self.wont_sleep += 1;
+ }
+
+ /// Exit a possibly-nested "atomic" section of code. Just for assertions.
+ /// All calls must be paired with a preceding call to inhibit_yield.
+ #[inline]
+ pub fn allow_yield(&mut self) {
+ rtassert!(self.wont_sleep != 0);
+ self.wont_sleep -= 1;
+ }
+
+ /// Ensure that the task is allowed to become descheduled.
+ #[inline]
+ pub fn assert_may_sleep(&self) {
+ if self.wont_sleep != 0 {
+ rtabort!("illegal atomic-sleep: can't deschedule inside atomically()");
+ }
+ }
+}
+
+impl Drop for Death {
+ fn drop(&self) {
+ // Mustn't be in an atomic or unkillable section at task death.
+ rtassert!(self.unkillable == 0);
+ rtassert!(self.wont_sleep == 0);
}
}
use comm::{stream, Chan, GenericChan, GenericPort, Port};
use result::Result;
use result;
-use rt::{context, OldTaskContext};
+use rt::{context, OldTaskContext, TaskContext};
+use rt::local::Local;
use task::rt::{task_id, sched_id};
use unstable::finally::Finally;
use util::replace;
pub fn failing() -> bool {
//! True if the running task has failed
- use rt::{context, OldTaskContext};
- use rt::local::Local;
use rt::task::Task;
match context() {
* ~~~
*/
pub unsafe fn unkillable<U>(f: &fn() -> U) -> U {
- if context() == OldTaskContext {
- let t = rt::rust_get_task();
- do (|| {
- rt::rust_task_inhibit_kill(t);
- f()
- }).finally {
- rt::rust_task_allow_kill(t);
+ use rt::task::Task;
+
+ match context() {
+ OldTaskContext => {
+ let t = rt::rust_get_task();
+ do (|| {
+ rt::rust_task_inhibit_kill(t);
+ f()
+ }).finally {
+ rt::rust_task_allow_kill(t);
+ }
+ }
+ TaskContext => {
+ // The inhibits/allows might fail and need to borrow the task.
+ let t = Local::unsafe_borrow::<Task>();
+ do (|| {
+ (*t).death.inhibit_kill((*t).unwinder.unwinding);
+ f()
+ }).finally {
+ (*t).death.allow_kill((*t).unwinder.unwinding);
+ }
}
- } else {
- // FIXME #6377
- f()
+ // FIXME(#3095): This should be an rtabort as soon as the scheduler
+ // no longer uses a workqueue implemented with an Exclusive.
+ _ => f()
}
}
/// The inverse of unkillable. Only ever to be used nested in unkillable().
pub unsafe fn rekillable<U>(f: &fn() -> U) -> U {
- if context() == OldTaskContext {
- let t = rt::rust_get_task();
- do (|| {
- rt::rust_task_allow_kill(t);
- f()
- }).finally {
- rt::rust_task_inhibit_kill(t);
+ use rt::task::Task;
+
+ match context() {
+ OldTaskContext => {
+ let t = rt::rust_get_task();
+ do (|| {
+ rt::rust_task_allow_kill(t);
+ f()
+ }).finally {
+ rt::rust_task_inhibit_kill(t);
+ }
}
- } else {
- // FIXME #6377
- f()
+ TaskContext => {
+ let t = Local::unsafe_borrow::<Task>();
+ do (|| {
+ (*t).death.allow_kill((*t).unwinder.unwinding);
+ f()
+ }).finally {
+ (*t).death.inhibit_kill((*t).unwinder.unwinding);
+ }
+ }
+ // FIXME(#3095): As in unkillable().
+ _ => f()
}
}
* For use with exclusive ARCs, which use pthread mutexes directly.
*/
pub unsafe fn atomically<U>(f: &fn() -> U) -> U {
- if context() == OldTaskContext {
- let t = rt::rust_get_task();
- do (|| {
- rt::rust_task_inhibit_kill(t);
- rt::rust_task_inhibit_yield(t);
- f()
- }).finally {
- rt::rust_task_allow_yield(t);
- rt::rust_task_allow_kill(t);
+ use rt::task::Task;
+
+ match context() {
+ OldTaskContext => {
+ let t = rt::rust_get_task();
+ do (|| {
+ rt::rust_task_inhibit_kill(t);
+ rt::rust_task_inhibit_yield(t);
+ f()
+ }).finally {
+ rt::rust_task_allow_yield(t);
+ rt::rust_task_allow_kill(t);
+ }
+ }
+ TaskContext => {
+ let t = Local::unsafe_borrow::<Task>();
+ do (|| {
+ // It's important to inhibit kill after inhibiting yield, because
+ // inhibit-kill might fail if we were already killed, and the
+ // inhibit-yield must happen to match the finally's allow-yield.
+ (*t).death.inhibit_yield();
+ (*t).death.inhibit_kill((*t).unwinder.unwinding);
+ f()
+ }).finally {
+ (*t).death.allow_kill((*t).unwinder.unwinding);
+ (*t).death.allow_yield();
+ }
}
- } else {
- // FIXME #6377
- f()
+ // FIXME(#3095): As in unkillable().
+ _ => f()
}
}