export reducer;
export map_reduce;
- type putter<unique K, unique V> = fn(K, V);
+ type putter<uniq K, uniq V> = fn(K, V);
// FIXME: the first K1 parameter should probably be a -, but that
// doesn't parse at the moment.
- type mapper<unique K1, unique K2, unique V> = fn(K1, putter<K2, V>);
+ type mapper<uniq K1, uniq K2, uniq V> = fn(K1, putter<K2, V>);
- type getter<unique V> = fn() -> option<V>;
+ type getter<uniq V> = fn() -> option<V>;
- type reducer<unique K, unique V> = fn(K, getter<V>);
+ type reducer<uniq K, uniq V> = fn(K, getter<V>);
- tag ctrl_proto<unique K, unique V> {
+ tag ctrl_proto<uniq K, uniq V> {
find_reducer(K, chan<chan<reduce_proto<V>>>);
mapper_done;
}
- tag reduce_proto<unique V> { emit_val(V); done; ref; release; }
+ tag reduce_proto<uniq V> { emit_val(V); done; ref; release; }
- fn start_mappers<unique K1, unique K2,
- unique V>(map: mapper<K1, K2, V>,
+ fn start_mappers<uniq K1, uniq K2,
+ uniq V>(map: mapper<K1, K2, V>,
ctrl: chan<ctrl_proto<K2, V>>, inputs: [K1]) ->
[joinable_task] {
let tasks = [];
ret tasks;
}
- fn map_task<unique K1, unique K2,
- unique V>(-map: mapper<K1, K2, V>,
+ fn map_task<uniq K1, uniq K2,
+ uniq V>(-map: mapper<K1, K2, V>,
-ctrl: chan<ctrl_proto<K2, V>>,
-input: K1) {
// log_err "map_task " + input;
let intermediates = treemap::init();
- fn emit<unique K2,
- unique V>(im: treemap::treemap<K2, chan<reduce_proto<V>>>,
+ fn emit<uniq K2,
+ uniq V>(im: treemap::treemap<K2, chan<reduce_proto<V>>>,
ctrl: chan<ctrl_proto<K2, V>>, key: K2, val: V) {
let c;
alt treemap::find(im, key) {
map(input, bind emit(intermediates, ctrl, _, _));
- fn finish<unique K, unique V>(_k: K, v: chan<reduce_proto<V>>) {
+ fn finish<uniq K, uniq V>(_k: K, v: chan<reduce_proto<V>>) {
send(v, release);
}
treemap::traverse(intermediates, finish);
send(ctrl, mapper_done);
}
- fn reduce_task<unique K,
- unique V>(-reduce: reducer<K, V>, -key: K,
+ fn reduce_task<uniq K,
+ uniq V>(-reduce: reducer<K, V>, -key: K,
-out: chan<chan<reduce_proto<V>>>) {
let p = port();
let ref_count = 0;
let is_done = false;
- fn get<unique V>(p: port<reduce_proto<V>>,
+ fn get<uniq V>(p: port<reduce_proto<V>>,
&ref_count: int, &is_done: bool)
-> option<V> {
while !is_done || ref_count > 0 {
reduce(key, bind get(p, ref_count, is_done));
}
- fn map_reduce<unique K1, unique K2,
- unique V>(map: mapper<K1, K2, V>, reduce: reducer<K2, V>,
+ fn map_reduce<uniq K1, uniq K2,
+ uniq V>(map: mapper<K1, K2, V>, reduce: reducer<K2, V>,
inputs: [K1]) {
let ctrl = port();
}
}
- fn finish<unique K, unique V>(_k: K, v: chan<reduce_proto<V>>) {
+ fn finish<uniq K, uniq V>(_k: K, v: chan<reduce_proto<V>>) {
send(v, done);
}
treemap::traverse(reducers, finish);