2 // compile-flags:--test
4 // ignore-sgx no thread sleep support
6 use std::sync::mpsc::channel;
7 use std::sync::mpsc::TryRecvError;
8 use std::sync::mpsc::RecvError;
9 use std::sync::mpsc::RecvTimeoutError;
11 use std::sync::atomic::AtomicUsize;
12 use std::sync::atomic::Ordering;
15 use std::time::Duration;
18 /// Simple thread synchronization utility
20 // Not using mutex/condvar for precision
21 shared: Arc<AtomicUsize>,
26 fn new(count: usize) -> Vec<Barrier> {
27 let shared = Arc::new(AtomicUsize::new(0));
28 (0..count).map(|_| Barrier { shared: shared.clone(), count: count }).collect()
31 fn new2() -> (Barrier, Barrier) {
32 let mut v = Barrier::new(2);
33 (v.pop().unwrap(), v.pop().unwrap())
36 /// Returns when `count` threads enter `wait`
38 self.shared.fetch_add(1, Ordering::SeqCst);
39 while self.shared.load(Ordering::SeqCst) != self.count {
45 fn shared_close_sender_does_not_lose_messages_iter() {
46 let (tb, rb) = Barrier::new2();
48 let (tx, rx) = channel();
49 let _ = tx.clone(); // convert to shared
51 thread::spawn(move || {
53 thread::sleep(Duration::from_micros(1));
54 tx.send(17).expect("send");
58 let i = rx.into_iter();
60 // Make sure it doesn't return disconnected before returning an element
61 assert_eq!(vec![17], i.collect::<Vec<_>>());
65 fn shared_close_sender_does_not_lose_messages() {
67 shared_close_sender_does_not_lose_messages_iter();
72 // https://github.com/rust-lang/rust/issues/39364
73 fn concurrent_recv_timeout_and_upgrade_iter() {
75 let sleep = Duration::new(0, 1_000);
77 let (a, b) = Barrier::new2();
78 let (tx, rx) = channel();
79 let th = thread::spawn(move || {
82 match rx.recv_timeout(sleep) {
92 tx.clone().send(()).expect("send");
97 fn concurrent_recv_timeout_and_upgrade() {
98 // FIXME: fix and enable
101 // at the moment of writing this test fails like this:
102 // thread '<unnamed>' panicked at 'assertion failed: `(left == right)`
103 // left: `4561387584`,
104 // right: `0`', libstd/sync/mpsc/shared.rs:253:13
107 concurrent_recv_timeout_and_upgrade_iter();
112 fn concurrent_writes_iter() {
113 const THREADS: usize = 4;
114 const PER_THR: usize = 100;
116 let mut bs = Barrier::new(THREADS + 1);
117 let (tx, rx) = channel();
119 let mut threads = Vec::new();
120 for j in 0..THREADS {
122 let b = bs.pop().unwrap();
123 threads.push(thread::spawn(move || {
125 for i in 0..PER_THR {
126 tx.send(j * 1000 + i).expect("send");
131 let b = bs.pop().unwrap();
134 let mut v: Vec<_> = rx.iter().take(THREADS * PER_THR).collect();
137 for j in 0..THREADS {
138 for i in 0..PER_THR {
139 assert_eq!(j * 1000 + i, v[j * PER_THR + i]);
147 let one_us = Duration::new(0, 1000);
149 assert_eq!(TryRecvError::Empty, rx.try_recv().unwrap_err());
150 assert_eq!(RecvTimeoutError::Timeout, rx.recv_timeout(one_us).unwrap_err());
154 assert_eq!(RecvError, rx.recv().unwrap_err());
155 assert_eq!(RecvTimeoutError::Disconnected, rx.recv_timeout(one_us).unwrap_err());
156 assert_eq!(TryRecvError::Disconnected, rx.try_recv().unwrap_err());
160 fn concurrent_writes() {
162 concurrent_writes_iter();