]> git.lizzy.rs Git - rust.git/blob - src/tools/rustfmt/tests/target/issue-2896.rs
Rollup merge of #85361 - bjorn3:rustdoc_target_json_path_canonicalize, r=jyn514
[rust.git] / src / tools / rustfmt / tests / target / issue-2896.rs
1 extern crate differential_dataflow;
2 extern crate rand;
3 extern crate timely;
4
5 use rand::{Rng, SeedableRng, StdRng};
6
7 use timely::dataflow::operators::*;
8
9 use differential_dataflow::input::InputSession;
10 use differential_dataflow::operators::*;
11 use differential_dataflow::AsCollection;
12
13 // mod loglikelihoodratio;
14
15 fn main() {
16     // define a new timely dataflow computation.
17     timely::execute_from_args(std::env::args().skip(6), move |worker| {
18         // capture parameters of the experiment.
19         let users: usize = std::env::args().nth(1).unwrap().parse().unwrap();
20         let items: usize = std::env::args().nth(2).unwrap().parse().unwrap();
21         let scale: usize = std::env::args().nth(3).unwrap().parse().unwrap();
22         let batch: usize = std::env::args().nth(4).unwrap().parse().unwrap();
23         let noisy: bool = std::env::args().nth(5).unwrap() == "noisy";
24
25         let index = worker.index();
26         let peers = worker.peers();
27
28         let (input, probe) = worker.dataflow(|scope| {
29             // input of (user, item) collection.
30             let (input, occurrences) = scope.new_input();
31             let occurrences = occurrences.as_collection();
32
33             //TODO adjust code to only work with upper triangular half of cooccurrence matrix
34
35             /* Compute the cooccurrence matrix C = A'A from the binary interaction matrix A. */
36             let cooccurrences = occurrences
37                 .join_map(&occurrences, |_user, &item_a, &item_b| (item_a, item_b))
38                 .filter(|&(item_a, item_b)| item_a != item_b)
39                 .count();
40
41             /* compute the rowsums of C indicating how often we encounter individual items. */
42             let row_sums = occurrences.map(|(_user, item)| item).count();
43
44             // row_sums.inspect(|record| println!("[row_sums] {:?}", record));
45
46             /* Join the cooccurrence pairs with the corresponding row sums. */
47             let mut cooccurrences_with_row_sums = cooccurrences
48                 .map(|((item_a, item_b), num_cooccurrences)| (item_a, (item_b, num_cooccurrences)))
49                 .join_map(
50                     &row_sums,
51                     |&item_a, &(item_b, num_cooccurrences), &row_sum_a| {
52                         assert!(row_sum_a > 0);
53                         (item_b, (item_a, num_cooccurrences, row_sum_a))
54                     },
55                 )
56                 .join_map(
57                     &row_sums,
58                     |&item_b, &(item_a, num_cooccurrences, row_sum_a), &row_sum_b| {
59                         assert!(row_sum_a > 0);
60                         assert!(row_sum_b > 0);
61                         (item_a, (item_b, num_cooccurrences, row_sum_a, row_sum_b))
62                     },
63                 );
64
65             // cooccurrences_with_row_sums
66             //     .inspect(|record| println!("[cooccurrences_with_row_sums] {:?}", record));
67
68             // //TODO compute top-k "similar items" per item
69             // /* Compute LLR scores for each item pair. */
70             // let llr_scores = cooccurrences_with_row_sums.map(
71             //   |(item_a, (item_b, num_cooccurrences, row_sum_a, row_sum_b))| {
72
73             //     println!(
74             //       "[llr_scores] item_a={} item_b={}, num_cooccurrences={} row_sum_a={} row_sum_b={}",
75             //       item_a, item_b, num_cooccurrences, row_sum_a, row_sum_b);
76
77             //     let k11: isize = num_cooccurrences;
78             //     let k12: isize = row_sum_a as isize - k11;
79             //     let k21: isize = row_sum_b as isize - k11;
80             //     let k22: isize = 10000 - k12 - k21 + k11;
81
82             //     let llr_score = loglikelihoodratio::log_likelihood_ratio(k11, k12, k21, k22);
83
84             //     ((item_a, item_b), llr_score)
85             //   });
86
87             if noisy {
88                 cooccurrences_with_row_sums =
89                     cooccurrences_with_row_sums.inspect(|x| println!("change: {:?}", x));
90             }
91
92             let probe = cooccurrences_with_row_sums.probe();
93             /*
94                   // produce the (item, item) collection
95                   let cooccurrences = occurrences
96                     .join_map(&occurrences, |_user, &item_a, &item_b| (item_a, item_b));
97                   // count the occurrences of each item.
98                   let counts = cooccurrences
99                     .map(|(item_a,_)| item_a)
100                     .count();
101                   // produce ((item1, item2), count1, count2, count12) tuples
102                   let cooccurrences_with_counts = cooccurrences
103                     .join_map(&counts, |&item_a, &item_b, &count_item_a| (item_b, (item_a, count_item_a)))
104                     .join_map(&counts, |&item_b, &(item_a, count_item_a), &count_item_b| {
105                       ((item_a, item_b), count_item_a, count_item_b)
106                     });
107                   let probe = cooccurrences_with_counts
108                     .inspect(|x| println!("change: {:?}", x))
109                     .probe();
110             */
111             (input, probe)
112         });
113
114         let seed: &[_] = &[1, 2, 3, index];
115         let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions
116         let mut rng2: StdRng = SeedableRng::from_seed(seed); // rng for edge deletions
117
118         let mut input = InputSession::from(input);
119
120         for count in 0..scale {
121             if count % peers == index {
122                 let user = rng1.gen_range(0, users);
123                 let item = rng1.gen_range(0, items);
124                 // println!("[INITIAL INPUT] ({}, {})", user, item);
125                 input.insert((user, item));
126             }
127         }
128
129         // load the initial data up!
130         while probe.less_than(input.time()) {
131             worker.step();
132         }
133
134         for round in 1.. {
135             for element in (round * batch)..((round + 1) * batch) {
136                 if element % peers == index {
137                     // advance the input timestamp.
138                     input.advance_to(round * batch);
139                     // insert a new item.
140                     let user = rng1.gen_range(0, users);
141                     let item = rng1.gen_range(0, items);
142                     if noisy {
143                         println!("[INPUT: insert] ({}, {})", user, item);
144                     }
145                     input.insert((user, item));
146                     // remove an old item.
147                     let user = rng2.gen_range(0, users);
148                     let item = rng2.gen_range(0, items);
149                     if noisy {
150                         println!("[INPUT: remove] ({}, {})", user, item);
151                     }
152                     input.remove((user, item));
153                 }
154             }
155
156             input.advance_to(round * batch);
157             input.flush();
158
159             while probe.less_than(input.time()) {
160                 worker.step();
161             }
162         }
163     })
164     .unwrap();
165 }