//! in order to spawn new tasks and deschedule the current task.
use std::any::Any;
-use std::cast;
+use std::mem;
use std::rt::bookkeeping;
-use std::rt::env;
use std::rt::local::Local;
+use std::rt::mutex::NativeMutex;
use std::rt::rtio;
use std::rt::stack;
-use std::rt::task::{Task, BlockedTask, SendMessage};
+use std::rt::task::{Task, BlockedTask, TaskOpts};
use std::rt::thread::Thread;
use std::rt;
-use std::task::TaskOpts;
-use std::unstable::mutex::NativeMutex;
use io;
use task;
+use std::task::{TaskBuilder, Spawner};
/// Creates a new Task which is ready to execute as a 1:1 task.
-pub fn new(stack_bounds: (uint, uint)) -> ~Task {
+pub fn new(stack_bounds: (uint, uint)) -> Box<Task> {
let mut task = box Task::new();
let mut ops = ops();
ops.stack_bounds = stack_bounds;
return task;
}
-fn ops() -> ~Ops {
+fn ops() -> Box<Ops> {
box Ops {
lock: unsafe { NativeMutex::new() },
awoken: false,
}
/// Spawns a function with the default configuration
+#[deprecated = "use the native method of NativeTaskBuilder instead"]
pub fn spawn(f: proc():Send) {
- spawn_opts(TaskOpts::new(), f)
+ spawn_opts(TaskOpts { name: None, stack_size: None, on_exit: None }, f)
}
/// Spawns a new task given the configuration options and a procedure to run
/// inside the task.
+#[deprecated = "use the native method of NativeTaskBuilder instead"]
pub fn spawn_opts(opts: TaskOpts, f: proc():Send) {
- let TaskOpts {
- notify_chan, name, stack_size,
- stderr, stdout,
- } = opts;
+ let TaskOpts { name, stack_size, on_exit } = opts;
let mut task = box Task::new();
task.name = name;
- task.stderr = stderr;
- task.stdout = stdout;
- match notify_chan {
- Some(chan) => { task.death.on_exit = Some(SendMessage(chan)); }
- None => {}
- }
+ task.death.on_exit = on_exit;
- let stack = stack_size.unwrap_or(env::min_stack());
+ let stack = stack_size.unwrap_or(rt::min_stack());
let task = task;
let ops = ops();
// Note that this increment must happen *before* the spawn in order to
// guarantee that if this task exits it will always end up waiting for the
// spawned task to exit.
- bookkeeping::increment();
+ let token = bookkeeping::increment();
// Spawning a new OS thread guarantees that __morestack will never get
// triggered, but we must manually set up the actual stack bounds once this
// which our stack started).
Thread::spawn_stack(stack, proc() {
let something_around_the_top_of_the_stack = 1;
- let addr = &something_around_the_top_of_the_stack as *int;
+ let addr = &something_around_the_top_of_the_stack as *const int;
let my_stack = addr as uint;
unsafe {
stack::record_stack_bounds(my_stack - stack + 1024, my_stack);
let mut f = Some(f);
let mut task = task;
task.put_runtime(ops);
- let t = task.run(|| { f.take_unwrap()() });
- drop(t);
- bookkeeping::decrement();
+ drop(task.run(|| { f.take_unwrap()() }).destroy());
+ drop(token);
})
}
+/// A spawner for native tasks
+pub struct NativeSpawner;
+
+impl Spawner for NativeSpawner {
+ fn spawn(self, opts: TaskOpts, f: proc():Send) {
+ spawn_opts(opts, f)
+ }
+}
+
+/// An extension trait adding a `native` configuration method to `TaskBuilder`.
+pub trait NativeTaskBuilder {
+ fn native(self) -> TaskBuilder<NativeSpawner>;
+}
+
+impl<S: Spawner> NativeTaskBuilder for TaskBuilder<S> {
+ fn native(self) -> TaskBuilder<NativeSpawner> {
+ self.spawner(NativeSpawner)
+ }
+}
+
// This structure is the glue between channels and the 1:1 scheduling mode. This
// structure is allocated once per task.
struct Ops {
}
impl rt::Runtime for Ops {
- fn yield_now(~self, mut cur_task: ~Task) {
+ fn yield_now(self: Box<Ops>, mut cur_task: Box<Task>) {
// put the task back in TLS and then invoke the OS thread yield
cur_task.put_runtime(self);
Local::put(cur_task);
Thread::yield_now();
}
- fn maybe_yield(~self, mut cur_task: ~Task) {
+ fn maybe_yield(self: Box<Ops>, mut cur_task: Box<Task>) {
// just put the task back in TLS, on OS threads we never need to
// opportunistically yield b/c the OS will do that for us (preemption)
cur_task.put_runtime(self);
Local::put(cur_task);
}
- fn wrap(~self) -> ~Any {
- self as ~Any
+ fn wrap(self: Box<Ops>) -> Box<Any> {
+ self as Box<Any>
}
fn stack_bounds(&self) -> (uint, uint) { self.stack_bounds }
// from the wakeup thread back to this thread about the task pointer, and
// there's really no need to. In order to get around this, we cast the task
// to a `uint` which is then used at the end of this function to cast back
- // to a `~Task` object. Naturally, this looks like it violates ownership
- // semantics in that there may be two `~Task` objects.
+ // to a `Box<Task>` object. Naturally, this looks like it violates
+ // ownership semantics in that there may be two `Box<Task>` objects.
//
// The fun part is that the wakeup half of this implementation knows to
// "forget" the task on the other end. This means that the awakening half of
// for both tasks because these operations are all done inside of a mutex.
//
// You'll also find that if blocking fails (the `f` function hands the
- // BlockedTask back to us), we will `cast::forget` the handles. The
+ // BlockedTask back to us), we will `mem::forget` the handles. The
// reasoning for this is the same logic as above in that the task silently
// transfers ownership via the `uint`, not through normal compiler
// semantics.
//
// On a mildly unrelated note, it should also be pointed out that OS
// condition variables are susceptible to spurious wakeups, which we need to
- // be ready for. In order to accomodate for this fact, we have an extra
+ // be ready for. In order to accommodate for this fact, we have an extra
// `awoken` field which indicates whether we were actually woken up via some
// invocation of `reawaken`. This flag is only ever accessed inside the
// lock, so there's no need to make it atomic.
- fn deschedule(mut ~self, times: uint, mut cur_task: ~Task,
+ fn deschedule(mut self: Box<Ops>,
+ times: uint,
+ mut cur_task: Box<Task>,
f: |BlockedTask| -> Result<(), BlockedTask>) {
let me = &mut *self as *mut Ops;
cur_task.put_runtime(self);
unsafe {
- let cur_task_dupe = &*cur_task as *Task;
+ let cur_task_dupe = &mut *cur_task as *mut Task;
let task = BlockedTask::block(cur_task);
if times == 1 {
guard.wait();
}
}
- Err(task) => { cast::forget(task.wake()); }
+ Err(task) => { mem::forget(task.wake()); }
}
} else {
let iter = task.make_selectable(times);
Some(task) => {
match task.wake() {
Some(task) => {
- cast::forget(task);
+ mem::forget(task);
(*me).awoken = true;
}
None => {}
}
}
// re-acquire ownership of the task
- cur_task = cast::transmute(cur_task_dupe);
+ cur_task = mem::transmute(cur_task_dupe);
}
// put the task back in TLS, and everything is as it once was.
// See the comments on `deschedule` for why the task is forgotten here, and
// why it's valid to do so.
- fn reawaken(mut ~self, mut to_wake: ~Task) {
+ fn reawaken(mut self: Box<Ops>, mut to_wake: Box<Task>) {
unsafe {
let me = &mut *self as *mut Ops;
to_wake.put_runtime(self);
- cast::forget(to_wake);
+ mem::forget(to_wake);
let guard = (*me).lock.lock();
(*me).awoken = true;
guard.signal();
}
}
- fn spawn_sibling(~self, mut cur_task: ~Task, opts: TaskOpts, f: proc():Send) {
+ fn spawn_sibling(self: Box<Ops>,
+ mut cur_task: Box<Task>,
+ opts: TaskOpts,
+ f: proc():Send) {
cur_task.put_runtime(self);
Local::put(cur_task);
#[cfg(test)]
mod tests {
use std::rt::local::Local;
- use std::rt::task::Task;
+ use std::rt::task::{Task, TaskOpts};
use std::task;
- use std::task::TaskOpts;
- use super::{spawn, spawn_opts, Ops};
+ use std::task::TaskBuilder;
+ use super::{spawn, spawn_opts, Ops, NativeTaskBuilder};
#[test]
fn smoke() {
opts.name = Some("test".into_maybe_owned());
opts.stack_size = Some(20 * 4096);
let (tx, rx) = channel();
- opts.notify_chan = Some(tx);
+ opts.on_exit = Some(proc(r) tx.send(r));
spawn_opts(opts, proc() {});
assert!(rx.recv().is_ok());
}
fn smoke_opts_fail() {
let mut opts = TaskOpts::new();
let (tx, rx) = channel();
- opts.notify_chan = Some(tx);
+ opts.on_exit = Some(proc(r) tx.send(r));
spawn_opts(opts, proc() { fail!() });
assert!(rx.recv().is_err());
}
fn yield_test() {
let (tx, rx) = channel();
spawn(proc() {
- for _ in range(0, 10) { task::deschedule(); }
+ for _ in range(0u, 10) { task::deschedule(); }
tx.send(());
});
rx.recv();
let (tx, rx) = channel();
spawn(proc() {
spawn(proc() {
- let mut task: ~Task = Local::take();
+ let mut task: Box<Task> = Local::take();
match task.maybe_take_runtime::<Ops>() {
Some(ops) => {
task.put_runtime(ops);
});
rx.recv();
}
+
+ #[test]
+ fn test_native_builder() {
+ let res = TaskBuilder::new().native().try(proc() {
+ "Success!".to_string()
+ });
+ assert_eq!(res.ok().unwrap(), "Success!".to_string());
+ }
}