]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/task_pool.rs
Doc says to avoid mixing allocator instead of forbiding it
[rust.git] / src / libstd / sync / task_pool.rs
1 // Copyright 2012 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Abstraction of a task pool for basic parallelism.
12
13 use core::prelude::*;
14
15 use task;
16 use task::spawn;
17 use vec::Vec;
18 use comm::{channel, Sender};
19
20 enum Msg<T> {
21     Execute(proc(&T):Send),
22     Quit
23 }
24
25 /// A task pool used to execute functions in parallel.
26 pub struct TaskPool<T> {
27     channels: Vec<Sender<Msg<T>>>,
28     next_index: uint,
29 }
30
31 #[unsafe_destructor]
32 impl<T> Drop for TaskPool<T> {
33     fn drop(&mut self) {
34         for channel in self.channels.mut_iter() {
35             channel.send(Quit);
36         }
37     }
38 }
39
40 impl<T> TaskPool<T> {
41     /// Spawns a new task pool with `n_tasks` tasks. The provided
42     /// `init_fn_factory` returns a function which, given the index of the
43     /// task, should return local data to be kept around in that task.
44     ///
45     /// # Failure
46     ///
47     /// This function will fail if `n_tasks` is less than 1.
48     pub fn new(n_tasks: uint,
49                init_fn_factory: || -> proc(uint):Send -> T)
50                -> TaskPool<T> {
51         assert!(n_tasks >= 1);
52
53         let channels = Vec::from_fn(n_tasks, |i| {
54             let (tx, rx) = channel::<Msg<T>>();
55             let init_fn = init_fn_factory();
56
57             let task_body = proc() {
58                 let local_data = init_fn(i);
59                 loop {
60                     match rx.recv() {
61                         Execute(f) => f(&local_data),
62                         Quit => break
63                     }
64                 }
65             };
66
67             // Run on this scheduler.
68             task::spawn(task_body);
69
70             tx
71         });
72
73         return TaskPool {
74             channels: channels,
75             next_index: 0,
76         };
77     }
78
79     /// Executes the function `f` on a task in the pool. The function
80     /// receives a reference to the local data returned by the `init_fn`.
81     pub fn execute(&mut self, f: proc(&T):Send) {
82         self.channels.get(self.next_index).send(Execute(f));
83         self.next_index += 1;
84         if self.next_index == self.channels.len() { self.next_index = 0; }
85     }
86 }
87
88 #[test]
89 fn test_task_pool() {
90     let f: || -> proc(uint):Send -> uint = || { proc(i) i };
91     let mut pool = TaskPool::new(4, f);
92     for _ in range(0u, 8) {
93         pool.execute(proc(i) println!("Hello from thread {}!", *i));
94     }
95 }
96
97 #[test]
98 #[should_fail]
99 fn test_zero_tasks_failure() {
100     let f: || -> proc(uint):Send -> uint = || { proc(i) i };
101     TaskPool::new(0, f);
102 }