1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Abstraction of a thread pool for basic parallelism.
13 #![unstable = "the semantics of a failing task and whether a thread is \
14 re-attached to a thread pool are somewhat unclear, and the \
15 utility of this type in `std::sync` is questionable with \
16 respect to the jobs of other primitives"]
20 use sync::{Arc, Mutex};
21 use sync::mpsc::{channel, Sender, Receiver};
26 jobs: &'a Arc<Mutex<Receiver<Thunk>>>,
30 impl<'a> Sentinel<'a> {
31 fn new(jobs: &Arc<Mutex<Receiver<Thunk>>>) -> Sentinel {
38 // Cancel and destroy this sentinel.
45 impl<'a> Drop for Sentinel<'a> {
48 spawn_in_pool(self.jobs.clone())
53 /// A thread pool used to execute functions in parallel.
55 /// Spawns `n` worker threads and replenishes the pool if any worker threads
61 /// use std::sync::TaskPool;
62 /// use std::iter::AdditiveIterator;
63 /// use std::sync::mpsc::channel;
65 /// let pool = TaskPool::new(4u);
67 /// let (tx, rx) = channel();
68 /// for _ in range(0, 8u) {
69 /// let tx = tx.clone();
70 /// pool.execute(move|| {
71 /// tx.send(1u).unwrap();
75 /// assert_eq!(rx.iter().take(8u).sum(), 8u);
78 // How the threadpool communicates with subthreads.
80 // This is the only such Sender, so when it is dropped all subthreads will
86 /// Spawns a new thread pool with `threads` threads.
90 /// This function will panic if `threads` is 0.
91 pub fn new(threads: uint) -> TaskPool {
92 assert!(threads >= 1);
94 let (tx, rx) = channel::<Thunk>();
95 let rx = Arc::new(Mutex::new(rx));
98 for _ in range(0, threads) {
99 spawn_in_pool(rx.clone());
102 TaskPool { jobs: tx }
105 /// Executes the function `job` on a thread in the pool.
106 pub fn execute<F>(&self, job: F)
107 where F : FnOnce(), F : Send
109 self.jobs.send(Thunk::new(job)).unwrap();
113 fn spawn_in_pool(jobs: Arc<Mutex<Receiver<Thunk>>>) {
114 Thread::spawn(move |:| {
115 // Will spawn a new thread on panic unless it is cancelled.
116 let sentinel = Sentinel::new(&jobs);
120 // Only lock jobs for the time it takes
121 // to get a job, not run it.
122 let lock = jobs.lock().unwrap();
127 Ok(job) => job.invoke(()),
129 // The Taskpool was dropped.
142 use sync::mpsc::channel;
144 const TEST_TASKS: uint = 4u;
148 use iter::AdditiveIterator;
150 let pool = TaskPool::new(TEST_TASKS);
152 let (tx, rx) = channel();
153 for _ in range(0, TEST_TASKS) {
155 pool.execute(move|| {
156 tx.send(1u).unwrap();
160 assert_eq!(rx.iter().take(TEST_TASKS).sum(), TEST_TASKS);
165 fn test_zero_tasks_panic() {
170 fn test_recovery_from_subtask_panic() {
171 use iter::AdditiveIterator;
173 let pool = TaskPool::new(TEST_TASKS);
175 // Panic all the existing threads.
176 for _ in range(0, TEST_TASKS) {
177 pool.execute(move|| -> () { panic!() });
180 // Ensure new threads were spawned to compensate.
181 let (tx, rx) = channel();
182 for _ in range(0, TEST_TASKS) {
184 pool.execute(move|| {
185 tx.send(1u).unwrap();
189 assert_eq!(rx.iter().take(TEST_TASKS).sum(), TEST_TASKS);
193 fn test_should_not_panic_on_drop_if_subtasks_panic_after_drop() {
194 use sync::{Arc, Barrier};
196 let pool = TaskPool::new(TEST_TASKS);
197 let waiter = Arc::new(Barrier::new(TEST_TASKS + 1));
199 // Panic all the existing threads in a bit.
200 for _ in range(0, TEST_TASKS) {
201 let waiter = waiter.clone();
202 pool.execute(move|| {
210 // Kick off the failure.