]> git.lizzy.rs Git - rust.git/blob - src/test/ui/mpsc_stress.rs
Auto merge of #85556 - FabianWolff:issue-85071, r=estebank,jackh726
[rust.git] / src / test / ui / mpsc_stress.rs
1 // run-pass
2 // compile-flags:--test
3 // ignore-emscripten
4
5 use std::sync::mpsc::channel;
6 use std::sync::mpsc::TryRecvError;
7 use std::sync::mpsc::RecvError;
8 use std::sync::mpsc::RecvTimeoutError;
9 use std::sync::Arc;
10 use std::sync::atomic::AtomicUsize;
11 use std::sync::atomic::Ordering;
12
13 use std::thread;
14 use std::time::Duration;
15
16
17 /// Simple thread synchronization utility
18 struct Barrier {
19     // Not using mutex/condvar for precision
20     shared: Arc<AtomicUsize>,
21     count: usize,
22 }
23
24 impl Barrier {
25     fn new(count: usize) -> Vec<Barrier> {
26         let shared = Arc::new(AtomicUsize::new(0));
27         (0..count).map(|_| Barrier { shared: shared.clone(), count: count }).collect()
28     }
29
30     fn new2() -> (Barrier, Barrier) {
31         let mut v = Barrier::new(2);
32         (v.pop().unwrap(), v.pop().unwrap())
33     }
34
35     /// Returns when `count` threads enter `wait`
36     fn wait(self) {
37         self.shared.fetch_add(1, Ordering::SeqCst);
38         while self.shared.load(Ordering::SeqCst) != self.count {
39             #[cfg(target_env = "sgx")]
40             thread::yield_now();
41         }
42     }
43 }
44
45
46 fn shared_close_sender_does_not_lose_messages_iter() {
47     let (tb, rb) = Barrier::new2();
48
49     let (tx, rx) = channel();
50     let _ = tx.clone(); // convert to shared
51
52     thread::spawn(move || {
53         tb.wait();
54         thread::sleep(Duration::from_micros(1));
55         tx.send(17).expect("send");
56         drop(tx);
57     });
58
59     let i = rx.into_iter();
60     rb.wait();
61     // Make sure it doesn't return disconnected before returning an element
62     assert_eq!(vec![17], i.collect::<Vec<_>>());
63 }
64
65 #[test]
66 fn shared_close_sender_does_not_lose_messages() {
67     for _ in 0..10000 {
68         shared_close_sender_does_not_lose_messages_iter();
69     }
70 }
71
72
73 // https://github.com/rust-lang/rust/issues/39364
74 fn concurrent_recv_timeout_and_upgrade_iter() {
75     // 1 us
76     let sleep = Duration::new(0, 1_000);
77
78     let (a, b) = Barrier::new2();
79     let (tx, rx) = channel();
80     let th = thread::spawn(move || {
81         a.wait();
82         loop {
83             match rx.recv_timeout(sleep) {
84                 Ok(_) => {
85                     break;
86                 },
87                 Err(_) => {},
88             }
89         }
90     });
91     b.wait();
92     thread::sleep(sleep);
93     tx.clone().send(()).expect("send");
94     th.join().unwrap();
95 }
96
97 #[test]
98 fn concurrent_recv_timeout_and_upgrade() {
99     // FIXME: fix and enable
100     if true { return }
101
102     // at the moment of writing this test fails like this:
103     // thread '<unnamed>' panicked at 'assertion failed: `(left == right)`
104     //  left: `4561387584`,
105     // right: `0`', libstd/sync/mpsc/shared.rs:253:13
106
107     for _ in 0..10000 {
108         concurrent_recv_timeout_and_upgrade_iter();
109     }
110 }
111
112
113 fn concurrent_writes_iter() {
114     const THREADS: usize = 4;
115     const PER_THR: usize = 100;
116
117     let mut bs = Barrier::new(THREADS + 1);
118     let (tx, rx) = channel();
119
120     let mut threads = Vec::new();
121     for j in 0..THREADS {
122         let tx = tx.clone();
123         let b = bs.pop().unwrap();
124         threads.push(thread::spawn(move || {
125             b.wait();
126             for i in 0..PER_THR {
127                 tx.send(j * 1000 + i).expect("send");
128             }
129         }));
130     }
131
132     let b = bs.pop().unwrap();
133     b.wait();
134
135     let mut v: Vec<_> = rx.iter().take(THREADS * PER_THR).collect();
136     v.sort();
137
138     for j in 0..THREADS {
139         for i in 0..PER_THR {
140             assert_eq!(j * 1000 + i, v[j * PER_THR + i]);
141         }
142     }
143
144     for t in threads {
145         t.join().unwrap();
146     }
147
148     let one_us = Duration::new(0, 1000);
149
150     assert_eq!(TryRecvError::Empty, rx.try_recv().unwrap_err());
151     assert_eq!(RecvTimeoutError::Timeout, rx.recv_timeout(one_us).unwrap_err());
152
153     drop(tx);
154
155     assert_eq!(RecvError, rx.recv().unwrap_err());
156     assert_eq!(RecvTimeoutError::Disconnected, rx.recv_timeout(one_us).unwrap_err());
157     assert_eq!(TryRecvError::Disconnected, rx.try_recv().unwrap_err());
158 }
159
160 #[test]
161 fn concurrent_writes() {
162     for _ in 0..100 {
163         concurrent_writes_iter();
164     }
165 }