4 use crate::time::{Duration, Instant};
6 pub fn stress_factor() -> usize {
7 match env::var("RUST_TEST_STRESS") {
8 Ok(val) => val.parse().unwrap(),
15 let (tx, rx) = channel::<i32>();
17 assert_eq!(rx.recv().unwrap(), 1);
22 let (tx, _rx) = channel::<Box<isize>>();
23 tx.send(Box::new(1)).unwrap();
27 fn drop_full_shared() {
28 let (tx, _rx) = channel::<Box<isize>>();
31 tx.send(Box::new(1)).unwrap();
36 let (tx, rx) = channel::<i32>();
38 assert_eq!(rx.recv().unwrap(), 1);
41 assert_eq!(rx.recv().unwrap(), 1);
46 let (tx, rx) = channel::<i32>();
47 let _t = thread::spawn(move || {
50 assert_eq!(rx.recv().unwrap(), 1);
54 fn smoke_port_gone() {
55 let (tx, rx) = channel::<i32>();
57 assert!(tx.send(1).is_err());
61 fn smoke_shared_port_gone() {
62 let (tx, rx) = channel::<i32>();
64 assert!(tx.send(1).is_err())
68 fn smoke_shared_port_gone2() {
69 let (tx, rx) = channel::<i32>();
73 assert!(tx2.send(1).is_err());
77 fn port_gone_concurrent() {
78 let (tx, rx) = channel::<i32>();
79 let _t = thread::spawn(move || {
82 while tx.send(1).is_ok() {}
86 fn port_gone_concurrent_shared() {
87 let (tx, rx) = channel::<i32>();
89 let _t = thread::spawn(move || {
92 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
96 fn smoke_chan_gone() {
97 let (tx, rx) = channel::<i32>();
99 assert!(rx.recv().is_err());
103 fn smoke_chan_gone_shared() {
104 let (tx, rx) = channel::<()>();
105 let tx2 = tx.clone();
108 assert!(rx.recv().is_err());
112 fn chan_gone_concurrent() {
113 let (tx, rx) = channel::<i32>();
114 let _t = thread::spawn(move || {
118 while rx.recv().is_ok() {}
123 let count = if cfg!(miri) { 100 } else { 10000 };
124 let (tx, rx) = channel::<i32>();
125 let t = thread::spawn(move || {
131 assert_eq!(rx.recv().unwrap(), 1);
133 t.join().ok().expect("thread panicked");
138 const AMT: u32 = if cfg!(miri) { 100 } else { 10000 };
139 const NTHREADS: u32 = 8;
140 let (tx, rx) = channel::<i32>();
142 let t = thread::spawn(move || {
143 for _ in 0..AMT * NTHREADS {
144 assert_eq!(rx.recv().unwrap(), 1);
146 match rx.try_recv() {
152 for _ in 0..NTHREADS {
154 thread::spawn(move || {
161 t.join().ok().expect("thread panicked");
165 fn send_from_outside_runtime() {
166 let (tx1, rx1) = channel::<()>();
167 let (tx2, rx2) = channel::<i32>();
168 let t1 = thread::spawn(move || {
169 tx1.send(()).unwrap();
171 assert_eq!(rx2.recv().unwrap(), 1);
175 let t2 = thread::spawn(move || {
177 tx2.send(1).unwrap();
180 t1.join().ok().expect("thread panicked");
181 t2.join().ok().expect("thread panicked");
185 fn recv_from_outside_runtime() {
186 let (tx, rx) = channel::<i32>();
187 let t = thread::spawn(move || {
189 assert_eq!(rx.recv().unwrap(), 1);
195 t.join().ok().expect("thread panicked");
200 let (tx1, rx1) = channel::<i32>();
201 let (tx2, rx2) = channel::<i32>();
202 let t1 = thread::spawn(move || {
203 assert_eq!(rx1.recv().unwrap(), 1);
204 tx2.send(2).unwrap();
206 let t2 = thread::spawn(move || {
207 tx1.send(1).unwrap();
208 assert_eq!(rx2.recv().unwrap(), 2);
210 t1.join().ok().expect("thread panicked");
211 t2.join().ok().expect("thread panicked");
215 fn oneshot_single_thread_close_port_first() {
216 // Simple test of closing without sending
217 let (_tx, rx) = channel::<i32>();
222 fn oneshot_single_thread_close_chan_first() {
223 // Simple test of closing without sending
224 let (tx, _rx) = channel::<i32>();
229 fn oneshot_single_thread_send_port_close() {
230 // Testing that the sender cleans up the payload if receiver is closed
231 let (tx, rx) = channel::<Box<i32>>();
233 assert!(tx.send(Box::new(0)).is_err());
237 fn oneshot_single_thread_recv_chan_close() {
238 // Receiving on a closed chan will panic
239 let res = thread::spawn(move || {
240 let (tx, rx) = channel::<i32>();
246 assert!(res.is_err());
250 fn oneshot_single_thread_send_then_recv() {
251 let (tx, rx) = channel::<Box<i32>>();
252 tx.send(Box::new(10)).unwrap();
253 assert!(*rx.recv().unwrap() == 10);
257 fn oneshot_single_thread_try_send_open() {
258 let (tx, rx) = channel::<i32>();
259 assert!(tx.send(10).is_ok());
260 assert!(rx.recv().unwrap() == 10);
264 fn oneshot_single_thread_try_send_closed() {
265 let (tx, rx) = channel::<i32>();
267 assert!(tx.send(10).is_err());
271 fn oneshot_single_thread_try_recv_open() {
272 let (tx, rx) = channel::<i32>();
273 tx.send(10).unwrap();
274 assert!(rx.recv() == Ok(10));
278 fn oneshot_single_thread_try_recv_closed() {
279 let (tx, rx) = channel::<i32>();
281 assert!(rx.recv().is_err());
285 fn oneshot_single_thread_peek_data() {
286 let (tx, rx) = channel::<i32>();
287 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
288 tx.send(10).unwrap();
289 assert_eq!(rx.try_recv(), Ok(10));
293 fn oneshot_single_thread_peek_close() {
294 let (tx, rx) = channel::<i32>();
296 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
297 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
301 fn oneshot_single_thread_peek_open() {
302 let (_tx, rx) = channel::<i32>();
303 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
307 fn oneshot_multi_task_recv_then_send() {
308 let (tx, rx) = channel::<Box<i32>>();
309 let _t = thread::spawn(move || {
310 assert!(*rx.recv().unwrap() == 10);
313 tx.send(Box::new(10)).unwrap();
317 fn oneshot_multi_task_recv_then_close() {
318 let (tx, rx) = channel::<Box<i32>>();
319 let _t = thread::spawn(move || {
322 let res = thread::spawn(move || {
323 assert!(*rx.recv().unwrap() == 10);
326 assert!(res.is_err());
330 fn oneshot_multi_thread_close_stress() {
331 for _ in 0..stress_factor() {
332 let (tx, rx) = channel::<i32>();
333 let _t = thread::spawn(move || {
341 fn oneshot_multi_thread_send_close_stress() {
342 for _ in 0..stress_factor() {
343 let (tx, rx) = channel::<i32>();
344 let _t = thread::spawn(move || {
347 let _ = thread::spawn(move || {
355 fn oneshot_multi_thread_recv_close_stress() {
356 for _ in 0..stress_factor() {
357 let (tx, rx) = channel::<i32>();
358 thread::spawn(move || {
359 let res = thread::spawn(move || {
363 assert!(res.is_err());
365 let _t = thread::spawn(move || {
366 thread::spawn(move || {
374 fn oneshot_multi_thread_send_recv_stress() {
375 for _ in 0..stress_factor() {
376 let (tx, rx) = channel::<Box<isize>>();
377 let _t = thread::spawn(move || {
378 tx.send(Box::new(10)).unwrap();
380 assert!(*rx.recv().unwrap() == 10);
385 fn stream_send_recv_stress() {
386 for _ in 0..stress_factor() {
387 let (tx, rx) = channel();
392 fn send(tx: Sender<Box<i32>>, i: i32) {
397 thread::spawn(move || {
398 tx.send(Box::new(i)).unwrap();
403 fn recv(rx: Receiver<Box<i32>>, i: i32) {
408 thread::spawn(move || {
409 assert!(*rx.recv().unwrap() == i);
417 fn oneshot_single_thread_recv_timeout() {
418 let (tx, rx) = channel();
419 tx.send(()).unwrap();
420 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
421 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
422 tx.send(()).unwrap();
423 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
427 fn stress_recv_timeout_two_threads() {
428 let (tx, rx) = channel();
429 let stress = stress_factor() + 100;
430 let timeout = Duration::from_millis(100);
432 thread::spawn(move || {
435 thread::sleep(timeout * 2);
437 tx.send(1usize).unwrap();
441 let mut recv_count = 0;
443 match rx.recv_timeout(timeout) {
445 assert_eq!(n, 1usize);
448 Err(RecvTimeoutError::Timeout) => continue,
449 Err(RecvTimeoutError::Disconnected) => break,
453 assert_eq!(recv_count, stress);
457 fn recv_timeout_upgrade() {
458 let (tx, rx) = channel::<()>();
459 let timeout = Duration::from_millis(1);
460 let _tx_clone = tx.clone();
462 let start = Instant::now();
463 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
464 assert!(Instant::now() >= start + timeout);
468 fn stress_recv_timeout_shared() {
469 let (tx, rx) = channel();
470 let stress = stress_factor() + 100;
474 thread::spawn(move || {
475 thread::sleep(Duration::from_millis(i as u64 * 10));
476 tx.send(1usize).unwrap();
482 let mut recv_count = 0;
484 match rx.recv_timeout(Duration::from_millis(10)) {
486 assert_eq!(n, 1usize);
489 Err(RecvTimeoutError::Timeout) => continue,
490 Err(RecvTimeoutError::Disconnected) => break,
494 assert_eq!(recv_count, stress);
498 fn very_long_recv_timeout_wont_panic() {
499 let (tx, rx) = channel::<()>();
500 let join_handle = thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::MAX)));
501 thread::sleep(Duration::from_secs(1));
502 assert!(tx.send(()).is_ok());
503 assert_eq!(join_handle.join().unwrap(), Ok(()));
508 let count = if cfg!(miri) { 1000 } else { 10000 };
509 // Regression test that we don't run out of stack in scheduler context
510 let (tx, rx) = channel();
512 tx.send(()).unwrap();
520 fn shared_recv_timeout() {
521 let (tx, rx) = channel();
525 thread::spawn(move || {
526 tx.send(()).unwrap();
534 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
535 tx.send(()).unwrap();
536 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
540 fn shared_chan_stress() {
541 let (tx, rx) = channel();
542 let total = stress_factor() + 100;
545 thread::spawn(move || {
546 tx.send(()).unwrap();
556 fn test_nested_recv_iter() {
557 let (tx, rx) = channel::<i32>();
558 let (total_tx, total_rx) = channel::<i32>();
560 let _t = thread::spawn(move || {
565 total_tx.send(acc).unwrap();
572 assert_eq!(total_rx.recv().unwrap(), 6);
576 fn test_recv_iter_break() {
577 let (tx, rx) = channel::<i32>();
578 let (count_tx, count_rx) = channel();
580 let _t = thread::spawn(move || {
589 count_tx.send(count).unwrap();
597 assert_eq!(count_rx.recv().unwrap(), 4);
601 fn test_recv_try_iter() {
602 let (request_tx, request_rx) = channel();
603 let (response_tx, response_rx) = channel();
605 // Request `x`s until we have `6`.
606 let t = thread::spawn(move || {
609 for x in response_rx.try_iter() {
615 request_tx.send(()).unwrap();
619 for _ in request_rx.iter() {
620 if response_tx.send(2).is_err() {
625 assert_eq!(t.join().unwrap(), 6);
629 fn test_recv_into_iter_owned() {
631 let (tx, rx) = channel::<i32>();
637 assert_eq!(iter.next().unwrap(), 1);
638 assert_eq!(iter.next().unwrap(), 2);
639 assert_eq!(iter.next().is_none(), true);
643 fn test_recv_into_iter_borrowed() {
644 let (tx, rx) = channel::<i32>();
648 let mut iter = (&rx).into_iter();
649 assert_eq!(iter.next().unwrap(), 1);
650 assert_eq!(iter.next().unwrap(), 2);
651 assert_eq!(iter.next().is_none(), true);
655 fn try_recv_states() {
656 let (tx1, rx1) = channel::<i32>();
657 let (tx2, rx2) = channel::<()>();
658 let (tx3, rx3) = channel::<()>();
659 let _t = thread::spawn(move || {
661 tx1.send(1).unwrap();
662 tx3.send(()).unwrap();
665 tx3.send(()).unwrap();
668 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
669 tx2.send(()).unwrap();
671 assert_eq!(rx1.try_recv(), Ok(1));
672 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
673 tx2.send(()).unwrap();
675 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
678 // This bug used to end up in a livelock inside of the Receiver destructor
679 // because the internal state of the Shared packet was corrupted
681 fn destroy_upgraded_shared_port_when_sender_still_active() {
682 let (tx, rx) = channel();
683 let (tx2, rx2) = channel();
684 let _t = thread::spawn(move || {
685 rx.recv().unwrap(); // wait on a oneshot
686 drop(rx); // destroy a shared
687 tx2.send(()).unwrap();
689 // make sure the other thread has gone to sleep
694 // upgrade to a shared chan and send a message
699 // wait for the child thread to exit before we exit
705 let (tx, _) = channel();
706 let _ = tx.send(123);
707 assert_eq!(tx.send(123), Err(SendError(123)));
712 let (tx, rx) = channel::<()>();
713 let t = thread::spawn(move || {
714 thread::sleep(Duration::from_millis(300));
716 crate::mem::forget(tx);
719 let _ = rx.recv_timeout(Duration::from_millis(500));
721 let _ = rx.recv_timeout(Duration::from_millis(500));