]> git.lizzy.rs Git - rust.git/blob - src/libextra/task_pool.rs
f1bf9e81c7251238224989d37622efa4afe6770f
[rust.git] / src / libextra / task_pool.rs
1 // Copyright 2012 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 #[allow(missing_doc)];
12
13 /// A task pool abstraction. Useful for achieving predictable CPU
14 /// parallelism.
15
16
17 use std::comm::{Chan, GenericChan, GenericPort};
18 use std::comm;
19 use std::task::SchedMode;
20 use std::task;
21 use std::vec;
22
23 #[cfg(test)] use std::task::SingleThreaded;
24
25 enum Msg<T> {
26     Execute(~fn(&T)),
27     Quit
28 }
29
30 pub struct TaskPool<T> {
31     channels: ~[Chan<Msg<T>>],
32     next_index: uint,
33 }
34
35 #[unsafe_destructor]
36 impl<T> Drop for TaskPool<T> {
37     fn drop(&self) {
38         for channel in self.channels.iter() {
39             channel.send(Quit);
40         }
41     }
42 }
43
44 impl<T> TaskPool<T> {
45     /// Spawns a new task pool with `n_tasks` tasks. If the `sched_mode`
46     /// is None, the tasks run on this scheduler; otherwise, they run on a
47     /// new scheduler with the given mode. The provided `init_fn_factory`
48     /// returns a function which, given the index of the task, should return
49     /// local data to be kept around in that task.
50     pub fn new(n_tasks: uint,
51                opt_sched_mode: Option<SchedMode>,
52                init_fn_factory: ~fn() -> ~fn(uint) -> T)
53                -> TaskPool<T> {
54         assert!(n_tasks >= 1);
55
56         let channels = do vec::from_fn(n_tasks) |i| {
57             let (port, chan) = comm::stream::<Msg<T>>();
58             let init_fn = init_fn_factory();
59
60             let task_body: ~fn() = || {
61                 let local_data = init_fn(i);
62                 loop {
63                     match port.recv() {
64                         Execute(f) => f(&local_data),
65                         Quit => break
66                     }
67                 }
68             };
69
70             // Start the task.
71             match opt_sched_mode {
72                 None => {
73                     // Run on this scheduler.
74                     task::spawn(task_body);
75                 }
76                 Some(sched_mode) => {
77                     let mut task = task::task();
78                     task.sched_mode(sched_mode);
79                     task.spawn(task_body);
80                 }
81             }
82
83             chan
84         };
85
86         return TaskPool { channels: channels, next_index: 0 };
87     }
88
89     /// Executes the function `f` on a task in the pool. The function
90     /// receives a reference to the local data returned by the `init_fn`.
91     pub fn execute(&mut self, f: ~fn(&T)) {
92         self.channels[self.next_index].send(Execute(f));
93         self.next_index += 1;
94         if self.next_index == self.channels.len() { self.next_index = 0; }
95     }
96 }
97
98 #[test]
99 fn test_task_pool() {
100     let f: ~fn() -> ~fn(uint) -> uint = || {
101         let g: ~fn(uint) -> uint = |i| i;
102         g
103     };
104     let mut pool = TaskPool::new(4, Some(SingleThreaded), f);
105     do 8.times {
106         pool.execute(|i| printfln!("Hello from thread %u!", *i));
107     }
108 }