]> git.lizzy.rs Git - rust.git/blob - library/std/src/sync/mpsc/sync_tests.rs
Rollup merge of #106835 - compiler-errors:new-solver-gat-rebase-oops, r=lcnr
[rust.git] / library / std / src / sync / mpsc / sync_tests.rs
1 use super::*;
2 use crate::env;
3 use crate::sync::mpmc::SendTimeoutError;
4 use crate::thread;
5 use crate::time::Duration;
6
7 pub fn stress_factor() -> usize {
8     match env::var("RUST_TEST_STRESS") {
9         Ok(val) => val.parse().unwrap(),
10         Err(..) => 1,
11     }
12 }
13
14 #[test]
15 fn smoke() {
16     let (tx, rx) = sync_channel::<i32>(1);
17     tx.send(1).unwrap();
18     assert_eq!(rx.recv().unwrap(), 1);
19 }
20
21 #[test]
22 fn drop_full() {
23     let (tx, _rx) = sync_channel::<Box<isize>>(1);
24     tx.send(Box::new(1)).unwrap();
25 }
26
27 #[test]
28 fn smoke_shared() {
29     let (tx, rx) = sync_channel::<i32>(1);
30     tx.send(1).unwrap();
31     assert_eq!(rx.recv().unwrap(), 1);
32     let tx = tx.clone();
33     tx.send(1).unwrap();
34     assert_eq!(rx.recv().unwrap(), 1);
35 }
36
37 #[test]
38 fn recv_timeout() {
39     let (tx, rx) = sync_channel::<i32>(1);
40     assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
41     tx.send(1).unwrap();
42     assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
43 }
44
45 #[test]
46 fn send_timeout() {
47     let (tx, _rx) = sync_channel::<i32>(1);
48     assert_eq!(tx.send_timeout(1, Duration::from_millis(1)), Ok(()));
49     assert_eq!(tx.send_timeout(1, Duration::from_millis(1)), Err(SendTimeoutError::Timeout(1)));
50 }
51
52 #[test]
53 fn smoke_threads() {
54     let (tx, rx) = sync_channel::<i32>(0);
55     let _t = thread::spawn(move || {
56         tx.send(1).unwrap();
57     });
58     assert_eq!(rx.recv().unwrap(), 1);
59 }
60
61 #[test]
62 fn smoke_port_gone() {
63     let (tx, rx) = sync_channel::<i32>(0);
64     drop(rx);
65     assert!(tx.send(1).is_err());
66 }
67
68 #[test]
69 fn smoke_shared_port_gone2() {
70     let (tx, rx) = sync_channel::<i32>(0);
71     drop(rx);
72     let tx2 = tx.clone();
73     drop(tx);
74     assert!(tx2.send(1).is_err());
75 }
76
77 #[test]
78 fn port_gone_concurrent() {
79     let (tx, rx) = sync_channel::<i32>(0);
80     let _t = thread::spawn(move || {
81         rx.recv().unwrap();
82     });
83     while tx.send(1).is_ok() {}
84 }
85
86 #[test]
87 fn port_gone_concurrent_shared() {
88     let (tx, rx) = sync_channel::<i32>(0);
89     let tx2 = tx.clone();
90     let _t = thread::spawn(move || {
91         rx.recv().unwrap();
92     });
93     while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
94 }
95
96 #[test]
97 fn smoke_chan_gone() {
98     let (tx, rx) = sync_channel::<i32>(0);
99     drop(tx);
100     assert!(rx.recv().is_err());
101 }
102
103 #[test]
104 fn smoke_chan_gone_shared() {
105     let (tx, rx) = sync_channel::<()>(0);
106     let tx2 = tx.clone();
107     drop(tx);
108     drop(tx2);
109     assert!(rx.recv().is_err());
110 }
111
112 #[test]
113 fn chan_gone_concurrent() {
114     let (tx, rx) = sync_channel::<i32>(0);
115     thread::spawn(move || {
116         tx.send(1).unwrap();
117         tx.send(1).unwrap();
118     });
119     while rx.recv().is_ok() {}
120 }
121
122 #[test]
123 fn stress() {
124     let count = if cfg!(miri) { 100 } else { 10000 };
125     let (tx, rx) = sync_channel::<i32>(0);
126     thread::spawn(move || {
127         for _ in 0..count {
128             tx.send(1).unwrap();
129         }
130     });
131     for _ in 0..count {
132         assert_eq!(rx.recv().unwrap(), 1);
133     }
134 }
135
136 #[test]
137 fn stress_recv_timeout_two_threads() {
138     let count = if cfg!(miri) { 100 } else { 10000 };
139     let (tx, rx) = sync_channel::<i32>(0);
140
141     thread::spawn(move || {
142         for _ in 0..count {
143             tx.send(1).unwrap();
144         }
145     });
146
147     let mut recv_count = 0;
148     loop {
149         match rx.recv_timeout(Duration::from_millis(1)) {
150             Ok(v) => {
151                 assert_eq!(v, 1);
152                 recv_count += 1;
153             }
154             Err(RecvTimeoutError::Timeout) => continue,
155             Err(RecvTimeoutError::Disconnected) => break,
156         }
157     }
158
159     assert_eq!(recv_count, count);
160 }
161
162 #[test]
163 fn stress_recv_timeout_shared() {
164     const AMT: u32 = if cfg!(miri) { 100 } else { 1000 };
165     const NTHREADS: u32 = 8;
166     let (tx, rx) = sync_channel::<i32>(0);
167     let (dtx, drx) = sync_channel::<()>(0);
168
169     thread::spawn(move || {
170         let mut recv_count = 0;
171         loop {
172             match rx.recv_timeout(Duration::from_millis(10)) {
173                 Ok(v) => {
174                     assert_eq!(v, 1);
175                     recv_count += 1;
176                 }
177                 Err(RecvTimeoutError::Timeout) => continue,
178                 Err(RecvTimeoutError::Disconnected) => break,
179             }
180         }
181
182         assert_eq!(recv_count, AMT * NTHREADS);
183         assert!(rx.try_recv().is_err());
184
185         dtx.send(()).unwrap();
186     });
187
188     for _ in 0..NTHREADS {
189         let tx = tx.clone();
190         thread::spawn(move || {
191             for _ in 0..AMT {
192                 tx.send(1).unwrap();
193             }
194         });
195     }
196
197     drop(tx);
198
199     drx.recv().unwrap();
200 }
201
202 #[test]
203 fn stress_shared() {
204     const AMT: u32 = if cfg!(miri) { 100 } else { 1000 };
205     const NTHREADS: u32 = 8;
206     let (tx, rx) = sync_channel::<i32>(0);
207     let (dtx, drx) = sync_channel::<()>(0);
208
209     thread::spawn(move || {
210         for _ in 0..AMT * NTHREADS {
211             assert_eq!(rx.recv().unwrap(), 1);
212         }
213         match rx.try_recv() {
214             Ok(..) => panic!(),
215             _ => {}
216         }
217         dtx.send(()).unwrap();
218     });
219
220     for _ in 0..NTHREADS {
221         let tx = tx.clone();
222         thread::spawn(move || {
223             for _ in 0..AMT {
224                 tx.send(1).unwrap();
225             }
226         });
227     }
228     drop(tx);
229     drx.recv().unwrap();
230 }
231
232 #[test]
233 fn oneshot_single_thread_close_port_first() {
234     // Simple test of closing without sending
235     let (_tx, rx) = sync_channel::<i32>(0);
236     drop(rx);
237 }
238
239 #[test]
240 fn oneshot_single_thread_close_chan_first() {
241     // Simple test of closing without sending
242     let (tx, _rx) = sync_channel::<i32>(0);
243     drop(tx);
244 }
245
246 #[test]
247 fn oneshot_single_thread_send_port_close() {
248     // Testing that the sender cleans up the payload if receiver is closed
249     let (tx, rx) = sync_channel::<Box<i32>>(0);
250     drop(rx);
251     assert!(tx.send(Box::new(0)).is_err());
252 }
253
254 #[test]
255 fn oneshot_single_thread_recv_chan_close() {
256     // Receiving on a closed chan will panic
257     let res = thread::spawn(move || {
258         let (tx, rx) = sync_channel::<i32>(0);
259         drop(tx);
260         rx.recv().unwrap();
261     })
262     .join();
263     // What is our res?
264     assert!(res.is_err());
265 }
266
267 #[test]
268 fn oneshot_single_thread_send_then_recv() {
269     let (tx, rx) = sync_channel::<Box<i32>>(1);
270     tx.send(Box::new(10)).unwrap();
271     assert!(*rx.recv().unwrap() == 10);
272 }
273
274 #[test]
275 fn oneshot_single_thread_try_send_open() {
276     let (tx, rx) = sync_channel::<i32>(1);
277     assert_eq!(tx.try_send(10), Ok(()));
278     assert!(rx.recv().unwrap() == 10);
279 }
280
281 #[test]
282 fn oneshot_single_thread_try_send_closed() {
283     let (tx, rx) = sync_channel::<i32>(0);
284     drop(rx);
285     assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
286 }
287
288 #[test]
289 fn oneshot_single_thread_try_send_closed2() {
290     let (tx, _rx) = sync_channel::<i32>(0);
291     assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
292 }
293
294 #[test]
295 fn oneshot_single_thread_try_recv_open() {
296     let (tx, rx) = sync_channel::<i32>(1);
297     tx.send(10).unwrap();
298     assert!(rx.recv() == Ok(10));
299 }
300
301 #[test]
302 fn oneshot_single_thread_try_recv_closed() {
303     let (tx, rx) = sync_channel::<i32>(0);
304     drop(tx);
305     assert!(rx.recv().is_err());
306 }
307
308 #[test]
309 fn oneshot_single_thread_try_recv_closed_with_data() {
310     let (tx, rx) = sync_channel::<i32>(1);
311     tx.send(10).unwrap();
312     drop(tx);
313     assert_eq!(rx.try_recv(), Ok(10));
314     assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
315 }
316
317 #[test]
318 fn oneshot_single_thread_peek_data() {
319     let (tx, rx) = sync_channel::<i32>(1);
320     assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
321     tx.send(10).unwrap();
322     assert_eq!(rx.try_recv(), Ok(10));
323 }
324
325 #[test]
326 fn oneshot_single_thread_peek_close() {
327     let (tx, rx) = sync_channel::<i32>(0);
328     drop(tx);
329     assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
330     assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
331 }
332
333 #[test]
334 fn oneshot_single_thread_peek_open() {
335     let (_tx, rx) = sync_channel::<i32>(0);
336     assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
337 }
338
339 #[test]
340 fn oneshot_multi_task_recv_then_send() {
341     let (tx, rx) = sync_channel::<Box<i32>>(0);
342     let _t = thread::spawn(move || {
343         assert!(*rx.recv().unwrap() == 10);
344     });
345
346     tx.send(Box::new(10)).unwrap();
347 }
348
349 #[test]
350 fn oneshot_multi_task_recv_then_close() {
351     let (tx, rx) = sync_channel::<Box<i32>>(0);
352     let _t = thread::spawn(move || {
353         drop(tx);
354     });
355     let res = thread::spawn(move || {
356         assert!(*rx.recv().unwrap() == 10);
357     })
358     .join();
359     assert!(res.is_err());
360 }
361
362 #[test]
363 fn oneshot_multi_thread_close_stress() {
364     for _ in 0..stress_factor() {
365         let (tx, rx) = sync_channel::<i32>(0);
366         let _t = thread::spawn(move || {
367             drop(rx);
368         });
369         drop(tx);
370     }
371 }
372
373 #[test]
374 fn oneshot_multi_thread_send_close_stress() {
375     for _ in 0..stress_factor() {
376         let (tx, rx) = sync_channel::<i32>(0);
377         let _t = thread::spawn(move || {
378             drop(rx);
379         });
380         let _ = thread::spawn(move || {
381             tx.send(1).unwrap();
382         })
383         .join();
384     }
385 }
386
387 #[test]
388 fn oneshot_multi_thread_recv_close_stress() {
389     for _ in 0..stress_factor() {
390         let (tx, rx) = sync_channel::<i32>(0);
391         let _t = thread::spawn(move || {
392             let res = thread::spawn(move || {
393                 rx.recv().unwrap();
394             })
395             .join();
396             assert!(res.is_err());
397         });
398         let _t = thread::spawn(move || {
399             thread::spawn(move || {
400                 drop(tx);
401             });
402         });
403     }
404 }
405
406 #[test]
407 fn oneshot_multi_thread_send_recv_stress() {
408     for _ in 0..stress_factor() {
409         let (tx, rx) = sync_channel::<Box<i32>>(0);
410         let _t = thread::spawn(move || {
411             tx.send(Box::new(10)).unwrap();
412         });
413         assert!(*rx.recv().unwrap() == 10);
414     }
415 }
416
417 #[test]
418 fn stream_send_recv_stress() {
419     for _ in 0..stress_factor() {
420         let (tx, rx) = sync_channel::<Box<i32>>(0);
421
422         send(tx, 0);
423         recv(rx, 0);
424
425         fn send(tx: SyncSender<Box<i32>>, i: i32) {
426             if i == 10 {
427                 return;
428             }
429
430             thread::spawn(move || {
431                 tx.send(Box::new(i)).unwrap();
432                 send(tx, i + 1);
433             });
434         }
435
436         fn recv(rx: Receiver<Box<i32>>, i: i32) {
437             if i == 10 {
438                 return;
439             }
440
441             thread::spawn(move || {
442                 assert!(*rx.recv().unwrap() == i);
443                 recv(rx, i + 1);
444             });
445         }
446     }
447 }
448
449 #[test]
450 fn recv_a_lot() {
451     let count = if cfg!(miri) { 1000 } else { 10000 };
452     // Regression test that we don't run out of stack in scheduler context
453     let (tx, rx) = sync_channel(count);
454     for _ in 0..count {
455         tx.send(()).unwrap();
456     }
457     for _ in 0..count {
458         rx.recv().unwrap();
459     }
460 }
461
462 #[test]
463 fn shared_chan_stress() {
464     let (tx, rx) = sync_channel(0);
465     let total = stress_factor() + 100;
466     for _ in 0..total {
467         let tx = tx.clone();
468         thread::spawn(move || {
469             tx.send(()).unwrap();
470         });
471     }
472
473     for _ in 0..total {
474         rx.recv().unwrap();
475     }
476 }
477
478 #[test]
479 fn test_nested_recv_iter() {
480     let (tx, rx) = sync_channel::<i32>(0);
481     let (total_tx, total_rx) = sync_channel::<i32>(0);
482
483     let _t = thread::spawn(move || {
484         let mut acc = 0;
485         for x in rx.iter() {
486             acc += x;
487         }
488         total_tx.send(acc).unwrap();
489     });
490
491     tx.send(3).unwrap();
492     tx.send(1).unwrap();
493     tx.send(2).unwrap();
494     drop(tx);
495     assert_eq!(total_rx.recv().unwrap(), 6);
496 }
497
498 #[test]
499 fn test_recv_iter_break() {
500     let (tx, rx) = sync_channel::<i32>(0);
501     let (count_tx, count_rx) = sync_channel(0);
502
503     let _t = thread::spawn(move || {
504         let mut count = 0;
505         for x in rx.iter() {
506             if count >= 3 {
507                 break;
508             } else {
509                 count += x;
510             }
511         }
512         count_tx.send(count).unwrap();
513     });
514
515     tx.send(2).unwrap();
516     tx.send(2).unwrap();
517     tx.send(2).unwrap();
518     let _ = tx.try_send(2);
519     drop(tx);
520     assert_eq!(count_rx.recv().unwrap(), 4);
521 }
522
523 #[test]
524 fn try_recv_states() {
525     let (tx1, rx1) = sync_channel::<i32>(1);
526     let (tx2, rx2) = sync_channel::<()>(1);
527     let (tx3, rx3) = sync_channel::<()>(1);
528     let _t = thread::spawn(move || {
529         rx2.recv().unwrap();
530         tx1.send(1).unwrap();
531         tx3.send(()).unwrap();
532         rx2.recv().unwrap();
533         drop(tx1);
534         tx3.send(()).unwrap();
535     });
536
537     assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
538     tx2.send(()).unwrap();
539     rx3.recv().unwrap();
540     assert_eq!(rx1.try_recv(), Ok(1));
541     assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
542     tx2.send(()).unwrap();
543     rx3.recv().unwrap();
544     assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
545 }
546
547 // This bug used to end up in a livelock inside of the Receiver destructor
548 // because the internal state of the Shared packet was corrupted
549 #[test]
550 fn destroy_upgraded_shared_port_when_sender_still_active() {
551     let (tx, rx) = sync_channel::<()>(0);
552     let (tx2, rx2) = sync_channel::<()>(0);
553     let _t = thread::spawn(move || {
554         rx.recv().unwrap(); // wait on a oneshot
555         drop(rx); // destroy a shared
556         tx2.send(()).unwrap();
557     });
558     // make sure the other thread has gone to sleep
559     for _ in 0..5000 {
560         thread::yield_now();
561     }
562
563     // upgrade to a shared chan and send a message
564     let t = tx.clone();
565     drop(tx);
566     t.send(()).unwrap();
567
568     // wait for the child thread to exit before we exit
569     rx2.recv().unwrap();
570 }
571
572 #[test]
573 fn send1() {
574     let (tx, rx) = sync_channel::<i32>(0);
575     let _t = thread::spawn(move || {
576         rx.recv().unwrap();
577     });
578     assert_eq!(tx.send(1), Ok(()));
579 }
580
581 #[test]
582 fn send2() {
583     let (tx, rx) = sync_channel::<i32>(0);
584     let _t = thread::spawn(move || {
585         drop(rx);
586     });
587     assert!(tx.send(1).is_err());
588 }
589
590 #[test]
591 fn send3() {
592     let (tx, rx) = sync_channel::<i32>(1);
593     assert_eq!(tx.send(1), Ok(()));
594     let _t = thread::spawn(move || {
595         drop(rx);
596     });
597     assert!(tx.send(1).is_err());
598 }
599
600 #[test]
601 fn send4() {
602     let (tx, rx) = sync_channel::<i32>(0);
603     let tx2 = tx.clone();
604     let (done, donerx) = channel();
605     let done2 = done.clone();
606     let _t = thread::spawn(move || {
607         assert!(tx.send(1).is_err());
608         done.send(()).unwrap();
609     });
610     let _t = thread::spawn(move || {
611         assert!(tx2.send(2).is_err());
612         done2.send(()).unwrap();
613     });
614     drop(rx);
615     donerx.recv().unwrap();
616     donerx.recv().unwrap();
617 }
618
619 #[test]
620 fn try_send1() {
621     let (tx, _rx) = sync_channel::<i32>(0);
622     assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
623 }
624
625 #[test]
626 fn try_send2() {
627     let (tx, _rx) = sync_channel::<i32>(1);
628     assert_eq!(tx.try_send(1), Ok(()));
629     assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
630 }
631
632 #[test]
633 fn try_send3() {
634     let (tx, rx) = sync_channel::<i32>(1);
635     assert_eq!(tx.try_send(1), Ok(()));
636     drop(rx);
637     assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
638 }
639
640 #[test]
641 fn issue_15761() {
642     fn repro() {
643         let (tx1, rx1) = sync_channel::<()>(3);
644         let (tx2, rx2) = sync_channel::<()>(3);
645
646         let _t = thread::spawn(move || {
647             rx1.recv().unwrap();
648             tx2.try_send(()).unwrap();
649         });
650
651         tx1.try_send(()).unwrap();
652         rx2.recv().unwrap();
653     }
654
655     for _ in 0..100 {
656         repro()
657     }
658 }