]> git.lizzy.rs Git - rust.git/blob - src/libsync/task_pool.rs
7670e9cf50aa22d5d64e7623bb5691733488fc04
[rust.git] / src / libsync / task_pool.rs
1 // Copyright 2012 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 #[allow(missing_doc)];
12
13 /// A task pool abstraction. Useful for achieving predictable CPU
14 /// parallelism.
15
16 use std::task;
17 use std::slice;
18
19 enum Msg<T> {
20     Execute(proc(&T)),
21     Quit
22 }
23
24 pub struct TaskPool<T> {
25     priv channels: ~[Sender<Msg<T>>],
26     priv next_index: uint,
27 }
28
29 #[unsafe_destructor]
30 impl<T> Drop for TaskPool<T> {
31     fn drop(&mut self) {
32         for channel in self.channels.mut_iter() {
33             channel.send(Quit);
34         }
35     }
36 }
37
38 impl<T> TaskPool<T> {
39     /// Spawns a new task pool with `n_tasks` tasks. If the `sched_mode`
40     /// is None, the tasks run on this scheduler; otherwise, they run on a
41     /// new scheduler with the given mode. The provided `init_fn_factory`
42     /// returns a function which, given the index of the task, should return
43     /// local data to be kept around in that task.
44     pub fn new(n_tasks: uint,
45                init_fn_factory: || -> proc(uint) -> T)
46                -> TaskPool<T> {
47         assert!(n_tasks >= 1);
48
49         let channels = slice::from_fn(n_tasks, |i| {
50             let (tx, rx) = channel::<Msg<T>>();
51             let init_fn = init_fn_factory();
52
53             let task_body: proc() = proc() {
54                 let local_data = init_fn(i);
55                 loop {
56                     match rx.recv() {
57                         Execute(f) => f(&local_data),
58                         Quit => break
59                     }
60                 }
61             };
62
63             // Run on this scheduler.
64             task::spawn(task_body);
65
66             tx
67         });
68
69         return TaskPool { channels: channels, next_index: 0 };
70     }
71
72     /// Executes the function `f` on a task in the pool. The function
73     /// receives a reference to the local data returned by the `init_fn`.
74     pub fn execute(&mut self, f: proc(&T)) {
75         self.channels[self.next_index].send(Execute(f));
76         self.next_index += 1;
77         if self.next_index == self.channels.len() { self.next_index = 0; }
78     }
79 }
80
81 #[test]
82 fn test_task_pool() {
83     let f: || -> proc(uint) -> uint = || {
84         let g: proc(uint) -> uint = proc(i) i;
85         g
86     };
87     let mut pool = TaskPool::new(4, f);
88     for _ in range(0, 8) {
89         pool.execute(proc(i) println!("Hello from thread {}!", *i));
90     }
91 }