1 extern crate differential_dataflow;
5 use rand::{Rng, SeedableRng, StdRng};
7 use timely::dataflow::operators::*;
9 use differential_dataflow::input::InputSession;
10 use differential_dataflow::operators::*;
11 use differential_dataflow::AsCollection;
13 // mod loglikelihoodratio;
16 // define a new timely dataflow computation.
17 timely::execute_from_args(std::env::args().skip(6), move |worker| {
18 // capture parameters of the experiment.
19 let users: usize = std::env::args().nth(1).unwrap().parse().unwrap();
20 let items: usize = std::env::args().nth(2).unwrap().parse().unwrap();
21 let scale: usize = std::env::args().nth(3).unwrap().parse().unwrap();
22 let batch: usize = std::env::args().nth(4).unwrap().parse().unwrap();
23 let noisy: bool = std::env::args().nth(5).unwrap() == "noisy";
25 let index = worker.index();
26 let peers = worker.peers();
28 let (input, probe) = worker.dataflow(|scope| {
29 // input of (user, item) collection.
30 let (input, occurrences) = scope.new_input();
31 let occurrences = occurrences.as_collection();
33 //TODO adjust code to only work with upper triangular half of cooccurrence matrix
35 /* Compute the cooccurrence matrix C = A'A from the binary interaction matrix A. */
36 let cooccurrences = occurrences
37 .join_map(&occurrences, |_user, &item_a, &item_b| (item_a, item_b))
38 .filter(|&(item_a, item_b)| item_a != item_b)
41 /* compute the rowsums of C indicating how often we encounter individual items. */
42 let row_sums = occurrences.map(|(_user, item)| item).count();
44 // row_sums.inspect(|record| println!("[row_sums] {:?}", record));
46 /* Join the cooccurrence pairs with the corresponding row sums. */
47 let mut cooccurrences_with_row_sums = cooccurrences
48 .map(|((item_a, item_b), num_cooccurrences)| (item_a, (item_b, num_cooccurrences)))
51 |&item_a, &(item_b, num_cooccurrences), &row_sum_a| {
52 assert!(row_sum_a > 0);
53 (item_b, (item_a, num_cooccurrences, row_sum_a))
58 |&item_b, &(item_a, num_cooccurrences, row_sum_a), &row_sum_b| {
59 assert!(row_sum_a > 0);
60 assert!(row_sum_b > 0);
61 (item_a, (item_b, num_cooccurrences, row_sum_a, row_sum_b))
65 // cooccurrences_with_row_sums
66 // .inspect(|record| println!("[cooccurrences_with_row_sums] {:?}", record));
68 // //TODO compute top-k "similar items" per item
69 // /* Compute LLR scores for each item pair. */
70 // let llr_scores = cooccurrences_with_row_sums.map(
71 // |(item_a, (item_b, num_cooccurrences, row_sum_a, row_sum_b))| {
74 // "[llr_scores] item_a={} item_b={}, num_cooccurrences={} row_sum_a={} row_sum_b={}",
75 // item_a, item_b, num_cooccurrences, row_sum_a, row_sum_b);
77 // let k11: isize = num_cooccurrences;
78 // let k12: isize = row_sum_a as isize - k11;
79 // let k21: isize = row_sum_b as isize - k11;
80 // let k22: isize = 10000 - k12 - k21 + k11;
82 // let llr_score = loglikelihoodratio::log_likelihood_ratio(k11, k12, k21, k22);
84 // ((item_a, item_b), llr_score)
88 cooccurrences_with_row_sums =
89 cooccurrences_with_row_sums.inspect(|x| println!("change: {:?}", x));
92 let probe = cooccurrences_with_row_sums.probe();
94 // produce the (item, item) collection
95 let cooccurrences = occurrences
96 .join_map(&occurrences, |_user, &item_a, &item_b| (item_a, item_b));
97 // count the occurrences of each item.
98 let counts = cooccurrences
99 .map(|(item_a,_)| item_a)
101 // produce ((item1, item2), count1, count2, count12) tuples
102 let cooccurrences_with_counts = cooccurrences
103 .join_map(&counts, |&item_a, &item_b, &count_item_a| (item_b, (item_a, count_item_a)))
104 .join_map(&counts, |&item_b, &(item_a, count_item_a), &count_item_b| {
105 ((item_a, item_b), count_item_a, count_item_b)
107 let probe = cooccurrences_with_counts
108 .inspect(|x| println!("change: {:?}", x))
114 let seed: &[_] = &[1, 2, 3, index];
115 let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions
116 let mut rng2: StdRng = SeedableRng::from_seed(seed); // rng for edge deletions
118 let mut input = InputSession::from(input);
120 for count in 0..scale {
121 if count % peers == index {
122 let user = rng1.gen_range(0, users);
123 let item = rng1.gen_range(0, items);
124 // println!("[INITIAL INPUT] ({}, {})", user, item);
125 input.insert((user, item));
129 // load the initial data up!
130 while probe.less_than(input.time()) {
135 for element in (round * batch)..((round + 1) * batch) {
136 if element % peers == index {
137 // advance the input timestamp.
138 input.advance_to(round * batch);
139 // insert a new item.
140 let user = rng1.gen_range(0, users);
141 let item = rng1.gen_range(0, items);
143 println!("[INPUT: insert] ({}, {})", user, item);
145 input.insert((user, item));
146 // remove an old item.
147 let user = rng2.gen_range(0, users);
148 let item = rng2.gen_range(0, items);
150 println!("[INPUT: remove] ({}, {})", user, item);
152 input.remove((user, item));
156 input.advance_to(round * batch);
159 while probe.less_than(input.time()) {