]> git.lizzy.rs Git - rust.git/blob - library/std/src/sync/mpsc/tests.rs
Merge commit '7b73b60faca71d01d900e49831fcb84553e93019' into sync-rustfmt
[rust.git] / library / std / src / sync / mpsc / tests.rs
1 use super::*;
2 use crate::env;
3 use crate::thread;
4 use crate::time::{Duration, Instant};
5
6 pub fn stress_factor() -> usize {
7     match env::var("RUST_TEST_STRESS") {
8         Ok(val) => val.parse().unwrap(),
9         Err(..) => 1,
10     }
11 }
12
13 #[test]
14 fn smoke() {
15     let (tx, rx) = channel::<i32>();
16     tx.send(1).unwrap();
17     assert_eq!(rx.recv().unwrap(), 1);
18 }
19
20 #[test]
21 fn drop_full() {
22     let (tx, _rx) = channel::<Box<isize>>();
23     tx.send(Box::new(1)).unwrap();
24 }
25
26 #[test]
27 fn drop_full_shared() {
28     let (tx, _rx) = channel::<Box<isize>>();
29     drop(tx.clone());
30     drop(tx.clone());
31     tx.send(Box::new(1)).unwrap();
32 }
33
34 #[test]
35 fn smoke_shared() {
36     let (tx, rx) = channel::<i32>();
37     tx.send(1).unwrap();
38     assert_eq!(rx.recv().unwrap(), 1);
39     let tx = tx.clone();
40     tx.send(1).unwrap();
41     assert_eq!(rx.recv().unwrap(), 1);
42 }
43
44 #[test]
45 fn smoke_threads() {
46     let (tx, rx) = channel::<i32>();
47     let _t = thread::spawn(move || {
48         tx.send(1).unwrap();
49     });
50     assert_eq!(rx.recv().unwrap(), 1);
51 }
52
53 #[test]
54 fn smoke_port_gone() {
55     let (tx, rx) = channel::<i32>();
56     drop(rx);
57     assert!(tx.send(1).is_err());
58 }
59
60 #[test]
61 fn smoke_shared_port_gone() {
62     let (tx, rx) = channel::<i32>();
63     drop(rx);
64     assert!(tx.send(1).is_err())
65 }
66
67 #[test]
68 fn smoke_shared_port_gone2() {
69     let (tx, rx) = channel::<i32>();
70     drop(rx);
71     let tx2 = tx.clone();
72     drop(tx);
73     assert!(tx2.send(1).is_err());
74 }
75
76 #[test]
77 fn port_gone_concurrent() {
78     let (tx, rx) = channel::<i32>();
79     let _t = thread::spawn(move || {
80         rx.recv().unwrap();
81     });
82     while tx.send(1).is_ok() {}
83 }
84
85 #[test]
86 fn port_gone_concurrent_shared() {
87     let (tx, rx) = channel::<i32>();
88     let tx2 = tx.clone();
89     let _t = thread::spawn(move || {
90         rx.recv().unwrap();
91     });
92     while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
93 }
94
95 #[test]
96 fn smoke_chan_gone() {
97     let (tx, rx) = channel::<i32>();
98     drop(tx);
99     assert!(rx.recv().is_err());
100 }
101
102 #[test]
103 fn smoke_chan_gone_shared() {
104     let (tx, rx) = channel::<()>();
105     let tx2 = tx.clone();
106     drop(tx);
107     drop(tx2);
108     assert!(rx.recv().is_err());
109 }
110
111 #[test]
112 fn chan_gone_concurrent() {
113     let (tx, rx) = channel::<i32>();
114     let _t = thread::spawn(move || {
115         tx.send(1).unwrap();
116         tx.send(1).unwrap();
117     });
118     while rx.recv().is_ok() {}
119 }
120
121 #[test]
122 fn stress() {
123     let (tx, rx) = channel::<i32>();
124     let t = thread::spawn(move || {
125         for _ in 0..10000 {
126             tx.send(1).unwrap();
127         }
128     });
129     for _ in 0..10000 {
130         assert_eq!(rx.recv().unwrap(), 1);
131     }
132     t.join().ok().expect("thread panicked");
133 }
134
135 #[test]
136 fn stress_shared() {
137     const AMT: u32 = 10000;
138     const NTHREADS: u32 = 8;
139     let (tx, rx) = channel::<i32>();
140
141     let t = thread::spawn(move || {
142         for _ in 0..AMT * NTHREADS {
143             assert_eq!(rx.recv().unwrap(), 1);
144         }
145         match rx.try_recv() {
146             Ok(..) => panic!(),
147             _ => {}
148         }
149     });
150
151     for _ in 0..NTHREADS {
152         let tx = tx.clone();
153         thread::spawn(move || {
154             for _ in 0..AMT {
155                 tx.send(1).unwrap();
156             }
157         });
158     }
159     drop(tx);
160     t.join().ok().expect("thread panicked");
161 }
162
163 #[test]
164 fn send_from_outside_runtime() {
165     let (tx1, rx1) = channel::<()>();
166     let (tx2, rx2) = channel::<i32>();
167     let t1 = thread::spawn(move || {
168         tx1.send(()).unwrap();
169         for _ in 0..40 {
170             assert_eq!(rx2.recv().unwrap(), 1);
171         }
172     });
173     rx1.recv().unwrap();
174     let t2 = thread::spawn(move || {
175         for _ in 0..40 {
176             tx2.send(1).unwrap();
177         }
178     });
179     t1.join().ok().expect("thread panicked");
180     t2.join().ok().expect("thread panicked");
181 }
182
183 #[test]
184 fn recv_from_outside_runtime() {
185     let (tx, rx) = channel::<i32>();
186     let t = thread::spawn(move || {
187         for _ in 0..40 {
188             assert_eq!(rx.recv().unwrap(), 1);
189         }
190     });
191     for _ in 0..40 {
192         tx.send(1).unwrap();
193     }
194     t.join().ok().expect("thread panicked");
195 }
196
197 #[test]
198 fn no_runtime() {
199     let (tx1, rx1) = channel::<i32>();
200     let (tx2, rx2) = channel::<i32>();
201     let t1 = thread::spawn(move || {
202         assert_eq!(rx1.recv().unwrap(), 1);
203         tx2.send(2).unwrap();
204     });
205     let t2 = thread::spawn(move || {
206         tx1.send(1).unwrap();
207         assert_eq!(rx2.recv().unwrap(), 2);
208     });
209     t1.join().ok().expect("thread panicked");
210     t2.join().ok().expect("thread panicked");
211 }
212
213 #[test]
214 fn oneshot_single_thread_close_port_first() {
215     // Simple test of closing without sending
216     let (_tx, rx) = channel::<i32>();
217     drop(rx);
218 }
219
220 #[test]
221 fn oneshot_single_thread_close_chan_first() {
222     // Simple test of closing without sending
223     let (tx, _rx) = channel::<i32>();
224     drop(tx);
225 }
226
227 #[test]
228 fn oneshot_single_thread_send_port_close() {
229     // Testing that the sender cleans up the payload if receiver is closed
230     let (tx, rx) = channel::<Box<i32>>();
231     drop(rx);
232     assert!(tx.send(Box::new(0)).is_err());
233 }
234
235 #[test]
236 fn oneshot_single_thread_recv_chan_close() {
237     // Receiving on a closed chan will panic
238     let res = thread::spawn(move || {
239         let (tx, rx) = channel::<i32>();
240         drop(tx);
241         rx.recv().unwrap();
242     })
243     .join();
244     // What is our res?
245     assert!(res.is_err());
246 }
247
248 #[test]
249 fn oneshot_single_thread_send_then_recv() {
250     let (tx, rx) = channel::<Box<i32>>();
251     tx.send(Box::new(10)).unwrap();
252     assert!(*rx.recv().unwrap() == 10);
253 }
254
255 #[test]
256 fn oneshot_single_thread_try_send_open() {
257     let (tx, rx) = channel::<i32>();
258     assert!(tx.send(10).is_ok());
259     assert!(rx.recv().unwrap() == 10);
260 }
261
262 #[test]
263 fn oneshot_single_thread_try_send_closed() {
264     let (tx, rx) = channel::<i32>();
265     drop(rx);
266     assert!(tx.send(10).is_err());
267 }
268
269 #[test]
270 fn oneshot_single_thread_try_recv_open() {
271     let (tx, rx) = channel::<i32>();
272     tx.send(10).unwrap();
273     assert!(rx.recv() == Ok(10));
274 }
275
276 #[test]
277 fn oneshot_single_thread_try_recv_closed() {
278     let (tx, rx) = channel::<i32>();
279     drop(tx);
280     assert!(rx.recv().is_err());
281 }
282
283 #[test]
284 fn oneshot_single_thread_peek_data() {
285     let (tx, rx) = channel::<i32>();
286     assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
287     tx.send(10).unwrap();
288     assert_eq!(rx.try_recv(), Ok(10));
289 }
290
291 #[test]
292 fn oneshot_single_thread_peek_close() {
293     let (tx, rx) = channel::<i32>();
294     drop(tx);
295     assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
296     assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
297 }
298
299 #[test]
300 fn oneshot_single_thread_peek_open() {
301     let (_tx, rx) = channel::<i32>();
302     assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
303 }
304
305 #[test]
306 fn oneshot_multi_task_recv_then_send() {
307     let (tx, rx) = channel::<Box<i32>>();
308     let _t = thread::spawn(move || {
309         assert!(*rx.recv().unwrap() == 10);
310     });
311
312     tx.send(Box::new(10)).unwrap();
313 }
314
315 #[test]
316 fn oneshot_multi_task_recv_then_close() {
317     let (tx, rx) = channel::<Box<i32>>();
318     let _t = thread::spawn(move || {
319         drop(tx);
320     });
321     let res = thread::spawn(move || {
322         assert!(*rx.recv().unwrap() == 10);
323     })
324     .join();
325     assert!(res.is_err());
326 }
327
328 #[test]
329 fn oneshot_multi_thread_close_stress() {
330     for _ in 0..stress_factor() {
331         let (tx, rx) = channel::<i32>();
332         let _t = thread::spawn(move || {
333             drop(rx);
334         });
335         drop(tx);
336     }
337 }
338
339 #[test]
340 fn oneshot_multi_thread_send_close_stress() {
341     for _ in 0..stress_factor() {
342         let (tx, rx) = channel::<i32>();
343         let _t = thread::spawn(move || {
344             drop(rx);
345         });
346         let _ = thread::spawn(move || {
347             tx.send(1).unwrap();
348         })
349         .join();
350     }
351 }
352
353 #[test]
354 fn oneshot_multi_thread_recv_close_stress() {
355     for _ in 0..stress_factor() {
356         let (tx, rx) = channel::<i32>();
357         thread::spawn(move || {
358             let res = thread::spawn(move || {
359                 rx.recv().unwrap();
360             })
361             .join();
362             assert!(res.is_err());
363         });
364         let _t = thread::spawn(move || {
365             thread::spawn(move || {
366                 drop(tx);
367             });
368         });
369     }
370 }
371
372 #[test]
373 fn oneshot_multi_thread_send_recv_stress() {
374     for _ in 0..stress_factor() {
375         let (tx, rx) = channel::<Box<isize>>();
376         let _t = thread::spawn(move || {
377             tx.send(Box::new(10)).unwrap();
378         });
379         assert!(*rx.recv().unwrap() == 10);
380     }
381 }
382
383 #[test]
384 fn stream_send_recv_stress() {
385     for _ in 0..stress_factor() {
386         let (tx, rx) = channel();
387
388         send(tx, 0);
389         recv(rx, 0);
390
391         fn send(tx: Sender<Box<i32>>, i: i32) {
392             if i == 10 {
393                 return;
394             }
395
396             thread::spawn(move || {
397                 tx.send(Box::new(i)).unwrap();
398                 send(tx, i + 1);
399             });
400         }
401
402         fn recv(rx: Receiver<Box<i32>>, i: i32) {
403             if i == 10 {
404                 return;
405             }
406
407             thread::spawn(move || {
408                 assert!(*rx.recv().unwrap() == i);
409                 recv(rx, i + 1);
410             });
411         }
412     }
413 }
414
415 #[test]
416 fn oneshot_single_thread_recv_timeout() {
417     let (tx, rx) = channel();
418     tx.send(()).unwrap();
419     assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
420     assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
421     tx.send(()).unwrap();
422     assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
423 }
424
425 #[test]
426 fn stress_recv_timeout_two_threads() {
427     let (tx, rx) = channel();
428     let stress = stress_factor() + 100;
429     let timeout = Duration::from_millis(100);
430
431     thread::spawn(move || {
432         for i in 0..stress {
433             if i % 2 == 0 {
434                 thread::sleep(timeout * 2);
435             }
436             tx.send(1usize).unwrap();
437         }
438     });
439
440     let mut recv_count = 0;
441     loop {
442         match rx.recv_timeout(timeout) {
443             Ok(n) => {
444                 assert_eq!(n, 1usize);
445                 recv_count += 1;
446             }
447             Err(RecvTimeoutError::Timeout) => continue,
448             Err(RecvTimeoutError::Disconnected) => break,
449         }
450     }
451
452     assert_eq!(recv_count, stress);
453 }
454
455 #[test]
456 fn recv_timeout_upgrade() {
457     let (tx, rx) = channel::<()>();
458     let timeout = Duration::from_millis(1);
459     let _tx_clone = tx.clone();
460
461     let start = Instant::now();
462     assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
463     assert!(Instant::now() >= start + timeout);
464 }
465
466 #[test]
467 fn stress_recv_timeout_shared() {
468     let (tx, rx) = channel();
469     let stress = stress_factor() + 100;
470
471     for i in 0..stress {
472         let tx = tx.clone();
473         thread::spawn(move || {
474             thread::sleep(Duration::from_millis(i as u64 * 10));
475             tx.send(1usize).unwrap();
476         });
477     }
478
479     drop(tx);
480
481     let mut recv_count = 0;
482     loop {
483         match rx.recv_timeout(Duration::from_millis(10)) {
484             Ok(n) => {
485                 assert_eq!(n, 1usize);
486                 recv_count += 1;
487             }
488             Err(RecvTimeoutError::Timeout) => continue,
489             Err(RecvTimeoutError::Disconnected) => break,
490         }
491     }
492
493     assert_eq!(recv_count, stress);
494 }
495
496 #[test]
497 fn very_long_recv_timeout_wont_panic() {
498     let (tx, rx) = channel::<()>();
499     let join_handle = thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::MAX)));
500     thread::sleep(Duration::from_secs(1));
501     assert!(tx.send(()).is_ok());
502     assert_eq!(join_handle.join().unwrap(), Ok(()));
503 }
504
505 #[test]
506 fn recv_a_lot() {
507     // Regression test that we don't run out of stack in scheduler context
508     let (tx, rx) = channel();
509     for _ in 0..10000 {
510         tx.send(()).unwrap();
511     }
512     for _ in 0..10000 {
513         rx.recv().unwrap();
514     }
515 }
516
517 #[test]
518 fn shared_recv_timeout() {
519     let (tx, rx) = channel();
520     let total = 5;
521     for _ in 0..total {
522         let tx = tx.clone();
523         thread::spawn(move || {
524             tx.send(()).unwrap();
525         });
526     }
527
528     for _ in 0..total {
529         rx.recv().unwrap();
530     }
531
532     assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
533     tx.send(()).unwrap();
534     assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
535 }
536
537 #[test]
538 fn shared_chan_stress() {
539     let (tx, rx) = channel();
540     let total = stress_factor() + 100;
541     for _ in 0..total {
542         let tx = tx.clone();
543         thread::spawn(move || {
544             tx.send(()).unwrap();
545         });
546     }
547
548     for _ in 0..total {
549         rx.recv().unwrap();
550     }
551 }
552
553 #[test]
554 fn test_nested_recv_iter() {
555     let (tx, rx) = channel::<i32>();
556     let (total_tx, total_rx) = channel::<i32>();
557
558     let _t = thread::spawn(move || {
559         let mut acc = 0;
560         for x in rx.iter() {
561             acc += x;
562         }
563         total_tx.send(acc).unwrap();
564     });
565
566     tx.send(3).unwrap();
567     tx.send(1).unwrap();
568     tx.send(2).unwrap();
569     drop(tx);
570     assert_eq!(total_rx.recv().unwrap(), 6);
571 }
572
573 #[test]
574 fn test_recv_iter_break() {
575     let (tx, rx) = channel::<i32>();
576     let (count_tx, count_rx) = channel();
577
578     let _t = thread::spawn(move || {
579         let mut count = 0;
580         for x in rx.iter() {
581             if count >= 3 {
582                 break;
583             } else {
584                 count += x;
585             }
586         }
587         count_tx.send(count).unwrap();
588     });
589
590     tx.send(2).unwrap();
591     tx.send(2).unwrap();
592     tx.send(2).unwrap();
593     let _ = tx.send(2);
594     drop(tx);
595     assert_eq!(count_rx.recv().unwrap(), 4);
596 }
597
598 #[test]
599 fn test_recv_try_iter() {
600     let (request_tx, request_rx) = channel();
601     let (response_tx, response_rx) = channel();
602
603     // Request `x`s until we have `6`.
604     let t = thread::spawn(move || {
605         let mut count = 0;
606         loop {
607             for x in response_rx.try_iter() {
608                 count += x;
609                 if count == 6 {
610                     return count;
611                 }
612             }
613             request_tx.send(()).unwrap();
614         }
615     });
616
617     for _ in request_rx.iter() {
618         if response_tx.send(2).is_err() {
619             break;
620         }
621     }
622
623     assert_eq!(t.join().unwrap(), 6);
624 }
625
626 #[test]
627 fn test_recv_into_iter_owned() {
628     let mut iter = {
629         let (tx, rx) = channel::<i32>();
630         tx.send(1).unwrap();
631         tx.send(2).unwrap();
632
633         rx.into_iter()
634     };
635     assert_eq!(iter.next().unwrap(), 1);
636     assert_eq!(iter.next().unwrap(), 2);
637     assert_eq!(iter.next().is_none(), true);
638 }
639
640 #[test]
641 fn test_recv_into_iter_borrowed() {
642     let (tx, rx) = channel::<i32>();
643     tx.send(1).unwrap();
644     tx.send(2).unwrap();
645     drop(tx);
646     let mut iter = (&rx).into_iter();
647     assert_eq!(iter.next().unwrap(), 1);
648     assert_eq!(iter.next().unwrap(), 2);
649     assert_eq!(iter.next().is_none(), true);
650 }
651
652 #[test]
653 fn try_recv_states() {
654     let (tx1, rx1) = channel::<i32>();
655     let (tx2, rx2) = channel::<()>();
656     let (tx3, rx3) = channel::<()>();
657     let _t = thread::spawn(move || {
658         rx2.recv().unwrap();
659         tx1.send(1).unwrap();
660         tx3.send(()).unwrap();
661         rx2.recv().unwrap();
662         drop(tx1);
663         tx3.send(()).unwrap();
664     });
665
666     assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
667     tx2.send(()).unwrap();
668     rx3.recv().unwrap();
669     assert_eq!(rx1.try_recv(), Ok(1));
670     assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
671     tx2.send(()).unwrap();
672     rx3.recv().unwrap();
673     assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
674 }
675
676 // This bug used to end up in a livelock inside of the Receiver destructor
677 // because the internal state of the Shared packet was corrupted
678 #[test]
679 fn destroy_upgraded_shared_port_when_sender_still_active() {
680     let (tx, rx) = channel();
681     let (tx2, rx2) = channel();
682     let _t = thread::spawn(move || {
683         rx.recv().unwrap(); // wait on a oneshot
684         drop(rx); // destroy a shared
685         tx2.send(()).unwrap();
686     });
687     // make sure the other thread has gone to sleep
688     for _ in 0..5000 {
689         thread::yield_now();
690     }
691
692     // upgrade to a shared chan and send a message
693     let t = tx.clone();
694     drop(tx);
695     t.send(()).unwrap();
696
697     // wait for the child thread to exit before we exit
698     rx2.recv().unwrap();
699 }
700
701 #[test]
702 fn issue_32114() {
703     let (tx, _) = channel();
704     let _ = tx.send(123);
705     assert_eq!(tx.send(123), Err(SendError(123)));
706 }