2 // compile-flags:--test
5 use std::sync::mpsc::channel;
6 use std::sync::mpsc::TryRecvError;
7 use std::sync::mpsc::RecvError;
8 use std::sync::mpsc::RecvTimeoutError;
10 use std::sync::atomic::AtomicUsize;
11 use std::sync::atomic::Ordering;
14 use std::time::Duration;
17 /// Simple thread synchronization utility
19 // Not using mutex/condvar for precision
20 shared: Arc<AtomicUsize>,
25 fn new(count: usize) -> Vec<Barrier> {
26 let shared = Arc::new(AtomicUsize::new(0));
27 (0..count).map(|_| Barrier { shared: shared.clone(), count: count }).collect()
30 fn new2() -> (Barrier, Barrier) {
31 let mut v = Barrier::new(2);
32 (v.pop().unwrap(), v.pop().unwrap())
35 /// Returns when `count` threads enter `wait`
37 self.shared.fetch_add(1, Ordering::SeqCst);
38 while self.shared.load(Ordering::SeqCst) != self.count {
39 #[cfg(target_env = "sgx")]
46 fn shared_close_sender_does_not_lose_messages_iter() {
47 let (tb, rb) = Barrier::new2();
49 let (tx, rx) = channel();
50 let _ = tx.clone(); // convert to shared
52 thread::spawn(move || {
54 thread::sleep(Duration::from_micros(1));
55 tx.send(17).expect("send");
59 let i = rx.into_iter();
61 // Make sure it doesn't return disconnected before returning an element
62 assert_eq!(vec![17], i.collect::<Vec<_>>());
66 fn shared_close_sender_does_not_lose_messages() {
68 shared_close_sender_does_not_lose_messages_iter();
73 // https://github.com/rust-lang/rust/issues/39364
74 fn concurrent_recv_timeout_and_upgrade_iter() {
76 let sleep = Duration::new(0, 1_000);
78 let (a, b) = Barrier::new2();
79 let (tx, rx) = channel();
80 let th = thread::spawn(move || {
83 match rx.recv_timeout(sleep) {
93 tx.clone().send(()).expect("send");
98 fn concurrent_recv_timeout_and_upgrade() {
99 // FIXME: fix and enable
102 // at the moment of writing this test fails like this:
103 // thread '<unnamed>' panicked at 'assertion failed: `(left == right)`
104 // left: `4561387584`,
105 // right: `0`', libstd/sync/mpsc/shared.rs:253:13
108 concurrent_recv_timeout_and_upgrade_iter();
113 fn concurrent_writes_iter() {
114 const THREADS: usize = 4;
115 const PER_THR: usize = 100;
117 let mut bs = Barrier::new(THREADS + 1);
118 let (tx, rx) = channel();
120 let mut threads = Vec::new();
121 for j in 0..THREADS {
123 let b = bs.pop().unwrap();
124 threads.push(thread::spawn(move || {
126 for i in 0..PER_THR {
127 tx.send(j * 1000 + i).expect("send");
132 let b = bs.pop().unwrap();
135 let mut v: Vec<_> = rx.iter().take(THREADS * PER_THR).collect();
138 for j in 0..THREADS {
139 for i in 0..PER_THR {
140 assert_eq!(j * 1000 + i, v[j * PER_THR + i]);
148 let one_us = Duration::new(0, 1000);
150 assert_eq!(TryRecvError::Empty, rx.try_recv().unwrap_err());
151 assert_eq!(RecvTimeoutError::Timeout, rx.recv_timeout(one_us).unwrap_err());
155 assert_eq!(RecvError, rx.recv().unwrap_err());
156 assert_eq!(RecvTimeoutError::Disconnected, rx.recv_timeout(one_us).unwrap_err());
157 assert_eq!(TryRecvError::Disconnected, rx.try_recv().unwrap_err());
161 fn concurrent_writes() {
163 concurrent_writes_iter();