]> git.lizzy.rs Git - rust.git/blob - tests/source/issue-2896.rs
Prevent duplicate comma when formatting struct pattern with ".."
[rust.git] / tests / source / issue-2896.rs
1 extern crate rand;
2 extern crate timely;
3 extern crate differential_dataflow;
4
5 use rand::{Rng, SeedableRng, StdRng};
6
7 use timely::dataflow::operators::*;
8
9 use differential_dataflow::AsCollection;
10 use differential_dataflow::operators::*;
11 use differential_dataflow::input::InputSession;
12
13 // mod loglikelihoodratio;
14
15 fn main() {
16
17   // define a new timely dataflow computation. 
18   timely::execute_from_args(std::env::args().skip(6), move |worker| {
19
20     // capture parameters of the experiment.
21     let users: usize = std::env::args().nth(1).unwrap().parse().unwrap();
22     let items: usize = std::env::args().nth(2).unwrap().parse().unwrap();
23     let scale: usize = std::env::args().nth(3).unwrap().parse().unwrap();
24     let batch: usize = std::env::args().nth(4).unwrap().parse().unwrap();
25     let noisy: bool = std::env::args().nth(5).unwrap() == "noisy";
26
27     let index = worker.index();
28     let peers = worker.peers();
29
30     let (input, probe) = worker.dataflow(|scope| {
31
32       // input of (user, item) collection.
33       let (input, occurrences) = scope.new_input();
34       let occurrences = occurrences.as_collection();
35
36       //TODO adjust code to only work with upper triangular half of cooccurrence matrix
37
38       /* Compute the cooccurrence matrix C = A'A from the binary interaction matrix A. */
39       let cooccurrences = 
40       occurrences
41         .join_map(&occurrences, |_user, &item_a, &item_b| (item_a, item_b))
42         .filter(|&(item_a, item_b)| item_a != item_b)
43         .count();
44
45       /* compute the rowsums of C indicating how often we encounter individual items. */
46       let row_sums = 
47       occurrences
48         .map(|(_user, item)| item)
49         .count();
50
51       // row_sums.inspect(|record| println!("[row_sums] {:?}", record));
52
53       /* Join the cooccurrence pairs with the corresponding row sums. */
54       let mut cooccurrences_with_row_sums = cooccurrences
55         .map(|((item_a, item_b), num_cooccurrences)| (item_a, (item_b, num_cooccurrences)))
56         .join_map(&row_sums, |&item_a, &(item_b, num_cooccurrences), &row_sum_a| {
57           assert!(row_sum_a > 0);
58           (item_b, (item_a, num_cooccurrences, row_sum_a))
59         })
60         .join_map(&row_sums, |&item_b, &(item_a, num_cooccurrences, row_sum_a), &row_sum_b| {
61           assert!(row_sum_a > 0);
62           assert!(row_sum_b > 0);
63           (item_a, (item_b, num_cooccurrences, row_sum_a, row_sum_b))
64         });
65
66       // cooccurrences_with_row_sums
67       //     .inspect(|record| println!("[cooccurrences_with_row_sums] {:?}", record));
68
69       // //TODO compute top-k "similar items" per item
70       // /* Compute LLR scores for each item pair. */
71       // let llr_scores = cooccurrences_with_row_sums.map(
72       //   |(item_a, (item_b, num_cooccurrences, row_sum_a, row_sum_b))| {
73
74       //     println!(
75       //       "[llr_scores] item_a={} item_b={}, num_cooccurrences={} row_sum_a={} row_sum_b={}",
76       //       item_a, item_b, num_cooccurrences, row_sum_a, row_sum_b);
77
78       //     let k11: isize = num_cooccurrences;
79       //     let k12: isize = row_sum_a as isize - k11;
80       //     let k21: isize = row_sum_b as isize - k11;
81       //     let k22: isize = 10000 - k12 - k21 + k11;
82
83       //     let llr_score = loglikelihoodratio::log_likelihood_ratio(k11, k12, k21, k22);
84
85       //     ((item_a, item_b), llr_score)
86       //   });
87
88       if noisy {
89         cooccurrences_with_row_sums = 
90         cooccurrences_with_row_sums
91           .inspect(|x| println!("change: {:?}", x));
92       }
93
94       let probe = 
95       cooccurrences_with_row_sums
96           .probe();
97 /*
98       // produce the (item, item) collection
99       let cooccurrences = occurrences
100         .join_map(&occurrences, |_user, &item_a, &item_b| (item_a, item_b));
101       // count the occurrences of each item.
102       let counts = cooccurrences
103         .map(|(item_a,_)| item_a)
104         .count();
105       // produce ((item1, item2), count1, count2, count12) tuples
106       let cooccurrences_with_counts = cooccurrences
107         .join_map(&counts, |&item_a, &item_b, &count_item_a| (item_b, (item_a, count_item_a)))
108         .join_map(&counts, |&item_b, &(item_a, count_item_a), &count_item_b| {
109           ((item_a, item_b), count_item_a, count_item_b)
110         });
111       let probe = cooccurrences_with_counts
112         .inspect(|x| println!("change: {:?}", x))
113         .probe();
114 */
115       (input, probe)
116     });
117
118     let seed: &[_] = &[1, 2, 3, index];
119     let mut rng1: StdRng = SeedableRng::from_seed(seed);  // rng for edge additions
120     let mut rng2: StdRng = SeedableRng::from_seed(seed);  // rng for edge deletions
121
122     let mut input = InputSession::from(input);
123
124     for count in 0 .. scale {
125       if count % peers == index {
126         let user = rng1.gen_range(0, users);
127         let item = rng1.gen_range(0, items);
128         // println!("[INITIAL INPUT] ({}, {})", user, item);
129         input.insert((user, item));
130       }
131     }
132
133     // load the initial data up!
134     while probe.less_than(input.time()) { worker.step(); }
135
136     for round in 1 .. {
137
138       for element in (round * batch) .. ((round + 1) * batch) {
139         if element % peers == index {
140           // advance the input timestamp.
141           input.advance_to(round * batch);
142           // insert a new item.
143           let user = rng1.gen_range(0, users);
144           let item = rng1.gen_range(0, items);
145           if noisy { println!("[INPUT: insert] ({}, {})", user, item); }
146           input.insert((user, item));
147           // remove an old item.
148           let user = rng2.gen_range(0, users);
149           let item = rng2.gen_range(0, items);
150           if noisy { println!("[INPUT: remove] ({}, {})", user, item); }
151           input.remove((user, item));
152         }
153       }
154
155       input.advance_to(round * batch);
156       input.flush();
157
158       while probe.less_than(input.time()) { worker.step(); }
159     }
160   }).unwrap();
161 }