}
struct BasicLoop {
- work: ~[proc():Send], // pending work
+ work: Vec<proc():Send>, // pending work
idle: Option<*mut BasicPausable>, // only one is allowed
- remotes: ~[(uint, ~Callback:Send)],
+ remotes: Vec<(uint, ~Callback:Send)>,
next_remote: uint,
- messages: Exclusive<~[Message]>,
+ messages: Exclusive<Vec<Message>>,
}
enum Message { RunRemote(uint), RemoveRemote(uint) }
impl BasicLoop {
fn new() -> BasicLoop {
BasicLoop {
- work: ~[],
+ work: vec![],
idle: None,
next_remote: 0,
- remotes: ~[],
- messages: Exclusive::new(~[]),
+ remotes: vec![],
+ messages: Exclusive::new(vec![]),
}
}
/// Process everything in the work queue (continually)
fn work(&mut self) {
while self.work.len() > 0 {
- for work in replace(&mut self.work, ~[]).move_iter() {
+ for work in replace(&mut self.work, vec![]).move_iter() {
work();
}
}
let messages = unsafe {
self.messages.with(|messages| {
if messages.len() > 0 {
- Some(replace(messages, ~[]))
+ Some(replace(messages, vec![]))
} else {
None
}
}
struct BasicRemote {
- queue: Exclusive<~[Message]>,
+ queue: Exclusive<Vec<Message>>,
id: uint,
}
impl BasicRemote {
- fn new(queue: Exclusive<~[Message]>, id: uint) -> BasicRemote {
+ fn new(queue: Exclusive<Vec<Message>>, id: uint) -> BasicRemote {
BasicRemote { queue: queue, id: id }
}
}
// NB this does *not* include globs, please keep it that way.
#![feature(macro_rules, phase)]
#![allow(visible_private_types)]
+#![deny(deprecated_owned_vector)]
#[cfg(test)] #[phase(syntax, link)] extern crate log;
#[cfg(test)] extern crate rustuv;
use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT};
use std::sync::deque;
use std::task::TaskOpts;
-use std::slice;
use std::sync::arc::UnsafeArc;
use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor};
/// used to keep the pool alive and also reap the status from the pool.
pub struct SchedPool {
id: uint,
- threads: ~[Thread<()>],
- handles: ~[SchedHandle],
- stealers: ~[deque::Stealer<~task::GreenTask>],
+ threads: Vec<Thread<()>>,
+ handles: Vec<SchedHandle>,
+ stealers: Vec<deque::Stealer<~task::GreenTask>>,
next_friend: uint,
stack_pool: StackPool,
deque_pool: deque::BufferPool<~task::GreenTask>,
// The pool of schedulers that will be returned from this function
let (p, state) = TaskState::new();
let mut pool = SchedPool {
- threads: ~[],
- handles: ~[],
- stealers: ~[],
+ threads: vec![],
+ handles: vec![],
+ stealers: vec![],
id: unsafe { POOL_ID.fetch_add(1, SeqCst) },
sleepers: SleeperList::new(),
stack_pool: StackPool::new(),
// Create a work queue for each scheduler, ntimes. Create an extra
// for the main thread if that flag is set. We won't steal from it.
- let arr = slice::from_fn(nscheds, |_| pool.deque_pool.deque());
- let (workers, stealers) = slice::unzip(arr.move_iter());
+ let mut workers = Vec::with_capacity(nscheds);
+ let mut stealers = Vec::with_capacity(nscheds);
+
+ for _ in range(0, nscheds) {
+ let (w, s) = pool.deque_pool.deque();
+ workers.push(w);
+ stealers.push(s);
+ }
pool.stealers = stealers;
// Now that we've got all our work queues, create one scheduler per
}
// Jettison the task away!
- self.handles[idx].send(TaskFromFriend(task));
+ self.handles.get_mut(idx).send(TaskFromFriend(task));
}
/// Spawns a new scheduler into this M:N pool. A handle is returned to the
/// This only waits for all tasks in *this pool* of schedulers to exit, any
/// native tasks or extern pools will not be waited on
pub fn shutdown(mut self) {
- self.stealers = ~[];
+ self.stealers = vec![];
// Wait for everyone to exit. We may have reached a 0-task count
// multiple times in the past, meaning there could be several buffered
}
// Now that everyone's gone, tell everything to shut down.
- for mut handle in replace(&mut self.handles, ~[]).move_iter() {
+ for mut handle in replace(&mut self.handles, vec![]).move_iter() {
handle.send(Shutdown);
}
- for thread in replace(&mut self.threads, ~[]).move_iter() {
+ for thread in replace(&mut self.threads, vec![]).move_iter() {
thread.join();
}
}
work_queue: deque::Worker<~GreenTask>,
/// Work queues for the other schedulers. These are created by
/// cloning the core work queues.
- work_queues: ~[deque::Stealer<~GreenTask>],
+ work_queues: Vec<deque::Stealer<~GreenTask>>,
/// The queue of incoming messages from other schedulers.
/// These are enqueued by SchedHandles after which a remote callback
/// is triggered to handle the message.
pub fn new(pool_id: uint,
event_loop: ~EventLoop:Send,
work_queue: deque::Worker<~GreenTask>,
- work_queues: ~[deque::Stealer<~GreenTask>],
+ work_queues: Vec<deque::Stealer<~GreenTask>>,
sleeper_list: SleeperList,
state: TaskState)
-> Scheduler {
pub fn new_special(pool_id: uint,
event_loop: ~EventLoop:Send,
work_queue: deque::Worker<~GreenTask>,
- work_queues: ~[deque::Stealer<~GreenTask>],
+ work_queues: Vec<deque::Stealer<~GreenTask>>,
sleeper_list: SleeperList,
run_anything: bool,
friend: Option<SchedHandle>,
let len = work_queues.len();
let start_index = self.rng.gen_range(0, len);
for index in range(0, len).map(|i| (i + start_index) % len) {
- match work_queues[index].steal() {
+ match work_queues.get_mut(index).steal() {
deque::Data(task) => {
rtdebug!("found task by stealing");
return Some(task)
let mut pool = BufferPool::new();
let (normal_worker, normal_stealer) = pool.deque();
let (special_worker, special_stealer) = pool.deque();
- let queues = ~[normal_stealer, special_stealer];
+ let queues = vec![normal_stealer, special_stealer];
let (_p, state) = TaskState::new();
// Our normal scheduler
#[test]
fn multithreading() {
run(proc() {
- let mut rxs = ~[];
+ let mut rxs = vec![];
for _ in range(0, 10) {
let (tx, rx) = channel();
spawn(proc() {
pub struct StackPool {
// Ideally this would be some datastructure that preserved ordering on
// Stack.min_size.
- stacks: ~[Stack],
+ stacks: Vec<Stack>,
}
impl StackPool {
pub fn new() -> StackPool {
StackPool {
- stacks: ~[],
+ stacks: vec![],
}
}