]> git.lizzy.rs Git - rust.git/blob - library/std/src/sync/mpsc/mod.rs
Fix font color for help button in ayu and dark themes
[rust.git] / library / std / src / sync / mpsc / mod.rs
1 // ignore-tidy-filelength
2
3 //! Multi-producer, single-consumer FIFO queue communication primitives.
4 //!
5 //! This module provides message-based communication over channels, concretely
6 //! defined among three types:
7 //!
8 //! * [`Sender`]
9 //! * [`SyncSender`]
10 //! * [`Receiver`]
11 //!
12 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
13 //! senders are clone-able (multi-producer) such that many threads can send
14 //! simultaneously to one receiver (single-consumer).
15 //!
16 //! These channels come in two flavors:
17 //!
18 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
19 //!    will return a `(Sender, Receiver)` tuple where all sends will be
20 //!    **asynchronous** (they never block). The channel conceptually has an
21 //!    infinite buffer.
22 //!
23 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
24 //!    return a `(SyncSender, Receiver)` tuple where the storage for pending
25 //!    messages is a pre-allocated buffer of a fixed size. All sends will be
26 //!    **synchronous** by blocking until there is buffer space available. Note
27 //!    that a bound of 0 is allowed, causing the channel to become a "rendezvous"
28 //!    channel where each sender atomically hands off a message to a receiver.
29 //!
30 //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
31 //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
32 //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
33 //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
34 //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
35 //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
36 //!
37 //! ## Disconnection
38 //!
39 //! The send and receive operations on channels will all return a [`Result`]
40 //! indicating whether the operation succeeded or not. An unsuccessful operation
41 //! is normally indicative of the other half of a channel having "hung up" by
42 //! being dropped in its corresponding thread.
43 //!
44 //! Once half of a channel has been deallocated, most operations can no longer
45 //! continue to make progress, so [`Err`] will be returned. Many applications
46 //! will continue to [`unwrap`] the results returned from this module,
47 //! instigating a propagation of failure among threads if one unexpectedly dies.
48 //!
49 //! [`Result`]: ../../../std/result/enum.Result.html
50 //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
51 //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
52 //!
53 //! # Examples
54 //!
55 //! Simple usage:
56 //!
57 //! ```
58 //! use std::thread;
59 //! use std::sync::mpsc::channel;
60 //!
61 //! // Create a simple streaming channel
62 //! let (tx, rx) = channel();
63 //! thread::spawn(move|| {
64 //!     tx.send(10).unwrap();
65 //! });
66 //! assert_eq!(rx.recv().unwrap(), 10);
67 //! ```
68 //!
69 //! Shared usage:
70 //!
71 //! ```
72 //! use std::thread;
73 //! use std::sync::mpsc::channel;
74 //!
75 //! // Create a shared channel that can be sent along from many threads
76 //! // where tx is the sending half (tx for transmission), and rx is the receiving
77 //! // half (rx for receiving).
78 //! let (tx, rx) = channel();
79 //! for i in 0..10 {
80 //!     let tx = tx.clone();
81 //!     thread::spawn(move|| {
82 //!         tx.send(i).unwrap();
83 //!     });
84 //! }
85 //!
86 //! for _ in 0..10 {
87 //!     let j = rx.recv().unwrap();
88 //!     assert!(0 <= j && j < 10);
89 //! }
90 //! ```
91 //!
92 //! Propagating panics:
93 //!
94 //! ```
95 //! use std::sync::mpsc::channel;
96 //!
97 //! // The call to recv() will return an error because the channel has already
98 //! // hung up (or been deallocated)
99 //! let (tx, rx) = channel::<i32>();
100 //! drop(tx);
101 //! assert!(rx.recv().is_err());
102 //! ```
103 //!
104 //! Synchronous channels:
105 //!
106 //! ```
107 //! use std::thread;
108 //! use std::sync::mpsc::sync_channel;
109 //!
110 //! let (tx, rx) = sync_channel::<i32>(0);
111 //! thread::spawn(move|| {
112 //!     // This will wait for the parent thread to start receiving
113 //!     tx.send(53).unwrap();
114 //! });
115 //! rx.recv().unwrap();
116 //! ```
117
118 #![stable(feature = "rust1", since = "1.0.0")]
119
120 // A description of how Rust's channel implementation works
121 //
122 // Channels are supposed to be the basic building block for all other
123 // concurrent primitives that are used in Rust. As a result, the channel type
124 // needs to be highly optimized, flexible, and broad enough for use everywhere.
125 //
126 // The choice of implementation of all channels is to be built on lock-free data
127 // structures. The channels themselves are then consequently also lock-free data
128 // structures. As always with lock-free code, this is a very "here be dragons"
129 // territory, especially because I'm unaware of any academic papers that have
130 // gone into great length about channels of these flavors.
131 //
132 // ## Flavors of channels
133 //
134 // From the perspective of a consumer of this library, there is only one flavor
135 // of channel. This channel can be used as a stream and cloned to allow multiple
136 // senders. Under the hood, however, there are actually three flavors of
137 // channels in play.
138 //
139 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
140 //                      case. They contain as few atomics as possible and
141 //                      involve one and exactly one allocation.
142 // * Streams - these channels are optimized for the non-shared use case. They
143 //             use a different concurrent queue that is more tailored for this
144 //             use case. The initial allocation of this flavor of channel is not
145 //             optimized.
146 // * Shared - this is the most general form of channel that this module offers,
147 //            a channel with multiple senders. This type is as optimized as it
148 //            can be, but the previous two types mentioned are much faster for
149 //            their use-cases.
150 //
151 // ## Concurrent queues
152 //
153 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
154 // but recv() obviously blocks. This means that under the hood there must be
155 // some shared and concurrent queue holding all of the actual data.
156 //
157 // With two flavors of channels, two flavors of queues are also used. We have
158 // chosen to use queues from a well-known author that are abbreviated as SPSC
159 // and MPSC (single producer, single consumer and multiple producer, single
160 // consumer). SPSC queues are used for streams while MPSC queues are used for
161 // shared channels.
162 //
163 // ### SPSC optimizations
164 //
165 // The SPSC queue found online is essentially a linked list of nodes where one
166 // half of the nodes are the "queue of data" and the other half of nodes are a
167 // cache of unused nodes. The unused nodes are used such that an allocation is
168 // not required on every push() and a free doesn't need to happen on every
169 // pop().
170 //
171 // As found online, however, the cache of nodes is of an infinite size. This
172 // means that if a channel at one point in its life had 50k items in the queue,
173 // then the queue will always have the capacity for 50k items. I believed that
174 // this was an unnecessary limitation of the implementation, so I have altered
175 // the queue to optionally have a bound on the cache size.
176 //
177 // By default, streams will have an unbounded SPSC queue with a small-ish cache
178 // size. The hope is that the cache is still large enough to have very fast
179 // send() operations while not too large such that millions of channels can
180 // coexist at once.
181 //
182 // ### MPSC optimizations
183 //
184 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
185 // a linked list under the hood to earn its unboundedness, but I have not put
186 // forth much effort into having a cache of nodes similar to the SPSC queue.
187 //
188 // For now, I believe that this is "ok" because shared channels are not the most
189 // common type, but soon we may wish to revisit this queue choice and determine
190 // another candidate for backend storage of shared channels.
191 //
192 // ## Overview of the Implementation
193 //
194 // Now that there's a little background on the concurrent queues used, it's
195 // worth going into much more detail about the channels themselves. The basic
196 // pseudocode for a send/recv are:
197 //
198 //
199 //      send(t)                             recv()
200 //        queue.push(t)                       return if queue.pop()
201 //        if increment() == -1                deschedule {
202 //          wakeup()                            if decrement() > 0
203 //                                                cancel_deschedule()
204 //                                            }
205 //                                            queue.pop()
206 //
207 // As mentioned before, there are no locks in this implementation, only atomic
208 // instructions are used.
209 //
210 // ### The internal atomic counter
211 //
212 // Every channel has a shared counter with each half to keep track of the size
213 // of the queue. This counter is used to abort descheduling by the receiver and
214 // to know when to wake up on the sending side.
215 //
216 // As seen in the pseudocode, senders will increment this count and receivers
217 // will decrement the count. The theory behind this is that if a sender sees a
218 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
219 // then it doesn't need to block.
220 //
221 // The recv() method has a beginning call to pop(), and if successful, it needs
222 // to decrement the count. It is a crucial implementation detail that this
223 // decrement does *not* happen to the shared counter. If this were the case,
224 // then it would be possible for the counter to be very negative when there were
225 // no receivers waiting, in which case the senders would have to determine when
226 // it was actually appropriate to wake up a receiver.
227 //
228 // Instead, the "steal count" is kept track of separately (not atomically
229 // because it's only used by receivers), and then the decrement() call when
230 // descheduling will lump in all of the recent steals into one large decrement.
231 //
232 // The implication of this is that if a sender sees a -1 count, then there's
233 // guaranteed to be a waiter waiting!
234 //
235 // ## Native Implementation
236 //
237 // A major goal of these channels is to work seamlessly on and off the runtime.
238 // All of the previous race conditions have been worded in terms of
239 // scheduler-isms (which is obviously not available without the runtime).
240 //
241 // For now, native usage of channels (off the runtime) will fall back onto
242 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
243 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
244 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
245 // condition variable.
246 //
247 // ## Select
248 //
249 // Being able to support selection over channels has greatly influenced this
250 // design, and not only does selection need to work inside the runtime, but also
251 // outside the runtime.
252 //
253 // The implementation is fairly straightforward. The goal of select() is not to
254 // return some data, but only to return which channel can receive data without
255 // blocking. The implementation is essentially the entire blocking procedure
256 // followed by an increment as soon as its woken up. The cancellation procedure
257 // involves an increment and swapping out of to_wake to acquire ownership of the
258 // thread to unblock.
259 //
260 // Sadly this current implementation requires multiple allocations, so I have
261 // seen the throughput of select() be much worse than it should be. I do not
262 // believe that there is anything fundamental that needs to change about these
263 // channels, however, in order to support a more efficient select().
264 //
265 // FIXME: Select is now removed, so these factors are ready to be cleaned up!
266 //
267 // # Conclusion
268 //
269 // And now that you've seen all the races that I found and attempted to fix,
270 // here's the code for you to find some more!
271
272 use crate::cell::UnsafeCell;
273 use crate::error;
274 use crate::fmt;
275 use crate::mem;
276 use crate::sync::Arc;
277 use crate::time::{Duration, Instant};
278
279 mod blocking;
280 mod mpsc_queue;
281 mod oneshot;
282 mod shared;
283 mod spsc_queue;
284 mod stream;
285 mod sync;
286
287 mod cache_aligned;
288
289 /// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type.
290 /// This half can only be owned by one thread.
291 ///
292 /// Messages sent to the channel can be retrieved using [`recv`].
293 ///
294 /// [`channel`]: fn.channel.html
295 /// [`sync_channel`]: fn.sync_channel.html
296 /// [`recv`]: struct.Receiver.html#method.recv
297 ///
298 /// # Examples
299 ///
300 /// ```rust
301 /// use std::sync::mpsc::channel;
302 /// use std::thread;
303 /// use std::time::Duration;
304 ///
305 /// let (send, recv) = channel();
306 ///
307 /// thread::spawn(move || {
308 ///     send.send("Hello world!").unwrap();
309 ///     thread::sleep(Duration::from_secs(2)); // block for two seconds
310 ///     send.send("Delayed for 2 seconds").unwrap();
311 /// });
312 ///
313 /// println!("{}", recv.recv().unwrap()); // Received immediately
314 /// println!("Waiting...");
315 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
316 /// ```
317 #[stable(feature = "rust1", since = "1.0.0")]
318 pub struct Receiver<T> {
319     inner: UnsafeCell<Flavor<T>>,
320 }
321
322 // The receiver port can be sent from place to place, so long as it
323 // is not used to receive non-sendable things.
324 #[stable(feature = "rust1", since = "1.0.0")]
325 unsafe impl<T: Send> Send for Receiver<T> {}
326
327 #[stable(feature = "rust1", since = "1.0.0")]
328 impl<T> !Sync for Receiver<T> {}
329
330 /// An iterator over messages on a [`Receiver`], created by [`iter`].
331 ///
332 /// This iterator will block whenever [`next`] is called,
333 /// waiting for a new message, and [`None`] will be returned
334 /// when the corresponding channel has hung up.
335 ///
336 /// [`iter`]: struct.Receiver.html#method.iter
337 /// [`Receiver`]: struct.Receiver.html
338 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
339 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
340 ///
341 /// # Examples
342 ///
343 /// ```rust
344 /// use std::sync::mpsc::channel;
345 /// use std::thread;
346 ///
347 /// let (send, recv) = channel();
348 ///
349 /// thread::spawn(move || {
350 ///     send.send(1u8).unwrap();
351 ///     send.send(2u8).unwrap();
352 ///     send.send(3u8).unwrap();
353 /// });
354 ///
355 /// for x in recv.iter() {
356 ///     println!("Got: {}", x);
357 /// }
358 /// ```
359 #[stable(feature = "rust1", since = "1.0.0")]
360 #[derive(Debug)]
361 pub struct Iter<'a, T: 'a> {
362     rx: &'a Receiver<T>,
363 }
364
365 /// An iterator that attempts to yield all pending values for a [`Receiver`],
366 /// created by [`try_iter`].
367 ///
368 /// [`None`] will be returned when there are no pending values remaining or
369 /// if the corresponding channel has hung up.
370 ///
371 /// This iterator will never block the caller in order to wait for data to
372 /// become available. Instead, it will return [`None`].
373 ///
374 /// [`Receiver`]: struct.Receiver.html
375 /// [`try_iter`]: struct.Receiver.html#method.try_iter
376 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
377 ///
378 /// # Examples
379 ///
380 /// ```rust
381 /// use std::sync::mpsc::channel;
382 /// use std::thread;
383 /// use std::time::Duration;
384 ///
385 /// let (sender, receiver) = channel();
386 ///
387 /// // Nothing is in the buffer yet
388 /// assert!(receiver.try_iter().next().is_none());
389 /// println!("Nothing in the buffer...");
390 ///
391 /// thread::spawn(move || {
392 ///     sender.send(1).unwrap();
393 ///     sender.send(2).unwrap();
394 ///     sender.send(3).unwrap();
395 /// });
396 ///
397 /// println!("Going to sleep...");
398 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
399 ///
400 /// for x in receiver.try_iter() {
401 ///     println!("Got: {}", x);
402 /// }
403 /// ```
404 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
405 #[derive(Debug)]
406 pub struct TryIter<'a, T: 'a> {
407     rx: &'a Receiver<T>,
408 }
409
410 /// An owning iterator over messages on a [`Receiver`],
411 /// created by **Receiver::into_iter**.
412 ///
413 /// This iterator will block whenever [`next`]
414 /// is called, waiting for a new message, and [`None`] will be
415 /// returned if the corresponding channel has hung up.
416 ///
417 /// [`Receiver`]: struct.Receiver.html
418 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
419 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
420 ///
421 /// # Examples
422 ///
423 /// ```rust
424 /// use std::sync::mpsc::channel;
425 /// use std::thread;
426 ///
427 /// let (send, recv) = channel();
428 ///
429 /// thread::spawn(move || {
430 ///     send.send(1u8).unwrap();
431 ///     send.send(2u8).unwrap();
432 ///     send.send(3u8).unwrap();
433 /// });
434 ///
435 /// for x in recv.into_iter() {
436 ///     println!("Got: {}", x);
437 /// }
438 /// ```
439 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
440 #[derive(Debug)]
441 pub struct IntoIter<T> {
442     rx: Receiver<T>,
443 }
444
445 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
446 /// owned by one thread, but it can be cloned to send to other threads.
447 ///
448 /// Messages can be sent through this channel with [`send`].
449 ///
450 /// [`channel`]: fn.channel.html
451 /// [`send`]: struct.Sender.html#method.send
452 ///
453 /// # Examples
454 ///
455 /// ```rust
456 /// use std::sync::mpsc::channel;
457 /// use std::thread;
458 ///
459 /// let (sender, receiver) = channel();
460 /// let sender2 = sender.clone();
461 ///
462 /// // First thread owns sender
463 /// thread::spawn(move || {
464 ///     sender.send(1).unwrap();
465 /// });
466 ///
467 /// // Second thread owns sender2
468 /// thread::spawn(move || {
469 ///     sender2.send(2).unwrap();
470 /// });
471 ///
472 /// let msg = receiver.recv().unwrap();
473 /// let msg2 = receiver.recv().unwrap();
474 ///
475 /// assert_eq!(3, msg + msg2);
476 /// ```
477 #[stable(feature = "rust1", since = "1.0.0")]
478 pub struct Sender<T> {
479     inner: UnsafeCell<Flavor<T>>,
480 }
481
482 // The send port can be sent from place to place, so long as it
483 // is not used to send non-sendable things.
484 #[stable(feature = "rust1", since = "1.0.0")]
485 unsafe impl<T: Send> Send for Sender<T> {}
486
487 #[stable(feature = "rust1", since = "1.0.0")]
488 impl<T> !Sync for Sender<T> {}
489
490 /// The sending-half of Rust's synchronous [`sync_channel`] type.
491 ///
492 /// Messages can be sent through this channel with [`send`] or [`try_send`].
493 ///
494 /// [`send`] will block if there is no space in the internal buffer.
495 ///
496 /// [`sync_channel`]: fn.sync_channel.html
497 /// [`send`]: struct.SyncSender.html#method.send
498 /// [`try_send`]: struct.SyncSender.html#method.try_send
499 ///
500 /// # Examples
501 ///
502 /// ```rust
503 /// use std::sync::mpsc::sync_channel;
504 /// use std::thread;
505 ///
506 /// // Create a sync_channel with buffer size 2
507 /// let (sync_sender, receiver) = sync_channel(2);
508 /// let sync_sender2 = sync_sender.clone();
509 ///
510 /// // First thread owns sync_sender
511 /// thread::spawn(move || {
512 ///     sync_sender.send(1).unwrap();
513 ///     sync_sender.send(2).unwrap();
514 /// });
515 ///
516 /// // Second thread owns sync_sender2
517 /// thread::spawn(move || {
518 ///     sync_sender2.send(3).unwrap();
519 ///     // thread will now block since the buffer is full
520 ///     println!("Thread unblocked!");
521 /// });
522 ///
523 /// let mut msg;
524 ///
525 /// msg = receiver.recv().unwrap();
526 /// println!("message {} received", msg);
527 ///
528 /// // "Thread unblocked!" will be printed now
529 ///
530 /// msg = receiver.recv().unwrap();
531 /// println!("message {} received", msg);
532 ///
533 /// msg = receiver.recv().unwrap();
534 ///
535 /// println!("message {} received", msg);
536 /// ```
537 #[stable(feature = "rust1", since = "1.0.0")]
538 pub struct SyncSender<T> {
539     inner: Arc<sync::Packet<T>>,
540 }
541
542 #[stable(feature = "rust1", since = "1.0.0")]
543 unsafe impl<T: Send> Send for SyncSender<T> {}
544
545 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
546 /// function on **channel**s.
547 ///
548 /// A **send** operation can only fail if the receiving end of a channel is
549 /// disconnected, implying that the data could never be received. The error
550 /// contains the data being sent as a payload so it can be recovered.
551 ///
552 /// [`Sender::send`]: struct.Sender.html#method.send
553 /// [`SyncSender::send`]: struct.SyncSender.html#method.send
554 #[stable(feature = "rust1", since = "1.0.0")]
555 #[derive(PartialEq, Eq, Clone, Copy)]
556 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
557
558 /// An error returned from the [`recv`] function on a [`Receiver`].
559 ///
560 /// The [`recv`] operation can only fail if the sending half of a
561 /// [`channel`] (or [`sync_channel`]) is disconnected, implying that no further
562 /// messages will ever be received.
563 ///
564 /// [`recv`]: struct.Receiver.html#method.recv
565 /// [`Receiver`]: struct.Receiver.html
566 /// [`channel`]: fn.channel.html
567 /// [`sync_channel`]: fn.sync_channel.html
568 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
569 #[stable(feature = "rust1", since = "1.0.0")]
570 pub struct RecvError;
571
572 /// This enumeration is the list of the possible reasons that [`try_recv`] could
573 /// not return data when called. This can occur with both a [`channel`] and
574 /// a [`sync_channel`].
575 ///
576 /// [`try_recv`]: struct.Receiver.html#method.try_recv
577 /// [`channel`]: fn.channel.html
578 /// [`sync_channel`]: fn.sync_channel.html
579 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
580 #[stable(feature = "rust1", since = "1.0.0")]
581 pub enum TryRecvError {
582     /// This **channel** is currently empty, but the **Sender**(s) have not yet
583     /// disconnected, so data may yet become available.
584     #[stable(feature = "rust1", since = "1.0.0")]
585     Empty,
586
587     /// The **channel**'s sending half has become disconnected, and there will
588     /// never be any more data received on it.
589     #[stable(feature = "rust1", since = "1.0.0")]
590     Disconnected,
591 }
592
593 /// This enumeration is the list of possible errors that made [`recv_timeout`]
594 /// unable to return data when called. This can occur with both a [`channel`] and
595 /// a [`sync_channel`].
596 ///
597 /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
598 /// [`channel`]: fn.channel.html
599 /// [`sync_channel`]: fn.sync_channel.html
600 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
601 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
602 pub enum RecvTimeoutError {
603     /// This **channel** is currently empty, but the **Sender**(s) have not yet
604     /// disconnected, so data may yet become available.
605     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
606     Timeout,
607     /// The **channel**'s sending half has become disconnected, and there will
608     /// never be any more data received on it.
609     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
610     Disconnected,
611 }
612
613 /// This enumeration is the list of the possible error outcomes for the
614 /// [`try_send`] method.
615 ///
616 /// [`try_send`]: struct.SyncSender.html#method.try_send
617 #[stable(feature = "rust1", since = "1.0.0")]
618 #[derive(PartialEq, Eq, Clone, Copy)]
619 pub enum TrySendError<T> {
620     /// The data could not be sent on the [`sync_channel`] because it would require that
621     /// the callee block to send the data.
622     ///
623     /// If this is a buffered channel, then the buffer is full at this time. If
624     /// this is not a buffered channel, then there is no [`Receiver`] available to
625     /// acquire the data.
626     ///
627     /// [`sync_channel`]: fn.sync_channel.html
628     /// [`Receiver`]: struct.Receiver.html
629     #[stable(feature = "rust1", since = "1.0.0")]
630     Full(#[stable(feature = "rust1", since = "1.0.0")] T),
631
632     /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
633     /// sent. The data is returned back to the callee in this case.
634     ///
635     /// [`sync_channel`]: fn.sync_channel.html
636     #[stable(feature = "rust1", since = "1.0.0")]
637     Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
638 }
639
640 enum Flavor<T> {
641     Oneshot(Arc<oneshot::Packet<T>>),
642     Stream(Arc<stream::Packet<T>>),
643     Shared(Arc<shared::Packet<T>>),
644     Sync(Arc<sync::Packet<T>>),
645 }
646
647 #[doc(hidden)]
648 trait UnsafeFlavor<T> {
649     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
650     unsafe fn inner_mut(&self) -> &mut Flavor<T> {
651         &mut *self.inner_unsafe().get()
652     }
653     unsafe fn inner(&self) -> &Flavor<T> {
654         &*self.inner_unsafe().get()
655     }
656 }
657 impl<T> UnsafeFlavor<T> for Sender<T> {
658     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
659         &self.inner
660     }
661 }
662 impl<T> UnsafeFlavor<T> for Receiver<T> {
663     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
664         &self.inner
665     }
666 }
667
668 /// Creates a new asynchronous channel, returning the sender/receiver halves.
669 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
670 /// the same order as it was sent, and no [`send`] will block the calling thread
671 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
672 /// block after its buffer limit is reached). [`recv`] will block until a message
673 /// is available.
674 ///
675 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
676 /// only one [`Receiver`] is supported.
677 ///
678 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
679 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
680 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
681 /// return a [`RecvError`].
682 ///
683 /// [`send`]: struct.Sender.html#method.send
684 /// [`recv`]: struct.Receiver.html#method.recv
685 /// [`Sender`]: struct.Sender.html
686 /// [`Receiver`]: struct.Receiver.html
687 /// [`sync_channel`]: fn.sync_channel.html
688 /// [`SendError`]: struct.SendError.html
689 /// [`RecvError`]: struct.RecvError.html
690 ///
691 /// # Examples
692 ///
693 /// ```
694 /// use std::sync::mpsc::channel;
695 /// use std::thread;
696 ///
697 /// let (sender, receiver) = channel();
698 ///
699 /// // Spawn off an expensive computation
700 /// thread::spawn(move|| {
701 /// #   fn expensive_computation() {}
702 ///     sender.send(expensive_computation()).unwrap();
703 /// });
704 ///
705 /// // Do some useful work for awhile
706 ///
707 /// // Let's see what that answer was
708 /// println!("{:?}", receiver.recv().unwrap());
709 /// ```
710 #[stable(feature = "rust1", since = "1.0.0")]
711 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
712     let a = Arc::new(oneshot::Packet::new());
713     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
714 }
715
716 /// Creates a new synchronous, bounded channel.
717 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
718 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
719 /// [`Receiver`] will block until a message becomes available. `sync_channel`
720 /// differs greatly in the semantics of the sender, however.
721 ///
722 /// This channel has an internal buffer on which messages will be queued.
723 /// `bound` specifies the buffer size. When the internal buffer becomes full,
724 /// future sends will *block* waiting for the buffer to open up. Note that a
725 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
726 /// where each [`send`] will not return until a [`recv`] is paired with it.
727 ///
728 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
729 /// times, but only one [`Receiver`] is supported.
730 ///
731 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
732 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
733 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
734 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
735 ///
736 /// [`channel`]: fn.channel.html
737 /// [`send`]: struct.SyncSender.html#method.send
738 /// [`recv`]: struct.Receiver.html#method.recv
739 /// [`SyncSender`]: struct.SyncSender.html
740 /// [`Receiver`]: struct.Receiver.html
741 /// [`SendError`]: struct.SendError.html
742 /// [`RecvError`]: struct.RecvError.html
743 ///
744 /// # Examples
745 ///
746 /// ```
747 /// use std::sync::mpsc::sync_channel;
748 /// use std::thread;
749 ///
750 /// let (sender, receiver) = sync_channel(1);
751 ///
752 /// // this returns immediately
753 /// sender.send(1).unwrap();
754 ///
755 /// thread::spawn(move|| {
756 ///     // this will block until the previous message has been received
757 ///     sender.send(2).unwrap();
758 /// });
759 ///
760 /// assert_eq!(receiver.recv().unwrap(), 1);
761 /// assert_eq!(receiver.recv().unwrap(), 2);
762 /// ```
763 #[stable(feature = "rust1", since = "1.0.0")]
764 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
765     let a = Arc::new(sync::Packet::new(bound));
766     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
767 }
768
769 ////////////////////////////////////////////////////////////////////////////////
770 // Sender
771 ////////////////////////////////////////////////////////////////////////////////
772
773 impl<T> Sender<T> {
774     fn new(inner: Flavor<T>) -> Sender<T> {
775         Sender { inner: UnsafeCell::new(inner) }
776     }
777
778     /// Attempts to send a value on this channel, returning it back if it could
779     /// not be sent.
780     ///
781     /// A successful send occurs when it is determined that the other end of
782     /// the channel has not hung up already. An unsuccessful send would be one
783     /// where the corresponding receiver has already been deallocated. Note
784     /// that a return value of [`Err`] means that the data will never be
785     /// received, but a return value of [`Ok`] does *not* mean that the data
786     /// will be received. It is possible for the corresponding receiver to
787     /// hang up immediately after this function returns [`Ok`].
788     ///
789     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
790     /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
791     ///
792     /// This method will never block the current thread.
793     ///
794     /// # Examples
795     ///
796     /// ```
797     /// use std::sync::mpsc::channel;
798     ///
799     /// let (tx, rx) = channel();
800     ///
801     /// // This send is always successful
802     /// tx.send(1).unwrap();
803     ///
804     /// // This send will fail because the receiver is gone
805     /// drop(rx);
806     /// assert_eq!(tx.send(1).unwrap_err().0, 1);
807     /// ```
808     #[stable(feature = "rust1", since = "1.0.0")]
809     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
810         let (new_inner, ret) = match *unsafe { self.inner() } {
811             Flavor::Oneshot(ref p) => {
812                 if !p.sent() {
813                     return p.send(t).map_err(SendError);
814                 } else {
815                     let a = Arc::new(stream::Packet::new());
816                     let rx = Receiver::new(Flavor::Stream(a.clone()));
817                     match p.upgrade(rx) {
818                         oneshot::UpSuccess => {
819                             let ret = a.send(t);
820                             (a, ret)
821                         }
822                         oneshot::UpDisconnected => (a, Err(t)),
823                         oneshot::UpWoke(token) => {
824                             // This send cannot panic because the thread is
825                             // asleep (we're looking at it), so the receiver
826                             // can't go away.
827                             a.send(t).ok().unwrap();
828                             token.signal();
829                             (a, Ok(()))
830                         }
831                     }
832                 }
833             }
834             Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
835             Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
836             Flavor::Sync(..) => unreachable!(),
837         };
838
839         unsafe {
840             let tmp = Sender::new(Flavor::Stream(new_inner));
841             mem::swap(self.inner_mut(), tmp.inner_mut());
842         }
843         ret.map_err(SendError)
844     }
845 }
846
847 #[stable(feature = "rust1", since = "1.0.0")]
848 impl<T> Clone for Sender<T> {
849     fn clone(&self) -> Sender<T> {
850         let packet = match *unsafe { self.inner() } {
851             Flavor::Oneshot(ref p) => {
852                 let a = Arc::new(shared::Packet::new());
853                 {
854                     let guard = a.postinit_lock();
855                     let rx = Receiver::new(Flavor::Shared(a.clone()));
856                     let sleeper = match p.upgrade(rx) {
857                         oneshot::UpSuccess | oneshot::UpDisconnected => None,
858                         oneshot::UpWoke(task) => Some(task),
859                     };
860                     a.inherit_blocker(sleeper, guard);
861                 }
862                 a
863             }
864             Flavor::Stream(ref p) => {
865                 let a = Arc::new(shared::Packet::new());
866                 {
867                     let guard = a.postinit_lock();
868                     let rx = Receiver::new(Flavor::Shared(a.clone()));
869                     let sleeper = match p.upgrade(rx) {
870                         stream::UpSuccess | stream::UpDisconnected => None,
871                         stream::UpWoke(task) => Some(task),
872                     };
873                     a.inherit_blocker(sleeper, guard);
874                 }
875                 a
876             }
877             Flavor::Shared(ref p) => {
878                 p.clone_chan();
879                 return Sender::new(Flavor::Shared(p.clone()));
880             }
881             Flavor::Sync(..) => unreachable!(),
882         };
883
884         unsafe {
885             let tmp = Sender::new(Flavor::Shared(packet.clone()));
886             mem::swap(self.inner_mut(), tmp.inner_mut());
887         }
888         Sender::new(Flavor::Shared(packet))
889     }
890 }
891
892 #[stable(feature = "rust1", since = "1.0.0")]
893 impl<T> Drop for Sender<T> {
894     fn drop(&mut self) {
895         match *unsafe { self.inner() } {
896             Flavor::Oneshot(ref p) => p.drop_chan(),
897             Flavor::Stream(ref p) => p.drop_chan(),
898             Flavor::Shared(ref p) => p.drop_chan(),
899             Flavor::Sync(..) => unreachable!(),
900         }
901     }
902 }
903
904 #[stable(feature = "mpsc_debug", since = "1.8.0")]
905 impl<T> fmt::Debug for Sender<T> {
906     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
907         f.debug_struct("Sender").finish()
908     }
909 }
910
911 ////////////////////////////////////////////////////////////////////////////////
912 // SyncSender
913 ////////////////////////////////////////////////////////////////////////////////
914
915 impl<T> SyncSender<T> {
916     fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
917         SyncSender { inner }
918     }
919
920     /// Sends a value on this synchronous channel.
921     ///
922     /// This function will *block* until space in the internal buffer becomes
923     /// available or a receiver is available to hand off the message to.
924     ///
925     /// Note that a successful send does *not* guarantee that the receiver will
926     /// ever see the data if there is a buffer on this channel. Items may be
927     /// enqueued in the internal buffer for the receiver to receive at a later
928     /// time. If the buffer size is 0, however, the channel becomes a rendezvous
929     /// channel and it guarantees that the receiver has indeed received
930     /// the data if this function returns success.
931     ///
932     /// This function will never panic, but it may return [`Err`] if the
933     /// [`Receiver`] has disconnected and is no longer able to receive
934     /// information.
935     ///
936     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
937     /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
938     ///
939     /// # Examples
940     ///
941     /// ```rust
942     /// use std::sync::mpsc::sync_channel;
943     /// use std::thread;
944     ///
945     /// // Create a rendezvous sync_channel with buffer size 0
946     /// let (sync_sender, receiver) = sync_channel(0);
947     ///
948     /// thread::spawn(move || {
949     ///    println!("sending message...");
950     ///    sync_sender.send(1).unwrap();
951     ///    // Thread is now blocked until the message is received
952     ///
953     ///    println!("...message received!");
954     /// });
955     ///
956     /// let msg = receiver.recv().unwrap();
957     /// assert_eq!(1, msg);
958     /// ```
959     #[stable(feature = "rust1", since = "1.0.0")]
960     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
961         self.inner.send(t).map_err(SendError)
962     }
963
964     /// Attempts to send a value on this channel without blocking.
965     ///
966     /// This method differs from [`send`] by returning immediately if the
967     /// channel's buffer is full or no receiver is waiting to acquire some
968     /// data. Compared with [`send`], this function has two failure cases
969     /// instead of one (one for disconnection, one for a full buffer).
970     ///
971     /// See [`send`] for notes about guarantees of whether the
972     /// receiver has received the data or not if this function is successful.
973     ///
974     /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
975     ///
976     /// # Examples
977     ///
978     /// ```rust
979     /// use std::sync::mpsc::sync_channel;
980     /// use std::thread;
981     ///
982     /// // Create a sync_channel with buffer size 1
983     /// let (sync_sender, receiver) = sync_channel(1);
984     /// let sync_sender2 = sync_sender.clone();
985     ///
986     /// // First thread owns sync_sender
987     /// thread::spawn(move || {
988     ///     sync_sender.send(1).unwrap();
989     ///     sync_sender.send(2).unwrap();
990     ///     // Thread blocked
991     /// });
992     ///
993     /// // Second thread owns sync_sender2
994     /// thread::spawn(move || {
995     ///     // This will return an error and send
996     ///     // no message if the buffer is full
997     ///     let _ = sync_sender2.try_send(3);
998     /// });
999     ///
1000     /// let mut msg;
1001     /// msg = receiver.recv().unwrap();
1002     /// println!("message {} received", msg);
1003     ///
1004     /// msg = receiver.recv().unwrap();
1005     /// println!("message {} received", msg);
1006     ///
1007     /// // Third message may have never been sent
1008     /// match receiver.try_recv() {
1009     ///     Ok(msg) => println!("message {} received", msg),
1010     ///     Err(_) => println!("the third message was never sent"),
1011     /// }
1012     /// ```
1013     #[stable(feature = "rust1", since = "1.0.0")]
1014     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
1015         self.inner.try_send(t)
1016     }
1017 }
1018
1019 #[stable(feature = "rust1", since = "1.0.0")]
1020 impl<T> Clone for SyncSender<T> {
1021     fn clone(&self) -> SyncSender<T> {
1022         self.inner.clone_chan();
1023         SyncSender::new(self.inner.clone())
1024     }
1025 }
1026
1027 #[stable(feature = "rust1", since = "1.0.0")]
1028 impl<T> Drop for SyncSender<T> {
1029     fn drop(&mut self) {
1030         self.inner.drop_chan();
1031     }
1032 }
1033
1034 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1035 impl<T> fmt::Debug for SyncSender<T> {
1036     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1037         f.debug_struct("SyncSender").finish()
1038     }
1039 }
1040
1041 ////////////////////////////////////////////////////////////////////////////////
1042 // Receiver
1043 ////////////////////////////////////////////////////////////////////////////////
1044
1045 impl<T> Receiver<T> {
1046     fn new(inner: Flavor<T>) -> Receiver<T> {
1047         Receiver { inner: UnsafeCell::new(inner) }
1048     }
1049
1050     /// Attempts to return a pending value on this receiver without blocking.
1051     ///
1052     /// This method will never block the caller in order to wait for data to
1053     /// become available. Instead, this will always return immediately with a
1054     /// possible option of pending data on the channel.
1055     ///
1056     /// This is useful for a flavor of "optimistic check" before deciding to
1057     /// block on a receiver.
1058     ///
1059     /// Compared with [`recv`], this function has two failure cases instead of one
1060     /// (one for disconnection, one for an empty buffer).
1061     ///
1062     /// [`recv`]: struct.Receiver.html#method.recv
1063     ///
1064     /// # Examples
1065     ///
1066     /// ```rust
1067     /// use std::sync::mpsc::{Receiver, channel};
1068     ///
1069     /// let (_, receiver): (_, Receiver<i32>) = channel();
1070     ///
1071     /// assert!(receiver.try_recv().is_err());
1072     /// ```
1073     #[stable(feature = "rust1", since = "1.0.0")]
1074     pub fn try_recv(&self) -> Result<T, TryRecvError> {
1075         loop {
1076             let new_port = match *unsafe { self.inner() } {
1077                 Flavor::Oneshot(ref p) => match p.try_recv() {
1078                     Ok(t) => return Ok(t),
1079                     Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1080                     Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected),
1081                     Err(oneshot::Upgraded(rx)) => rx,
1082                 },
1083                 Flavor::Stream(ref p) => match p.try_recv() {
1084                     Ok(t) => return Ok(t),
1085                     Err(stream::Empty) => return Err(TryRecvError::Empty),
1086                     Err(stream::Disconnected) => return Err(TryRecvError::Disconnected),
1087                     Err(stream::Upgraded(rx)) => rx,
1088                 },
1089                 Flavor::Shared(ref p) => match p.try_recv() {
1090                     Ok(t) => return Ok(t),
1091                     Err(shared::Empty) => return Err(TryRecvError::Empty),
1092                     Err(shared::Disconnected) => return Err(TryRecvError::Disconnected),
1093                 },
1094                 Flavor::Sync(ref p) => match p.try_recv() {
1095                     Ok(t) => return Ok(t),
1096                     Err(sync::Empty) => return Err(TryRecvError::Empty),
1097                     Err(sync::Disconnected) => return Err(TryRecvError::Disconnected),
1098                 },
1099             };
1100             unsafe {
1101                 mem::swap(self.inner_mut(), new_port.inner_mut());
1102             }
1103         }
1104     }
1105
1106     /// Attempts to wait for a value on this receiver, returning an error if the
1107     /// corresponding channel has hung up.
1108     ///
1109     /// This function will always block the current thread if there is no data
1110     /// available and it's possible for more data to be sent. Once a message is
1111     /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this
1112     /// receiver will wake up and return that message.
1113     ///
1114     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1115     /// this call is blocking, this call will wake up and return [`Err`] to
1116     /// indicate that no more messages can ever be received on this channel.
1117     /// However, since channels are buffered, messages sent before the disconnect
1118     /// will still be properly received.
1119     ///
1120     /// [`Sender`]: struct.Sender.html
1121     /// [`SyncSender`]: struct.SyncSender.html
1122     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1123     ///
1124     /// # Examples
1125     ///
1126     /// ```
1127     /// use std::sync::mpsc;
1128     /// use std::thread;
1129     ///
1130     /// let (send, recv) = mpsc::channel();
1131     /// let handle = thread::spawn(move || {
1132     ///     send.send(1u8).unwrap();
1133     /// });
1134     ///
1135     /// handle.join().unwrap();
1136     ///
1137     /// assert_eq!(Ok(1), recv.recv());
1138     /// ```
1139     ///
1140     /// Buffering behavior:
1141     ///
1142     /// ```
1143     /// use std::sync::mpsc;
1144     /// use std::thread;
1145     /// use std::sync::mpsc::RecvError;
1146     ///
1147     /// let (send, recv) = mpsc::channel();
1148     /// let handle = thread::spawn(move || {
1149     ///     send.send(1u8).unwrap();
1150     ///     send.send(2).unwrap();
1151     ///     send.send(3).unwrap();
1152     ///     drop(send);
1153     /// });
1154     ///
1155     /// // wait for the thread to join so we ensure the sender is dropped
1156     /// handle.join().unwrap();
1157     ///
1158     /// assert_eq!(Ok(1), recv.recv());
1159     /// assert_eq!(Ok(2), recv.recv());
1160     /// assert_eq!(Ok(3), recv.recv());
1161     /// assert_eq!(Err(RecvError), recv.recv());
1162     /// ```
1163     #[stable(feature = "rust1", since = "1.0.0")]
1164     pub fn recv(&self) -> Result<T, RecvError> {
1165         loop {
1166             let new_port = match *unsafe { self.inner() } {
1167                 Flavor::Oneshot(ref p) => match p.recv(None) {
1168                     Ok(t) => return Ok(t),
1169                     Err(oneshot::Disconnected) => return Err(RecvError),
1170                     Err(oneshot::Upgraded(rx)) => rx,
1171                     Err(oneshot::Empty) => unreachable!(),
1172                 },
1173                 Flavor::Stream(ref p) => match p.recv(None) {
1174                     Ok(t) => return Ok(t),
1175                     Err(stream::Disconnected) => return Err(RecvError),
1176                     Err(stream::Upgraded(rx)) => rx,
1177                     Err(stream::Empty) => unreachable!(),
1178                 },
1179                 Flavor::Shared(ref p) => match p.recv(None) {
1180                     Ok(t) => return Ok(t),
1181                     Err(shared::Disconnected) => return Err(RecvError),
1182                     Err(shared::Empty) => unreachable!(),
1183                 },
1184                 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1185             };
1186             unsafe {
1187                 mem::swap(self.inner_mut(), new_port.inner_mut());
1188             }
1189         }
1190     }
1191
1192     /// Attempts to wait for a value on this receiver, returning an error if the
1193     /// corresponding channel has hung up, or if it waits more than `timeout`.
1194     ///
1195     /// This function will always block the current thread if there is no data
1196     /// available and it's possible for more data to be sent. Once a message is
1197     /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this
1198     /// receiver will wake up and return that message.
1199     ///
1200     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1201     /// this call is blocking, this call will wake up and return [`Err`] to
1202     /// indicate that no more messages can ever be received on this channel.
1203     /// However, since channels are buffered, messages sent before the disconnect
1204     /// will still be properly received.
1205     ///
1206     /// [`Sender`]: struct.Sender.html
1207     /// [`SyncSender`]: struct.SyncSender.html
1208     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1209     ///
1210     /// # Known Issues
1211     ///
1212     /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
1213     /// to panic unexpectedly with the following example:
1214     ///
1215     /// ```no_run
1216     /// use std::sync::mpsc::channel;
1217     /// use std::thread;
1218     /// use std::time::Duration;
1219     ///
1220     /// let (tx, rx) = channel::<String>();
1221     ///
1222     /// thread::spawn(move || {
1223     ///     let d = Duration::from_millis(10);
1224     ///     loop {
1225     ///         println!("recv");
1226     ///         let _r = rx.recv_timeout(d);
1227     ///     }
1228     /// });
1229     ///
1230     /// thread::sleep(Duration::from_millis(100));
1231     /// let _c1 = tx.clone();
1232     ///
1233     /// thread::sleep(Duration::from_secs(1));
1234     /// ```
1235     ///
1236     /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
1237     ///
1238     /// # Examples
1239     ///
1240     /// Successfully receiving value before encountering timeout:
1241     ///
1242     /// ```no_run
1243     /// use std::thread;
1244     /// use std::time::Duration;
1245     /// use std::sync::mpsc;
1246     ///
1247     /// let (send, recv) = mpsc::channel();
1248     ///
1249     /// thread::spawn(move || {
1250     ///     send.send('a').unwrap();
1251     /// });
1252     ///
1253     /// assert_eq!(
1254     ///     recv.recv_timeout(Duration::from_millis(400)),
1255     ///     Ok('a')
1256     /// );
1257     /// ```
1258     ///
1259     /// Receiving an error upon reaching timeout:
1260     ///
1261     /// ```no_run
1262     /// use std::thread;
1263     /// use std::time::Duration;
1264     /// use std::sync::mpsc;
1265     ///
1266     /// let (send, recv) = mpsc::channel();
1267     ///
1268     /// thread::spawn(move || {
1269     ///     thread::sleep(Duration::from_millis(800));
1270     ///     send.send('a').unwrap();
1271     /// });
1272     ///
1273     /// assert_eq!(
1274     ///     recv.recv_timeout(Duration::from_millis(400)),
1275     ///     Err(mpsc::RecvTimeoutError::Timeout)
1276     /// );
1277     /// ```
1278     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1279     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1280         // Do an optimistic try_recv to avoid the performance impact of
1281         // Instant::now() in the full-channel case.
1282         match self.try_recv() {
1283             Ok(result) => Ok(result),
1284             Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
1285             Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
1286                 Some(deadline) => self.recv_deadline(deadline),
1287                 // So far in the future that it's practically the same as waiting indefinitely.
1288                 None => self.recv().map_err(RecvTimeoutError::from),
1289             },
1290         }
1291     }
1292
1293     /// Attempts to wait for a value on this receiver, returning an error if the
1294     /// corresponding channel has hung up, or if `deadline` is reached.
1295     ///
1296     /// This function will always block the current thread if there is no data
1297     /// available and it's possible for more data to be sent. Once a message is
1298     /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this
1299     /// receiver will wake up and return that message.
1300     ///
1301     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1302     /// this call is blocking, this call will wake up and return [`Err`] to
1303     /// indicate that no more messages can ever be received on this channel.
1304     /// However, since channels are buffered, messages sent before the disconnect
1305     /// will still be properly received.
1306     ///
1307     /// [`Sender`]: struct.Sender.html
1308     /// [`SyncSender`]: struct.SyncSender.html
1309     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1310     ///
1311     /// # Examples
1312     ///
1313     /// Successfully receiving value before reaching deadline:
1314     ///
1315     /// ```no_run
1316     /// #![feature(deadline_api)]
1317     /// use std::thread;
1318     /// use std::time::{Duration, Instant};
1319     /// use std::sync::mpsc;
1320     ///
1321     /// let (send, recv) = mpsc::channel();
1322     ///
1323     /// thread::spawn(move || {
1324     ///     send.send('a').unwrap();
1325     /// });
1326     ///
1327     /// assert_eq!(
1328     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1329     ///     Ok('a')
1330     /// );
1331     /// ```
1332     ///
1333     /// Receiving an error upon reaching deadline:
1334     ///
1335     /// ```no_run
1336     /// #![feature(deadline_api)]
1337     /// use std::thread;
1338     /// use std::time::{Duration, Instant};
1339     /// use std::sync::mpsc;
1340     ///
1341     /// let (send, recv) = mpsc::channel();
1342     ///
1343     /// thread::spawn(move || {
1344     ///     thread::sleep(Duration::from_millis(800));
1345     ///     send.send('a').unwrap();
1346     /// });
1347     ///
1348     /// assert_eq!(
1349     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1350     ///     Err(mpsc::RecvTimeoutError::Timeout)
1351     /// );
1352     /// ```
1353     #[unstable(feature = "deadline_api", issue = "46316")]
1354     pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1355         use self::RecvTimeoutError::*;
1356
1357         loop {
1358             let port_or_empty = match *unsafe { self.inner() } {
1359                 Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) {
1360                     Ok(t) => return Ok(t),
1361                     Err(oneshot::Disconnected) => return Err(Disconnected),
1362                     Err(oneshot::Upgraded(rx)) => Some(rx),
1363                     Err(oneshot::Empty) => None,
1364                 },
1365                 Flavor::Stream(ref p) => match p.recv(Some(deadline)) {
1366                     Ok(t) => return Ok(t),
1367                     Err(stream::Disconnected) => return Err(Disconnected),
1368                     Err(stream::Upgraded(rx)) => Some(rx),
1369                     Err(stream::Empty) => None,
1370                 },
1371                 Flavor::Shared(ref p) => match p.recv(Some(deadline)) {
1372                     Ok(t) => return Ok(t),
1373                     Err(shared::Disconnected) => return Err(Disconnected),
1374                     Err(shared::Empty) => None,
1375                 },
1376                 Flavor::Sync(ref p) => match p.recv(Some(deadline)) {
1377                     Ok(t) => return Ok(t),
1378                     Err(sync::Disconnected) => return Err(Disconnected),
1379                     Err(sync::Empty) => None,
1380                 },
1381             };
1382
1383             if let Some(new_port) = port_or_empty {
1384                 unsafe {
1385                     mem::swap(self.inner_mut(), new_port.inner_mut());
1386                 }
1387             }
1388
1389             // If we're already passed the deadline, and we're here without
1390             // data, return a timeout, else try again.
1391             if Instant::now() >= deadline {
1392                 return Err(Timeout);
1393             }
1394         }
1395     }
1396
1397     /// Returns an iterator that will block waiting for messages, but never
1398     /// [`panic!`]. It will return [`None`] when the channel has hung up.
1399     ///
1400     /// [`panic!`]: ../../../std/macro.panic.html
1401     /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1402     ///
1403     /// # Examples
1404     ///
1405     /// ```rust
1406     /// use std::sync::mpsc::channel;
1407     /// use std::thread;
1408     ///
1409     /// let (send, recv) = channel();
1410     ///
1411     /// thread::spawn(move || {
1412     ///     send.send(1).unwrap();
1413     ///     send.send(2).unwrap();
1414     ///     send.send(3).unwrap();
1415     /// });
1416     ///
1417     /// let mut iter = recv.iter();
1418     /// assert_eq!(iter.next(), Some(1));
1419     /// assert_eq!(iter.next(), Some(2));
1420     /// assert_eq!(iter.next(), Some(3));
1421     /// assert_eq!(iter.next(), None);
1422     /// ```
1423     #[stable(feature = "rust1", since = "1.0.0")]
1424     pub fn iter(&self) -> Iter<'_, T> {
1425         Iter { rx: self }
1426     }
1427
1428     /// Returns an iterator that will attempt to yield all pending values.
1429     /// It will return `None` if there are no more pending values or if the
1430     /// channel has hung up. The iterator will never [`panic!`] or block the
1431     /// user by waiting for values.
1432     ///
1433     /// [`panic!`]: ../../../std/macro.panic.html
1434     ///
1435     /// # Examples
1436     ///
1437     /// ```no_run
1438     /// use std::sync::mpsc::channel;
1439     /// use std::thread;
1440     /// use std::time::Duration;
1441     ///
1442     /// let (sender, receiver) = channel();
1443     ///
1444     /// // nothing is in the buffer yet
1445     /// assert!(receiver.try_iter().next().is_none());
1446     ///
1447     /// thread::spawn(move || {
1448     ///     thread::sleep(Duration::from_secs(1));
1449     ///     sender.send(1).unwrap();
1450     ///     sender.send(2).unwrap();
1451     ///     sender.send(3).unwrap();
1452     /// });
1453     ///
1454     /// // nothing is in the buffer yet
1455     /// assert!(receiver.try_iter().next().is_none());
1456     ///
1457     /// // block for two seconds
1458     /// thread::sleep(Duration::from_secs(2));
1459     ///
1460     /// let mut iter = receiver.try_iter();
1461     /// assert_eq!(iter.next(), Some(1));
1462     /// assert_eq!(iter.next(), Some(2));
1463     /// assert_eq!(iter.next(), Some(3));
1464     /// assert_eq!(iter.next(), None);
1465     /// ```
1466     #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1467     pub fn try_iter(&self) -> TryIter<'_, T> {
1468         TryIter { rx: self }
1469     }
1470 }
1471
1472 #[stable(feature = "rust1", since = "1.0.0")]
1473 impl<'a, T> Iterator for Iter<'a, T> {
1474     type Item = T;
1475
1476     fn next(&mut self) -> Option<T> {
1477         self.rx.recv().ok()
1478     }
1479 }
1480
1481 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1482 impl<'a, T> Iterator for TryIter<'a, T> {
1483     type Item = T;
1484
1485     fn next(&mut self) -> Option<T> {
1486         self.rx.try_recv().ok()
1487     }
1488 }
1489
1490 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1491 impl<'a, T> IntoIterator for &'a Receiver<T> {
1492     type Item = T;
1493     type IntoIter = Iter<'a, T>;
1494
1495     fn into_iter(self) -> Iter<'a, T> {
1496         self.iter()
1497     }
1498 }
1499
1500 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1501 impl<T> Iterator for IntoIter<T> {
1502     type Item = T;
1503     fn next(&mut self) -> Option<T> {
1504         self.rx.recv().ok()
1505     }
1506 }
1507
1508 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1509 impl<T> IntoIterator for Receiver<T> {
1510     type Item = T;
1511     type IntoIter = IntoIter<T>;
1512
1513     fn into_iter(self) -> IntoIter<T> {
1514         IntoIter { rx: self }
1515     }
1516 }
1517
1518 #[stable(feature = "rust1", since = "1.0.0")]
1519 impl<T> Drop for Receiver<T> {
1520     fn drop(&mut self) {
1521         match *unsafe { self.inner() } {
1522             Flavor::Oneshot(ref p) => p.drop_port(),
1523             Flavor::Stream(ref p) => p.drop_port(),
1524             Flavor::Shared(ref p) => p.drop_port(),
1525             Flavor::Sync(ref p) => p.drop_port(),
1526         }
1527     }
1528 }
1529
1530 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1531 impl<T> fmt::Debug for Receiver<T> {
1532     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1533         f.debug_struct("Receiver").finish()
1534     }
1535 }
1536
1537 #[stable(feature = "rust1", since = "1.0.0")]
1538 impl<T> fmt::Debug for SendError<T> {
1539     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1540         "SendError(..)".fmt(f)
1541     }
1542 }
1543
1544 #[stable(feature = "rust1", since = "1.0.0")]
1545 impl<T> fmt::Display for SendError<T> {
1546     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1547         "sending on a closed channel".fmt(f)
1548     }
1549 }
1550
1551 #[stable(feature = "rust1", since = "1.0.0")]
1552 impl<T: Send> error::Error for SendError<T> {
1553     #[allow(deprecated)]
1554     fn description(&self) -> &str {
1555         "sending on a closed channel"
1556     }
1557 }
1558
1559 #[stable(feature = "rust1", since = "1.0.0")]
1560 impl<T> fmt::Debug for TrySendError<T> {
1561     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1562         match *self {
1563             TrySendError::Full(..) => "Full(..)".fmt(f),
1564             TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1565         }
1566     }
1567 }
1568
1569 #[stable(feature = "rust1", since = "1.0.0")]
1570 impl<T> fmt::Display for TrySendError<T> {
1571     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1572         match *self {
1573             TrySendError::Full(..) => "sending on a full channel".fmt(f),
1574             TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
1575         }
1576     }
1577 }
1578
1579 #[stable(feature = "rust1", since = "1.0.0")]
1580 impl<T: Send> error::Error for TrySendError<T> {
1581     #[allow(deprecated)]
1582     fn description(&self) -> &str {
1583         match *self {
1584             TrySendError::Full(..) => "sending on a full channel",
1585             TrySendError::Disconnected(..) => "sending on a closed channel",
1586         }
1587     }
1588 }
1589
1590 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1591 impl<T> From<SendError<T>> for TrySendError<T> {
1592     fn from(err: SendError<T>) -> TrySendError<T> {
1593         match err {
1594             SendError(t) => TrySendError::Disconnected(t),
1595         }
1596     }
1597 }
1598
1599 #[stable(feature = "rust1", since = "1.0.0")]
1600 impl fmt::Display for RecvError {
1601     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1602         "receiving on a closed channel".fmt(f)
1603     }
1604 }
1605
1606 #[stable(feature = "rust1", since = "1.0.0")]
1607 impl error::Error for RecvError {
1608     #[allow(deprecated)]
1609     fn description(&self) -> &str {
1610         "receiving on a closed channel"
1611     }
1612 }
1613
1614 #[stable(feature = "rust1", since = "1.0.0")]
1615 impl fmt::Display for TryRecvError {
1616     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1617         match *self {
1618             TryRecvError::Empty => "receiving on an empty channel".fmt(f),
1619             TryRecvError::Disconnected => "receiving on a closed channel".fmt(f),
1620         }
1621     }
1622 }
1623
1624 #[stable(feature = "rust1", since = "1.0.0")]
1625 impl error::Error for TryRecvError {
1626     #[allow(deprecated)]
1627     fn description(&self) -> &str {
1628         match *self {
1629             TryRecvError::Empty => "receiving on an empty channel",
1630             TryRecvError::Disconnected => "receiving on a closed channel",
1631         }
1632     }
1633 }
1634
1635 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1636 impl From<RecvError> for TryRecvError {
1637     fn from(err: RecvError) -> TryRecvError {
1638         match err {
1639             RecvError => TryRecvError::Disconnected,
1640         }
1641     }
1642 }
1643
1644 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1645 impl fmt::Display for RecvTimeoutError {
1646     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1647         match *self {
1648             RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f),
1649             RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f),
1650         }
1651     }
1652 }
1653
1654 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1655 impl error::Error for RecvTimeoutError {
1656     #[allow(deprecated)]
1657     fn description(&self) -> &str {
1658         match *self {
1659             RecvTimeoutError::Timeout => "timed out waiting on channel",
1660             RecvTimeoutError::Disconnected => "channel is empty and sending half is closed",
1661         }
1662     }
1663 }
1664
1665 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1666 impl From<RecvError> for RecvTimeoutError {
1667     fn from(err: RecvError) -> RecvTimeoutError {
1668         match err {
1669             RecvError => RecvTimeoutError::Disconnected,
1670         }
1671     }
1672 }
1673
1674 #[cfg(all(test, not(target_os = "emscripten")))]
1675 mod tests {
1676     use super::*;
1677     use crate::env;
1678     use crate::thread;
1679     use crate::time::{Duration, Instant};
1680
1681     pub fn stress_factor() -> usize {
1682         match env::var("RUST_TEST_STRESS") {
1683             Ok(val) => val.parse().unwrap(),
1684             Err(..) => 1,
1685         }
1686     }
1687
1688     #[test]
1689     fn smoke() {
1690         let (tx, rx) = channel::<i32>();
1691         tx.send(1).unwrap();
1692         assert_eq!(rx.recv().unwrap(), 1);
1693     }
1694
1695     #[test]
1696     fn drop_full() {
1697         let (tx, _rx) = channel::<Box<isize>>();
1698         tx.send(box 1).unwrap();
1699     }
1700
1701     #[test]
1702     fn drop_full_shared() {
1703         let (tx, _rx) = channel::<Box<isize>>();
1704         drop(tx.clone());
1705         drop(tx.clone());
1706         tx.send(box 1).unwrap();
1707     }
1708
1709     #[test]
1710     fn smoke_shared() {
1711         let (tx, rx) = channel::<i32>();
1712         tx.send(1).unwrap();
1713         assert_eq!(rx.recv().unwrap(), 1);
1714         let tx = tx.clone();
1715         tx.send(1).unwrap();
1716         assert_eq!(rx.recv().unwrap(), 1);
1717     }
1718
1719     #[test]
1720     fn smoke_threads() {
1721         let (tx, rx) = channel::<i32>();
1722         let _t = thread::spawn(move || {
1723             tx.send(1).unwrap();
1724         });
1725         assert_eq!(rx.recv().unwrap(), 1);
1726     }
1727
1728     #[test]
1729     fn smoke_port_gone() {
1730         let (tx, rx) = channel::<i32>();
1731         drop(rx);
1732         assert!(tx.send(1).is_err());
1733     }
1734
1735     #[test]
1736     fn smoke_shared_port_gone() {
1737         let (tx, rx) = channel::<i32>();
1738         drop(rx);
1739         assert!(tx.send(1).is_err())
1740     }
1741
1742     #[test]
1743     fn smoke_shared_port_gone2() {
1744         let (tx, rx) = channel::<i32>();
1745         drop(rx);
1746         let tx2 = tx.clone();
1747         drop(tx);
1748         assert!(tx2.send(1).is_err());
1749     }
1750
1751     #[test]
1752     fn port_gone_concurrent() {
1753         let (tx, rx) = channel::<i32>();
1754         let _t = thread::spawn(move || {
1755             rx.recv().unwrap();
1756         });
1757         while tx.send(1).is_ok() {}
1758     }
1759
1760     #[test]
1761     fn port_gone_concurrent_shared() {
1762         let (tx, rx) = channel::<i32>();
1763         let tx2 = tx.clone();
1764         let _t = thread::spawn(move || {
1765             rx.recv().unwrap();
1766         });
1767         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1768     }
1769
1770     #[test]
1771     fn smoke_chan_gone() {
1772         let (tx, rx) = channel::<i32>();
1773         drop(tx);
1774         assert!(rx.recv().is_err());
1775     }
1776
1777     #[test]
1778     fn smoke_chan_gone_shared() {
1779         let (tx, rx) = channel::<()>();
1780         let tx2 = tx.clone();
1781         drop(tx);
1782         drop(tx2);
1783         assert!(rx.recv().is_err());
1784     }
1785
1786     #[test]
1787     fn chan_gone_concurrent() {
1788         let (tx, rx) = channel::<i32>();
1789         let _t = thread::spawn(move || {
1790             tx.send(1).unwrap();
1791             tx.send(1).unwrap();
1792         });
1793         while rx.recv().is_ok() {}
1794     }
1795
1796     #[test]
1797     fn stress() {
1798         let (tx, rx) = channel::<i32>();
1799         let t = thread::spawn(move || {
1800             for _ in 0..10000 {
1801                 tx.send(1).unwrap();
1802             }
1803         });
1804         for _ in 0..10000 {
1805             assert_eq!(rx.recv().unwrap(), 1);
1806         }
1807         t.join().ok().expect("thread panicked");
1808     }
1809
1810     #[test]
1811     fn stress_shared() {
1812         const AMT: u32 = 10000;
1813         const NTHREADS: u32 = 8;
1814         let (tx, rx) = channel::<i32>();
1815
1816         let t = thread::spawn(move || {
1817             for _ in 0..AMT * NTHREADS {
1818                 assert_eq!(rx.recv().unwrap(), 1);
1819             }
1820             match rx.try_recv() {
1821                 Ok(..) => panic!(),
1822                 _ => {}
1823             }
1824         });
1825
1826         for _ in 0..NTHREADS {
1827             let tx = tx.clone();
1828             thread::spawn(move || {
1829                 for _ in 0..AMT {
1830                     tx.send(1).unwrap();
1831                 }
1832             });
1833         }
1834         drop(tx);
1835         t.join().ok().expect("thread panicked");
1836     }
1837
1838     #[test]
1839     fn send_from_outside_runtime() {
1840         let (tx1, rx1) = channel::<()>();
1841         let (tx2, rx2) = channel::<i32>();
1842         let t1 = thread::spawn(move || {
1843             tx1.send(()).unwrap();
1844             for _ in 0..40 {
1845                 assert_eq!(rx2.recv().unwrap(), 1);
1846             }
1847         });
1848         rx1.recv().unwrap();
1849         let t2 = thread::spawn(move || {
1850             for _ in 0..40 {
1851                 tx2.send(1).unwrap();
1852             }
1853         });
1854         t1.join().ok().expect("thread panicked");
1855         t2.join().ok().expect("thread panicked");
1856     }
1857
1858     #[test]
1859     fn recv_from_outside_runtime() {
1860         let (tx, rx) = channel::<i32>();
1861         let t = thread::spawn(move || {
1862             for _ in 0..40 {
1863                 assert_eq!(rx.recv().unwrap(), 1);
1864             }
1865         });
1866         for _ in 0..40 {
1867             tx.send(1).unwrap();
1868         }
1869         t.join().ok().expect("thread panicked");
1870     }
1871
1872     #[test]
1873     fn no_runtime() {
1874         let (tx1, rx1) = channel::<i32>();
1875         let (tx2, rx2) = channel::<i32>();
1876         let t1 = thread::spawn(move || {
1877             assert_eq!(rx1.recv().unwrap(), 1);
1878             tx2.send(2).unwrap();
1879         });
1880         let t2 = thread::spawn(move || {
1881             tx1.send(1).unwrap();
1882             assert_eq!(rx2.recv().unwrap(), 2);
1883         });
1884         t1.join().ok().expect("thread panicked");
1885         t2.join().ok().expect("thread panicked");
1886     }
1887
1888     #[test]
1889     fn oneshot_single_thread_close_port_first() {
1890         // Simple test of closing without sending
1891         let (_tx, rx) = channel::<i32>();
1892         drop(rx);
1893     }
1894
1895     #[test]
1896     fn oneshot_single_thread_close_chan_first() {
1897         // Simple test of closing without sending
1898         let (tx, _rx) = channel::<i32>();
1899         drop(tx);
1900     }
1901
1902     #[test]
1903     fn oneshot_single_thread_send_port_close() {
1904         // Testing that the sender cleans up the payload if receiver is closed
1905         let (tx, rx) = channel::<Box<i32>>();
1906         drop(rx);
1907         assert!(tx.send(box 0).is_err());
1908     }
1909
1910     #[test]
1911     fn oneshot_single_thread_recv_chan_close() {
1912         // Receiving on a closed chan will panic
1913         let res = thread::spawn(move || {
1914             let (tx, rx) = channel::<i32>();
1915             drop(tx);
1916             rx.recv().unwrap();
1917         })
1918         .join();
1919         // What is our res?
1920         assert!(res.is_err());
1921     }
1922
1923     #[test]
1924     fn oneshot_single_thread_send_then_recv() {
1925         let (tx, rx) = channel::<Box<i32>>();
1926         tx.send(box 10).unwrap();
1927         assert!(*rx.recv().unwrap() == 10);
1928     }
1929
1930     #[test]
1931     fn oneshot_single_thread_try_send_open() {
1932         let (tx, rx) = channel::<i32>();
1933         assert!(tx.send(10).is_ok());
1934         assert!(rx.recv().unwrap() == 10);
1935     }
1936
1937     #[test]
1938     fn oneshot_single_thread_try_send_closed() {
1939         let (tx, rx) = channel::<i32>();
1940         drop(rx);
1941         assert!(tx.send(10).is_err());
1942     }
1943
1944     #[test]
1945     fn oneshot_single_thread_try_recv_open() {
1946         let (tx, rx) = channel::<i32>();
1947         tx.send(10).unwrap();
1948         assert!(rx.recv() == Ok(10));
1949     }
1950
1951     #[test]
1952     fn oneshot_single_thread_try_recv_closed() {
1953         let (tx, rx) = channel::<i32>();
1954         drop(tx);
1955         assert!(rx.recv().is_err());
1956     }
1957
1958     #[test]
1959     fn oneshot_single_thread_peek_data() {
1960         let (tx, rx) = channel::<i32>();
1961         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1962         tx.send(10).unwrap();
1963         assert_eq!(rx.try_recv(), Ok(10));
1964     }
1965
1966     #[test]
1967     fn oneshot_single_thread_peek_close() {
1968         let (tx, rx) = channel::<i32>();
1969         drop(tx);
1970         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1971         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1972     }
1973
1974     #[test]
1975     fn oneshot_single_thread_peek_open() {
1976         let (_tx, rx) = channel::<i32>();
1977         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1978     }
1979
1980     #[test]
1981     fn oneshot_multi_task_recv_then_send() {
1982         let (tx, rx) = channel::<Box<i32>>();
1983         let _t = thread::spawn(move || {
1984             assert!(*rx.recv().unwrap() == 10);
1985         });
1986
1987         tx.send(box 10).unwrap();
1988     }
1989
1990     #[test]
1991     fn oneshot_multi_task_recv_then_close() {
1992         let (tx, rx) = channel::<Box<i32>>();
1993         let _t = thread::spawn(move || {
1994             drop(tx);
1995         });
1996         let res = thread::spawn(move || {
1997             assert!(*rx.recv().unwrap() == 10);
1998         })
1999         .join();
2000         assert!(res.is_err());
2001     }
2002
2003     #[test]
2004     fn oneshot_multi_thread_close_stress() {
2005         for _ in 0..stress_factor() {
2006             let (tx, rx) = channel::<i32>();
2007             let _t = thread::spawn(move || {
2008                 drop(rx);
2009             });
2010             drop(tx);
2011         }
2012     }
2013
2014     #[test]
2015     fn oneshot_multi_thread_send_close_stress() {
2016         for _ in 0..stress_factor() {
2017             let (tx, rx) = channel::<i32>();
2018             let _t = thread::spawn(move || {
2019                 drop(rx);
2020             });
2021             let _ = thread::spawn(move || {
2022                 tx.send(1).unwrap();
2023             })
2024             .join();
2025         }
2026     }
2027
2028     #[test]
2029     fn oneshot_multi_thread_recv_close_stress() {
2030         for _ in 0..stress_factor() {
2031             let (tx, rx) = channel::<i32>();
2032             thread::spawn(move || {
2033                 let res = thread::spawn(move || {
2034                     rx.recv().unwrap();
2035                 })
2036                 .join();
2037                 assert!(res.is_err());
2038             });
2039             let _t = thread::spawn(move || {
2040                 thread::spawn(move || {
2041                     drop(tx);
2042                 });
2043             });
2044         }
2045     }
2046
2047     #[test]
2048     fn oneshot_multi_thread_send_recv_stress() {
2049         for _ in 0..stress_factor() {
2050             let (tx, rx) = channel::<Box<isize>>();
2051             let _t = thread::spawn(move || {
2052                 tx.send(box 10).unwrap();
2053             });
2054             assert!(*rx.recv().unwrap() == 10);
2055         }
2056     }
2057
2058     #[test]
2059     fn stream_send_recv_stress() {
2060         for _ in 0..stress_factor() {
2061             let (tx, rx) = channel();
2062
2063             send(tx, 0);
2064             recv(rx, 0);
2065
2066             fn send(tx: Sender<Box<i32>>, i: i32) {
2067                 if i == 10 {
2068                     return;
2069                 }
2070
2071                 thread::spawn(move || {
2072                     tx.send(box i).unwrap();
2073                     send(tx, i + 1);
2074                 });
2075             }
2076
2077             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2078                 if i == 10 {
2079                     return;
2080                 }
2081
2082                 thread::spawn(move || {
2083                     assert!(*rx.recv().unwrap() == i);
2084                     recv(rx, i + 1);
2085                 });
2086             }
2087         }
2088     }
2089
2090     #[test]
2091     fn oneshot_single_thread_recv_timeout() {
2092         let (tx, rx) = channel();
2093         tx.send(()).unwrap();
2094         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2095         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2096         tx.send(()).unwrap();
2097         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2098     }
2099
2100     #[test]
2101     fn stress_recv_timeout_two_threads() {
2102         let (tx, rx) = channel();
2103         let stress = stress_factor() + 100;
2104         let timeout = Duration::from_millis(100);
2105
2106         thread::spawn(move || {
2107             for i in 0..stress {
2108                 if i % 2 == 0 {
2109                     thread::sleep(timeout * 2);
2110                 }
2111                 tx.send(1usize).unwrap();
2112             }
2113         });
2114
2115         let mut recv_count = 0;
2116         loop {
2117             match rx.recv_timeout(timeout) {
2118                 Ok(n) => {
2119                     assert_eq!(n, 1usize);
2120                     recv_count += 1;
2121                 }
2122                 Err(RecvTimeoutError::Timeout) => continue,
2123                 Err(RecvTimeoutError::Disconnected) => break,
2124             }
2125         }
2126
2127         assert_eq!(recv_count, stress);
2128     }
2129
2130     #[test]
2131     fn recv_timeout_upgrade() {
2132         let (tx, rx) = channel::<()>();
2133         let timeout = Duration::from_millis(1);
2134         let _tx_clone = tx.clone();
2135
2136         let start = Instant::now();
2137         assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
2138         assert!(Instant::now() >= start + timeout);
2139     }
2140
2141     #[test]
2142     fn stress_recv_timeout_shared() {
2143         let (tx, rx) = channel();
2144         let stress = stress_factor() + 100;
2145
2146         for i in 0..stress {
2147             let tx = tx.clone();
2148             thread::spawn(move || {
2149                 thread::sleep(Duration::from_millis(i as u64 * 10));
2150                 tx.send(1usize).unwrap();
2151             });
2152         }
2153
2154         drop(tx);
2155
2156         let mut recv_count = 0;
2157         loop {
2158             match rx.recv_timeout(Duration::from_millis(10)) {
2159                 Ok(n) => {
2160                     assert_eq!(n, 1usize);
2161                     recv_count += 1;
2162                 }
2163                 Err(RecvTimeoutError::Timeout) => continue,
2164                 Err(RecvTimeoutError::Disconnected) => break,
2165             }
2166         }
2167
2168         assert_eq!(recv_count, stress);
2169     }
2170
2171     #[test]
2172     fn very_long_recv_timeout_wont_panic() {
2173         let (tx, rx) = channel::<()>();
2174         let join_handle = thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::MAX)));
2175         thread::sleep(Duration::from_secs(1));
2176         assert!(tx.send(()).is_ok());
2177         assert_eq!(join_handle.join().unwrap(), Ok(()));
2178     }
2179
2180     #[test]
2181     fn recv_a_lot() {
2182         // Regression test that we don't run out of stack in scheduler context
2183         let (tx, rx) = channel();
2184         for _ in 0..10000 {
2185             tx.send(()).unwrap();
2186         }
2187         for _ in 0..10000 {
2188             rx.recv().unwrap();
2189         }
2190     }
2191
2192     #[test]
2193     fn shared_recv_timeout() {
2194         let (tx, rx) = channel();
2195         let total = 5;
2196         for _ in 0..total {
2197             let tx = tx.clone();
2198             thread::spawn(move || {
2199                 tx.send(()).unwrap();
2200             });
2201         }
2202
2203         for _ in 0..total {
2204             rx.recv().unwrap();
2205         }
2206
2207         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2208         tx.send(()).unwrap();
2209         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2210     }
2211
2212     #[test]
2213     fn shared_chan_stress() {
2214         let (tx, rx) = channel();
2215         let total = stress_factor() + 100;
2216         for _ in 0..total {
2217             let tx = tx.clone();
2218             thread::spawn(move || {
2219                 tx.send(()).unwrap();
2220             });
2221         }
2222
2223         for _ in 0..total {
2224             rx.recv().unwrap();
2225         }
2226     }
2227
2228     #[test]
2229     fn test_nested_recv_iter() {
2230         let (tx, rx) = channel::<i32>();
2231         let (total_tx, total_rx) = channel::<i32>();
2232
2233         let _t = thread::spawn(move || {
2234             let mut acc = 0;
2235             for x in rx.iter() {
2236                 acc += x;
2237             }
2238             total_tx.send(acc).unwrap();
2239         });
2240
2241         tx.send(3).unwrap();
2242         tx.send(1).unwrap();
2243         tx.send(2).unwrap();
2244         drop(tx);
2245         assert_eq!(total_rx.recv().unwrap(), 6);
2246     }
2247
2248     #[test]
2249     fn test_recv_iter_break() {
2250         let (tx, rx) = channel::<i32>();
2251         let (count_tx, count_rx) = channel();
2252
2253         let _t = thread::spawn(move || {
2254             let mut count = 0;
2255             for x in rx.iter() {
2256                 if count >= 3 {
2257                     break;
2258                 } else {
2259                     count += x;
2260                 }
2261             }
2262             count_tx.send(count).unwrap();
2263         });
2264
2265         tx.send(2).unwrap();
2266         tx.send(2).unwrap();
2267         tx.send(2).unwrap();
2268         let _ = tx.send(2);
2269         drop(tx);
2270         assert_eq!(count_rx.recv().unwrap(), 4);
2271     }
2272
2273     #[test]
2274     fn test_recv_try_iter() {
2275         let (request_tx, request_rx) = channel();
2276         let (response_tx, response_rx) = channel();
2277
2278         // Request `x`s until we have `6`.
2279         let t = thread::spawn(move || {
2280             let mut count = 0;
2281             loop {
2282                 for x in response_rx.try_iter() {
2283                     count += x;
2284                     if count == 6 {
2285                         return count;
2286                     }
2287                 }
2288                 request_tx.send(()).unwrap();
2289             }
2290         });
2291
2292         for _ in request_rx.iter() {
2293             if response_tx.send(2).is_err() {
2294                 break;
2295             }
2296         }
2297
2298         assert_eq!(t.join().unwrap(), 6);
2299     }
2300
2301     #[test]
2302     fn test_recv_into_iter_owned() {
2303         let mut iter = {
2304             let (tx, rx) = channel::<i32>();
2305             tx.send(1).unwrap();
2306             tx.send(2).unwrap();
2307
2308             rx.into_iter()
2309         };
2310         assert_eq!(iter.next().unwrap(), 1);
2311         assert_eq!(iter.next().unwrap(), 2);
2312         assert_eq!(iter.next().is_none(), true);
2313     }
2314
2315     #[test]
2316     fn test_recv_into_iter_borrowed() {
2317         let (tx, rx) = channel::<i32>();
2318         tx.send(1).unwrap();
2319         tx.send(2).unwrap();
2320         drop(tx);
2321         let mut iter = (&rx).into_iter();
2322         assert_eq!(iter.next().unwrap(), 1);
2323         assert_eq!(iter.next().unwrap(), 2);
2324         assert_eq!(iter.next().is_none(), true);
2325     }
2326
2327     #[test]
2328     fn try_recv_states() {
2329         let (tx1, rx1) = channel::<i32>();
2330         let (tx2, rx2) = channel::<()>();
2331         let (tx3, rx3) = channel::<()>();
2332         let _t = thread::spawn(move || {
2333             rx2.recv().unwrap();
2334             tx1.send(1).unwrap();
2335             tx3.send(()).unwrap();
2336             rx2.recv().unwrap();
2337             drop(tx1);
2338             tx3.send(()).unwrap();
2339         });
2340
2341         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2342         tx2.send(()).unwrap();
2343         rx3.recv().unwrap();
2344         assert_eq!(rx1.try_recv(), Ok(1));
2345         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2346         tx2.send(()).unwrap();
2347         rx3.recv().unwrap();
2348         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2349     }
2350
2351     // This bug used to end up in a livelock inside of the Receiver destructor
2352     // because the internal state of the Shared packet was corrupted
2353     #[test]
2354     fn destroy_upgraded_shared_port_when_sender_still_active() {
2355         let (tx, rx) = channel();
2356         let (tx2, rx2) = channel();
2357         let _t = thread::spawn(move || {
2358             rx.recv().unwrap(); // wait on a oneshot
2359             drop(rx); // destroy a shared
2360             tx2.send(()).unwrap();
2361         });
2362         // make sure the other thread has gone to sleep
2363         for _ in 0..5000 {
2364             thread::yield_now();
2365         }
2366
2367         // upgrade to a shared chan and send a message
2368         let t = tx.clone();
2369         drop(tx);
2370         t.send(()).unwrap();
2371
2372         // wait for the child thread to exit before we exit
2373         rx2.recv().unwrap();
2374     }
2375
2376     #[test]
2377     fn issue_32114() {
2378         let (tx, _) = channel();
2379         let _ = tx.send(123);
2380         assert_eq!(tx.send(123), Err(SendError(123)));
2381     }
2382 }
2383
2384 #[cfg(all(test, not(target_os = "emscripten")))]
2385 mod sync_tests {
2386     use super::*;
2387     use crate::env;
2388     use crate::thread;
2389     use crate::time::Duration;
2390
2391     pub fn stress_factor() -> usize {
2392         match env::var("RUST_TEST_STRESS") {
2393             Ok(val) => val.parse().unwrap(),
2394             Err(..) => 1,
2395         }
2396     }
2397
2398     #[test]
2399     fn smoke() {
2400         let (tx, rx) = sync_channel::<i32>(1);
2401         tx.send(1).unwrap();
2402         assert_eq!(rx.recv().unwrap(), 1);
2403     }
2404
2405     #[test]
2406     fn drop_full() {
2407         let (tx, _rx) = sync_channel::<Box<isize>>(1);
2408         tx.send(box 1).unwrap();
2409     }
2410
2411     #[test]
2412     fn smoke_shared() {
2413         let (tx, rx) = sync_channel::<i32>(1);
2414         tx.send(1).unwrap();
2415         assert_eq!(rx.recv().unwrap(), 1);
2416         let tx = tx.clone();
2417         tx.send(1).unwrap();
2418         assert_eq!(rx.recv().unwrap(), 1);
2419     }
2420
2421     #[test]
2422     fn recv_timeout() {
2423         let (tx, rx) = sync_channel::<i32>(1);
2424         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2425         tx.send(1).unwrap();
2426         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2427     }
2428
2429     #[test]
2430     fn smoke_threads() {
2431         let (tx, rx) = sync_channel::<i32>(0);
2432         let _t = thread::spawn(move || {
2433             tx.send(1).unwrap();
2434         });
2435         assert_eq!(rx.recv().unwrap(), 1);
2436     }
2437
2438     #[test]
2439     fn smoke_port_gone() {
2440         let (tx, rx) = sync_channel::<i32>(0);
2441         drop(rx);
2442         assert!(tx.send(1).is_err());
2443     }
2444
2445     #[test]
2446     fn smoke_shared_port_gone2() {
2447         let (tx, rx) = sync_channel::<i32>(0);
2448         drop(rx);
2449         let tx2 = tx.clone();
2450         drop(tx);
2451         assert!(tx2.send(1).is_err());
2452     }
2453
2454     #[test]
2455     fn port_gone_concurrent() {
2456         let (tx, rx) = sync_channel::<i32>(0);
2457         let _t = thread::spawn(move || {
2458             rx.recv().unwrap();
2459         });
2460         while tx.send(1).is_ok() {}
2461     }
2462
2463     #[test]
2464     fn port_gone_concurrent_shared() {
2465         let (tx, rx) = sync_channel::<i32>(0);
2466         let tx2 = tx.clone();
2467         let _t = thread::spawn(move || {
2468             rx.recv().unwrap();
2469         });
2470         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2471     }
2472
2473     #[test]
2474     fn smoke_chan_gone() {
2475         let (tx, rx) = sync_channel::<i32>(0);
2476         drop(tx);
2477         assert!(rx.recv().is_err());
2478     }
2479
2480     #[test]
2481     fn smoke_chan_gone_shared() {
2482         let (tx, rx) = sync_channel::<()>(0);
2483         let tx2 = tx.clone();
2484         drop(tx);
2485         drop(tx2);
2486         assert!(rx.recv().is_err());
2487     }
2488
2489     #[test]
2490     fn chan_gone_concurrent() {
2491         let (tx, rx) = sync_channel::<i32>(0);
2492         thread::spawn(move || {
2493             tx.send(1).unwrap();
2494             tx.send(1).unwrap();
2495         });
2496         while rx.recv().is_ok() {}
2497     }
2498
2499     #[test]
2500     fn stress() {
2501         let (tx, rx) = sync_channel::<i32>(0);
2502         thread::spawn(move || {
2503             for _ in 0..10000 {
2504                 tx.send(1).unwrap();
2505             }
2506         });
2507         for _ in 0..10000 {
2508             assert_eq!(rx.recv().unwrap(), 1);
2509         }
2510     }
2511
2512     #[test]
2513     fn stress_recv_timeout_two_threads() {
2514         let (tx, rx) = sync_channel::<i32>(0);
2515
2516         thread::spawn(move || {
2517             for _ in 0..10000 {
2518                 tx.send(1).unwrap();
2519             }
2520         });
2521
2522         let mut recv_count = 0;
2523         loop {
2524             match rx.recv_timeout(Duration::from_millis(1)) {
2525                 Ok(v) => {
2526                     assert_eq!(v, 1);
2527                     recv_count += 1;
2528                 }
2529                 Err(RecvTimeoutError::Timeout) => continue,
2530                 Err(RecvTimeoutError::Disconnected) => break,
2531             }
2532         }
2533
2534         assert_eq!(recv_count, 10000);
2535     }
2536
2537     #[test]
2538     fn stress_recv_timeout_shared() {
2539         const AMT: u32 = 1000;
2540         const NTHREADS: u32 = 8;
2541         let (tx, rx) = sync_channel::<i32>(0);
2542         let (dtx, drx) = sync_channel::<()>(0);
2543
2544         thread::spawn(move || {
2545             let mut recv_count = 0;
2546             loop {
2547                 match rx.recv_timeout(Duration::from_millis(10)) {
2548                     Ok(v) => {
2549                         assert_eq!(v, 1);
2550                         recv_count += 1;
2551                     }
2552                     Err(RecvTimeoutError::Timeout) => continue,
2553                     Err(RecvTimeoutError::Disconnected) => break,
2554                 }
2555             }
2556
2557             assert_eq!(recv_count, AMT * NTHREADS);
2558             assert!(rx.try_recv().is_err());
2559
2560             dtx.send(()).unwrap();
2561         });
2562
2563         for _ in 0..NTHREADS {
2564             let tx = tx.clone();
2565             thread::spawn(move || {
2566                 for _ in 0..AMT {
2567                     tx.send(1).unwrap();
2568                 }
2569             });
2570         }
2571
2572         drop(tx);
2573
2574         drx.recv().unwrap();
2575     }
2576
2577     #[test]
2578     fn stress_shared() {
2579         const AMT: u32 = 1000;
2580         const NTHREADS: u32 = 8;
2581         let (tx, rx) = sync_channel::<i32>(0);
2582         let (dtx, drx) = sync_channel::<()>(0);
2583
2584         thread::spawn(move || {
2585             for _ in 0..AMT * NTHREADS {
2586                 assert_eq!(rx.recv().unwrap(), 1);
2587             }
2588             match rx.try_recv() {
2589                 Ok(..) => panic!(),
2590                 _ => {}
2591             }
2592             dtx.send(()).unwrap();
2593         });
2594
2595         for _ in 0..NTHREADS {
2596             let tx = tx.clone();
2597             thread::spawn(move || {
2598                 for _ in 0..AMT {
2599                     tx.send(1).unwrap();
2600                 }
2601             });
2602         }
2603         drop(tx);
2604         drx.recv().unwrap();
2605     }
2606
2607     #[test]
2608     fn oneshot_single_thread_close_port_first() {
2609         // Simple test of closing without sending
2610         let (_tx, rx) = sync_channel::<i32>(0);
2611         drop(rx);
2612     }
2613
2614     #[test]
2615     fn oneshot_single_thread_close_chan_first() {
2616         // Simple test of closing without sending
2617         let (tx, _rx) = sync_channel::<i32>(0);
2618         drop(tx);
2619     }
2620
2621     #[test]
2622     fn oneshot_single_thread_send_port_close() {
2623         // Testing that the sender cleans up the payload if receiver is closed
2624         let (tx, rx) = sync_channel::<Box<i32>>(0);
2625         drop(rx);
2626         assert!(tx.send(box 0).is_err());
2627     }
2628
2629     #[test]
2630     fn oneshot_single_thread_recv_chan_close() {
2631         // Receiving on a closed chan will panic
2632         let res = thread::spawn(move || {
2633             let (tx, rx) = sync_channel::<i32>(0);
2634             drop(tx);
2635             rx.recv().unwrap();
2636         })
2637         .join();
2638         // What is our res?
2639         assert!(res.is_err());
2640     }
2641
2642     #[test]
2643     fn oneshot_single_thread_send_then_recv() {
2644         let (tx, rx) = sync_channel::<Box<i32>>(1);
2645         tx.send(box 10).unwrap();
2646         assert!(*rx.recv().unwrap() == 10);
2647     }
2648
2649     #[test]
2650     fn oneshot_single_thread_try_send_open() {
2651         let (tx, rx) = sync_channel::<i32>(1);
2652         assert_eq!(tx.try_send(10), Ok(()));
2653         assert!(rx.recv().unwrap() == 10);
2654     }
2655
2656     #[test]
2657     fn oneshot_single_thread_try_send_closed() {
2658         let (tx, rx) = sync_channel::<i32>(0);
2659         drop(rx);
2660         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2661     }
2662
2663     #[test]
2664     fn oneshot_single_thread_try_send_closed2() {
2665         let (tx, _rx) = sync_channel::<i32>(0);
2666         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2667     }
2668
2669     #[test]
2670     fn oneshot_single_thread_try_recv_open() {
2671         let (tx, rx) = sync_channel::<i32>(1);
2672         tx.send(10).unwrap();
2673         assert!(rx.recv() == Ok(10));
2674     }
2675
2676     #[test]
2677     fn oneshot_single_thread_try_recv_closed() {
2678         let (tx, rx) = sync_channel::<i32>(0);
2679         drop(tx);
2680         assert!(rx.recv().is_err());
2681     }
2682
2683     #[test]
2684     fn oneshot_single_thread_try_recv_closed_with_data() {
2685         let (tx, rx) = sync_channel::<i32>(1);
2686         tx.send(10).unwrap();
2687         drop(tx);
2688         assert_eq!(rx.try_recv(), Ok(10));
2689         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2690     }
2691
2692     #[test]
2693     fn oneshot_single_thread_peek_data() {
2694         let (tx, rx) = sync_channel::<i32>(1);
2695         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2696         tx.send(10).unwrap();
2697         assert_eq!(rx.try_recv(), Ok(10));
2698     }
2699
2700     #[test]
2701     fn oneshot_single_thread_peek_close() {
2702         let (tx, rx) = sync_channel::<i32>(0);
2703         drop(tx);
2704         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2705         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2706     }
2707
2708     #[test]
2709     fn oneshot_single_thread_peek_open() {
2710         let (_tx, rx) = sync_channel::<i32>(0);
2711         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2712     }
2713
2714     #[test]
2715     fn oneshot_multi_task_recv_then_send() {
2716         let (tx, rx) = sync_channel::<Box<i32>>(0);
2717         let _t = thread::spawn(move || {
2718             assert!(*rx.recv().unwrap() == 10);
2719         });
2720
2721         tx.send(box 10).unwrap();
2722     }
2723
2724     #[test]
2725     fn oneshot_multi_task_recv_then_close() {
2726         let (tx, rx) = sync_channel::<Box<i32>>(0);
2727         let _t = thread::spawn(move || {
2728             drop(tx);
2729         });
2730         let res = thread::spawn(move || {
2731             assert!(*rx.recv().unwrap() == 10);
2732         })
2733         .join();
2734         assert!(res.is_err());
2735     }
2736
2737     #[test]
2738     fn oneshot_multi_thread_close_stress() {
2739         for _ in 0..stress_factor() {
2740             let (tx, rx) = sync_channel::<i32>(0);
2741             let _t = thread::spawn(move || {
2742                 drop(rx);
2743             });
2744             drop(tx);
2745         }
2746     }
2747
2748     #[test]
2749     fn oneshot_multi_thread_send_close_stress() {
2750         for _ in 0..stress_factor() {
2751             let (tx, rx) = sync_channel::<i32>(0);
2752             let _t = thread::spawn(move || {
2753                 drop(rx);
2754             });
2755             let _ = thread::spawn(move || {
2756                 tx.send(1).unwrap();
2757             })
2758             .join();
2759         }
2760     }
2761
2762     #[test]
2763     fn oneshot_multi_thread_recv_close_stress() {
2764         for _ in 0..stress_factor() {
2765             let (tx, rx) = sync_channel::<i32>(0);
2766             let _t = thread::spawn(move || {
2767                 let res = thread::spawn(move || {
2768                     rx.recv().unwrap();
2769                 })
2770                 .join();
2771                 assert!(res.is_err());
2772             });
2773             let _t = thread::spawn(move || {
2774                 thread::spawn(move || {
2775                     drop(tx);
2776                 });
2777             });
2778         }
2779     }
2780
2781     #[test]
2782     fn oneshot_multi_thread_send_recv_stress() {
2783         for _ in 0..stress_factor() {
2784             let (tx, rx) = sync_channel::<Box<i32>>(0);
2785             let _t = thread::spawn(move || {
2786                 tx.send(box 10).unwrap();
2787             });
2788             assert!(*rx.recv().unwrap() == 10);
2789         }
2790     }
2791
2792     #[test]
2793     fn stream_send_recv_stress() {
2794         for _ in 0..stress_factor() {
2795             let (tx, rx) = sync_channel::<Box<i32>>(0);
2796
2797             send(tx, 0);
2798             recv(rx, 0);
2799
2800             fn send(tx: SyncSender<Box<i32>>, i: i32) {
2801                 if i == 10 {
2802                     return;
2803                 }
2804
2805                 thread::spawn(move || {
2806                     tx.send(box i).unwrap();
2807                     send(tx, i + 1);
2808                 });
2809             }
2810
2811             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2812                 if i == 10 {
2813                     return;
2814                 }
2815
2816                 thread::spawn(move || {
2817                     assert!(*rx.recv().unwrap() == i);
2818                     recv(rx, i + 1);
2819                 });
2820             }
2821         }
2822     }
2823
2824     #[test]
2825     fn recv_a_lot() {
2826         // Regression test that we don't run out of stack in scheduler context
2827         let (tx, rx) = sync_channel(10000);
2828         for _ in 0..10000 {
2829             tx.send(()).unwrap();
2830         }
2831         for _ in 0..10000 {
2832             rx.recv().unwrap();
2833         }
2834     }
2835
2836     #[test]
2837     fn shared_chan_stress() {
2838         let (tx, rx) = sync_channel(0);
2839         let total = stress_factor() + 100;
2840         for _ in 0..total {
2841             let tx = tx.clone();
2842             thread::spawn(move || {
2843                 tx.send(()).unwrap();
2844             });
2845         }
2846
2847         for _ in 0..total {
2848             rx.recv().unwrap();
2849         }
2850     }
2851
2852     #[test]
2853     fn test_nested_recv_iter() {
2854         let (tx, rx) = sync_channel::<i32>(0);
2855         let (total_tx, total_rx) = sync_channel::<i32>(0);
2856
2857         let _t = thread::spawn(move || {
2858             let mut acc = 0;
2859             for x in rx.iter() {
2860                 acc += x;
2861             }
2862             total_tx.send(acc).unwrap();
2863         });
2864
2865         tx.send(3).unwrap();
2866         tx.send(1).unwrap();
2867         tx.send(2).unwrap();
2868         drop(tx);
2869         assert_eq!(total_rx.recv().unwrap(), 6);
2870     }
2871
2872     #[test]
2873     fn test_recv_iter_break() {
2874         let (tx, rx) = sync_channel::<i32>(0);
2875         let (count_tx, count_rx) = sync_channel(0);
2876
2877         let _t = thread::spawn(move || {
2878             let mut count = 0;
2879             for x in rx.iter() {
2880                 if count >= 3 {
2881                     break;
2882                 } else {
2883                     count += x;
2884                 }
2885             }
2886             count_tx.send(count).unwrap();
2887         });
2888
2889         tx.send(2).unwrap();
2890         tx.send(2).unwrap();
2891         tx.send(2).unwrap();
2892         let _ = tx.try_send(2);
2893         drop(tx);
2894         assert_eq!(count_rx.recv().unwrap(), 4);
2895     }
2896
2897     #[test]
2898     fn try_recv_states() {
2899         let (tx1, rx1) = sync_channel::<i32>(1);
2900         let (tx2, rx2) = sync_channel::<()>(1);
2901         let (tx3, rx3) = sync_channel::<()>(1);
2902         let _t = thread::spawn(move || {
2903             rx2.recv().unwrap();
2904             tx1.send(1).unwrap();
2905             tx3.send(()).unwrap();
2906             rx2.recv().unwrap();
2907             drop(tx1);
2908             tx3.send(()).unwrap();
2909         });
2910
2911         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2912         tx2.send(()).unwrap();
2913         rx3.recv().unwrap();
2914         assert_eq!(rx1.try_recv(), Ok(1));
2915         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2916         tx2.send(()).unwrap();
2917         rx3.recv().unwrap();
2918         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2919     }
2920
2921     // This bug used to end up in a livelock inside of the Receiver destructor
2922     // because the internal state of the Shared packet was corrupted
2923     #[test]
2924     fn destroy_upgraded_shared_port_when_sender_still_active() {
2925         let (tx, rx) = sync_channel::<()>(0);
2926         let (tx2, rx2) = sync_channel::<()>(0);
2927         let _t = thread::spawn(move || {
2928             rx.recv().unwrap(); // wait on a oneshot
2929             drop(rx); // destroy a shared
2930             tx2.send(()).unwrap();
2931         });
2932         // make sure the other thread has gone to sleep
2933         for _ in 0..5000 {
2934             thread::yield_now();
2935         }
2936
2937         // upgrade to a shared chan and send a message
2938         let t = tx.clone();
2939         drop(tx);
2940         t.send(()).unwrap();
2941
2942         // wait for the child thread to exit before we exit
2943         rx2.recv().unwrap();
2944     }
2945
2946     #[test]
2947     fn send1() {
2948         let (tx, rx) = sync_channel::<i32>(0);
2949         let _t = thread::spawn(move || {
2950             rx.recv().unwrap();
2951         });
2952         assert_eq!(tx.send(1), Ok(()));
2953     }
2954
2955     #[test]
2956     fn send2() {
2957         let (tx, rx) = sync_channel::<i32>(0);
2958         let _t = thread::spawn(move || {
2959             drop(rx);
2960         });
2961         assert!(tx.send(1).is_err());
2962     }
2963
2964     #[test]
2965     fn send3() {
2966         let (tx, rx) = sync_channel::<i32>(1);
2967         assert_eq!(tx.send(1), Ok(()));
2968         let _t = thread::spawn(move || {
2969             drop(rx);
2970         });
2971         assert!(tx.send(1).is_err());
2972     }
2973
2974     #[test]
2975     fn send4() {
2976         let (tx, rx) = sync_channel::<i32>(0);
2977         let tx2 = tx.clone();
2978         let (done, donerx) = channel();
2979         let done2 = done.clone();
2980         let _t = thread::spawn(move || {
2981             assert!(tx.send(1).is_err());
2982             done.send(()).unwrap();
2983         });
2984         let _t = thread::spawn(move || {
2985             assert!(tx2.send(2).is_err());
2986             done2.send(()).unwrap();
2987         });
2988         drop(rx);
2989         donerx.recv().unwrap();
2990         donerx.recv().unwrap();
2991     }
2992
2993     #[test]
2994     fn try_send1() {
2995         let (tx, _rx) = sync_channel::<i32>(0);
2996         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2997     }
2998
2999     #[test]
3000     fn try_send2() {
3001         let (tx, _rx) = sync_channel::<i32>(1);
3002         assert_eq!(tx.try_send(1), Ok(()));
3003         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3004     }
3005
3006     #[test]
3007     fn try_send3() {
3008         let (tx, rx) = sync_channel::<i32>(1);
3009         assert_eq!(tx.try_send(1), Ok(()));
3010         drop(rx);
3011         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
3012     }
3013
3014     #[test]
3015     fn issue_15761() {
3016         fn repro() {
3017             let (tx1, rx1) = sync_channel::<()>(3);
3018             let (tx2, rx2) = sync_channel::<()>(3);
3019
3020             let _t = thread::spawn(move || {
3021                 rx1.recv().unwrap();
3022                 tx2.try_send(()).unwrap();
3023             });
3024
3025             tx1.try_send(()).unwrap();
3026             rx2.recv().unwrap();
3027         }
3028
3029         for _ in 0..100 {
3030             repro()
3031         }
3032     }
3033 }