]> git.lizzy.rs Git - rust.git/blob - src/test/ui/mpsc_stress.rs
Merge commit 'ff0993c5e9162ddaea78e83d0f0161e68bd4ea73' into clippy
[rust.git] / src / test / ui / mpsc_stress.rs
1 // run-pass
2 // compile-flags:--test
3 // ignore-emscripten
4 // ignore-sgx no thread sleep support
5
6 use std::sync::mpsc::channel;
7 use std::sync::mpsc::TryRecvError;
8 use std::sync::mpsc::RecvError;
9 use std::sync::mpsc::RecvTimeoutError;
10 use std::sync::Arc;
11 use std::sync::atomic::AtomicUsize;
12 use std::sync::atomic::Ordering;
13
14 use std::thread;
15 use std::time::Duration;
16
17
18 /// Simple thread synchronization utility
19 struct Barrier {
20     // Not using mutex/condvar for precision
21     shared: Arc<AtomicUsize>,
22     count: usize,
23 }
24
25 impl Barrier {
26     fn new(count: usize) -> Vec<Barrier> {
27         let shared = Arc::new(AtomicUsize::new(0));
28         (0..count).map(|_| Barrier { shared: shared.clone(), count: count }).collect()
29     }
30
31     fn new2() -> (Barrier, Barrier) {
32         let mut v = Barrier::new(2);
33         (v.pop().unwrap(), v.pop().unwrap())
34     }
35
36     /// Returns when `count` threads enter `wait`
37     fn wait(self) {
38         self.shared.fetch_add(1, Ordering::SeqCst);
39         while self.shared.load(Ordering::SeqCst) != self.count {
40         }
41     }
42 }
43
44
45 fn shared_close_sender_does_not_lose_messages_iter() {
46     let (tb, rb) = Barrier::new2();
47
48     let (tx, rx) = channel();
49     let _ = tx.clone(); // convert to shared
50
51     thread::spawn(move || {
52         tb.wait();
53         thread::sleep(Duration::from_micros(1));
54         tx.send(17).expect("send");
55         drop(tx);
56     });
57
58     let i = rx.into_iter();
59     rb.wait();
60     // Make sure it doesn't return disconnected before returning an element
61     assert_eq!(vec![17], i.collect::<Vec<_>>());
62 }
63
64 #[test]
65 fn shared_close_sender_does_not_lose_messages() {
66     for _ in 0..10000 {
67         shared_close_sender_does_not_lose_messages_iter();
68     }
69 }
70
71
72 // https://github.com/rust-lang/rust/issues/39364
73 fn concurrent_recv_timeout_and_upgrade_iter() {
74     // 1 us
75     let sleep = Duration::new(0, 1_000);
76
77     let (a, b) = Barrier::new2();
78     let (tx, rx) = channel();
79     let th = thread::spawn(move || {
80         a.wait();
81         loop {
82             match rx.recv_timeout(sleep) {
83                 Ok(_) => {
84                     break;
85                 },
86                 Err(_) => {},
87             }
88         }
89     });
90     b.wait();
91     thread::sleep(sleep);
92     tx.clone().send(()).expect("send");
93     th.join().unwrap();
94 }
95
96 #[test]
97 fn concurrent_recv_timeout_and_upgrade() {
98     // FIXME: fix and enable
99     if true { return }
100
101     // at the moment of writing this test fails like this:
102     // thread '<unnamed>' panicked at 'assertion failed: `(left == right)`
103     //  left: `4561387584`,
104     // right: `0`', libstd/sync/mpsc/shared.rs:253:13
105
106     for _ in 0..10000 {
107         concurrent_recv_timeout_and_upgrade_iter();
108     }
109 }
110
111
112 fn concurrent_writes_iter() {
113     const THREADS: usize = 4;
114     const PER_THR: usize = 100;
115
116     let mut bs = Barrier::new(THREADS + 1);
117     let (tx, rx) = channel();
118
119     let mut threads = Vec::new();
120     for j in 0..THREADS {
121         let tx = tx.clone();
122         let b = bs.pop().unwrap();
123         threads.push(thread::spawn(move || {
124             b.wait();
125             for i in 0..PER_THR {
126                 tx.send(j * 1000 + i).expect("send");
127             }
128         }));
129     }
130
131     let b = bs.pop().unwrap();
132     b.wait();
133
134     let mut v: Vec<_> = rx.iter().take(THREADS * PER_THR).collect();
135     v.sort();
136
137     for j in 0..THREADS {
138         for i in 0..PER_THR {
139             assert_eq!(j * 1000 + i, v[j * PER_THR + i]);
140         }
141     }
142
143     for t in threads {
144         t.join().unwrap();
145     }
146
147     let one_us = Duration::new(0, 1000);
148
149     assert_eq!(TryRecvError::Empty, rx.try_recv().unwrap_err());
150     assert_eq!(RecvTimeoutError::Timeout, rx.recv_timeout(one_us).unwrap_err());
151
152     drop(tx);
153
154     assert_eq!(RecvError, rx.recv().unwrap_err());
155     assert_eq!(RecvTimeoutError::Disconnected, rx.recv_timeout(one_us).unwrap_err());
156     assert_eq!(TryRecvError::Disconnected, rx.try_recv().unwrap_err());
157 }
158
159 #[test]
160 fn concurrent_writes() {
161     for _ in 0..100 {
162         concurrent_writes_iter();
163     }
164 }