]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/mod.rs
Rollup merge of #66870 - tmiasko:simplify-ty, r=oli-obk
[rust.git] / src / libstd / sync / mpsc / mod.rs
1 // ignore-tidy-filelength
2
3 //! Multi-producer, single-consumer FIFO queue communication primitives.
4 //!
5 //! This module provides message-based communication over channels, concretely
6 //! defined among three types:
7 //!
8 //! * [`Sender`]
9 //! * [`SyncSender`]
10 //! * [`Receiver`]
11 //!
12 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
13 //! senders are clone-able (multi-producer) such that many threads can send
14 //! simultaneously to one receiver (single-consumer).
15 //!
16 //! These channels come in two flavors:
17 //!
18 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
19 //!    will return a `(Sender, Receiver)` tuple where all sends will be
20 //!    **asynchronous** (they never block). The channel conceptually has an
21 //!    infinite buffer.
22 //!
23 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
24 //!    return a `(SyncSender, Receiver)` tuple where the storage for pending
25 //!    messages is a pre-allocated buffer of a fixed size. All sends will be
26 //!    **synchronous** by blocking until there is buffer space available. Note
27 //!    that a bound of 0 is allowed, causing the channel to become a "rendezvous"
28 //!    channel where each sender atomically hands off a message to a receiver.
29 //!
30 //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
31 //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
32 //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
33 //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
34 //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
35 //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
36 //!
37 //! ## Disconnection
38 //!
39 //! The send and receive operations on channels will all return a [`Result`]
40 //! indicating whether the operation succeeded or not. An unsuccessful operation
41 //! is normally indicative of the other half of a channel having "hung up" by
42 //! being dropped in its corresponding thread.
43 //!
44 //! Once half of a channel has been deallocated, most operations can no longer
45 //! continue to make progress, so [`Err`] will be returned. Many applications
46 //! will continue to [`unwrap`] the results returned from this module,
47 //! instigating a propagation of failure among threads if one unexpectedly dies.
48 //!
49 //! [`Result`]: ../../../std/result/enum.Result.html
50 //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
51 //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
52 //!
53 //! # Examples
54 //!
55 //! Simple usage:
56 //!
57 //! ```
58 //! use std::thread;
59 //! use std::sync::mpsc::channel;
60 //!
61 //! // Create a simple streaming channel
62 //! let (tx, rx) = channel();
63 //! thread::spawn(move|| {
64 //!     tx.send(10).unwrap();
65 //! });
66 //! assert_eq!(rx.recv().unwrap(), 10);
67 //! ```
68 //!
69 //! Shared usage:
70 //!
71 //! ```
72 //! use std::thread;
73 //! use std::sync::mpsc::channel;
74 //!
75 //! // Create a shared channel that can be sent along from many threads
76 //! // where tx is the sending half (tx for transmission), and rx is the receiving
77 //! // half (rx for receiving).
78 //! let (tx, rx) = channel();
79 //! for i in 0..10 {
80 //!     let tx = tx.clone();
81 //!     thread::spawn(move|| {
82 //!         tx.send(i).unwrap();
83 //!     });
84 //! }
85 //!
86 //! for _ in 0..10 {
87 //!     let j = rx.recv().unwrap();
88 //!     assert!(0 <= j && j < 10);
89 //! }
90 //! ```
91 //!
92 //! Propagating panics:
93 //!
94 //! ```
95 //! use std::sync::mpsc::channel;
96 //!
97 //! // The call to recv() will return an error because the channel has already
98 //! // hung up (or been deallocated)
99 //! let (tx, rx) = channel::<i32>();
100 //! drop(tx);
101 //! assert!(rx.recv().is_err());
102 //! ```
103 //!
104 //! Synchronous channels:
105 //!
106 //! ```
107 //! use std::thread;
108 //! use std::sync::mpsc::sync_channel;
109 //!
110 //! let (tx, rx) = sync_channel::<i32>(0);
111 //! thread::spawn(move|| {
112 //!     // This will wait for the parent thread to start receiving
113 //!     tx.send(53).unwrap();
114 //! });
115 //! rx.recv().unwrap();
116 //! ```
117
118 #![stable(feature = "rust1", since = "1.0.0")]
119
120 // A description of how Rust's channel implementation works
121 //
122 // Channels are supposed to be the basic building block for all other
123 // concurrent primitives that are used in Rust. As a result, the channel type
124 // needs to be highly optimized, flexible, and broad enough for use everywhere.
125 //
126 // The choice of implementation of all channels is to be built on lock-free data
127 // structures. The channels themselves are then consequently also lock-free data
128 // structures. As always with lock-free code, this is a very "here be dragons"
129 // territory, especially because I'm unaware of any academic papers that have
130 // gone into great length about channels of these flavors.
131 //
132 // ## Flavors of channels
133 //
134 // From the perspective of a consumer of this library, there is only one flavor
135 // of channel. This channel can be used as a stream and cloned to allow multiple
136 // senders. Under the hood, however, there are actually three flavors of
137 // channels in play.
138 //
139 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
140 //                      case. They contain as few atomics as possible and
141 //                      involve one and exactly one allocation.
142 // * Streams - these channels are optimized for the non-shared use case. They
143 //             use a different concurrent queue that is more tailored for this
144 //             use case. The initial allocation of this flavor of channel is not
145 //             optimized.
146 // * Shared - this is the most general form of channel that this module offers,
147 //            a channel with multiple senders. This type is as optimized as it
148 //            can be, but the previous two types mentioned are much faster for
149 //            their use-cases.
150 //
151 // ## Concurrent queues
152 //
153 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
154 // but recv() obviously blocks. This means that under the hood there must be
155 // some shared and concurrent queue holding all of the actual data.
156 //
157 // With two flavors of channels, two flavors of queues are also used. We have
158 // chosen to use queues from a well-known author that are abbreviated as SPSC
159 // and MPSC (single producer, single consumer and multiple producer, single
160 // consumer). SPSC queues are used for streams while MPSC queues are used for
161 // shared channels.
162 //
163 // ### SPSC optimizations
164 //
165 // The SPSC queue found online is essentially a linked list of nodes where one
166 // half of the nodes are the "queue of data" and the other half of nodes are a
167 // cache of unused nodes. The unused nodes are used such that an allocation is
168 // not required on every push() and a free doesn't need to happen on every
169 // pop().
170 //
171 // As found online, however, the cache of nodes is of an infinite size. This
172 // means that if a channel at one point in its life had 50k items in the queue,
173 // then the queue will always have the capacity for 50k items. I believed that
174 // this was an unnecessary limitation of the implementation, so I have altered
175 // the queue to optionally have a bound on the cache size.
176 //
177 // By default, streams will have an unbounded SPSC queue with a small-ish cache
178 // size. The hope is that the cache is still large enough to have very fast
179 // send() operations while not too large such that millions of channels can
180 // coexist at once.
181 //
182 // ### MPSC optimizations
183 //
184 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
185 // a linked list under the hood to earn its unboundedness, but I have not put
186 // forth much effort into having a cache of nodes similar to the SPSC queue.
187 //
188 // For now, I believe that this is "ok" because shared channels are not the most
189 // common type, but soon we may wish to revisit this queue choice and determine
190 // another candidate for backend storage of shared channels.
191 //
192 // ## Overview of the Implementation
193 //
194 // Now that there's a little background on the concurrent queues used, it's
195 // worth going into much more detail about the channels themselves. The basic
196 // pseudocode for a send/recv are:
197 //
198 //
199 //      send(t)                             recv()
200 //        queue.push(t)                       return if queue.pop()
201 //        if increment() == -1                deschedule {
202 //          wakeup()                            if decrement() > 0
203 //                                                cancel_deschedule()
204 //                                            }
205 //                                            queue.pop()
206 //
207 // As mentioned before, there are no locks in this implementation, only atomic
208 // instructions are used.
209 //
210 // ### The internal atomic counter
211 //
212 // Every channel has a shared counter with each half to keep track of the size
213 // of the queue. This counter is used to abort descheduling by the receiver and
214 // to know when to wake up on the sending side.
215 //
216 // As seen in the pseudocode, senders will increment this count and receivers
217 // will decrement the count. The theory behind this is that if a sender sees a
218 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
219 // then it doesn't need to block.
220 //
221 // The recv() method has a beginning call to pop(), and if successful, it needs
222 // to decrement the count. It is a crucial implementation detail that this
223 // decrement does *not* happen to the shared counter. If this were the case,
224 // then it would be possible for the counter to be very negative when there were
225 // no receivers waiting, in which case the senders would have to determine when
226 // it was actually appropriate to wake up a receiver.
227 //
228 // Instead, the "steal count" is kept track of separately (not atomically
229 // because it's only used by receivers), and then the decrement() call when
230 // descheduling will lump in all of the recent steals into one large decrement.
231 //
232 // The implication of this is that if a sender sees a -1 count, then there's
233 // guaranteed to be a waiter waiting!
234 //
235 // ## Native Implementation
236 //
237 // A major goal of these channels is to work seamlessly on and off the runtime.
238 // All of the previous race conditions have been worded in terms of
239 // scheduler-isms (which is obviously not available without the runtime).
240 //
241 // For now, native usage of channels (off the runtime) will fall back onto
242 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
243 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
244 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
245 // condition variable.
246 //
247 // ## Select
248 //
249 // Being able to support selection over channels has greatly influenced this
250 // design, and not only does selection need to work inside the runtime, but also
251 // outside the runtime.
252 //
253 // The implementation is fairly straightforward. The goal of select() is not to
254 // return some data, but only to return which channel can receive data without
255 // blocking. The implementation is essentially the entire blocking procedure
256 // followed by an increment as soon as its woken up. The cancellation procedure
257 // involves an increment and swapping out of to_wake to acquire ownership of the
258 // thread to unblock.
259 //
260 // Sadly this current implementation requires multiple allocations, so I have
261 // seen the throughput of select() be much worse than it should be. I do not
262 // believe that there is anything fundamental that needs to change about these
263 // channels, however, in order to support a more efficient select().
264 //
265 // FIXME: Select is now removed, so these factors are ready to be cleaned up!
266 //
267 // # Conclusion
268 //
269 // And now that you've seen all the races that I found and attempted to fix,
270 // here's the code for you to find some more!
271
272 use crate::cell::UnsafeCell;
273 use crate::error;
274 use crate::fmt;
275 use crate::mem;
276 use crate::sync::Arc;
277 use crate::time::{Duration, Instant};
278
279 mod blocking;
280 mod mpsc_queue;
281 mod oneshot;
282 mod shared;
283 mod spsc_queue;
284 mod stream;
285 mod sync;
286
287 mod cache_aligned;
288
289 /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
290 /// This half can only be owned by one thread.
291 ///
292 /// Messages sent to the channel can be retrieved using [`recv`].
293 ///
294 /// [`channel`]: fn.channel.html
295 /// [`sync_channel`]: fn.sync_channel.html
296 /// [`recv`]: struct.Receiver.html#method.recv
297 ///
298 /// # Examples
299 ///
300 /// ```rust
301 /// use std::sync::mpsc::channel;
302 /// use std::thread;
303 /// use std::time::Duration;
304 ///
305 /// let (send, recv) = channel();
306 ///
307 /// thread::spawn(move || {
308 ///     send.send("Hello world!").unwrap();
309 ///     thread::sleep(Duration::from_secs(2)); // block for two seconds
310 ///     send.send("Delayed for 2 seconds").unwrap();
311 /// });
312 ///
313 /// println!("{}", recv.recv().unwrap()); // Received immediately
314 /// println!("Waiting...");
315 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
316 /// ```
317 #[stable(feature = "rust1", since = "1.0.0")]
318 pub struct Receiver<T> {
319     inner: UnsafeCell<Flavor<T>>,
320 }
321
322 // The receiver port can be sent from place to place, so long as it
323 // is not used to receive non-sendable things.
324 #[stable(feature = "rust1", since = "1.0.0")]
325 unsafe impl<T: Send> Send for Receiver<T> {}
326
327 #[stable(feature = "rust1", since = "1.0.0")]
328 impl<T> !Sync for Receiver<T> {}
329
330 /// An iterator over messages on a [`Receiver`], created by [`iter`].
331 ///
332 /// This iterator will block whenever [`next`] is called,
333 /// waiting for a new message, and [`None`] will be returned
334 /// when the corresponding channel has hung up.
335 ///
336 /// [`iter`]: struct.Receiver.html#method.iter
337 /// [`Receiver`]: struct.Receiver.html
338 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
339 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
340 ///
341 /// # Examples
342 ///
343 /// ```rust
344 /// use std::sync::mpsc::channel;
345 /// use std::thread;
346 ///
347 /// let (send, recv) = channel();
348 ///
349 /// thread::spawn(move || {
350 ///     send.send(1u8).unwrap();
351 ///     send.send(2u8).unwrap();
352 ///     send.send(3u8).unwrap();
353 /// });
354 ///
355 /// for x in recv.iter() {
356 ///     println!("Got: {}", x);
357 /// }
358 /// ```
359 #[stable(feature = "rust1", since = "1.0.0")]
360 #[derive(Debug)]
361 pub struct Iter<'a, T: 'a> {
362     rx: &'a Receiver<T>,
363 }
364
365 /// An iterator that attempts to yield all pending values for a [`Receiver`],
366 /// created by [`try_iter`].
367 ///
368 /// [`None`] will be returned when there are no pending values remaining or
369 /// if the corresponding channel has hung up.
370 ///
371 /// This iterator will never block the caller in order to wait for data to
372 /// become available. Instead, it will return [`None`].
373 ///
374 /// [`Receiver`]: struct.Receiver.html
375 /// [`try_iter`]: struct.Receiver.html#method.try_iter
376 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
377 ///
378 /// # Examples
379 ///
380 /// ```rust
381 /// use std::sync::mpsc::channel;
382 /// use std::thread;
383 /// use std::time::Duration;
384 ///
385 /// let (sender, receiver) = channel();
386 ///
387 /// // Nothing is in the buffer yet
388 /// assert!(receiver.try_iter().next().is_none());
389 /// println!("Nothing in the buffer...");
390 ///
391 /// thread::spawn(move || {
392 ///     sender.send(1).unwrap();
393 ///     sender.send(2).unwrap();
394 ///     sender.send(3).unwrap();
395 /// });
396 ///
397 /// println!("Going to sleep...");
398 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
399 ///
400 /// for x in receiver.try_iter() {
401 ///     println!("Got: {}", x);
402 /// }
403 /// ```
404 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
405 #[derive(Debug)]
406 pub struct TryIter<'a, T: 'a> {
407     rx: &'a Receiver<T>,
408 }
409
410 /// An owning iterator over messages on a [`Receiver`],
411 /// created by **Receiver::into_iter**.
412 ///
413 /// This iterator will block whenever [`next`]
414 /// is called, waiting for a new message, and [`None`] will be
415 /// returned if the corresponding channel has hung up.
416 ///
417 /// [`Receiver`]: struct.Receiver.html
418 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
419 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
420 ///
421 /// # Examples
422 ///
423 /// ```rust
424 /// use std::sync::mpsc::channel;
425 /// use std::thread;
426 ///
427 /// let (send, recv) = channel();
428 ///
429 /// thread::spawn(move || {
430 ///     send.send(1u8).unwrap();
431 ///     send.send(2u8).unwrap();
432 ///     send.send(3u8).unwrap();
433 /// });
434 ///
435 /// for x in recv.into_iter() {
436 ///     println!("Got: {}", x);
437 /// }
438 /// ```
439 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
440 #[derive(Debug)]
441 pub struct IntoIter<T> {
442     rx: Receiver<T>,
443 }
444
445 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
446 /// owned by one thread, but it can be cloned to send to other threads.
447 ///
448 /// Messages can be sent through this channel with [`send`].
449 ///
450 /// [`channel`]: fn.channel.html
451 /// [`send`]: struct.Sender.html#method.send
452 ///
453 /// # Examples
454 ///
455 /// ```rust
456 /// use std::sync::mpsc::channel;
457 /// use std::thread;
458 ///
459 /// let (sender, receiver) = channel();
460 /// let sender2 = sender.clone();
461 ///
462 /// // First thread owns sender
463 /// thread::spawn(move || {
464 ///     sender.send(1).unwrap();
465 /// });
466 ///
467 /// // Second thread owns sender2
468 /// thread::spawn(move || {
469 ///     sender2.send(2).unwrap();
470 /// });
471 ///
472 /// let msg = receiver.recv().unwrap();
473 /// let msg2 = receiver.recv().unwrap();
474 ///
475 /// assert_eq!(3, msg + msg2);
476 /// ```
477 #[stable(feature = "rust1", since = "1.0.0")]
478 pub struct Sender<T> {
479     inner: UnsafeCell<Flavor<T>>,
480 }
481
482 // The send port can be sent from place to place, so long as it
483 // is not used to send non-sendable things.
484 #[stable(feature = "rust1", since = "1.0.0")]
485 unsafe impl<T: Send> Send for Sender<T> {}
486
487 #[stable(feature = "rust1", since = "1.0.0")]
488 impl<T> !Sync for Sender<T> {}
489
490 /// The sending-half of Rust's synchronous [`sync_channel`] type.
491 ///
492 /// Messages can be sent through this channel with [`send`] or [`try_send`].
493 ///
494 /// [`send`] will block if there is no space in the internal buffer.
495 ///
496 /// [`sync_channel`]: fn.sync_channel.html
497 /// [`send`]: struct.SyncSender.html#method.send
498 /// [`try_send`]: struct.SyncSender.html#method.try_send
499 ///
500 /// # Examples
501 ///
502 /// ```rust
503 /// use std::sync::mpsc::sync_channel;
504 /// use std::thread;
505 ///
506 /// // Create a sync_channel with buffer size 2
507 /// let (sync_sender, receiver) = sync_channel(2);
508 /// let sync_sender2 = sync_sender.clone();
509 ///
510 /// // First thread owns sync_sender
511 /// thread::spawn(move || {
512 ///     sync_sender.send(1).unwrap();
513 ///     sync_sender.send(2).unwrap();
514 /// });
515 ///
516 /// // Second thread owns sync_sender2
517 /// thread::spawn(move || {
518 ///     sync_sender2.send(3).unwrap();
519 ///     // thread will now block since the buffer is full
520 ///     println!("Thread unblocked!");
521 /// });
522 ///
523 /// let mut msg;
524 ///
525 /// msg = receiver.recv().unwrap();
526 /// println!("message {} received", msg);
527 ///
528 /// // "Thread unblocked!" will be printed now
529 ///
530 /// msg = receiver.recv().unwrap();
531 /// println!("message {} received", msg);
532 ///
533 /// msg = receiver.recv().unwrap();
534 ///
535 /// println!("message {} received", msg);
536 /// ```
537 #[stable(feature = "rust1", since = "1.0.0")]
538 pub struct SyncSender<T> {
539     inner: Arc<sync::Packet<T>>,
540 }
541
542 #[stable(feature = "rust1", since = "1.0.0")]
543 unsafe impl<T: Send> Send for SyncSender<T> {}
544
545 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
546 /// function on **channel**s.
547 ///
548 /// A **send** operation can only fail if the receiving end of a channel is
549 /// disconnected, implying that the data could never be received. The error
550 /// contains the data being sent as a payload so it can be recovered.
551 ///
552 /// [`Sender::send`]: struct.Sender.html#method.send
553 /// [`SyncSender::send`]: struct.SyncSender.html#method.send
554 #[stable(feature = "rust1", since = "1.0.0")]
555 #[derive(PartialEq, Eq, Clone, Copy)]
556 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
557
558 /// An error returned from the [`recv`] function on a [`Receiver`].
559 ///
560 /// The [`recv`] operation can only fail if the sending half of a
561 /// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
562 /// messages will ever be received.
563 ///
564 /// [`recv`]: struct.Receiver.html#method.recv
565 /// [`Receiver`]: struct.Receiver.html
566 /// [`channel`]: fn.channel.html
567 /// [`sync_channel`]: fn.sync_channel.html
568 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
569 #[stable(feature = "rust1", since = "1.0.0")]
570 pub struct RecvError;
571
572 /// This enumeration is the list of the possible reasons that [`try_recv`] could
573 /// not return data when called. This can occur with both a [`channel`] and
574 /// a [`sync_channel`].
575 ///
576 /// [`try_recv`]: struct.Receiver.html#method.try_recv
577 /// [`channel`]: fn.channel.html
578 /// [`sync_channel`]: fn.sync_channel.html
579 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
580 #[stable(feature = "rust1", since = "1.0.0")]
581 pub enum TryRecvError {
582     /// This **channel** is currently empty, but the **Sender**(s) have not yet
583     /// disconnected, so data may yet become available.
584     #[stable(feature = "rust1", since = "1.0.0")]
585     Empty,
586
587     /// The **channel**'s sending half has become disconnected, and there will
588     /// never be any more data received on it.
589     #[stable(feature = "rust1", since = "1.0.0")]
590     Disconnected,
591 }
592
593 /// This enumeration is the list of possible errors that made [`recv_timeout`]
594 /// unable to return data when called. This can occur with both a [`channel`] and
595 /// a [`sync_channel`].
596 ///
597 /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
598 /// [`channel`]: fn.channel.html
599 /// [`sync_channel`]: fn.sync_channel.html
600 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
601 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
602 pub enum RecvTimeoutError {
603     /// This **channel** is currently empty, but the **Sender**(s) have not yet
604     /// disconnected, so data may yet become available.
605     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
606     Timeout,
607     /// The **channel**'s sending half has become disconnected, and there will
608     /// never be any more data received on it.
609     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
610     Disconnected,
611 }
612
613 /// This enumeration is the list of the possible error outcomes for the
614 /// [`try_send`] method.
615 ///
616 /// [`try_send`]: struct.SyncSender.html#method.try_send
617 #[stable(feature = "rust1", since = "1.0.0")]
618 #[derive(PartialEq, Eq, Clone, Copy)]
619 pub enum TrySendError<T> {
620     /// The data could not be sent on the [`sync_channel`] because it would require that
621     /// the callee block to send the data.
622     ///
623     /// If this is a buffered channel, then the buffer is full at this time. If
624     /// this is not a buffered channel, then there is no [`Receiver`] available to
625     /// acquire the data.
626     ///
627     /// [`sync_channel`]: fn.sync_channel.html
628     /// [`Receiver`]: struct.Receiver.html
629     #[stable(feature = "rust1", since = "1.0.0")]
630     Full(#[stable(feature = "rust1", since = "1.0.0")] T),
631
632     /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
633     /// sent. The data is returned back to the callee in this case.
634     ///
635     /// [`sync_channel`]: fn.sync_channel.html
636     #[stable(feature = "rust1", since = "1.0.0")]
637     Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
638 }
639
640 enum Flavor<T> {
641     Oneshot(Arc<oneshot::Packet<T>>),
642     Stream(Arc<stream::Packet<T>>),
643     Shared(Arc<shared::Packet<T>>),
644     Sync(Arc<sync::Packet<T>>),
645 }
646
647 #[doc(hidden)]
648 trait UnsafeFlavor<T> {
649     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
650     unsafe fn inner_mut(&self) -> &mut Flavor<T> {
651         &mut *self.inner_unsafe().get()
652     }
653     unsafe fn inner(&self) -> &Flavor<T> {
654         &*self.inner_unsafe().get()
655     }
656 }
657 impl<T> UnsafeFlavor<T> for Sender<T> {
658     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
659         &self.inner
660     }
661 }
662 impl<T> UnsafeFlavor<T> for Receiver<T> {
663     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
664         &self.inner
665     }
666 }
667
668 /// Creates a new asynchronous channel, returning the sender/receiver halves.
669 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
670 /// the same order as it was sent, and no [`send`] will block the calling thread
671 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
672 /// block after its buffer limit is reached). [`recv`] will block until a message
673 /// is available.
674 ///
675 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
676 /// only one [`Receiver`] is supported.
677 ///
678 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
679 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
680 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
681 /// return a [`RecvError`].
682 ///
683 /// [`send`]: struct.Sender.html#method.send
684 /// [`recv`]: struct.Receiver.html#method.recv
685 /// [`Sender`]: struct.Sender.html
686 /// [`Receiver`]: struct.Receiver.html
687 /// [`sync_channel`]: fn.sync_channel.html
688 /// [`SendError`]: struct.SendError.html
689 /// [`RecvError`]: struct.RecvError.html
690 ///
691 /// # Examples
692 ///
693 /// ```
694 /// use std::sync::mpsc::channel;
695 /// use std::thread;
696 ///
697 /// let (sender, receiver) = channel();
698 ///
699 /// // Spawn off an expensive computation
700 /// thread::spawn(move|| {
701 /// #   fn expensive_computation() {}
702 ///     sender.send(expensive_computation()).unwrap();
703 /// });
704 ///
705 /// // Do some useful work for awhile
706 ///
707 /// // Let's see what that answer was
708 /// println!("{:?}", receiver.recv().unwrap());
709 /// ```
710 #[stable(feature = "rust1", since = "1.0.0")]
711 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
712     let a = Arc::new(oneshot::Packet::new());
713     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
714 }
715
716 /// Creates a new synchronous, bounded channel.
717 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
718 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
719 /// [`Receiver`] will block until a message becomes available. `sync_channel`
720 /// differs greatly in the semantics of the sender, however.
721 ///
722 /// This channel has an internal buffer on which messages will be queued.
723 /// `bound` specifies the buffer size. When the internal buffer becomes full,
724 /// future sends will *block* waiting for the buffer to open up. Note that a
725 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
726 /// where each [`send`] will not return until a [`recv`] is paired with it.
727 ///
728 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
729 /// times, but only one [`Receiver`] is supported.
730 ///
731 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
732 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
733 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
734 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
735 ///
736 /// [`channel`]: fn.channel.html
737 /// [`send`]: struct.SyncSender.html#method.send
738 /// [`recv`]: struct.Receiver.html#method.recv
739 /// [`SyncSender`]: struct.SyncSender.html
740 /// [`Receiver`]: struct.Receiver.html
741 /// [`SendError`]: struct.SendError.html
742 /// [`RecvError`]: struct.RecvError.html
743 ///
744 /// # Examples
745 ///
746 /// ```
747 /// use std::sync::mpsc::sync_channel;
748 /// use std::thread;
749 ///
750 /// let (sender, receiver) = sync_channel(1);
751 ///
752 /// // this returns immediately
753 /// sender.send(1).unwrap();
754 ///
755 /// thread::spawn(move|| {
756 ///     // this will block until the previous message has been received
757 ///     sender.send(2).unwrap();
758 /// });
759 ///
760 /// assert_eq!(receiver.recv().unwrap(), 1);
761 /// assert_eq!(receiver.recv().unwrap(), 2);
762 /// ```
763 #[stable(feature = "rust1", since = "1.0.0")]
764 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
765     let a = Arc::new(sync::Packet::new(bound));
766     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
767 }
768
769 ////////////////////////////////////////////////////////////////////////////////
770 // Sender
771 ////////////////////////////////////////////////////////////////////////////////
772
773 impl<T> Sender<T> {
774     fn new(inner: Flavor<T>) -> Sender<T> {
775         Sender { inner: UnsafeCell::new(inner) }
776     }
777
778     /// Attempts to send a value on this channel, returning it back if it could
779     /// not be sent.
780     ///
781     /// A successful send occurs when it is determined that the other end of
782     /// the channel has not hung up already. An unsuccessful send would be one
783     /// where the corresponding receiver has already been deallocated. Note
784     /// that a return value of [`Err`] means that the data will never be
785     /// received, but a return value of [`Ok`] does *not* mean that the data
786     /// will be received. It is possible for the corresponding receiver to
787     /// hang up immediately after this function returns [`Ok`].
788     ///
789     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
790     /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
791     ///
792     /// This method will never block the current thread.
793     ///
794     /// # Examples
795     ///
796     /// ```
797     /// use std::sync::mpsc::channel;
798     ///
799     /// let (tx, rx) = channel();
800     ///
801     /// // This send is always successful
802     /// tx.send(1).unwrap();
803     ///
804     /// // This send will fail because the receiver is gone
805     /// drop(rx);
806     /// assert_eq!(tx.send(1).unwrap_err().0, 1);
807     /// ```
808     #[stable(feature = "rust1", since = "1.0.0")]
809     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
810         let (new_inner, ret) = match *unsafe { self.inner() } {
811             Flavor::Oneshot(ref p) => {
812                 if !p.sent() {
813                     return p.send(t).map_err(SendError);
814                 } else {
815                     let a = Arc::new(stream::Packet::new());
816                     let rx = Receiver::new(Flavor::Stream(a.clone()));
817                     match p.upgrade(rx) {
818                         oneshot::UpSuccess => {
819                             let ret = a.send(t);
820                             (a, ret)
821                         }
822                         oneshot::UpDisconnected => (a, Err(t)),
823                         oneshot::UpWoke(token) => {
824                             // This send cannot panic because the thread is
825                             // asleep (we're looking at it), so the receiver
826                             // can't go away.
827                             a.send(t).ok().unwrap();
828                             token.signal();
829                             (a, Ok(()))
830                         }
831                     }
832                 }
833             }
834             Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
835             Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
836             Flavor::Sync(..) => unreachable!(),
837         };
838
839         unsafe {
840             let tmp = Sender::new(Flavor::Stream(new_inner));
841             mem::swap(self.inner_mut(), tmp.inner_mut());
842         }
843         ret.map_err(SendError)
844     }
845 }
846
847 #[stable(feature = "rust1", since = "1.0.0")]
848 impl<T> Clone for Sender<T> {
849     fn clone(&self) -> Sender<T> {
850         let packet = match *unsafe { self.inner() } {
851             Flavor::Oneshot(ref p) => {
852                 let a = Arc::new(shared::Packet::new());
853                 {
854                     let guard = a.postinit_lock();
855                     let rx = Receiver::new(Flavor::Shared(a.clone()));
856                     let sleeper = match p.upgrade(rx) {
857                         oneshot::UpSuccess | oneshot::UpDisconnected => None,
858                         oneshot::UpWoke(task) => Some(task),
859                     };
860                     a.inherit_blocker(sleeper, guard);
861                 }
862                 a
863             }
864             Flavor::Stream(ref p) => {
865                 let a = Arc::new(shared::Packet::new());
866                 {
867                     let guard = a.postinit_lock();
868                     let rx = Receiver::new(Flavor::Shared(a.clone()));
869                     let sleeper = match p.upgrade(rx) {
870                         stream::UpSuccess | stream::UpDisconnected => None,
871                         stream::UpWoke(task) => Some(task),
872                     };
873                     a.inherit_blocker(sleeper, guard);
874                 }
875                 a
876             }
877             Flavor::Shared(ref p) => {
878                 p.clone_chan();
879                 return Sender::new(Flavor::Shared(p.clone()));
880             }
881             Flavor::Sync(..) => unreachable!(),
882         };
883
884         unsafe {
885             let tmp = Sender::new(Flavor::Shared(packet.clone()));
886             mem::swap(self.inner_mut(), tmp.inner_mut());
887         }
888         Sender::new(Flavor::Shared(packet))
889     }
890 }
891
892 #[stable(feature = "rust1", since = "1.0.0")]
893 impl<T> Drop for Sender<T> {
894     fn drop(&mut self) {
895         match *unsafe { self.inner() } {
896             Flavor::Oneshot(ref p) => p.drop_chan(),
897             Flavor::Stream(ref p) => p.drop_chan(),
898             Flavor::Shared(ref p) => p.drop_chan(),
899             Flavor::Sync(..) => unreachable!(),
900         }
901     }
902 }
903
904 #[stable(feature = "mpsc_debug", since = "1.8.0")]
905 impl<T> fmt::Debug for Sender<T> {
906     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
907         f.debug_struct("Sender").finish()
908     }
909 }
910
911 ////////////////////////////////////////////////////////////////////////////////
912 // SyncSender
913 ////////////////////////////////////////////////////////////////////////////////
914
915 impl<T> SyncSender<T> {
916     fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
917         SyncSender { inner }
918     }
919
920     /// Sends a value on this synchronous channel.
921     ///
922     /// This function will *block* until space in the internal buffer becomes
923     /// available or a receiver is available to hand off the message to.
924     ///
925     /// Note that a successful send does *not* guarantee that the receiver will
926     /// ever see the data if there is a buffer on this channel. Items may be
927     /// enqueued in the internal buffer for the receiver to receive at a later
928     /// time. If the buffer size is 0, however, the channel becomes a rendezvous
929     /// channel and it guarantees that the receiver has indeed received
930     /// the data if this function returns success.
931     ///
932     /// This function will never panic, but it may return [`Err`] if the
933     /// [`Receiver`] has disconnected and is no longer able to receive
934     /// information.
935     ///
936     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
937     /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
938     ///
939     /// # Examples
940     ///
941     /// ```rust
942     /// use std::sync::mpsc::sync_channel;
943     /// use std::thread;
944     ///
945     /// // Create a rendezvous sync_channel with buffer size 0
946     /// let (sync_sender, receiver) = sync_channel(0);
947     ///
948     /// thread::spawn(move || {
949     ///    println!("sending message...");
950     ///    sync_sender.send(1).unwrap();
951     ///    // Thread is now blocked until the message is received
952     ///
953     ///    println!("...message received!");
954     /// });
955     ///
956     /// let msg = receiver.recv().unwrap();
957     /// assert_eq!(1, msg);
958     /// ```
959     #[stable(feature = "rust1", since = "1.0.0")]
960     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
961         self.inner.send(t).map_err(SendError)
962     }
963
964     /// Attempts to send a value on this channel without blocking.
965     ///
966     /// This method differs from [`send`] by returning immediately if the
967     /// channel's buffer is full or no receiver is waiting to acquire some
968     /// data. Compared with [`send`], this function has two failure cases
969     /// instead of one (one for disconnection, one for a full buffer).
970     ///
971     /// See [`send`] for notes about guarantees of whether the
972     /// receiver has received the data or not if this function is successful.
973     ///
974     /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
975     ///
976     /// # Examples
977     ///
978     /// ```rust
979     /// use std::sync::mpsc::sync_channel;
980     /// use std::thread;
981     ///
982     /// // Create a sync_channel with buffer size 1
983     /// let (sync_sender, receiver) = sync_channel(1);
984     /// let sync_sender2 = sync_sender.clone();
985     ///
986     /// // First thread owns sync_sender
987     /// thread::spawn(move || {
988     ///     sync_sender.send(1).unwrap();
989     ///     sync_sender.send(2).unwrap();
990     ///     // Thread blocked
991     /// });
992     ///
993     /// // Second thread owns sync_sender2
994     /// thread::spawn(move || {
995     ///     // This will return an error and send
996     ///     // no message if the buffer is full
997     ///     let _ = sync_sender2.try_send(3);
998     /// });
999     ///
1000     /// let mut msg;
1001     /// msg = receiver.recv().unwrap();
1002     /// println!("message {} received", msg);
1003     ///
1004     /// msg = receiver.recv().unwrap();
1005     /// println!("message {} received", msg);
1006     ///
1007     /// // Third message may have never been sent
1008     /// match receiver.try_recv() {
1009     ///     Ok(msg) => println!("message {} received", msg),
1010     ///     Err(_) => println!("the third message was never sent"),
1011     /// }
1012     /// ```
1013     #[stable(feature = "rust1", since = "1.0.0")]
1014     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
1015         self.inner.try_send(t)
1016     }
1017 }
1018
1019 #[stable(feature = "rust1", since = "1.0.0")]
1020 impl<T> Clone for SyncSender<T> {
1021     fn clone(&self) -> SyncSender<T> {
1022         self.inner.clone_chan();
1023         SyncSender::new(self.inner.clone())
1024     }
1025 }
1026
1027 #[stable(feature = "rust1", since = "1.0.0")]
1028 impl<T> Drop for SyncSender<T> {
1029     fn drop(&mut self) {
1030         self.inner.drop_chan();
1031     }
1032 }
1033
1034 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1035 impl<T> fmt::Debug for SyncSender<T> {
1036     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1037         f.debug_struct("SyncSender").finish()
1038     }
1039 }
1040
1041 ////////////////////////////////////////////////////////////////////////////////
1042 // Receiver
1043 ////////////////////////////////////////////////////////////////////////////////
1044
1045 impl<T> Receiver<T> {
1046     fn new(inner: Flavor<T>) -> Receiver<T> {
1047         Receiver { inner: UnsafeCell::new(inner) }
1048     }
1049
1050     /// Attempts to return a pending value on this receiver without blocking.
1051     ///
1052     /// This method will never block the caller in order to wait for data to
1053     /// become available. Instead, this will always return immediately with a
1054     /// possible option of pending data on the channel.
1055     ///
1056     /// This is useful for a flavor of "optimistic check" before deciding to
1057     /// block on a receiver.
1058     ///
1059     /// Compared with [`recv`], this function has two failure cases instead of one
1060     /// (one for disconnection, one for an empty buffer).
1061     ///
1062     /// [`recv`]: struct.Receiver.html#method.recv
1063     ///
1064     /// # Examples
1065     ///
1066     /// ```rust
1067     /// use std::sync::mpsc::{Receiver, channel};
1068     ///
1069     /// let (_, receiver): (_, Receiver<i32>) = channel();
1070     ///
1071     /// assert!(receiver.try_recv().is_err());
1072     /// ```
1073     #[stable(feature = "rust1", since = "1.0.0")]
1074     pub fn try_recv(&self) -> Result<T, TryRecvError> {
1075         loop {
1076             let new_port = match *unsafe { self.inner() } {
1077                 Flavor::Oneshot(ref p) => match p.try_recv() {
1078                     Ok(t) => return Ok(t),
1079                     Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1080                     Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected),
1081                     Err(oneshot::Upgraded(rx)) => rx,
1082                 },
1083                 Flavor::Stream(ref p) => match p.try_recv() {
1084                     Ok(t) => return Ok(t),
1085                     Err(stream::Empty) => return Err(TryRecvError::Empty),
1086                     Err(stream::Disconnected) => return Err(TryRecvError::Disconnected),
1087                     Err(stream::Upgraded(rx)) => rx,
1088                 },
1089                 Flavor::Shared(ref p) => match p.try_recv() {
1090                     Ok(t) => return Ok(t),
1091                     Err(shared::Empty) => return Err(TryRecvError::Empty),
1092                     Err(shared::Disconnected) => return Err(TryRecvError::Disconnected),
1093                 },
1094                 Flavor::Sync(ref p) => match p.try_recv() {
1095                     Ok(t) => return Ok(t),
1096                     Err(sync::Empty) => return Err(TryRecvError::Empty),
1097                     Err(sync::Disconnected) => return Err(TryRecvError::Disconnected),
1098                 },
1099             };
1100             unsafe {
1101                 mem::swap(self.inner_mut(), new_port.inner_mut());
1102             }
1103         }
1104     }
1105
1106     /// Attempts to wait for a value on this receiver, returning an error if the
1107     /// corresponding channel has hung up.
1108     ///
1109     /// This function will always block the current thread if there is no data
1110     /// available and it's possible for more data to be sent. Once a message is
1111     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1112     /// receiver will wake up and return that message.
1113     ///
1114     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1115     /// this call is blocking, this call will wake up and return [`Err`] to
1116     /// indicate that no more messages can ever be received on this channel.
1117     /// However, since channels are buffered, messages sent before the disconnect
1118     /// will still be properly received.
1119     ///
1120     /// [`Sender`]: struct.Sender.html
1121     /// [`SyncSender`]: struct.SyncSender.html
1122     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1123     ///
1124     /// # Examples
1125     ///
1126     /// ```
1127     /// use std::sync::mpsc;
1128     /// use std::thread;
1129     ///
1130     /// let (send, recv) = mpsc::channel();
1131     /// let handle = thread::spawn(move || {
1132     ///     send.send(1u8).unwrap();
1133     /// });
1134     ///
1135     /// handle.join().unwrap();
1136     ///
1137     /// assert_eq!(Ok(1), recv.recv());
1138     /// ```
1139     ///
1140     /// Buffering behavior:
1141     ///
1142     /// ```
1143     /// use std::sync::mpsc;
1144     /// use std::thread;
1145     /// use std::sync::mpsc::RecvError;
1146     ///
1147     /// let (send, recv) = mpsc::channel();
1148     /// let handle = thread::spawn(move || {
1149     ///     send.send(1u8).unwrap();
1150     ///     send.send(2).unwrap();
1151     ///     send.send(3).unwrap();
1152     ///     drop(send);
1153     /// });
1154     ///
1155     /// // wait for the thread to join so we ensure the sender is dropped
1156     /// handle.join().unwrap();
1157     ///
1158     /// assert_eq!(Ok(1), recv.recv());
1159     /// assert_eq!(Ok(2), recv.recv());
1160     /// assert_eq!(Ok(3), recv.recv());
1161     /// assert_eq!(Err(RecvError), recv.recv());
1162     /// ```
1163     #[stable(feature = "rust1", since = "1.0.0")]
1164     pub fn recv(&self) -> Result<T, RecvError> {
1165         loop {
1166             let new_port = match *unsafe { self.inner() } {
1167                 Flavor::Oneshot(ref p) => match p.recv(None) {
1168                     Ok(t) => return Ok(t),
1169                     Err(oneshot::Disconnected) => return Err(RecvError),
1170                     Err(oneshot::Upgraded(rx)) => rx,
1171                     Err(oneshot::Empty) => unreachable!(),
1172                 },
1173                 Flavor::Stream(ref p) => match p.recv(None) {
1174                     Ok(t) => return Ok(t),
1175                     Err(stream::Disconnected) => return Err(RecvError),
1176                     Err(stream::Upgraded(rx)) => rx,
1177                     Err(stream::Empty) => unreachable!(),
1178                 },
1179                 Flavor::Shared(ref p) => match p.recv(None) {
1180                     Ok(t) => return Ok(t),
1181                     Err(shared::Disconnected) => return Err(RecvError),
1182                     Err(shared::Empty) => unreachable!(),
1183                 },
1184                 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1185             };
1186             unsafe {
1187                 mem::swap(self.inner_mut(), new_port.inner_mut());
1188             }
1189         }
1190     }
1191
1192     /// Attempts to wait for a value on this receiver, returning an error if the
1193     /// corresponding channel has hung up, or if it waits more than `timeout`.
1194     ///
1195     /// This function will always block the current thread if there is no data
1196     /// available and it's possible for more data to be sent. Once a message is
1197     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1198     /// receiver will wake up and return that message.
1199     ///
1200     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1201     /// this call is blocking, this call will wake up and return [`Err`] to
1202     /// indicate that no more messages can ever be received on this channel.
1203     /// However, since channels are buffered, messages sent before the disconnect
1204     /// will still be properly received.
1205     ///
1206     /// [`Sender`]: struct.Sender.html
1207     /// [`SyncSender`]: struct.SyncSender.html
1208     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1209     ///
1210     /// # Known Issues
1211     ///
1212     /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
1213     /// to panic unexpectedly with the following example:
1214     ///
1215     /// ```no_run
1216     /// use std::sync::mpsc::channel;
1217     /// use std::thread;
1218     /// use std::time::Duration;
1219     ///
1220     /// let (tx, rx) = channel::<String>();
1221     ///
1222     /// thread::spawn(move || {
1223     ///     let d = Duration::from_millis(10);
1224     ///     loop {
1225     ///         println!("recv");
1226     ///         let _r = rx.recv_timeout(d);
1227     ///     }
1228     /// });
1229     ///
1230     /// thread::sleep(Duration::from_millis(100));
1231     /// let _c1 = tx.clone();
1232     ///
1233     /// thread::sleep(Duration::from_secs(1));
1234     /// ```
1235     ///
1236     /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
1237     ///
1238     /// # Examples
1239     ///
1240     /// Successfully receiving value before encountering timeout:
1241     ///
1242     /// ```no_run
1243     /// use std::thread;
1244     /// use std::time::Duration;
1245     /// use std::sync::mpsc;
1246     ///
1247     /// let (send, recv) = mpsc::channel();
1248     ///
1249     /// thread::spawn(move || {
1250     ///     send.send('a').unwrap();
1251     /// });
1252     ///
1253     /// assert_eq!(
1254     ///     recv.recv_timeout(Duration::from_millis(400)),
1255     ///     Ok('a')
1256     /// );
1257     /// ```
1258     ///
1259     /// Receiving an error upon reaching timeout:
1260     ///
1261     /// ```no_run
1262     /// use std::thread;
1263     /// use std::time::Duration;
1264     /// use std::sync::mpsc;
1265     ///
1266     /// let (send, recv) = mpsc::channel();
1267     ///
1268     /// thread::spawn(move || {
1269     ///     thread::sleep(Duration::from_millis(800));
1270     ///     send.send('a').unwrap();
1271     /// });
1272     ///
1273     /// assert_eq!(
1274     ///     recv.recv_timeout(Duration::from_millis(400)),
1275     ///     Err(mpsc::RecvTimeoutError::Timeout)
1276     /// );
1277     /// ```
1278     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1279     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1280         // Do an optimistic try_recv to avoid the performance impact of
1281         // Instant::now() in the full-channel case.
1282         match self.try_recv() {
1283             Ok(result) => Ok(result),
1284             Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
1285             Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
1286                 Some(deadline) => self.recv_deadline(deadline),
1287                 // So far in the future that it's practically the same as waiting indefinitely.
1288                 None => self.recv().map_err(RecvTimeoutError::from),
1289             },
1290         }
1291     }
1292
1293     /// Attempts to wait for a value on this receiver, returning an error if the
1294     /// corresponding channel has hung up, or if `deadline` is reached.
1295     ///
1296     /// This function will always block the current thread if there is no data
1297     /// available and it's possible for more data to be sent. Once a message is
1298     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1299     /// receiver will wake up and return that message.
1300     ///
1301     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1302     /// this call is blocking, this call will wake up and return [`Err`] to
1303     /// indicate that no more messages can ever be received on this channel.
1304     /// However, since channels are buffered, messages sent before the disconnect
1305     /// will still be properly received.
1306     ///
1307     /// [`Sender`]: struct.Sender.html
1308     /// [`SyncSender`]: struct.SyncSender.html
1309     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1310     ///
1311     /// # Examples
1312     ///
1313     /// Successfully receiving value before reaching deadline:
1314     ///
1315     /// ```no_run
1316     /// #![feature(deadline_api)]
1317     /// use std::thread;
1318     /// use std::time::{Duration, Instant};
1319     /// use std::sync::mpsc;
1320     ///
1321     /// let (send, recv) = mpsc::channel();
1322     ///
1323     /// thread::spawn(move || {
1324     ///     send.send('a').unwrap();
1325     /// });
1326     ///
1327     /// assert_eq!(
1328     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1329     ///     Ok('a')
1330     /// );
1331     /// ```
1332     ///
1333     /// Receiving an error upon reaching deadline:
1334     ///
1335     /// ```no_run
1336     /// #![feature(deadline_api)]
1337     /// use std::thread;
1338     /// use std::time::{Duration, Instant};
1339     /// use std::sync::mpsc;
1340     ///
1341     /// let (send, recv) = mpsc::channel();
1342     ///
1343     /// thread::spawn(move || {
1344     ///     thread::sleep(Duration::from_millis(800));
1345     ///     send.send('a').unwrap();
1346     /// });
1347     ///
1348     /// assert_eq!(
1349     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1350     ///     Err(mpsc::RecvTimeoutError::Timeout)
1351     /// );
1352     /// ```
1353     #[unstable(feature = "deadline_api", issue = "46316")]
1354     pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1355         use self::RecvTimeoutError::*;
1356
1357         loop {
1358             let port_or_empty = match *unsafe { self.inner() } {
1359                 Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) {
1360                     Ok(t) => return Ok(t),
1361                     Err(oneshot::Disconnected) => return Err(Disconnected),
1362                     Err(oneshot::Upgraded(rx)) => Some(rx),
1363                     Err(oneshot::Empty) => None,
1364                 },
1365                 Flavor::Stream(ref p) => match p.recv(Some(deadline)) {
1366                     Ok(t) => return Ok(t),
1367                     Err(stream::Disconnected) => return Err(Disconnected),
1368                     Err(stream::Upgraded(rx)) => Some(rx),
1369                     Err(stream::Empty) => None,
1370                 },
1371                 Flavor::Shared(ref p) => match p.recv(Some(deadline)) {
1372                     Ok(t) => return Ok(t),
1373                     Err(shared::Disconnected) => return Err(Disconnected),
1374                     Err(shared::Empty) => None,
1375                 },
1376                 Flavor::Sync(ref p) => match p.recv(Some(deadline)) {
1377                     Ok(t) => return Ok(t),
1378                     Err(sync::Disconnected) => return Err(Disconnected),
1379                     Err(sync::Empty) => None,
1380                 },
1381             };
1382
1383             if let Some(new_port) = port_or_empty {
1384                 unsafe {
1385                     mem::swap(self.inner_mut(), new_port.inner_mut());
1386                 }
1387             }
1388
1389             // If we're already passed the deadline, and we're here without
1390             // data, return a timeout, else try again.
1391             if Instant::now() >= deadline {
1392                 return Err(Timeout);
1393             }
1394         }
1395     }
1396
1397     /// Returns an iterator that will block waiting for messages, but never
1398     /// [`panic!`]. It will return [`None`] when the channel has hung up.
1399     ///
1400     /// [`panic!`]: ../../../std/macro.panic.html
1401     /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1402     ///
1403     /// # Examples
1404     ///
1405     /// ```rust
1406     /// use std::sync::mpsc::channel;
1407     /// use std::thread;
1408     ///
1409     /// let (send, recv) = channel();
1410     ///
1411     /// thread::spawn(move || {
1412     ///     send.send(1).unwrap();
1413     ///     send.send(2).unwrap();
1414     ///     send.send(3).unwrap();
1415     /// });
1416     ///
1417     /// let mut iter = recv.iter();
1418     /// assert_eq!(iter.next(), Some(1));
1419     /// assert_eq!(iter.next(), Some(2));
1420     /// assert_eq!(iter.next(), Some(3));
1421     /// assert_eq!(iter.next(), None);
1422     /// ```
1423     #[stable(feature = "rust1", since = "1.0.0")]
1424     pub fn iter(&self) -> Iter<'_, T> {
1425         Iter { rx: self }
1426     }
1427
1428     /// Returns an iterator that will attempt to yield all pending values.
1429     /// It will return `None` if there are no more pending values or if the
1430     /// channel has hung up. The iterator will never [`panic!`] or block the
1431     /// user by waiting for values.
1432     ///
1433     /// [`panic!`]: ../../../std/macro.panic.html
1434     ///
1435     /// # Examples
1436     ///
1437     /// ```no_run
1438     /// use std::sync::mpsc::channel;
1439     /// use std::thread;
1440     /// use std::time::Duration;
1441     ///
1442     /// let (sender, receiver) = channel();
1443     ///
1444     /// // nothing is in the buffer yet
1445     /// assert!(receiver.try_iter().next().is_none());
1446     ///
1447     /// thread::spawn(move || {
1448     ///     thread::sleep(Duration::from_secs(1));
1449     ///     sender.send(1).unwrap();
1450     ///     sender.send(2).unwrap();
1451     ///     sender.send(3).unwrap();
1452     /// });
1453     ///
1454     /// // nothing is in the buffer yet
1455     /// assert!(receiver.try_iter().next().is_none());
1456     ///
1457     /// // block for two seconds
1458     /// thread::sleep(Duration::from_secs(2));
1459     ///
1460     /// let mut iter = receiver.try_iter();
1461     /// assert_eq!(iter.next(), Some(1));
1462     /// assert_eq!(iter.next(), Some(2));
1463     /// assert_eq!(iter.next(), Some(3));
1464     /// assert_eq!(iter.next(), None);
1465     /// ```
1466     #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1467     pub fn try_iter(&self) -> TryIter<'_, T> {
1468         TryIter { rx: self }
1469     }
1470 }
1471
1472 #[stable(feature = "rust1", since = "1.0.0")]
1473 impl<'a, T> Iterator for Iter<'a, T> {
1474     type Item = T;
1475
1476     fn next(&mut self) -> Option<T> {
1477         self.rx.recv().ok()
1478     }
1479 }
1480
1481 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1482 impl<'a, T> Iterator for TryIter<'a, T> {
1483     type Item = T;
1484
1485     fn next(&mut self) -> Option<T> {
1486         self.rx.try_recv().ok()
1487     }
1488 }
1489
1490 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1491 impl<'a, T> IntoIterator for &'a Receiver<T> {
1492     type Item = T;
1493     type IntoIter = Iter<'a, T>;
1494
1495     fn into_iter(self) -> Iter<'a, T> {
1496         self.iter()
1497     }
1498 }
1499
1500 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1501 impl<T> Iterator for IntoIter<T> {
1502     type Item = T;
1503     fn next(&mut self) -> Option<T> {
1504         self.rx.recv().ok()
1505     }
1506 }
1507
1508 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1509 impl<T> IntoIterator for Receiver<T> {
1510     type Item = T;
1511     type IntoIter = IntoIter<T>;
1512
1513     fn into_iter(self) -> IntoIter<T> {
1514         IntoIter { rx: self }
1515     }
1516 }
1517
1518 #[stable(feature = "rust1", since = "1.0.0")]
1519 impl<T> Drop for Receiver<T> {
1520     fn drop(&mut self) {
1521         match *unsafe { self.inner() } {
1522             Flavor::Oneshot(ref p) => p.drop_port(),
1523             Flavor::Stream(ref p) => p.drop_port(),
1524             Flavor::Shared(ref p) => p.drop_port(),
1525             Flavor::Sync(ref p) => p.drop_port(),
1526         }
1527     }
1528 }
1529
1530 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1531 impl<T> fmt::Debug for Receiver<T> {
1532     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1533         f.debug_struct("Receiver").finish()
1534     }
1535 }
1536
1537 #[stable(feature = "rust1", since = "1.0.0")]
1538 impl<T> fmt::Debug for SendError<T> {
1539     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1540         "SendError(..)".fmt(f)
1541     }
1542 }
1543
1544 #[stable(feature = "rust1", since = "1.0.0")]
1545 impl<T> fmt::Display for SendError<T> {
1546     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1547         "sending on a closed channel".fmt(f)
1548     }
1549 }
1550
1551 #[stable(feature = "rust1", since = "1.0.0")]
1552 impl<T: Send> error::Error for SendError<T> {
1553     fn description(&self) -> &str {
1554         "sending on a closed channel"
1555     }
1556 }
1557
1558 #[stable(feature = "rust1", since = "1.0.0")]
1559 impl<T> fmt::Debug for TrySendError<T> {
1560     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1561         match *self {
1562             TrySendError::Full(..) => "Full(..)".fmt(f),
1563             TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1564         }
1565     }
1566 }
1567
1568 #[stable(feature = "rust1", since = "1.0.0")]
1569 impl<T> fmt::Display for TrySendError<T> {
1570     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1571         match *self {
1572             TrySendError::Full(..) => "sending on a full channel".fmt(f),
1573             TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
1574         }
1575     }
1576 }
1577
1578 #[stable(feature = "rust1", since = "1.0.0")]
1579 impl<T: Send> error::Error for TrySendError<T> {
1580     fn description(&self) -> &str {
1581         match *self {
1582             TrySendError::Full(..) => "sending on a full channel",
1583             TrySendError::Disconnected(..) => "sending on a closed channel",
1584         }
1585     }
1586 }
1587
1588 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1589 impl<T> From<SendError<T>> for TrySendError<T> {
1590     fn from(err: SendError<T>) -> TrySendError<T> {
1591         match err {
1592             SendError(t) => TrySendError::Disconnected(t),
1593         }
1594     }
1595 }
1596
1597 #[stable(feature = "rust1", since = "1.0.0")]
1598 impl fmt::Display for RecvError {
1599     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1600         "receiving on a closed channel".fmt(f)
1601     }
1602 }
1603
1604 #[stable(feature = "rust1", since = "1.0.0")]
1605 impl error::Error for RecvError {
1606     fn description(&self) -> &str {
1607         "receiving on a closed channel"
1608     }
1609 }
1610
1611 #[stable(feature = "rust1", since = "1.0.0")]
1612 impl fmt::Display for TryRecvError {
1613     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1614         match *self {
1615             TryRecvError::Empty => "receiving on an empty channel".fmt(f),
1616             TryRecvError::Disconnected => "receiving on a closed channel".fmt(f),
1617         }
1618     }
1619 }
1620
1621 #[stable(feature = "rust1", since = "1.0.0")]
1622 impl error::Error for TryRecvError {
1623     fn description(&self) -> &str {
1624         match *self {
1625             TryRecvError::Empty => "receiving on an empty channel",
1626             TryRecvError::Disconnected => "receiving on a closed channel",
1627         }
1628     }
1629 }
1630
1631 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1632 impl From<RecvError> for TryRecvError {
1633     fn from(err: RecvError) -> TryRecvError {
1634         match err {
1635             RecvError => TryRecvError::Disconnected,
1636         }
1637     }
1638 }
1639
1640 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1641 impl fmt::Display for RecvTimeoutError {
1642     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1643         match *self {
1644             RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f),
1645             RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f),
1646         }
1647     }
1648 }
1649
1650 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1651 impl error::Error for RecvTimeoutError {
1652     fn description(&self) -> &str {
1653         match *self {
1654             RecvTimeoutError::Timeout => "timed out waiting on channel",
1655             RecvTimeoutError::Disconnected => "channel is empty and sending half is closed",
1656         }
1657     }
1658 }
1659
1660 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1661 impl From<RecvError> for RecvTimeoutError {
1662     fn from(err: RecvError) -> RecvTimeoutError {
1663         match err {
1664             RecvError => RecvTimeoutError::Disconnected,
1665         }
1666     }
1667 }
1668
1669 #[cfg(all(test, not(target_os = "emscripten")))]
1670 mod tests {
1671     use super::*;
1672     use crate::env;
1673     use crate::thread;
1674     use crate::time::{Duration, Instant};
1675
1676     pub fn stress_factor() -> usize {
1677         match env::var("RUST_TEST_STRESS") {
1678             Ok(val) => val.parse().unwrap(),
1679             Err(..) => 1,
1680         }
1681     }
1682
1683     #[test]
1684     fn smoke() {
1685         let (tx, rx) = channel::<i32>();
1686         tx.send(1).unwrap();
1687         assert_eq!(rx.recv().unwrap(), 1);
1688     }
1689
1690     #[test]
1691     fn drop_full() {
1692         let (tx, _rx) = channel::<Box<isize>>();
1693         tx.send(box 1).unwrap();
1694     }
1695
1696     #[test]
1697     fn drop_full_shared() {
1698         let (tx, _rx) = channel::<Box<isize>>();
1699         drop(tx.clone());
1700         drop(tx.clone());
1701         tx.send(box 1).unwrap();
1702     }
1703
1704     #[test]
1705     fn smoke_shared() {
1706         let (tx, rx) = channel::<i32>();
1707         tx.send(1).unwrap();
1708         assert_eq!(rx.recv().unwrap(), 1);
1709         let tx = tx.clone();
1710         tx.send(1).unwrap();
1711         assert_eq!(rx.recv().unwrap(), 1);
1712     }
1713
1714     #[test]
1715     fn smoke_threads() {
1716         let (tx, rx) = channel::<i32>();
1717         let _t = thread::spawn(move || {
1718             tx.send(1).unwrap();
1719         });
1720         assert_eq!(rx.recv().unwrap(), 1);
1721     }
1722
1723     #[test]
1724     fn smoke_port_gone() {
1725         let (tx, rx) = channel::<i32>();
1726         drop(rx);
1727         assert!(tx.send(1).is_err());
1728     }
1729
1730     #[test]
1731     fn smoke_shared_port_gone() {
1732         let (tx, rx) = channel::<i32>();
1733         drop(rx);
1734         assert!(tx.send(1).is_err())
1735     }
1736
1737     #[test]
1738     fn smoke_shared_port_gone2() {
1739         let (tx, rx) = channel::<i32>();
1740         drop(rx);
1741         let tx2 = tx.clone();
1742         drop(tx);
1743         assert!(tx2.send(1).is_err());
1744     }
1745
1746     #[test]
1747     fn port_gone_concurrent() {
1748         let (tx, rx) = channel::<i32>();
1749         let _t = thread::spawn(move || {
1750             rx.recv().unwrap();
1751         });
1752         while tx.send(1).is_ok() {}
1753     }
1754
1755     #[test]
1756     fn port_gone_concurrent_shared() {
1757         let (tx, rx) = channel::<i32>();
1758         let tx2 = tx.clone();
1759         let _t = thread::spawn(move || {
1760             rx.recv().unwrap();
1761         });
1762         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1763     }
1764
1765     #[test]
1766     fn smoke_chan_gone() {
1767         let (tx, rx) = channel::<i32>();
1768         drop(tx);
1769         assert!(rx.recv().is_err());
1770     }
1771
1772     #[test]
1773     fn smoke_chan_gone_shared() {
1774         let (tx, rx) = channel::<()>();
1775         let tx2 = tx.clone();
1776         drop(tx);
1777         drop(tx2);
1778         assert!(rx.recv().is_err());
1779     }
1780
1781     #[test]
1782     fn chan_gone_concurrent() {
1783         let (tx, rx) = channel::<i32>();
1784         let _t = thread::spawn(move || {
1785             tx.send(1).unwrap();
1786             tx.send(1).unwrap();
1787         });
1788         while rx.recv().is_ok() {}
1789     }
1790
1791     #[test]
1792     fn stress() {
1793         let (tx, rx) = channel::<i32>();
1794         let t = thread::spawn(move || {
1795             for _ in 0..10000 {
1796                 tx.send(1).unwrap();
1797             }
1798         });
1799         for _ in 0..10000 {
1800             assert_eq!(rx.recv().unwrap(), 1);
1801         }
1802         t.join().ok().expect("thread panicked");
1803     }
1804
1805     #[test]
1806     fn stress_shared() {
1807         const AMT: u32 = 10000;
1808         const NTHREADS: u32 = 8;
1809         let (tx, rx) = channel::<i32>();
1810
1811         let t = thread::spawn(move || {
1812             for _ in 0..AMT * NTHREADS {
1813                 assert_eq!(rx.recv().unwrap(), 1);
1814             }
1815             match rx.try_recv() {
1816                 Ok(..) => panic!(),
1817                 _ => {}
1818             }
1819         });
1820
1821         for _ in 0..NTHREADS {
1822             let tx = tx.clone();
1823             thread::spawn(move || {
1824                 for _ in 0..AMT {
1825                     tx.send(1).unwrap();
1826                 }
1827             });
1828         }
1829         drop(tx);
1830         t.join().ok().expect("thread panicked");
1831     }
1832
1833     #[test]
1834     fn send_from_outside_runtime() {
1835         let (tx1, rx1) = channel::<()>();
1836         let (tx2, rx2) = channel::<i32>();
1837         let t1 = thread::spawn(move || {
1838             tx1.send(()).unwrap();
1839             for _ in 0..40 {
1840                 assert_eq!(rx2.recv().unwrap(), 1);
1841             }
1842         });
1843         rx1.recv().unwrap();
1844         let t2 = thread::spawn(move || {
1845             for _ in 0..40 {
1846                 tx2.send(1).unwrap();
1847             }
1848         });
1849         t1.join().ok().expect("thread panicked");
1850         t2.join().ok().expect("thread panicked");
1851     }
1852
1853     #[test]
1854     fn recv_from_outside_runtime() {
1855         let (tx, rx) = channel::<i32>();
1856         let t = thread::spawn(move || {
1857             for _ in 0..40 {
1858                 assert_eq!(rx.recv().unwrap(), 1);
1859             }
1860         });
1861         for _ in 0..40 {
1862             tx.send(1).unwrap();
1863         }
1864         t.join().ok().expect("thread panicked");
1865     }
1866
1867     #[test]
1868     fn no_runtime() {
1869         let (tx1, rx1) = channel::<i32>();
1870         let (tx2, rx2) = channel::<i32>();
1871         let t1 = thread::spawn(move || {
1872             assert_eq!(rx1.recv().unwrap(), 1);
1873             tx2.send(2).unwrap();
1874         });
1875         let t2 = thread::spawn(move || {
1876             tx1.send(1).unwrap();
1877             assert_eq!(rx2.recv().unwrap(), 2);
1878         });
1879         t1.join().ok().expect("thread panicked");
1880         t2.join().ok().expect("thread panicked");
1881     }
1882
1883     #[test]
1884     fn oneshot_single_thread_close_port_first() {
1885         // Simple test of closing without sending
1886         let (_tx, rx) = channel::<i32>();
1887         drop(rx);
1888     }
1889
1890     #[test]
1891     fn oneshot_single_thread_close_chan_first() {
1892         // Simple test of closing without sending
1893         let (tx, _rx) = channel::<i32>();
1894         drop(tx);
1895     }
1896
1897     #[test]
1898     fn oneshot_single_thread_send_port_close() {
1899         // Testing that the sender cleans up the payload if receiver is closed
1900         let (tx, rx) = channel::<Box<i32>>();
1901         drop(rx);
1902         assert!(tx.send(box 0).is_err());
1903     }
1904
1905     #[test]
1906     fn oneshot_single_thread_recv_chan_close() {
1907         // Receiving on a closed chan will panic
1908         let res = thread::spawn(move || {
1909             let (tx, rx) = channel::<i32>();
1910             drop(tx);
1911             rx.recv().unwrap();
1912         })
1913         .join();
1914         // What is our res?
1915         assert!(res.is_err());
1916     }
1917
1918     #[test]
1919     fn oneshot_single_thread_send_then_recv() {
1920         let (tx, rx) = channel::<Box<i32>>();
1921         tx.send(box 10).unwrap();
1922         assert!(*rx.recv().unwrap() == 10);
1923     }
1924
1925     #[test]
1926     fn oneshot_single_thread_try_send_open() {
1927         let (tx, rx) = channel::<i32>();
1928         assert!(tx.send(10).is_ok());
1929         assert!(rx.recv().unwrap() == 10);
1930     }
1931
1932     #[test]
1933     fn oneshot_single_thread_try_send_closed() {
1934         let (tx, rx) = channel::<i32>();
1935         drop(rx);
1936         assert!(tx.send(10).is_err());
1937     }
1938
1939     #[test]
1940     fn oneshot_single_thread_try_recv_open() {
1941         let (tx, rx) = channel::<i32>();
1942         tx.send(10).unwrap();
1943         assert!(rx.recv() == Ok(10));
1944     }
1945
1946     #[test]
1947     fn oneshot_single_thread_try_recv_closed() {
1948         let (tx, rx) = channel::<i32>();
1949         drop(tx);
1950         assert!(rx.recv().is_err());
1951     }
1952
1953     #[test]
1954     fn oneshot_single_thread_peek_data() {
1955         let (tx, rx) = channel::<i32>();
1956         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1957         tx.send(10).unwrap();
1958         assert_eq!(rx.try_recv(), Ok(10));
1959     }
1960
1961     #[test]
1962     fn oneshot_single_thread_peek_close() {
1963         let (tx, rx) = channel::<i32>();
1964         drop(tx);
1965         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1966         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1967     }
1968
1969     #[test]
1970     fn oneshot_single_thread_peek_open() {
1971         let (_tx, rx) = channel::<i32>();
1972         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1973     }
1974
1975     #[test]
1976     fn oneshot_multi_task_recv_then_send() {
1977         let (tx, rx) = channel::<Box<i32>>();
1978         let _t = thread::spawn(move || {
1979             assert!(*rx.recv().unwrap() == 10);
1980         });
1981
1982         tx.send(box 10).unwrap();
1983     }
1984
1985     #[test]
1986     fn oneshot_multi_task_recv_then_close() {
1987         let (tx, rx) = channel::<Box<i32>>();
1988         let _t = thread::spawn(move || {
1989             drop(tx);
1990         });
1991         let res = thread::spawn(move || {
1992             assert!(*rx.recv().unwrap() == 10);
1993         })
1994         .join();
1995         assert!(res.is_err());
1996     }
1997
1998     #[test]
1999     fn oneshot_multi_thread_close_stress() {
2000         for _ in 0..stress_factor() {
2001             let (tx, rx) = channel::<i32>();
2002             let _t = thread::spawn(move || {
2003                 drop(rx);
2004             });
2005             drop(tx);
2006         }
2007     }
2008
2009     #[test]
2010     fn oneshot_multi_thread_send_close_stress() {
2011         for _ in 0..stress_factor() {
2012             let (tx, rx) = channel::<i32>();
2013             let _t = thread::spawn(move || {
2014                 drop(rx);
2015             });
2016             let _ = thread::spawn(move || {
2017                 tx.send(1).unwrap();
2018             })
2019             .join();
2020         }
2021     }
2022
2023     #[test]
2024     fn oneshot_multi_thread_recv_close_stress() {
2025         for _ in 0..stress_factor() {
2026             let (tx, rx) = channel::<i32>();
2027             thread::spawn(move || {
2028                 let res = thread::spawn(move || {
2029                     rx.recv().unwrap();
2030                 })
2031                 .join();
2032                 assert!(res.is_err());
2033             });
2034             let _t = thread::spawn(move || {
2035                 thread::spawn(move || {
2036                     drop(tx);
2037                 });
2038             });
2039         }
2040     }
2041
2042     #[test]
2043     fn oneshot_multi_thread_send_recv_stress() {
2044         for _ in 0..stress_factor() {
2045             let (tx, rx) = channel::<Box<isize>>();
2046             let _t = thread::spawn(move || {
2047                 tx.send(box 10).unwrap();
2048             });
2049             assert!(*rx.recv().unwrap() == 10);
2050         }
2051     }
2052
2053     #[test]
2054     fn stream_send_recv_stress() {
2055         for _ in 0..stress_factor() {
2056             let (tx, rx) = channel();
2057
2058             send(tx, 0);
2059             recv(rx, 0);
2060
2061             fn send(tx: Sender<Box<i32>>, i: i32) {
2062                 if i == 10 {
2063                     return;
2064                 }
2065
2066                 thread::spawn(move || {
2067                     tx.send(box i).unwrap();
2068                     send(tx, i + 1);
2069                 });
2070             }
2071
2072             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2073                 if i == 10 {
2074                     return;
2075                 }
2076
2077                 thread::spawn(move || {
2078                     assert!(*rx.recv().unwrap() == i);
2079                     recv(rx, i + 1);
2080                 });
2081             }
2082         }
2083     }
2084
2085     #[test]
2086     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2087     fn oneshot_single_thread_recv_timeout() {
2088         let (tx, rx) = channel();
2089         tx.send(()).unwrap();
2090         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2091         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2092         tx.send(()).unwrap();
2093         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2094     }
2095
2096     #[test]
2097     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2098     fn stress_recv_timeout_two_threads() {
2099         let (tx, rx) = channel();
2100         let stress = stress_factor() + 100;
2101         let timeout = Duration::from_millis(100);
2102
2103         thread::spawn(move || {
2104             for i in 0..stress {
2105                 if i % 2 == 0 {
2106                     thread::sleep(timeout * 2);
2107                 }
2108                 tx.send(1usize).unwrap();
2109             }
2110         });
2111
2112         let mut recv_count = 0;
2113         loop {
2114             match rx.recv_timeout(timeout) {
2115                 Ok(n) => {
2116                     assert_eq!(n, 1usize);
2117                     recv_count += 1;
2118                 }
2119                 Err(RecvTimeoutError::Timeout) => continue,
2120                 Err(RecvTimeoutError::Disconnected) => break,
2121             }
2122         }
2123
2124         assert_eq!(recv_count, stress);
2125     }
2126
2127     #[test]
2128     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2129     fn recv_timeout_upgrade() {
2130         let (tx, rx) = channel::<()>();
2131         let timeout = Duration::from_millis(1);
2132         let _tx_clone = tx.clone();
2133
2134         let start = Instant::now();
2135         assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
2136         assert!(Instant::now() >= start + timeout);
2137     }
2138
2139     #[test]
2140     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2141     fn stress_recv_timeout_shared() {
2142         let (tx, rx) = channel();
2143         let stress = stress_factor() + 100;
2144
2145         for i in 0..stress {
2146             let tx = tx.clone();
2147             thread::spawn(move || {
2148                 thread::sleep(Duration::from_millis(i as u64 * 10));
2149                 tx.send(1usize).unwrap();
2150             });
2151         }
2152
2153         drop(tx);
2154
2155         let mut recv_count = 0;
2156         loop {
2157             match rx.recv_timeout(Duration::from_millis(10)) {
2158                 Ok(n) => {
2159                     assert_eq!(n, 1usize);
2160                     recv_count += 1;
2161                 }
2162                 Err(RecvTimeoutError::Timeout) => continue,
2163                 Err(RecvTimeoutError::Disconnected) => break,
2164             }
2165         }
2166
2167         assert_eq!(recv_count, stress);
2168     }
2169
2170     #[test]
2171     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2172     fn very_long_recv_timeout_wont_panic() {
2173         let (tx, rx) = channel::<()>();
2174         let join_handle =
2175             thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::max_value())));
2176         thread::sleep(Duration::from_secs(1));
2177         assert!(tx.send(()).is_ok());
2178         assert_eq!(join_handle.join().unwrap(), Ok(()));
2179     }
2180
2181     #[test]
2182     fn recv_a_lot() {
2183         // Regression test that we don't run out of stack in scheduler context
2184         let (tx, rx) = channel();
2185         for _ in 0..10000 {
2186             tx.send(()).unwrap();
2187         }
2188         for _ in 0..10000 {
2189             rx.recv().unwrap();
2190         }
2191     }
2192
2193     #[test]
2194     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2195     fn shared_recv_timeout() {
2196         let (tx, rx) = channel();
2197         let total = 5;
2198         for _ in 0..total {
2199             let tx = tx.clone();
2200             thread::spawn(move || {
2201                 tx.send(()).unwrap();
2202             });
2203         }
2204
2205         for _ in 0..total {
2206             rx.recv().unwrap();
2207         }
2208
2209         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2210         tx.send(()).unwrap();
2211         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2212     }
2213
2214     #[test]
2215     fn shared_chan_stress() {
2216         let (tx, rx) = channel();
2217         let total = stress_factor() + 100;
2218         for _ in 0..total {
2219             let tx = tx.clone();
2220             thread::spawn(move || {
2221                 tx.send(()).unwrap();
2222             });
2223         }
2224
2225         for _ in 0..total {
2226             rx.recv().unwrap();
2227         }
2228     }
2229
2230     #[test]
2231     fn test_nested_recv_iter() {
2232         let (tx, rx) = channel::<i32>();
2233         let (total_tx, total_rx) = channel::<i32>();
2234
2235         let _t = thread::spawn(move || {
2236             let mut acc = 0;
2237             for x in rx.iter() {
2238                 acc += x;
2239             }
2240             total_tx.send(acc).unwrap();
2241         });
2242
2243         tx.send(3).unwrap();
2244         tx.send(1).unwrap();
2245         tx.send(2).unwrap();
2246         drop(tx);
2247         assert_eq!(total_rx.recv().unwrap(), 6);
2248     }
2249
2250     #[test]
2251     fn test_recv_iter_break() {
2252         let (tx, rx) = channel::<i32>();
2253         let (count_tx, count_rx) = channel();
2254
2255         let _t = thread::spawn(move || {
2256             let mut count = 0;
2257             for x in rx.iter() {
2258                 if count >= 3 {
2259                     break;
2260                 } else {
2261                     count += x;
2262                 }
2263             }
2264             count_tx.send(count).unwrap();
2265         });
2266
2267         tx.send(2).unwrap();
2268         tx.send(2).unwrap();
2269         tx.send(2).unwrap();
2270         let _ = tx.send(2);
2271         drop(tx);
2272         assert_eq!(count_rx.recv().unwrap(), 4);
2273     }
2274
2275     #[test]
2276     fn test_recv_try_iter() {
2277         let (request_tx, request_rx) = channel();
2278         let (response_tx, response_rx) = channel();
2279
2280         // Request `x`s until we have `6`.
2281         let t = thread::spawn(move || {
2282             let mut count = 0;
2283             loop {
2284                 for x in response_rx.try_iter() {
2285                     count += x;
2286                     if count == 6 {
2287                         return count;
2288                     }
2289                 }
2290                 request_tx.send(()).unwrap();
2291             }
2292         });
2293
2294         for _ in request_rx.iter() {
2295             if response_tx.send(2).is_err() {
2296                 break;
2297             }
2298         }
2299
2300         assert_eq!(t.join().unwrap(), 6);
2301     }
2302
2303     #[test]
2304     fn test_recv_into_iter_owned() {
2305         let mut iter = {
2306             let (tx, rx) = channel::<i32>();
2307             tx.send(1).unwrap();
2308             tx.send(2).unwrap();
2309
2310             rx.into_iter()
2311         };
2312         assert_eq!(iter.next().unwrap(), 1);
2313         assert_eq!(iter.next().unwrap(), 2);
2314         assert_eq!(iter.next().is_none(), true);
2315     }
2316
2317     #[test]
2318     fn test_recv_into_iter_borrowed() {
2319         let (tx, rx) = channel::<i32>();
2320         tx.send(1).unwrap();
2321         tx.send(2).unwrap();
2322         drop(tx);
2323         let mut iter = (&rx).into_iter();
2324         assert_eq!(iter.next().unwrap(), 1);
2325         assert_eq!(iter.next().unwrap(), 2);
2326         assert_eq!(iter.next().is_none(), true);
2327     }
2328
2329     #[test]
2330     fn try_recv_states() {
2331         let (tx1, rx1) = channel::<i32>();
2332         let (tx2, rx2) = channel::<()>();
2333         let (tx3, rx3) = channel::<()>();
2334         let _t = thread::spawn(move || {
2335             rx2.recv().unwrap();
2336             tx1.send(1).unwrap();
2337             tx3.send(()).unwrap();
2338             rx2.recv().unwrap();
2339             drop(tx1);
2340             tx3.send(()).unwrap();
2341         });
2342
2343         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2344         tx2.send(()).unwrap();
2345         rx3.recv().unwrap();
2346         assert_eq!(rx1.try_recv(), Ok(1));
2347         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2348         tx2.send(()).unwrap();
2349         rx3.recv().unwrap();
2350         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2351     }
2352
2353     // This bug used to end up in a livelock inside of the Receiver destructor
2354     // because the internal state of the Shared packet was corrupted
2355     #[test]
2356     fn destroy_upgraded_shared_port_when_sender_still_active() {
2357         let (tx, rx) = channel();
2358         let (tx2, rx2) = channel();
2359         let _t = thread::spawn(move || {
2360             rx.recv().unwrap(); // wait on a oneshot
2361             drop(rx); // destroy a shared
2362             tx2.send(()).unwrap();
2363         });
2364         // make sure the other thread has gone to sleep
2365         for _ in 0..5000 {
2366             thread::yield_now();
2367         }
2368
2369         // upgrade to a shared chan and send a message
2370         let t = tx.clone();
2371         drop(tx);
2372         t.send(()).unwrap();
2373
2374         // wait for the child thread to exit before we exit
2375         rx2.recv().unwrap();
2376     }
2377
2378     #[test]
2379     fn issue_32114() {
2380         let (tx, _) = channel();
2381         let _ = tx.send(123);
2382         assert_eq!(tx.send(123), Err(SendError(123)));
2383     }
2384 }
2385
2386 #[cfg(all(test, not(target_os = "emscripten")))]
2387 mod sync_tests {
2388     use super::*;
2389     use crate::env;
2390     use crate::thread;
2391     use crate::time::Duration;
2392
2393     pub fn stress_factor() -> usize {
2394         match env::var("RUST_TEST_STRESS") {
2395             Ok(val) => val.parse().unwrap(),
2396             Err(..) => 1,
2397         }
2398     }
2399
2400     #[test]
2401     fn smoke() {
2402         let (tx, rx) = sync_channel::<i32>(1);
2403         tx.send(1).unwrap();
2404         assert_eq!(rx.recv().unwrap(), 1);
2405     }
2406
2407     #[test]
2408     fn drop_full() {
2409         let (tx, _rx) = sync_channel::<Box<isize>>(1);
2410         tx.send(box 1).unwrap();
2411     }
2412
2413     #[test]
2414     fn smoke_shared() {
2415         let (tx, rx) = sync_channel::<i32>(1);
2416         tx.send(1).unwrap();
2417         assert_eq!(rx.recv().unwrap(), 1);
2418         let tx = tx.clone();
2419         tx.send(1).unwrap();
2420         assert_eq!(rx.recv().unwrap(), 1);
2421     }
2422
2423     #[test]
2424     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2425     fn recv_timeout() {
2426         let (tx, rx) = sync_channel::<i32>(1);
2427         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2428         tx.send(1).unwrap();
2429         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2430     }
2431
2432     #[test]
2433     fn smoke_threads() {
2434         let (tx, rx) = sync_channel::<i32>(0);
2435         let _t = thread::spawn(move || {
2436             tx.send(1).unwrap();
2437         });
2438         assert_eq!(rx.recv().unwrap(), 1);
2439     }
2440
2441     #[test]
2442     fn smoke_port_gone() {
2443         let (tx, rx) = sync_channel::<i32>(0);
2444         drop(rx);
2445         assert!(tx.send(1).is_err());
2446     }
2447
2448     #[test]
2449     fn smoke_shared_port_gone2() {
2450         let (tx, rx) = sync_channel::<i32>(0);
2451         drop(rx);
2452         let tx2 = tx.clone();
2453         drop(tx);
2454         assert!(tx2.send(1).is_err());
2455     }
2456
2457     #[test]
2458     fn port_gone_concurrent() {
2459         let (tx, rx) = sync_channel::<i32>(0);
2460         let _t = thread::spawn(move || {
2461             rx.recv().unwrap();
2462         });
2463         while tx.send(1).is_ok() {}
2464     }
2465
2466     #[test]
2467     fn port_gone_concurrent_shared() {
2468         let (tx, rx) = sync_channel::<i32>(0);
2469         let tx2 = tx.clone();
2470         let _t = thread::spawn(move || {
2471             rx.recv().unwrap();
2472         });
2473         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2474     }
2475
2476     #[test]
2477     fn smoke_chan_gone() {
2478         let (tx, rx) = sync_channel::<i32>(0);
2479         drop(tx);
2480         assert!(rx.recv().is_err());
2481     }
2482
2483     #[test]
2484     fn smoke_chan_gone_shared() {
2485         let (tx, rx) = sync_channel::<()>(0);
2486         let tx2 = tx.clone();
2487         drop(tx);
2488         drop(tx2);
2489         assert!(rx.recv().is_err());
2490     }
2491
2492     #[test]
2493     fn chan_gone_concurrent() {
2494         let (tx, rx) = sync_channel::<i32>(0);
2495         thread::spawn(move || {
2496             tx.send(1).unwrap();
2497             tx.send(1).unwrap();
2498         });
2499         while rx.recv().is_ok() {}
2500     }
2501
2502     #[test]
2503     fn stress() {
2504         let (tx, rx) = sync_channel::<i32>(0);
2505         thread::spawn(move || {
2506             for _ in 0..10000 {
2507                 tx.send(1).unwrap();
2508             }
2509         });
2510         for _ in 0..10000 {
2511             assert_eq!(rx.recv().unwrap(), 1);
2512         }
2513     }
2514
2515     #[test]
2516     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2517     fn stress_recv_timeout_two_threads() {
2518         let (tx, rx) = sync_channel::<i32>(0);
2519
2520         thread::spawn(move || {
2521             for _ in 0..10000 {
2522                 tx.send(1).unwrap();
2523             }
2524         });
2525
2526         let mut recv_count = 0;
2527         loop {
2528             match rx.recv_timeout(Duration::from_millis(1)) {
2529                 Ok(v) => {
2530                     assert_eq!(v, 1);
2531                     recv_count += 1;
2532                 }
2533                 Err(RecvTimeoutError::Timeout) => continue,
2534                 Err(RecvTimeoutError::Disconnected) => break,
2535             }
2536         }
2537
2538         assert_eq!(recv_count, 10000);
2539     }
2540
2541     #[test]
2542     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2543     fn stress_recv_timeout_shared() {
2544         const AMT: u32 = 1000;
2545         const NTHREADS: u32 = 8;
2546         let (tx, rx) = sync_channel::<i32>(0);
2547         let (dtx, drx) = sync_channel::<()>(0);
2548
2549         thread::spawn(move || {
2550             let mut recv_count = 0;
2551             loop {
2552                 match rx.recv_timeout(Duration::from_millis(10)) {
2553                     Ok(v) => {
2554                         assert_eq!(v, 1);
2555                         recv_count += 1;
2556                     }
2557                     Err(RecvTimeoutError::Timeout) => continue,
2558                     Err(RecvTimeoutError::Disconnected) => break,
2559                 }
2560             }
2561
2562             assert_eq!(recv_count, AMT * NTHREADS);
2563             assert!(rx.try_recv().is_err());
2564
2565             dtx.send(()).unwrap();
2566         });
2567
2568         for _ in 0..NTHREADS {
2569             let tx = tx.clone();
2570             thread::spawn(move || {
2571                 for _ in 0..AMT {
2572                     tx.send(1).unwrap();
2573                 }
2574             });
2575         }
2576
2577         drop(tx);
2578
2579         drx.recv().unwrap();
2580     }
2581
2582     #[test]
2583     fn stress_shared() {
2584         const AMT: u32 = 1000;
2585         const NTHREADS: u32 = 8;
2586         let (tx, rx) = sync_channel::<i32>(0);
2587         let (dtx, drx) = sync_channel::<()>(0);
2588
2589         thread::spawn(move || {
2590             for _ in 0..AMT * NTHREADS {
2591                 assert_eq!(rx.recv().unwrap(), 1);
2592             }
2593             match rx.try_recv() {
2594                 Ok(..) => panic!(),
2595                 _ => {}
2596             }
2597             dtx.send(()).unwrap();
2598         });
2599
2600         for _ in 0..NTHREADS {
2601             let tx = tx.clone();
2602             thread::spawn(move || {
2603                 for _ in 0..AMT {
2604                     tx.send(1).unwrap();
2605                 }
2606             });
2607         }
2608         drop(tx);
2609         drx.recv().unwrap();
2610     }
2611
2612     #[test]
2613     fn oneshot_single_thread_close_port_first() {
2614         // Simple test of closing without sending
2615         let (_tx, rx) = sync_channel::<i32>(0);
2616         drop(rx);
2617     }
2618
2619     #[test]
2620     fn oneshot_single_thread_close_chan_first() {
2621         // Simple test of closing without sending
2622         let (tx, _rx) = sync_channel::<i32>(0);
2623         drop(tx);
2624     }
2625
2626     #[test]
2627     fn oneshot_single_thread_send_port_close() {
2628         // Testing that the sender cleans up the payload if receiver is closed
2629         let (tx, rx) = sync_channel::<Box<i32>>(0);
2630         drop(rx);
2631         assert!(tx.send(box 0).is_err());
2632     }
2633
2634     #[test]
2635     fn oneshot_single_thread_recv_chan_close() {
2636         // Receiving on a closed chan will panic
2637         let res = thread::spawn(move || {
2638             let (tx, rx) = sync_channel::<i32>(0);
2639             drop(tx);
2640             rx.recv().unwrap();
2641         })
2642         .join();
2643         // What is our res?
2644         assert!(res.is_err());
2645     }
2646
2647     #[test]
2648     fn oneshot_single_thread_send_then_recv() {
2649         let (tx, rx) = sync_channel::<Box<i32>>(1);
2650         tx.send(box 10).unwrap();
2651         assert!(*rx.recv().unwrap() == 10);
2652     }
2653
2654     #[test]
2655     fn oneshot_single_thread_try_send_open() {
2656         let (tx, rx) = sync_channel::<i32>(1);
2657         assert_eq!(tx.try_send(10), Ok(()));
2658         assert!(rx.recv().unwrap() == 10);
2659     }
2660
2661     #[test]
2662     fn oneshot_single_thread_try_send_closed() {
2663         let (tx, rx) = sync_channel::<i32>(0);
2664         drop(rx);
2665         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2666     }
2667
2668     #[test]
2669     fn oneshot_single_thread_try_send_closed2() {
2670         let (tx, _rx) = sync_channel::<i32>(0);
2671         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2672     }
2673
2674     #[test]
2675     fn oneshot_single_thread_try_recv_open() {
2676         let (tx, rx) = sync_channel::<i32>(1);
2677         tx.send(10).unwrap();
2678         assert!(rx.recv() == Ok(10));
2679     }
2680
2681     #[test]
2682     fn oneshot_single_thread_try_recv_closed() {
2683         let (tx, rx) = sync_channel::<i32>(0);
2684         drop(tx);
2685         assert!(rx.recv().is_err());
2686     }
2687
2688     #[test]
2689     fn oneshot_single_thread_try_recv_closed_with_data() {
2690         let (tx, rx) = sync_channel::<i32>(1);
2691         tx.send(10).unwrap();
2692         drop(tx);
2693         assert_eq!(rx.try_recv(), Ok(10));
2694         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2695     }
2696
2697     #[test]
2698     fn oneshot_single_thread_peek_data() {
2699         let (tx, rx) = sync_channel::<i32>(1);
2700         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2701         tx.send(10).unwrap();
2702         assert_eq!(rx.try_recv(), Ok(10));
2703     }
2704
2705     #[test]
2706     fn oneshot_single_thread_peek_close() {
2707         let (tx, rx) = sync_channel::<i32>(0);
2708         drop(tx);
2709         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2710         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2711     }
2712
2713     #[test]
2714     fn oneshot_single_thread_peek_open() {
2715         let (_tx, rx) = sync_channel::<i32>(0);
2716         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2717     }
2718
2719     #[test]
2720     fn oneshot_multi_task_recv_then_send() {
2721         let (tx, rx) = sync_channel::<Box<i32>>(0);
2722         let _t = thread::spawn(move || {
2723             assert!(*rx.recv().unwrap() == 10);
2724         });
2725
2726         tx.send(box 10).unwrap();
2727     }
2728
2729     #[test]
2730     fn oneshot_multi_task_recv_then_close() {
2731         let (tx, rx) = sync_channel::<Box<i32>>(0);
2732         let _t = thread::spawn(move || {
2733             drop(tx);
2734         });
2735         let res = thread::spawn(move || {
2736             assert!(*rx.recv().unwrap() == 10);
2737         })
2738         .join();
2739         assert!(res.is_err());
2740     }
2741
2742     #[test]
2743     fn oneshot_multi_thread_close_stress() {
2744         for _ in 0..stress_factor() {
2745             let (tx, rx) = sync_channel::<i32>(0);
2746             let _t = thread::spawn(move || {
2747                 drop(rx);
2748             });
2749             drop(tx);
2750         }
2751     }
2752
2753     #[test]
2754     fn oneshot_multi_thread_send_close_stress() {
2755         for _ in 0..stress_factor() {
2756             let (tx, rx) = sync_channel::<i32>(0);
2757             let _t = thread::spawn(move || {
2758                 drop(rx);
2759             });
2760             let _ = thread::spawn(move || {
2761                 tx.send(1).unwrap();
2762             })
2763             .join();
2764         }
2765     }
2766
2767     #[test]
2768     fn oneshot_multi_thread_recv_close_stress() {
2769         for _ in 0..stress_factor() {
2770             let (tx, rx) = sync_channel::<i32>(0);
2771             let _t = thread::spawn(move || {
2772                 let res = thread::spawn(move || {
2773                     rx.recv().unwrap();
2774                 })
2775                 .join();
2776                 assert!(res.is_err());
2777             });
2778             let _t = thread::spawn(move || {
2779                 thread::spawn(move || {
2780                     drop(tx);
2781                 });
2782             });
2783         }
2784     }
2785
2786     #[test]
2787     fn oneshot_multi_thread_send_recv_stress() {
2788         for _ in 0..stress_factor() {
2789             let (tx, rx) = sync_channel::<Box<i32>>(0);
2790             let _t = thread::spawn(move || {
2791                 tx.send(box 10).unwrap();
2792             });
2793             assert!(*rx.recv().unwrap() == 10);
2794         }
2795     }
2796
2797     #[test]
2798     fn stream_send_recv_stress() {
2799         for _ in 0..stress_factor() {
2800             let (tx, rx) = sync_channel::<Box<i32>>(0);
2801
2802             send(tx, 0);
2803             recv(rx, 0);
2804
2805             fn send(tx: SyncSender<Box<i32>>, i: i32) {
2806                 if i == 10 {
2807                     return;
2808                 }
2809
2810                 thread::spawn(move || {
2811                     tx.send(box i).unwrap();
2812                     send(tx, i + 1);
2813                 });
2814             }
2815
2816             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2817                 if i == 10 {
2818                     return;
2819                 }
2820
2821                 thread::spawn(move || {
2822                     assert!(*rx.recv().unwrap() == i);
2823                     recv(rx, i + 1);
2824                 });
2825             }
2826         }
2827     }
2828
2829     #[test]
2830     fn recv_a_lot() {
2831         // Regression test that we don't run out of stack in scheduler context
2832         let (tx, rx) = sync_channel(10000);
2833         for _ in 0..10000 {
2834             tx.send(()).unwrap();
2835         }
2836         for _ in 0..10000 {
2837             rx.recv().unwrap();
2838         }
2839     }
2840
2841     #[test]
2842     fn shared_chan_stress() {
2843         let (tx, rx) = sync_channel(0);
2844         let total = stress_factor() + 100;
2845         for _ in 0..total {
2846             let tx = tx.clone();
2847             thread::spawn(move || {
2848                 tx.send(()).unwrap();
2849             });
2850         }
2851
2852         for _ in 0..total {
2853             rx.recv().unwrap();
2854         }
2855     }
2856
2857     #[test]
2858     fn test_nested_recv_iter() {
2859         let (tx, rx) = sync_channel::<i32>(0);
2860         let (total_tx, total_rx) = sync_channel::<i32>(0);
2861
2862         let _t = thread::spawn(move || {
2863             let mut acc = 0;
2864             for x in rx.iter() {
2865                 acc += x;
2866             }
2867             total_tx.send(acc).unwrap();
2868         });
2869
2870         tx.send(3).unwrap();
2871         tx.send(1).unwrap();
2872         tx.send(2).unwrap();
2873         drop(tx);
2874         assert_eq!(total_rx.recv().unwrap(), 6);
2875     }
2876
2877     #[test]
2878     fn test_recv_iter_break() {
2879         let (tx, rx) = sync_channel::<i32>(0);
2880         let (count_tx, count_rx) = sync_channel(0);
2881
2882         let _t = thread::spawn(move || {
2883             let mut count = 0;
2884             for x in rx.iter() {
2885                 if count >= 3 {
2886                     break;
2887                 } else {
2888                     count += x;
2889                 }
2890             }
2891             count_tx.send(count).unwrap();
2892         });
2893
2894         tx.send(2).unwrap();
2895         tx.send(2).unwrap();
2896         tx.send(2).unwrap();
2897         let _ = tx.try_send(2);
2898         drop(tx);
2899         assert_eq!(count_rx.recv().unwrap(), 4);
2900     }
2901
2902     #[test]
2903     fn try_recv_states() {
2904         let (tx1, rx1) = sync_channel::<i32>(1);
2905         let (tx2, rx2) = sync_channel::<()>(1);
2906         let (tx3, rx3) = sync_channel::<()>(1);
2907         let _t = thread::spawn(move || {
2908             rx2.recv().unwrap();
2909             tx1.send(1).unwrap();
2910             tx3.send(()).unwrap();
2911             rx2.recv().unwrap();
2912             drop(tx1);
2913             tx3.send(()).unwrap();
2914         });
2915
2916         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2917         tx2.send(()).unwrap();
2918         rx3.recv().unwrap();
2919         assert_eq!(rx1.try_recv(), Ok(1));
2920         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2921         tx2.send(()).unwrap();
2922         rx3.recv().unwrap();
2923         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2924     }
2925
2926     // This bug used to end up in a livelock inside of the Receiver destructor
2927     // because the internal state of the Shared packet was corrupted
2928     #[test]
2929     fn destroy_upgraded_shared_port_when_sender_still_active() {
2930         let (tx, rx) = sync_channel::<()>(0);
2931         let (tx2, rx2) = sync_channel::<()>(0);
2932         let _t = thread::spawn(move || {
2933             rx.recv().unwrap(); // wait on a oneshot
2934             drop(rx); // destroy a shared
2935             tx2.send(()).unwrap();
2936         });
2937         // make sure the other thread has gone to sleep
2938         for _ in 0..5000 {
2939             thread::yield_now();
2940         }
2941
2942         // upgrade to a shared chan and send a message
2943         let t = tx.clone();
2944         drop(tx);
2945         t.send(()).unwrap();
2946
2947         // wait for the child thread to exit before we exit
2948         rx2.recv().unwrap();
2949     }
2950
2951     #[test]
2952     fn send1() {
2953         let (tx, rx) = sync_channel::<i32>(0);
2954         let _t = thread::spawn(move || {
2955             rx.recv().unwrap();
2956         });
2957         assert_eq!(tx.send(1), Ok(()));
2958     }
2959
2960     #[test]
2961     fn send2() {
2962         let (tx, rx) = sync_channel::<i32>(0);
2963         let _t = thread::spawn(move || {
2964             drop(rx);
2965         });
2966         assert!(tx.send(1).is_err());
2967     }
2968
2969     #[test]
2970     fn send3() {
2971         let (tx, rx) = sync_channel::<i32>(1);
2972         assert_eq!(tx.send(1), Ok(()));
2973         let _t = thread::spawn(move || {
2974             drop(rx);
2975         });
2976         assert!(tx.send(1).is_err());
2977     }
2978
2979     #[test]
2980     fn send4() {
2981         let (tx, rx) = sync_channel::<i32>(0);
2982         let tx2 = tx.clone();
2983         let (done, donerx) = channel();
2984         let done2 = done.clone();
2985         let _t = thread::spawn(move || {
2986             assert!(tx.send(1).is_err());
2987             done.send(()).unwrap();
2988         });
2989         let _t = thread::spawn(move || {
2990             assert!(tx2.send(2).is_err());
2991             done2.send(()).unwrap();
2992         });
2993         drop(rx);
2994         donerx.recv().unwrap();
2995         donerx.recv().unwrap();
2996     }
2997
2998     #[test]
2999     fn try_send1() {
3000         let (tx, _rx) = sync_channel::<i32>(0);
3001         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3002     }
3003
3004     #[test]
3005     fn try_send2() {
3006         let (tx, _rx) = sync_channel::<i32>(1);
3007         assert_eq!(tx.try_send(1), Ok(()));
3008         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3009     }
3010
3011     #[test]
3012     fn try_send3() {
3013         let (tx, rx) = sync_channel::<i32>(1);
3014         assert_eq!(tx.try_send(1), Ok(()));
3015         drop(rx);
3016         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
3017     }
3018
3019     #[test]
3020     fn issue_15761() {
3021         fn repro() {
3022             let (tx1, rx1) = sync_channel::<()>(3);
3023             let (tx2, rx2) = sync_channel::<()>(3);
3024
3025             let _t = thread::spawn(move || {
3026                 rx1.recv().unwrap();
3027                 tx2.try_send(()).unwrap();
3028             });
3029
3030             tx1.try_send(()).unwrap();
3031             rx2.recv().unwrap();
3032         }
3033
3034         for _ in 0..100 {
3035             repro()
3036         }
3037     }
3038 }