#[cfg(test)]
mod test {
- use borrow::to_uint;
- use rt::deque::BufferPool;
- use rt::basic;
- use rt::sched::{Scheduler};
- use rt::task::{GreenTask, Sched};
- use rt::thread::Thread;
- use rt::util;
- use task::TaskResult;
- use unstable::run_in_bare_thread;
+ use std::task::TaskOpts;
+ use std::rt::Runtime;
+ use std::rt::task::Task;
+ use std::rt::local::Local;
+
+ use basic;
+ use sched::TaskFromFriend;
+ use task::{GreenTask, HomeSched};
+ use PoolConfig;
+ use SchedPool;
+
+ fn pool() -> SchedPool {
+ SchedPool::new(PoolConfig {
+ threads: 1,
+ event_loop_factory: Some(basic::event_loop),
+ })
+ }
+
+ fn run(f: proc()) {
+ let mut pool = pool();
+ pool.spawn(TaskOpts::new(), f);
+ pool.shutdown();
+ }
+
+ fn sched_id() -> uint {
+ let mut task = Local::borrow(None::<Task>);
+ match task.get().maybe_take_runtime::<GreenTask>() {
+ Some(green) => {
+ let ret = green.sched.get_ref().sched_id();
+ task.get().put_runtime(green as ~Runtime);
+ return ret;
+ }
+ None => fail!()
+ }
+ }
#[test]
fn trivial_run_in_newsched_task_test() {
let mut task_ran = false;
let task_ran_ptr: *mut bool = &mut task_ran;
- do run_in_newsched_task || {
+ do run {
unsafe { *task_ran_ptr = true };
rtdebug!("executed from the new scheduler")
}
let total = 10;
let mut task_run_count = 0;
let task_run_count_ptr: *mut uint = &mut task_run_count;
- do run_in_newsched_task || {
+ // with only one thread this is safe to run in without worries of
+ // contention.
+ do run {
for _ in range(0u, total) {
- do spawntask || {
+ do spawn || {
unsafe { *task_run_count_ptr = *task_run_count_ptr + 1};
}
}
fn multiple_task_nested_test() {
let mut task_run_count = 0;
let task_run_count_ptr: *mut uint = &mut task_run_count;
- do run_in_newsched_task || {
- do spawntask || {
+ do run {
+ do spawn {
unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
- do spawntask || {
+ do spawn {
unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
- do spawntask || {
+ do spawn {
unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
}
}
assert!(task_run_count == 3);
}
- // Confirm that a sched_id actually is the uint form of the
- // pointer to the scheduler struct.
- #[test]
- fn simple_sched_id_test() {
- do run_in_bare_thread {
- let sched = ~new_test_uv_sched();
- assert!(to_uint(sched) == sched.sched_id());
- }
- }
-
- // Compare two scheduler ids that are different, this should never
- // fail but may catch a mistake someday.
- #[test]
- fn compare_sched_id_test() {
- do run_in_bare_thread {
- let sched_one = ~new_test_uv_sched();
- let sched_two = ~new_test_uv_sched();
- assert!(sched_one.sched_id() != sched_two.sched_id());
- }
- }
-
-
// A very simple test that confirms that a task executing on the
// home scheduler notices that it is home.
#[test]
fn test_home_sched() {
- do run_in_bare_thread {
- let mut task_ran = false;
- let task_ran_ptr: *mut bool = &mut task_ran;
+ let mut pool = pool();
- let mut sched = ~new_test_uv_sched();
- let sched_handle = sched.make_handle();
+ let (dport, dchan) = Chan::new();
+ {
+ let (port, chan) = Chan::new();
+ let mut handle1 = pool.spawn_sched();
+ let mut handle2 = pool.spawn_sched();
- let mut task = ~do GreenTask::new_root_homed(&mut sched.stack_pool, None,
- Sched(sched_handle)) {
- unsafe { *task_ran_ptr = true };
- assert!(GreenTask::on_appropriate_sched());
- };
+ handle1.send(TaskFromFriend(do pool.task(TaskOpts::new()) {
+ chan.send(sched_id());
+ }));
+ let sched1_id = port.recv();
- let on_exit: proc(TaskResult) = proc(exit_status) {
- rtassert!(exit_status.is_ok())
+ let mut task = do pool.task(TaskOpts::new()) {
+ assert_eq!(sched_id(), sched1_id);
+ dchan.send(());
};
- task.death.on_exit = Some(on_exit);
-
- sched.bootstrap(task);
+ task.give_home(HomeSched(handle1));
+ handle2.send(TaskFromFriend(task));
}
+ dport.recv();
+
+ pool.shutdown();
}
// An advanced test that checks all four possible states that a
#[test]
fn test_schedule_home_states() {
- use rt::sleeper_list::SleeperList;
- use rt::sched::Shutdown;
- use borrow;
+ use sleeper_list::SleeperList;
+ use super::{Shutdown, Scheduler, SchedHandle};
+ use std::unstable::run_in_bare_thread;
+ use std::rt::thread::Thread;
+ use std::sync::deque::BufferPool;
do run_in_bare_thread {
-
let sleepers = SleeperList::new();
let mut pool = BufferPool::new();
let (normal_worker, normal_stealer) = pool.deque();
// Our normal scheduler
let mut normal_sched = ~Scheduler::new(
+ 1,
basic::event_loop(),
normal_worker,
queues.clone(),
sleepers.clone());
let normal_handle = normal_sched.make_handle();
+ let friend_handle = normal_sched.make_handle();
// Our special scheduler
let mut special_sched = ~Scheduler::new_special(
+ 1,
basic::event_loop(),
special_worker,
queues.clone(),
// 3) task not homed, sched requeues
// 4) task not home, send home
- let task1 = ~do GreenTask::new_root_homed(&mut special_sched.stack_pool, None,
- Sched(t1_handle)) || {
- rtassert!(GreenTask::on_appropriate_sched());
+ // Grab both the scheduler and the task from TLS and check if the
+ // task is executing on an appropriate scheduler.
+ fn on_appropriate_sched() -> bool {
+ use task::{TypeGreen, TypeSched, HomeSched};
+ let task = GreenTask::convert(Local::take());
+ let sched_id = task.sched.get_ref().sched_id();
+ let run_any = task.sched.get_ref().run_anything;
+ let ret = match task.task_type {
+ TypeGreen(Some(AnySched)) => {
+ run_any
+ }
+ TypeGreen(Some(HomeSched(SchedHandle {
+ sched_id: ref id,
+ ..
+ }))) => {
+ *id == sched_id
+ }
+ TypeGreen(None) => { fail!("task without home"); }
+ TypeSched => { fail!("expected green task"); }
+ };
+ task.put();
+ ret
+ }
+
+ let task1 = do GreenTask::new_homed(&mut special_sched.stack_pool,
+ None, HomeSched(t1_handle)) {
+ rtassert!(on_appropriate_sched());
};
- rtdebug!("task1 id: **{}**", borrow::to_uint(task1));
- let task2 = ~do GreenTask::new_root(&mut normal_sched.stack_pool, None) {
- rtassert!(GreenTask::on_appropriate_sched());
+ let task2 = do GreenTask::new(&mut normal_sched.stack_pool, None) {
+ rtassert!(on_appropriate_sched());
};
- let task3 = ~do GreenTask::new_root(&mut normal_sched.stack_pool, None) {
- rtassert!(GreenTask::on_appropriate_sched());
+ let task3 = do GreenTask::new(&mut normal_sched.stack_pool, None) {
+ rtassert!(on_appropriate_sched());
};
- let task4 = ~do GreenTask::new_root_homed(&mut special_sched.stack_pool, None,
- Sched(t4_handle)) {
- rtassert!(GreenTask::on_appropriate_sched());
+ let task4 = do GreenTask::new_homed(&mut special_sched.stack_pool,
+ None, HomeSched(t4_handle)) {
+ rtassert!(on_appropriate_sched());
};
- rtdebug!("task4 id: **{}**", borrow::to_uint(task4));
// Signal from the special task that we are done.
let (port, chan) = Chan::<()>::new();
- let normal_task = ~do GreenTask::new_root(&mut normal_sched.stack_pool, None) {
- rtdebug!("*about to submit task2*");
- Scheduler::run_task(task2);
- rtdebug!("*about to submit task4*");
- Scheduler::run_task(task4);
- rtdebug!("*normal_task done*");
+ fn run(next: ~GreenTask) {
+ let mut task = GreenTask::convert(Local::take());
+ let sched = task.sched.take_unwrap();
+ sched.run_task(task, next)
+ }
+
+ let normal_task = do GreenTask::new(&mut normal_sched.stack_pool,
+ None) {
+ run(task2);
+ run(task4);
port.recv();
let mut nh = normal_handle;
nh.send(Shutdown);
sh.send(Shutdown);
};
- rtdebug!("normal task: {}", borrow::to_uint(normal_task));
- let special_task = ~do GreenTask::new_root(&mut special_sched.stack_pool, None) {
- rtdebug!("*about to submit task1*");
- Scheduler::run_task(task1);
- rtdebug!("*about to submit task3*");
- Scheduler::run_task(task3);
- rtdebug!("*done with special_task*");
+ let special_task = do GreenTask::new(&mut special_sched.stack_pool,
+ None) {
+ run(task1);
+ run(task3);
chan.send(());
};
- rtdebug!("special task: {}", borrow::to_uint(special_task));
let normal_sched = normal_sched;
let normal_thread = do Thread::start {
normal_sched.bootstrap(normal_task);
- rtdebug!("finished with normal_thread");
};
let special_sched = special_sched;
let special_thread = do Thread::start {
special_sched.bootstrap(special_task);
- rtdebug!("finished with special_sched");
};
normal_thread.join();
}
}
- #[test]
- fn test_stress_schedule_task_states() {
- if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
- let n = stress_factor() * 120;
- for _ in range(0, n as int) {
- test_schedule_home_states();
- }
- }
+ //#[test]
+ //fn test_stress_schedule_task_states() {
+ // if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
+ // let n = stress_factor() * 120;
+ // for _ in range(0, n as int) {
+ // test_schedule_home_states();
+ // }
+ //}
#[test]
fn test_io_callback() {
- use io::timer;
-
- // This is a regression test that when there are no schedulable tasks
- // in the work queue, but we are performing I/O, that once we do put
- // something in the work queue again the scheduler picks it up and doesn't
- // exit before emptying the work queue
- do run_in_uv_task {
- do spawntask {
+ use std::io::timer;
+
+ let mut pool = SchedPool::new(PoolConfig {
+ threads: 2,
+ event_loop_factory: None,
+ });
+
+ // This is a regression test that when there are no schedulable tasks in
+ // the work queue, but we are performing I/O, that once we do put
+ // something in the work queue again the scheduler picks it up and
+ // doesn't exit before emptying the work queue
+ do pool.spawn(TaskOpts::new()) {
+ do spawn {
timer::sleep(10);
}
}
+
+ pool.shutdown();
}
#[test]
- fn handle() {
- do run_in_bare_thread {
- let (port, chan) = Chan::new();
-
- let thread_one = do Thread::start {
- let chan = chan;
- do run_in_newsched_task_core {
- chan.send(());
- }
- };
-
- let thread_two = do Thread::start {
- let port = port;
- do run_in_newsched_task_core {
- port.recv();
- }
- };
+ fn wakeup_across_scheds() {
+ let (port1, chan1) = Chan::new();
+ let (port2, chan2) = Chan::new();
+
+ let mut pool1 = pool();
+ let mut pool2 = pool();
+
+ do pool1.spawn(TaskOpts::new()) {
+ let id = sched_id();
+ chan1.send(());
+ port2.recv();
+ assert_eq!(id, sched_id());
+ }
- thread_two.join();
- thread_one.join();
+ do pool2.spawn(TaskOpts::new()) {
+ let id = sched_id();
+ port1.recv();
+ assert_eq!(id, sched_id());
+ chan2.send(());
}
+
+ pool1.shutdown();
+ pool2.shutdown();
}
// A regression test that the final message is always handled.
// Used to deadlock because Shutdown was never recvd.
#[test]
fn no_missed_messages() {
- use rt::sleeper_list::SleeperList;
- use rt::stack::StackPool;
- use rt::sched::{Shutdown, TaskFromFriend};
+ let mut pool = pool();
- do run_in_bare_thread {
- stress_factor().times(|| {
- let sleepers = SleeperList::new();
- let mut pool = BufferPool::new();
- let (worker, stealer) = pool.deque();
-
- let mut sched = ~Scheduler::new(
- basic::event_loop(),
- worker,
- ~[stealer],
- sleepers.clone());
-
- let mut handle = sched.make_handle();
-
- let sched = sched;
- let thread = do Thread::start {
- let mut sched = sched;
- let bootstrap_task =
- ~GreenTask::new_root(&mut sched.stack_pool,
- None,
- proc()());
- sched.bootstrap(bootstrap_task);
- };
+ let task = pool.task(TaskOpts::new(), proc()());
+ pool.spawn_sched().send(TaskFromFriend(task));
- let mut stack_pool = StackPool::new();
- let task = ~GreenTask::new_root(&mut stack_pool, None, proc()());
- handle.send(TaskFromFriend(task));
-
- handle.send(Shutdown);
- drop(handle);
-
- thread.join();
- })
- }
+ pool.shutdown();
}
#[test]
fn multithreading() {
- use num::Times;
- use vec::OwnedVector;
- use container::Container;
-
- do run_in_mt_newsched_task {
+ do run {
let mut ports = ~[];
10.times(|| {
let (port, chan) = Chan::new();
- do spawntask_later {
+ do spawn {
chan.send(());
}
ports.push(port);
#[test]
fn thread_ring() {
- do run_in_mt_newsched_task {
+ do run {
let (end_port, end_chan) = Chan::new();
let n_tasks = 10;
let (next_p, ch) = Chan::new();
let imm_i = i;
let imm_p = p;
- do spawntask_random {
+ do spawn {
roundtrip(imm_i, n_tasks, &imm_p, &ch);
};
p = next_p;
i += 1;
}
let p = p;
- do spawntask_random {
+ do spawn {
roundtrip(1, n_tasks, &p, &ch1);
}
#[test]
fn start_closure_dtor() {
- use ops::Drop;
-
// Regression test that the `start` task entrypoint can
// contain dtors that use task resources
- do run_in_newsched_task {
+ do run {
struct S { field: () }
impl Drop for S {
let s = S { field: () };
- do spawntask {
+ do spawn {
let _ss = &s;
}
}
#[ignore]
#[test]
fn dont_starve_1() {
- stress_factor().times(|| {
- do run_in_mt_newsched_task {
- let (port, chan) = Chan::new();
-
- // This task should not be able to starve the sender;
- // The sender should get stolen to another thread.
- do spawntask {
- while port.try_recv().is_none() { }
- }
+ let mut pool = SchedPool::new(PoolConfig {
+ threads: 2, // this must be > 1
+ event_loop_factory: Some(basic::event_loop),
+ });
+ do pool.spawn(TaskOpts::new()) {
+ let (port, chan) = Chan::new();
- chan.send(());
+ // This task should not be able to starve the sender;
+ // The sender should get stolen to another thread.
+ do spawn {
+ while port.try_recv().is_none() { }
}
- })
+
+ chan.send(());
+ }
+ pool.shutdown();
}
#[test]
fn dont_starve_2() {
- stress_factor().times(|| {
- do run_in_newsched_task {
- let (port, chan) = Chan::new();
- let (_port2, chan2) = Chan::new();
+ do run {
+ let (port, chan) = Chan::new();
+ let (_port2, chan2) = Chan::new();
- // This task should not be able to starve the other task.
- // The sends should eventually yield.
- do spawntask {
- while port.try_recv().is_none() {
- chan2.send(());
- }
+ // This task should not be able to starve the other task.
+ // The sends should eventually yield.
+ do spawn {
+ while port.try_recv().is_none() {
+ chan2.send(());
}
-
- chan.send(());
}
- })
+
+ chan.send(());
+ }
}
- // Regression test for a logic bug that would cause single-threaded schedulers
- // to sleep forever after yielding and stealing another task.
+ // Regression test for a logic bug that would cause single-threaded
+ // schedulers to sleep forever after yielding and stealing another task.
#[test]
fn single_threaded_yield() {
- use task::{spawn, spawn_sched, SingleThreaded, deschedule};
- use num::Times;
-
- do spawn_sched(SingleThreaded) {
- 5.times(|| { deschedule(); })
+ use std::task::deschedule;
+ do run {
+ 5.times(deschedule);
}
- do spawn { }
- do spawn { }
}
}
}
}
- fn reawaken(mut ~self, to_wake: ~Task, can_resched: bool) {
+ fn reawaken(mut ~self, to_wake: ~Task) {
self.put_task(to_wake);
assert!(self.sched.is_none());
let mut running_task: ~Task = Local::take();
match running_task.maybe_take_runtime::<GreenTask>() {
Some(mut running_green_task) => {
- let mut sched = running_green_task.sched.take_unwrap();
+ running_green_task.put_task(running_task);
+ let sched = running_green_task.sched.take_unwrap();
+
if sched.pool_id == self.pool_id {
- running_green_task.put_task(running_task);
- if can_resched {
- sched.run_task(running_green_task, self);
- } else {
- sched.enqueue_task(self);
- running_green_task.put_with_sched(sched);
- }
+ sched.run_task(running_green_task, self);
} else {
self.reawaken_remotely();
// put that thing back where it came from!
- running_task.put_runtime(running_green_task as ~Runtime);
- Local::put(running_task);
+ running_green_task.put_with_sched(sched);
}
}
None => {
}
#[cfg(test)]
-mod test {
+mod tests {
+ use std::rt::Runtime;
+ use std::rt::local::Local;
+ use std::rt::task::Task;
+ use std::task;
+ use std::task::TaskOpts;
- #[test]
- fn local_heap() {
- do run_in_newsched_task() {
- let a = @5;
- let b = a;
- assert!(*a == 5);
- assert!(*b == 5);
- }
+ use super::super::{PoolConfig, SchedPool};
+ use super::GreenTask;
+
+ fn spawn_opts(opts: TaskOpts, f: proc()) {
+ let mut pool = SchedPool::new(PoolConfig {
+ threads: 1,
+ event_loop_factory: None,
+ });
+ pool.spawn(opts, f);
+ pool.shutdown();
}
#[test]
- fn tls() {
- use std::local_data;
- do run_in_newsched_task() {
- local_data_key!(key: @~str)
- local_data::set(key, @~"data");
- assert!(*local_data::get(key, |k| k.map(|k| *k)).unwrap() == ~"data");
- local_data_key!(key2: @~str)
- local_data::set(key2, @~"data");
- assert!(*local_data::get(key2, |k| k.map(|k| *k)).unwrap() == ~"data");
+ fn smoke() {
+ let (p, c) = Chan::new();
+ do spawn_opts(TaskOpts::new()) {
+ c.send(());
}
+ p.recv();
}
#[test]
- fn unwind() {
- do run_in_newsched_task() {
- let result = spawntask_try(proc()());
- rtdebug!("trying first assert");
- assert!(result.is_ok());
- let result = spawntask_try(proc() fail!());
- rtdebug!("trying second assert");
- assert!(result.is_err());
+ fn smoke_fail() {
+ let (p, c) = Chan::<()>::new();
+ do spawn_opts(TaskOpts::new()) {
+ let _c = c;
+ fail!()
}
+ assert_eq!(p.recv_opt(), None);
}
#[test]
- fn rng() {
- do run_in_uv_task() {
- use std::rand::{rng, Rng};
- let mut r = rng();
- let _ = r.next_u32();
- }
+ fn smoke_opts() {
+ let mut opts = TaskOpts::new();
+ opts.name = Some(SendStrStatic("test"));
+ opts.stack_size = Some(20 * 4096);
+ let (p, c) = Chan::new();
+ opts.notify_chan = Some(c);
+ spawn_opts(opts, proc() {});
+ assert!(p.recv().is_ok());
}
#[test]
- fn logging() {
- do run_in_uv_task() {
- info!("here i am. logging in a newsched task");
- }
+ fn smoke_opts_fail() {
+ let mut opts = TaskOpts::new();
+ let (p, c) = Chan::new();
+ opts.notify_chan = Some(c);
+ spawn_opts(opts, proc() { fail!() });
+ assert!(p.recv().is_err());
}
#[test]
- fn comm_stream() {
- do run_in_newsched_task() {
- let (port, chan) = Chan::new();
- chan.send(10);
- assert!(port.recv() == 10);
+ fn yield_test() {
+ let (p, c) = Chan::new();
+ do spawn_opts(TaskOpts::new()) {
+ 10.times(task::deschedule);
+ c.send(());
}
+ p.recv();
}
#[test]
- fn comm_shared_chan() {
- do run_in_newsched_task() {
- let (port, chan) = SharedChan::new();
- chan.send(10);
- assert!(port.recv() == 10);
+ fn spawn_children() {
+ let (p, c) = Chan::new();
+ do spawn_opts(TaskOpts::new()) {
+ let (p, c2) = Chan::new();
+ do spawn {
+ let (p, c3) = Chan::new();
+ do spawn {
+ c3.send(());
+ }
+ p.recv();
+ c2.send(());
+ }
+ p.recv();
+ c.send(());
}
+ p.recv();
}
- //#[test]
- //fn heap_cycles() {
- // use std::option::{Option, Some, None};
-
- // do run_in_newsched_task {
- // struct List {
- // next: Option<@mut List>,
- // }
-
- // let a = @mut List { next: None };
- // let b = @mut List { next: Some(a) };
-
- // a.next = Some(b);
- // }
- //}
-
#[test]
- #[should_fail]
- fn test_begin_unwind() { begin_unwind("cause", file!(), line!()) }
+ fn spawn_inherits() {
+ let (p, c) = Chan::new();
+ do spawn_opts(TaskOpts::new()) {
+ let c = c;
+ do spawn {
+ let mut task: ~Task = Local::take();
+ match task.maybe_take_runtime::<GreenTask>() {
+ Some(ops) => {
+ task.put_runtime(ops as ~Runtime);
+ }
+ None => fail!(),
+ }
+ Local::put(task);
+ c.send(());
+ }
+ }
+ p.recv();
+ }
}