1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Abstraction of a thread pool for basic parallelism.
16 use comm::{channel, Sender, Receiver};
17 use sync::{Arc, Mutex};
21 jobs: &'a Arc<Mutex<Receiver<Thunk>>>,
25 impl<'a> Sentinel<'a> {
26 fn new(jobs: &Arc<Mutex<Receiver<Thunk>>>) -> Sentinel {
33 // Cancel and destroy this sentinel.
40 impl<'a> Drop for Sentinel<'a> {
43 spawn_in_pool(self.jobs.clone())
48 /// A thread pool used to execute functions in parallel.
50 /// Spawns `n` worker threads and replenishes the pool if any worker threads
56 /// use std::sync::TaskPool;
57 /// use std::iter::AdditiveIterator;
58 /// use std::comm::channel;
60 /// let pool = TaskPool::new(4u);
62 /// let (tx, rx) = channel();
63 /// for _ in range(0, 8u) {
64 /// let tx = tx.clone();
65 /// pool.execute(move|| {
70 /// assert_eq!(rx.iter().take(8u).sum(), 8u);
73 // How the threadpool communicates with subthreads.
75 // This is the only such Sender, so when it is dropped all subthreads will
81 /// Spawns a new thread pool with `threads` threads.
85 /// This function will panic if `threads` is 0.
86 pub fn new(threads: uint) -> TaskPool {
87 assert!(threads >= 1);
89 let (tx, rx) = channel::<Thunk>();
90 let rx = Arc::new(Mutex::new(rx));
93 for _ in range(0, threads) {
94 spawn_in_pool(rx.clone());
100 /// Executes the function `job` on a thread in the pool.
101 pub fn execute<F>(&self, job: F)
102 where F : FnOnce(), F : Send
104 self.jobs.send(Thunk::new(job));
108 fn spawn_in_pool(jobs: Arc<Mutex<Receiver<Thunk>>>) {
109 Thread::spawn(move |:| {
110 // Will spawn a new thread on panic unless it is cancelled.
111 let sentinel = Sentinel::new(&jobs);
115 // Only lock jobs for the time it takes
116 // to get a job, not run it.
117 let lock = jobs.lock().unwrap();
122 Ok(job) => job.invoke(()),
124 // The Taskpool was dropped.
139 const TEST_TASKS: uint = 4u;
143 use iter::AdditiveIterator;
145 let pool = TaskPool::new(TEST_TASKS);
147 let (tx, rx) = channel();
148 for _ in range(0, TEST_TASKS) {
150 pool.execute(move|| {
155 assert_eq!(rx.iter().take(TEST_TASKS).sum(), TEST_TASKS);
160 fn test_zero_tasks_panic() {
165 fn test_recovery_from_subtask_panic() {
166 use iter::AdditiveIterator;
168 let pool = TaskPool::new(TEST_TASKS);
170 // Panic all the existing threads.
171 for _ in range(0, TEST_TASKS) {
172 pool.execute(move|| -> () { panic!() });
175 // Ensure new threads were spawned to compensate.
176 let (tx, rx) = channel();
177 for _ in range(0, TEST_TASKS) {
179 pool.execute(move|| {
184 assert_eq!(rx.iter().take(TEST_TASKS).sum(), TEST_TASKS);
188 fn test_should_not_panic_on_drop_if_subtasks_panic_after_drop() {
189 use sync::{Arc, Barrier};
191 let pool = TaskPool::new(TEST_TASKS);
192 let waiter = Arc::new(Barrier::new(TEST_TASKS + 1));
194 // Panic all the existing threads in a bit.
195 for _ in range(0, TEST_TASKS) {
196 let waiter = waiter.clone();
197 pool.execute(move|| {
205 // Kick off the failure.