]> git.lizzy.rs Git - rust.git/blob - library/std/src/sync/mpsc/tests.rs
add test case for rust-lang#39364
[rust.git] / library / std / src / sync / mpsc / tests.rs
1 use super::*;
2 use crate::env;
3 use crate::thread;
4 use crate::time::{Duration, Instant};
5
6 pub fn stress_factor() -> usize {
7     match env::var("RUST_TEST_STRESS") {
8         Ok(val) => val.parse().unwrap(),
9         Err(..) => 1,
10     }
11 }
12
13 #[test]
14 fn smoke() {
15     let (tx, rx) = channel::<i32>();
16     tx.send(1).unwrap();
17     assert_eq!(rx.recv().unwrap(), 1);
18 }
19
20 #[test]
21 fn drop_full() {
22     let (tx, _rx) = channel::<Box<isize>>();
23     tx.send(Box::new(1)).unwrap();
24 }
25
26 #[test]
27 fn drop_full_shared() {
28     let (tx, _rx) = channel::<Box<isize>>();
29     drop(tx.clone());
30     drop(tx.clone());
31     tx.send(Box::new(1)).unwrap();
32 }
33
34 #[test]
35 fn smoke_shared() {
36     let (tx, rx) = channel::<i32>();
37     tx.send(1).unwrap();
38     assert_eq!(rx.recv().unwrap(), 1);
39     let tx = tx.clone();
40     tx.send(1).unwrap();
41     assert_eq!(rx.recv().unwrap(), 1);
42 }
43
44 #[test]
45 fn smoke_threads() {
46     let (tx, rx) = channel::<i32>();
47     let _t = thread::spawn(move || {
48         tx.send(1).unwrap();
49     });
50     assert_eq!(rx.recv().unwrap(), 1);
51 }
52
53 #[test]
54 fn smoke_port_gone() {
55     let (tx, rx) = channel::<i32>();
56     drop(rx);
57     assert!(tx.send(1).is_err());
58 }
59
60 #[test]
61 fn smoke_shared_port_gone() {
62     let (tx, rx) = channel::<i32>();
63     drop(rx);
64     assert!(tx.send(1).is_err())
65 }
66
67 #[test]
68 fn smoke_shared_port_gone2() {
69     let (tx, rx) = channel::<i32>();
70     drop(rx);
71     let tx2 = tx.clone();
72     drop(tx);
73     assert!(tx2.send(1).is_err());
74 }
75
76 #[test]
77 fn port_gone_concurrent() {
78     let (tx, rx) = channel::<i32>();
79     let _t = thread::spawn(move || {
80         rx.recv().unwrap();
81     });
82     while tx.send(1).is_ok() {}
83 }
84
85 #[test]
86 fn port_gone_concurrent_shared() {
87     let (tx, rx) = channel::<i32>();
88     let tx2 = tx.clone();
89     let _t = thread::spawn(move || {
90         rx.recv().unwrap();
91     });
92     while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
93 }
94
95 #[test]
96 fn smoke_chan_gone() {
97     let (tx, rx) = channel::<i32>();
98     drop(tx);
99     assert!(rx.recv().is_err());
100 }
101
102 #[test]
103 fn smoke_chan_gone_shared() {
104     let (tx, rx) = channel::<()>();
105     let tx2 = tx.clone();
106     drop(tx);
107     drop(tx2);
108     assert!(rx.recv().is_err());
109 }
110
111 #[test]
112 fn chan_gone_concurrent() {
113     let (tx, rx) = channel::<i32>();
114     let _t = thread::spawn(move || {
115         tx.send(1).unwrap();
116         tx.send(1).unwrap();
117     });
118     while rx.recv().is_ok() {}
119 }
120
121 #[test]
122 fn stress() {
123     let count = if cfg!(miri) { 100 } else { 10000 };
124     let (tx, rx) = channel::<i32>();
125     let t = thread::spawn(move || {
126         for _ in 0..count {
127             tx.send(1).unwrap();
128         }
129     });
130     for _ in 0..count {
131         assert_eq!(rx.recv().unwrap(), 1);
132     }
133     t.join().ok().expect("thread panicked");
134 }
135
136 #[test]
137 fn stress_shared() {
138     const AMT: u32 = if cfg!(miri) { 100 } else { 10000 };
139     const NTHREADS: u32 = 8;
140     let (tx, rx) = channel::<i32>();
141
142     let t = thread::spawn(move || {
143         for _ in 0..AMT * NTHREADS {
144             assert_eq!(rx.recv().unwrap(), 1);
145         }
146         match rx.try_recv() {
147             Ok(..) => panic!(),
148             _ => {}
149         }
150     });
151
152     for _ in 0..NTHREADS {
153         let tx = tx.clone();
154         thread::spawn(move || {
155             for _ in 0..AMT {
156                 tx.send(1).unwrap();
157             }
158         });
159     }
160     drop(tx);
161     t.join().ok().expect("thread panicked");
162 }
163
164 #[test]
165 fn send_from_outside_runtime() {
166     let (tx1, rx1) = channel::<()>();
167     let (tx2, rx2) = channel::<i32>();
168     let t1 = thread::spawn(move || {
169         tx1.send(()).unwrap();
170         for _ in 0..40 {
171             assert_eq!(rx2.recv().unwrap(), 1);
172         }
173     });
174     rx1.recv().unwrap();
175     let t2 = thread::spawn(move || {
176         for _ in 0..40 {
177             tx2.send(1).unwrap();
178         }
179     });
180     t1.join().ok().expect("thread panicked");
181     t2.join().ok().expect("thread panicked");
182 }
183
184 #[test]
185 fn recv_from_outside_runtime() {
186     let (tx, rx) = channel::<i32>();
187     let t = thread::spawn(move || {
188         for _ in 0..40 {
189             assert_eq!(rx.recv().unwrap(), 1);
190         }
191     });
192     for _ in 0..40 {
193         tx.send(1).unwrap();
194     }
195     t.join().ok().expect("thread panicked");
196 }
197
198 #[test]
199 fn no_runtime() {
200     let (tx1, rx1) = channel::<i32>();
201     let (tx2, rx2) = channel::<i32>();
202     let t1 = thread::spawn(move || {
203         assert_eq!(rx1.recv().unwrap(), 1);
204         tx2.send(2).unwrap();
205     });
206     let t2 = thread::spawn(move || {
207         tx1.send(1).unwrap();
208         assert_eq!(rx2.recv().unwrap(), 2);
209     });
210     t1.join().ok().expect("thread panicked");
211     t2.join().ok().expect("thread panicked");
212 }
213
214 #[test]
215 fn oneshot_single_thread_close_port_first() {
216     // Simple test of closing without sending
217     let (_tx, rx) = channel::<i32>();
218     drop(rx);
219 }
220
221 #[test]
222 fn oneshot_single_thread_close_chan_first() {
223     // Simple test of closing without sending
224     let (tx, _rx) = channel::<i32>();
225     drop(tx);
226 }
227
228 #[test]
229 fn oneshot_single_thread_send_port_close() {
230     // Testing that the sender cleans up the payload if receiver is closed
231     let (tx, rx) = channel::<Box<i32>>();
232     drop(rx);
233     assert!(tx.send(Box::new(0)).is_err());
234 }
235
236 #[test]
237 fn oneshot_single_thread_recv_chan_close() {
238     // Receiving on a closed chan will panic
239     let res = thread::spawn(move || {
240         let (tx, rx) = channel::<i32>();
241         drop(tx);
242         rx.recv().unwrap();
243     })
244     .join();
245     // What is our res?
246     assert!(res.is_err());
247 }
248
249 #[test]
250 fn oneshot_single_thread_send_then_recv() {
251     let (tx, rx) = channel::<Box<i32>>();
252     tx.send(Box::new(10)).unwrap();
253     assert!(*rx.recv().unwrap() == 10);
254 }
255
256 #[test]
257 fn oneshot_single_thread_try_send_open() {
258     let (tx, rx) = channel::<i32>();
259     assert!(tx.send(10).is_ok());
260     assert!(rx.recv().unwrap() == 10);
261 }
262
263 #[test]
264 fn oneshot_single_thread_try_send_closed() {
265     let (tx, rx) = channel::<i32>();
266     drop(rx);
267     assert!(tx.send(10).is_err());
268 }
269
270 #[test]
271 fn oneshot_single_thread_try_recv_open() {
272     let (tx, rx) = channel::<i32>();
273     tx.send(10).unwrap();
274     assert!(rx.recv() == Ok(10));
275 }
276
277 #[test]
278 fn oneshot_single_thread_try_recv_closed() {
279     let (tx, rx) = channel::<i32>();
280     drop(tx);
281     assert!(rx.recv().is_err());
282 }
283
284 #[test]
285 fn oneshot_single_thread_peek_data() {
286     let (tx, rx) = channel::<i32>();
287     assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
288     tx.send(10).unwrap();
289     assert_eq!(rx.try_recv(), Ok(10));
290 }
291
292 #[test]
293 fn oneshot_single_thread_peek_close() {
294     let (tx, rx) = channel::<i32>();
295     drop(tx);
296     assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
297     assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
298 }
299
300 #[test]
301 fn oneshot_single_thread_peek_open() {
302     let (_tx, rx) = channel::<i32>();
303     assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
304 }
305
306 #[test]
307 fn oneshot_multi_task_recv_then_send() {
308     let (tx, rx) = channel::<Box<i32>>();
309     let _t = thread::spawn(move || {
310         assert!(*rx.recv().unwrap() == 10);
311     });
312
313     tx.send(Box::new(10)).unwrap();
314 }
315
316 #[test]
317 fn oneshot_multi_task_recv_then_close() {
318     let (tx, rx) = channel::<Box<i32>>();
319     let _t = thread::spawn(move || {
320         drop(tx);
321     });
322     let res = thread::spawn(move || {
323         assert!(*rx.recv().unwrap() == 10);
324     })
325     .join();
326     assert!(res.is_err());
327 }
328
329 #[test]
330 fn oneshot_multi_thread_close_stress() {
331     for _ in 0..stress_factor() {
332         let (tx, rx) = channel::<i32>();
333         let _t = thread::spawn(move || {
334             drop(rx);
335         });
336         drop(tx);
337     }
338 }
339
340 #[test]
341 fn oneshot_multi_thread_send_close_stress() {
342     for _ in 0..stress_factor() {
343         let (tx, rx) = channel::<i32>();
344         let _t = thread::spawn(move || {
345             drop(rx);
346         });
347         let _ = thread::spawn(move || {
348             tx.send(1).unwrap();
349         })
350         .join();
351     }
352 }
353
354 #[test]
355 fn oneshot_multi_thread_recv_close_stress() {
356     for _ in 0..stress_factor() {
357         let (tx, rx) = channel::<i32>();
358         thread::spawn(move || {
359             let res = thread::spawn(move || {
360                 rx.recv().unwrap();
361             })
362             .join();
363             assert!(res.is_err());
364         });
365         let _t = thread::spawn(move || {
366             thread::spawn(move || {
367                 drop(tx);
368             });
369         });
370     }
371 }
372
373 #[test]
374 fn oneshot_multi_thread_send_recv_stress() {
375     for _ in 0..stress_factor() {
376         let (tx, rx) = channel::<Box<isize>>();
377         let _t = thread::spawn(move || {
378             tx.send(Box::new(10)).unwrap();
379         });
380         assert!(*rx.recv().unwrap() == 10);
381     }
382 }
383
384 #[test]
385 fn stream_send_recv_stress() {
386     for _ in 0..stress_factor() {
387         let (tx, rx) = channel();
388
389         send(tx, 0);
390         recv(rx, 0);
391
392         fn send(tx: Sender<Box<i32>>, i: i32) {
393             if i == 10 {
394                 return;
395             }
396
397             thread::spawn(move || {
398                 tx.send(Box::new(i)).unwrap();
399                 send(tx, i + 1);
400             });
401         }
402
403         fn recv(rx: Receiver<Box<i32>>, i: i32) {
404             if i == 10 {
405                 return;
406             }
407
408             thread::spawn(move || {
409                 assert!(*rx.recv().unwrap() == i);
410                 recv(rx, i + 1);
411             });
412         }
413     }
414 }
415
416 #[test]
417 fn oneshot_single_thread_recv_timeout() {
418     let (tx, rx) = channel();
419     tx.send(()).unwrap();
420     assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
421     assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
422     tx.send(()).unwrap();
423     assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
424 }
425
426 #[test]
427 fn stress_recv_timeout_two_threads() {
428     let (tx, rx) = channel();
429     let stress = stress_factor() + 100;
430     let timeout = Duration::from_millis(100);
431
432     thread::spawn(move || {
433         for i in 0..stress {
434             if i % 2 == 0 {
435                 thread::sleep(timeout * 2);
436             }
437             tx.send(1usize).unwrap();
438         }
439     });
440
441     let mut recv_count = 0;
442     loop {
443         match rx.recv_timeout(timeout) {
444             Ok(n) => {
445                 assert_eq!(n, 1usize);
446                 recv_count += 1;
447             }
448             Err(RecvTimeoutError::Timeout) => continue,
449             Err(RecvTimeoutError::Disconnected) => break,
450         }
451     }
452
453     assert_eq!(recv_count, stress);
454 }
455
456 #[test]
457 fn recv_timeout_upgrade() {
458     let (tx, rx) = channel::<()>();
459     let timeout = Duration::from_millis(1);
460     let _tx_clone = tx.clone();
461
462     let start = Instant::now();
463     assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
464     assert!(Instant::now() >= start + timeout);
465 }
466
467 #[test]
468 fn stress_recv_timeout_shared() {
469     let (tx, rx) = channel();
470     let stress = stress_factor() + 100;
471
472     for i in 0..stress {
473         let tx = tx.clone();
474         thread::spawn(move || {
475             thread::sleep(Duration::from_millis(i as u64 * 10));
476             tx.send(1usize).unwrap();
477         });
478     }
479
480     drop(tx);
481
482     let mut recv_count = 0;
483     loop {
484         match rx.recv_timeout(Duration::from_millis(10)) {
485             Ok(n) => {
486                 assert_eq!(n, 1usize);
487                 recv_count += 1;
488             }
489             Err(RecvTimeoutError::Timeout) => continue,
490             Err(RecvTimeoutError::Disconnected) => break,
491         }
492     }
493
494     assert_eq!(recv_count, stress);
495 }
496
497 #[test]
498 fn very_long_recv_timeout_wont_panic() {
499     let (tx, rx) = channel::<()>();
500     let join_handle = thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::MAX)));
501     thread::sleep(Duration::from_secs(1));
502     assert!(tx.send(()).is_ok());
503     assert_eq!(join_handle.join().unwrap(), Ok(()));
504 }
505
506 #[test]
507 fn recv_a_lot() {
508     let count = if cfg!(miri) { 1000 } else { 10000 };
509     // Regression test that we don't run out of stack in scheduler context
510     let (tx, rx) = channel();
511     for _ in 0..count {
512         tx.send(()).unwrap();
513     }
514     for _ in 0..count {
515         rx.recv().unwrap();
516     }
517 }
518
519 #[test]
520 fn shared_recv_timeout() {
521     let (tx, rx) = channel();
522     let total = 5;
523     for _ in 0..total {
524         let tx = tx.clone();
525         thread::spawn(move || {
526             tx.send(()).unwrap();
527         });
528     }
529
530     for _ in 0..total {
531         rx.recv().unwrap();
532     }
533
534     assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
535     tx.send(()).unwrap();
536     assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
537 }
538
539 #[test]
540 fn shared_chan_stress() {
541     let (tx, rx) = channel();
542     let total = stress_factor() + 100;
543     for _ in 0..total {
544         let tx = tx.clone();
545         thread::spawn(move || {
546             tx.send(()).unwrap();
547         });
548     }
549
550     for _ in 0..total {
551         rx.recv().unwrap();
552     }
553 }
554
555 #[test]
556 fn test_nested_recv_iter() {
557     let (tx, rx) = channel::<i32>();
558     let (total_tx, total_rx) = channel::<i32>();
559
560     let _t = thread::spawn(move || {
561         let mut acc = 0;
562         for x in rx.iter() {
563             acc += x;
564         }
565         total_tx.send(acc).unwrap();
566     });
567
568     tx.send(3).unwrap();
569     tx.send(1).unwrap();
570     tx.send(2).unwrap();
571     drop(tx);
572     assert_eq!(total_rx.recv().unwrap(), 6);
573 }
574
575 #[test]
576 fn test_recv_iter_break() {
577     let (tx, rx) = channel::<i32>();
578     let (count_tx, count_rx) = channel();
579
580     let _t = thread::spawn(move || {
581         let mut count = 0;
582         for x in rx.iter() {
583             if count >= 3 {
584                 break;
585             } else {
586                 count += x;
587             }
588         }
589         count_tx.send(count).unwrap();
590     });
591
592     tx.send(2).unwrap();
593     tx.send(2).unwrap();
594     tx.send(2).unwrap();
595     let _ = tx.send(2);
596     drop(tx);
597     assert_eq!(count_rx.recv().unwrap(), 4);
598 }
599
600 #[test]
601 fn test_recv_try_iter() {
602     let (request_tx, request_rx) = channel();
603     let (response_tx, response_rx) = channel();
604
605     // Request `x`s until we have `6`.
606     let t = thread::spawn(move || {
607         let mut count = 0;
608         loop {
609             for x in response_rx.try_iter() {
610                 count += x;
611                 if count == 6 {
612                     return count;
613                 }
614             }
615             request_tx.send(()).unwrap();
616         }
617     });
618
619     for _ in request_rx.iter() {
620         if response_tx.send(2).is_err() {
621             break;
622         }
623     }
624
625     assert_eq!(t.join().unwrap(), 6);
626 }
627
628 #[test]
629 fn test_recv_into_iter_owned() {
630     let mut iter = {
631         let (tx, rx) = channel::<i32>();
632         tx.send(1).unwrap();
633         tx.send(2).unwrap();
634
635         rx.into_iter()
636     };
637     assert_eq!(iter.next().unwrap(), 1);
638     assert_eq!(iter.next().unwrap(), 2);
639     assert_eq!(iter.next().is_none(), true);
640 }
641
642 #[test]
643 fn test_recv_into_iter_borrowed() {
644     let (tx, rx) = channel::<i32>();
645     tx.send(1).unwrap();
646     tx.send(2).unwrap();
647     drop(tx);
648     let mut iter = (&rx).into_iter();
649     assert_eq!(iter.next().unwrap(), 1);
650     assert_eq!(iter.next().unwrap(), 2);
651     assert_eq!(iter.next().is_none(), true);
652 }
653
654 #[test]
655 fn try_recv_states() {
656     let (tx1, rx1) = channel::<i32>();
657     let (tx2, rx2) = channel::<()>();
658     let (tx3, rx3) = channel::<()>();
659     let _t = thread::spawn(move || {
660         rx2.recv().unwrap();
661         tx1.send(1).unwrap();
662         tx3.send(()).unwrap();
663         rx2.recv().unwrap();
664         drop(tx1);
665         tx3.send(()).unwrap();
666     });
667
668     assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
669     tx2.send(()).unwrap();
670     rx3.recv().unwrap();
671     assert_eq!(rx1.try_recv(), Ok(1));
672     assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
673     tx2.send(()).unwrap();
674     rx3.recv().unwrap();
675     assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
676 }
677
678 // This bug used to end up in a livelock inside of the Receiver destructor
679 // because the internal state of the Shared packet was corrupted
680 #[test]
681 fn destroy_upgraded_shared_port_when_sender_still_active() {
682     let (tx, rx) = channel();
683     let (tx2, rx2) = channel();
684     let _t = thread::spawn(move || {
685         rx.recv().unwrap(); // wait on a oneshot
686         drop(rx); // destroy a shared
687         tx2.send(()).unwrap();
688     });
689     // make sure the other thread has gone to sleep
690     for _ in 0..5000 {
691         thread::yield_now();
692     }
693
694     // upgrade to a shared chan and send a message
695     let t = tx.clone();
696     drop(tx);
697     t.send(()).unwrap();
698
699     // wait for the child thread to exit before we exit
700     rx2.recv().unwrap();
701 }
702
703 #[test]
704 fn issue_32114() {
705     let (tx, _) = channel();
706     let _ = tx.send(123);
707     assert_eq!(tx.send(123), Err(SendError(123)));
708 }
709
710 #[test]
711 fn issue_39364() {
712     let (tx, rx) = channel::<()>();
713     let t = thread::spawn(move || {
714         thread::sleep(Duration::from_millis(300));
715         let _ = tx.clone();
716         crate::mem::forget(tx);
717     });
718
719     let _ = rx.recv_timeout(Duration::from_millis(500));
720     t.join().unwrap();
721     let _ = rx.recv_timeout(Duration::from_millis(500));
722 }