use std::rt::thread::Thread;
use std::rt;
-use std::task::{TaskBuilder, Spawner};
-
-/// Creates a new Task which is ready to execute as a 1:1 task.
-pub fn new(stack_bounds: (uint, uint), stack_guard: uint) -> Box<Task> {
- let mut task = box Task::new();
- let mut ops = ops();
- ops.stack_bounds = stack_bounds;
- ops.stack_guard = stack_guard;
- task.put_runtime(ops);
- return task;
-}
-
-fn ops() -> Box<Ops> {
- box Ops {
- lock: unsafe { NativeMutex::new() },
- awoken: false,
- // these *should* get overwritten
- stack_bounds: (0, 0),
- stack_guard: 0
- }
-}
-
-/// A spawner for native tasks
-pub struct NativeSpawner;
-
-impl Spawner for NativeSpawner {
- fn spawn(self, opts: TaskOpts, f: proc():Send) {
- let TaskOpts { name, stack_size, on_exit } = opts;
-
- let mut task = box Task::new();
- task.name = name;
- task.death.on_exit = on_exit;
-
- let stack = stack_size.unwrap_or(rt::min_stack());
- let task = task;
- let ops = ops();
-
- // Note that this increment must happen *before* the spawn in order to
- // guarantee that if this task exits it will always end up waiting for
- // the spawned task to exit.
- let token = bookkeeping::increment();
-
- // Spawning a new OS thread guarantees that __morestack will never get
- // triggered, but we must manually set up the actual stack bounds once
- // this function starts executing. This raises the lower limit by a bit
- // because by the time that this function is executing we've already
- // consumed at least a little bit of stack (we don't know the exact byte
- // address at which our stack started).
- Thread::spawn_stack(stack, proc() {
- let something_around_the_top_of_the_stack = 1;
- let addr = &something_around_the_top_of_the_stack as *const int;
- let my_stack = addr as uint;
- unsafe {
- stack::record_os_managed_stack_bounds(my_stack - stack + 1024,
- my_stack);
- }
- let mut ops = ops;
- ops.stack_guard = rt::thread::current_guard_page();
- ops.stack_bounds = (my_stack - stack + 1024, my_stack);
-
- let mut f = Some(f);
- let mut task = task;
- task.put_runtime(ops);
- drop(task.run(|| { f.take().unwrap()() }).destroy());
- drop(token);
- })
- }
-}
-
-/// An extension trait adding a `native` configuration method to `TaskBuilder`.
-pub trait NativeTaskBuilder {
- fn native(self) -> TaskBuilder<NativeSpawner>;
-}
-
-impl<S: Spawner> NativeTaskBuilder for TaskBuilder<S> {
- fn native(self) -> TaskBuilder<NativeSpawner> {
- self.spawner(NativeSpawner)
- }
-}
-
-// This structure is the glue between channels and the 1:1 scheduling mode. This
-// structure is allocated once per task.
-struct Ops {
- lock: NativeMutex, // native synchronization
- awoken: bool, // used to prevent spurious wakeups
-
- // This field holds the known bounds of the stack in (lo, hi) form. Not all
- // native tasks necessarily know their precise bounds, hence this is
- // optional.
- stack_bounds: (uint, uint),
-
- stack_guard: uint
-}
-
-impl rt::Runtime for Ops {
- fn yield_now(self: Box<Ops>, mut cur_task: Box<Task>) {
- // put the task back in TLS and then invoke the OS thread yield
- cur_task.put_runtime(self);
- Local::put(cur_task);
- Thread::yield_now();
- }
-
- fn maybe_yield(self: Box<Ops>, mut cur_task: Box<Task>) {
- // just put the task back in TLS, on OS threads we never need to
- // opportunistically yield b/c the OS will do that for us (preemption)
- cur_task.put_runtime(self);
- Local::put(cur_task);
- }
-
- fn wrap(self: Box<Ops>) -> Box<Any+'static> {
- self as Box<Any+'static>
- }
-
- fn stack_bounds(&self) -> (uint, uint) { self.stack_bounds }
-
- fn stack_guard(&self) -> Option<uint> {
- if self.stack_guard != 0 {
- Some(self.stack_guard)
- } else {
- None
- }
- }
-
- fn can_block(&self) -> bool { true }
-
- // This function gets a little interesting. There are a few safety and
- // ownership violations going on here, but this is all done in the name of
- // shared state. Additionally, all of the violations are protected with a
- // mutex, so in theory there are no races.
- //
- // The first thing we need to do is to get a pointer to the task's internal
- // mutex. This address will not be changing (because the task is allocated
- // on the heap). We must have this handle separately because the task will
- // have its ownership transferred to the given closure. We're guaranteed,
- // however, that this memory will remain valid because *this* is the current
- // task's execution thread.
- //
- // The next weird part is where ownership of the task actually goes. We
- // relinquish it to the `f` blocking function, but upon returning this
- // function needs to replace the task back in TLS. There is no communication
- // from the wakeup thread back to this thread about the task pointer, and
- // there's really no need to. In order to get around this, we cast the task
- // to a `uint` which is then used at the end of this function to cast back
- // to a `Box<Task>` object. Naturally, this looks like it violates
- // ownership semantics in that there may be two `Box<Task>` objects.
- //
- // The fun part is that the wakeup half of this implementation knows to
- // "forget" the task on the other end. This means that the awakening half of
- // things silently relinquishes ownership back to this thread, but not in a
- // way that the compiler can understand. The task's memory is always valid
- // for both tasks because these operations are all done inside of a mutex.
- //
- // You'll also find that if blocking fails (the `f` function hands the
- // BlockedTask back to us), we will `mem::forget` the handles. The
- // reasoning for this is the same logic as above in that the task silently
- // transfers ownership via the `uint`, not through normal compiler
- // semantics.
- //
- // On a mildly unrelated note, it should also be pointed out that OS
- // condition variables are susceptible to spurious wakeups, which we need to
- // be ready for. In order to accommodate for this fact, we have an extra
- // `awoken` field which indicates whether we were actually woken up via some
- // invocation of `reawaken`. This flag is only ever accessed inside the
- // lock, so there's no need to make it atomic.
- fn deschedule(mut self: Box<Ops>,
- times: uint,
- mut cur_task: Box<Task>,
- f: |BlockedTask| -> Result<(), BlockedTask>) {
- let me = &mut *self as *mut Ops;
- cur_task.put_runtime(self);
-
- unsafe {
- let cur_task_dupe = &mut *cur_task as *mut Task;
- let task = BlockedTask::block(cur_task);
-
- if times == 1 {
- let guard = (*me).lock.lock();
- (*me).awoken = false;
- match f(task) {
- Ok(()) => {
- while !(*me).awoken {
- guard.wait();
- }
- }
- Err(task) => { mem::forget(task.wake()); }
- }
- } else {
- let iter = task.make_selectable(times);
- let guard = (*me).lock.lock();
- (*me).awoken = false;
-
- // Apply the given closure to all of the "selectable tasks",
- // bailing on the first one that produces an error. Note that
- // care must be taken such that when an error is occurred, we
- // may not own the task, so we may still have to wait for the
- // task to become available. In other words, if task.wake()
- // returns `None`, then someone else has ownership and we must
- // wait for their signal.
- match iter.map(f).filter_map(|a| a.err()).next() {
- None => {}
- Some(task) => {
- match task.wake() {
- Some(task) => {
- mem::forget(task);
- (*me).awoken = true;
- }
- None => {}
- }
- }
- }
- while !(*me).awoken {
- guard.wait();
- }
- }
- // re-acquire ownership of the task
- cur_task = mem::transmute(cur_task_dupe);
- }
-
- // put the task back in TLS, and everything is as it once was.
- Local::put(cur_task);
- }
-
- // See the comments on `deschedule` for why the task is forgotten here, and
- // why it's valid to do so.
- fn reawaken(mut self: Box<Ops>, mut to_wake: Box<Task>) {
- unsafe {
- let me = &mut *self as *mut Ops;
- to_wake.put_runtime(self);
- mem::forget(to_wake);
- let guard = (*me).lock.lock();
- (*me).awoken = true;
- guard.signal();
- }
- }
-
- fn spawn_sibling(self: Box<Ops>,
- mut cur_task: Box<Task>,
- opts: TaskOpts,
- f: proc():Send) {
- cur_task.put_runtime(self);
- Local::put(cur_task);
-
- NativeSpawner.spawn(opts, f);
- }
-}
-
#[cfg(test)]
mod tests {
use std::rt::local::Local;
use self::TaskState::*;
use alloc::arc::Arc;
-use alloc::boxed::{BoxAny, Box};
+use alloc::boxed::Box;
use core::any::Any;
use core::atomic::{AtomicUint, SeqCst};
use core::iter::Take;
use core::mem;
use core::prelude::{Clone, Drop, Err, Iterator, None, Ok, Option, Send, Some};
use core::prelude::{drop};
-use core::raw;
+use bookkeeping;
+use mutex::NativeMutex;
use local_data;
-use Runtime;
use local::Local;
+use thread::{mod, Thread};
+use stack;
use unwind;
use unwind::Unwinder;
use collections::str::SendStr;
/// State associated with Rust tasks.
///
-/// Rust tasks are primarily built with two separate components. One is this
-/// structure which handles standard services such as TLD, unwinding support,
-/// naming of a task, etc. The second component is the runtime of this task, a
-/// `Runtime` trait object.
-///
-/// The `Runtime` object instructs this task how it can perform critical
-/// operations such as blocking, rescheduling, I/O constructors, etc. The two
-/// halves are separately owned, but one is often found contained in the other.
-/// A task's runtime can be reflected upon with the `maybe_take_runtime` method,
-/// and otherwise its ownership is managed with `take_runtime` and
-/// `put_runtime`.
-///
-/// In general, this structure should not be used. This is meant to be an
-/// unstable internal detail of the runtime itself. From time-to-time, however,
-/// it is useful to manage tasks directly. An example of this would be
-/// interoperating with the Rust runtime from FFI callbacks or such. For this
-/// reason, there are two methods of note with the `Task` structure.
-///
-/// * `run` - This function will execute a closure inside the context of a task.
-/// Failure is caught and handled via the task's on_exit callback. If
-/// this panics, the task is still returned, but it can no longer be
-/// used, it is poisoned.
-///
-/// * `destroy` - This is a required function to call to destroy a task. If a
-/// task falls out of scope without calling `destroy`, its
-/// destructor bomb will go off, aborting the process.
-///
-/// With these two methods, tasks can be re-used to execute code inside of its
-/// context while having a point in the future where destruction is allowed.
-/// More information can be found on these specific methods.
-///
-/// # Example
-///
-/// ```no_run
-/// extern crate native;
-/// use std::uint;
-/// # fn main() {
-///
-/// // Create a task using a native runtime
-/// let task = native::task::new((0, uint::MAX), 0);
-///
-/// // Run some code, catching any possible panic
-/// let task = task.run(|| {
-/// // Run some code inside this task
-/// println!("Hello with a native runtime!");
-/// });
-///
-/// // Run some code again, catching the panic
-/// let task = task.run(|| {
-/// panic!("oh no, what to do!");
-/// });
-///
-/// // Now that the task has panicked, it can never be used again
-/// assert!(task.is_destroyed());
-///
-/// // Deallocate the resources associated with this task
-/// task.destroy();
-/// # }
-/// ```
+/// This structure is currently undergoing major changes, and is
+/// likely to be move/be merged with a `Thread` structure.
pub struct Task {
pub storage: LocalStorage,
pub unwinder: Unwinder,
pub name: Option<SendStr>,
state: TaskState,
- imp: Option<Box<Runtime + Send + 'static>>,
+ lock: NativeMutex, // native synchronization
+ awoken: bool, // used to prevent spurious wakeups
+
+ // This field holds the known bounds of the stack in (lo, hi) form. Not all
+ // native tasks necessarily know their precise bounds, hence this is
+ // optional.
+ stack_bounds: (uint, uint),
+
+ stack_guard: uint
}
// Once a task has entered the `Armed` state it must be destroyed via `drop`,
impl Task {
/// Creates a new uninitialized task.
- ///
- /// This method cannot be used to immediately invoke `run` because the task
- /// itself will likely require a runtime to be inserted via `put_runtime`.
- ///
- /// Note that you likely don't want to call this function, but rather the
- /// task creation functions through libnative or libgreen.
- pub fn new() -> Task {
+ pub fn new(stack_bounds: Option<(uint, uint)>, stack_guard: Option<uint>) -> Task {
Task {
storage: LocalStorage(None),
unwinder: Unwinder::new(),
death: Death::new(),
state: New,
name: None,
- imp: None,
+ lock: unsafe { NativeMutex::new() },
+ awoken: false,
+ // these *should* get overwritten
+ stack_bounds: stack_bounds.unwrap_or((0, 0)),
+ stack_guard: stack_guard.unwrap_or(0)
}
}
+ pub fn spawn(opts: TaskOpts, f: proc():Send) {
+ let TaskOpts { name, stack_size, on_exit } = opts;
+
+ let mut task = box Task::new(None, None);
+ task.name = name;
+ task.death.on_exit = on_exit;
+
+ // FIXME: change this back after moving rustrt into std
+ // let stack = stack_size.unwrap_or(rt::min_stack());
+ let stack = stack_size.unwrap_or(2 * 1024 * 1024);
+
+ // Note that this increment must happen *before* the spawn in order to
+ // guarantee that if this task exits it will always end up waiting for
+ // the spawned task to exit.
+ let token = bookkeeping::increment();
+
+ // Spawning a new OS thread guarantees that __morestack will never get
+ // triggered, but we must manually set up the actual stack bounds once
+ // this function starts executing. This raises the lower limit by a bit
+ // because by the time that this function is executing we've already
+ // consumed at least a little bit of stack (we don't know the exact byte
+ // address at which our stack started).
+ Thread::spawn_stack(stack, proc() {
+ let something_around_the_top_of_the_stack = 1;
+ let addr = &something_around_the_top_of_the_stack as *const int;
+ let my_stack = addr as uint;
+ unsafe {
+ stack::record_os_managed_stack_bounds(my_stack - stack + 1024,
+ my_stack);
+ }
+ task.stack_guard = thread::current_guard_page();
+ task.stack_bounds = (my_stack - stack + 1024, my_stack);
+
+ let mut f = Some(f);
+ drop(task.run(|| { f.take().unwrap()() }).destroy());
+ drop(token);
+ })
+ }
+
/// Consumes ownership of a task, runs some code, and returns the task back.
///
/// This function can be used as an emulated "try/catch" to interoperate
///
/// It is invalid to call this function with a task that has been previously
/// destroyed via a failed call to `run`.
- ///
- /// # Example
- ///
- /// ```no_run
- /// extern crate native;
- /// use std::uint;
- /// # fn main() {
- ///
- /// // Create a new native task
- /// let task = native::task::new((0, uint::MAX), 0);
- ///
- /// // Run some code once and then destroy this task
- /// task.run(|| {
- /// println!("Hello with a native runtime!");
- /// }).destroy();
- /// # }
- /// ```
pub fn run(mut self: Box<Task>, f: ||) -> Box<Task> {
assert!(!self.is_destroyed(), "cannot re-use a destroyed task");
/// Queries whether this can be destroyed or not.
pub fn is_destroyed(&self) -> bool { self.state == Destroyed }
- /// Inserts a runtime object into this task, transferring ownership to the
- /// task. It is illegal to replace a previous runtime object in this task
- /// with this argument.
- pub fn put_runtime(&mut self, ops: Box<Runtime + Send + 'static>) {
- assert!(self.imp.is_none());
- self.imp = Some(ops);
- }
-
- /// Removes the runtime from this task, transferring ownership to the
- /// caller.
- pub fn take_runtime(&mut self) -> Box<Runtime + Send + 'static> {
- assert!(self.imp.is_some());
- self.imp.take().unwrap()
- }
-
- /// Attempts to extract the runtime as a specific type. If the runtime does
- /// not have the provided type, then the runtime is not removed. If the
- /// runtime does have the specified type, then it is removed and returned
- /// (transfer of ownership).
- ///
- /// It is recommended to only use this method when *absolutely necessary*.
- /// This function may not be available in the future.
- pub fn maybe_take_runtime<T: 'static>(&mut self) -> Option<Box<T>> {
- // This is a terrible, terrible function. The general idea here is to
- // take the runtime, cast it to Box<Any>, check if it has the right
- // type, and then re-cast it back if necessary. The method of doing
- // this is pretty sketchy and involves shuffling vtables of trait
- // objects around, but it gets the job done.
- //
- // FIXME: This function is a serious code smell and should be avoided at
- // all costs. I have yet to think of a method to avoid this
- // function, and I would be saddened if more usage of the function
- // crops up.
- unsafe {
- let imp = self.imp.take().unwrap();
- let vtable = mem::transmute::<_, &raw::TraitObject>(&imp).vtable;
- match imp.wrap().downcast::<T>() {
- Ok(t) => Some(t),
- Err(t) => {
- let data = mem::transmute::<_, raw::TraitObject>(t).data;
- let obj: Box<Runtime + Send + 'static> =
- mem::transmute(raw::TraitObject {
- vtable: vtable,
- data: data,
- });
- self.put_runtime(obj);
- None
- }
- }
- }
- }
-
- /// Spawns a sibling to this task. The newly spawned task is configured with
- /// the `opts` structure and will run `f` as the body of its code.
- pub fn spawn_sibling(mut self: Box<Task>,
- opts: TaskOpts,
- f: proc(): Send) {
- let ops = self.imp.take().unwrap();
- ops.spawn_sibling(self, opts, f)
- }
-
/// Deschedules the current task, invoking `f` `amt` times. It is not
/// recommended to use this function directly, but rather communication
/// primitives in `std::comm` should be used.
+ //
+ // This function gets a little interesting. There are a few safety and
+ // ownership violations going on here, but this is all done in the name of
+ // shared state. Additionally, all of the violations are protected with a
+ // mutex, so in theory there are no races.
+ //
+ // The first thing we need to do is to get a pointer to the task's internal
+ // mutex. This address will not be changing (because the task is allocated
+ // on the heap). We must have this handle separately because the task will
+ // have its ownership transferred to the given closure. We're guaranteed,
+ // however, that this memory will remain valid because *this* is the current
+ // task's execution thread.
+ //
+ // The next weird part is where ownership of the task actually goes. We
+ // relinquish it to the `f` blocking function, but upon returning this
+ // function needs to replace the task back in TLS. There is no communication
+ // from the wakeup thread back to this thread about the task pointer, and
+ // there's really no need to. In order to get around this, we cast the task
+ // to a `uint` which is then used at the end of this function to cast back
+ // to a `Box<Task>` object. Naturally, this looks like it violates
+ // ownership semantics in that there may be two `Box<Task>` objects.
+ //
+ // The fun part is that the wakeup half of this implementation knows to
+ // "forget" the task on the other end. This means that the awakening half of
+ // things silently relinquishes ownership back to this thread, but not in a
+ // way that the compiler can understand. The task's memory is always valid
+ // for both tasks because these operations are all done inside of a mutex.
+ //
+ // You'll also find that if blocking fails (the `f` function hands the
+ // BlockedTask back to us), we will `mem::forget` the handles. The
+ // reasoning for this is the same logic as above in that the task silently
+ // transfers ownership via the `uint`, not through normal compiler
+ // semantics.
+ //
+ // On a mildly unrelated note, it should also be pointed out that OS
+ // condition variables are susceptible to spurious wakeups, which we need to
+ // be ready for. In order to accommodate for this fact, we have an extra
+ // `awoken` field which indicates whether we were actually woken up via some
+ // invocation of `reawaken`. This flag is only ever accessed inside the
+ // lock, so there's no need to make it atomic.
pub fn deschedule(mut self: Box<Task>,
- amt: uint,
+ times: uint,
f: |BlockedTask| -> ::core::result::Result<(), BlockedTask>) {
- let ops = self.imp.take().unwrap();
- ops.deschedule(amt, self, f)
+ unsafe {
+ let me = &mut *self as *mut Task;
+ let task = BlockedTask::block(self);
+
+ if times == 1 {
+ let guard = (*me).lock.lock();
+ (*me).awoken = false;
+ match f(task) {
+ Ok(()) => {
+ while !(*me).awoken {
+ guard.wait();
+ }
+ }
+ Err(task) => { mem::forget(task.wake()); }
+ }
+ } else {
+ let iter = task.make_selectable(times);
+ let guard = (*me).lock.lock();
+ (*me).awoken = false;
+
+ // Apply the given closure to all of the "selectable tasks",
+ // bailing on the first one that produces an error. Note that
+ // care must be taken such that when an error is occurred, we
+ // may not own the task, so we may still have to wait for the
+ // task to become available. In other words, if task.wake()
+ // returns `None`, then someone else has ownership and we must
+ // wait for their signal.
+ match iter.map(f).filter_map(|a| a.err()).next() {
+ None => {}
+ Some(task) => {
+ match task.wake() {
+ Some(task) => {
+ mem::forget(task);
+ (*me).awoken = true;
+ }
+ None => {}
+ }
+ }
+ }
+ while !(*me).awoken {
+ guard.wait();
+ }
+ }
+ // put the task back in TLS, and everything is as it once was.
+ Local::put(mem::transmute(me));
+ }
}
- /// Wakes up a previously blocked task, optionally specifying whether the
- /// current task can accept a change in scheduling. This function can only
- /// be called on tasks that were previously blocked in `deschedule`.
+ /// Wakes up a previously blocked task. This function can only be
+ /// called on tasks that were previously blocked in `deschedule`.
+ //
+ // See the comments on `deschedule` for why the task is forgotten here, and
+ // why it's valid to do so.
pub fn reawaken(mut self: Box<Task>) {
- let ops = self.imp.take().unwrap();
- ops.reawaken(self);
+ unsafe {
+ let me = &mut *self as *mut Task;
+ mem::forget(self);
+ let guard = (*me).lock.lock();
+ (*me).awoken = true;
+ guard.signal();
+ }
}
/// Yields control of this task to another task. This function will
/// eventually return, but possibly not immediately. This is used as an
/// opportunity to allow other tasks a chance to run.
- pub fn yield_now(mut self: Box<Task>) {
- let ops = self.imp.take().unwrap();
- ops.yield_now(self);
- }
-
- /// Similar to `yield_now`, except that this function may immediately return
- /// without yielding (depending on what the runtime decides to do).
- pub fn maybe_yield(mut self: Box<Task>) {
- let ops = self.imp.take().unwrap();
- ops.maybe_yield(self);
+ pub fn yield_now() {
+ Thread::yield_now();
}
/// Returns the stack bounds for this task in (lo, hi) format. The stack
/// bounds may not be known for all tasks, so the return value may be
/// `None`.
pub fn stack_bounds(&self) -> (uint, uint) {
- self.imp.as_ref().unwrap().stack_bounds()
+ self.stack_bounds
}
- /// Returns whether it is legal for this task to block the OS thread that it
- /// is running on.
- pub fn can_block(&self) -> bool {
- self.imp.as_ref().unwrap().can_block()
+ /// Returns the stack guard for this task, if known.
+ pub fn stack_guard(&self) -> Option<uint> {
+ if self.stack_guard != 0 {
+ Some(self.stack_guard)
+ } else {
+ None
+ }
}
/// Consume this task, flagging it as a candidate for destruction.