]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/mod.rs
Improve type size assertions
[rust.git] / src / libstd / sync / mpsc / mod.rs
1 // ignore-tidy-filelength
2
3 //! Multi-producer, single-consumer FIFO queue communication primitives.
4 //!
5 //! This module provides message-based communication over channels, concretely
6 //! defined among three types:
7 //!
8 //! * [`Sender`]
9 //! * [`SyncSender`]
10 //! * [`Receiver`]
11 //!
12 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
13 //! senders are clone-able (multi-producer) such that many threads can send
14 //! simultaneously to one receiver (single-consumer).
15 //!
16 //! These channels come in two flavors:
17 //!
18 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
19 //!    will return a `(Sender, Receiver)` tuple where all sends will be
20 //!    **asynchronous** (they never block). The channel conceptually has an
21 //!    infinite buffer.
22 //!
23 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
24 //!    return a `(SyncSender, Receiver)` tuple where the storage for pending
25 //!    messages is a pre-allocated buffer of a fixed size. All sends will be
26 //!    **synchronous** by blocking until there is buffer space available. Note
27 //!    that a bound of 0 is allowed, causing the channel to become a "rendezvous"
28 //!    channel where each sender atomically hands off a message to a receiver.
29 //!
30 //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
31 //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
32 //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
33 //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
34 //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
35 //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
36 //!
37 //! ## Disconnection
38 //!
39 //! The send and receive operations on channels will all return a [`Result`]
40 //! indicating whether the operation succeeded or not. An unsuccessful operation
41 //! is normally indicative of the other half of a channel having "hung up" by
42 //! being dropped in its corresponding thread.
43 //!
44 //! Once half of a channel has been deallocated, most operations can no longer
45 //! continue to make progress, so [`Err`] will be returned. Many applications
46 //! will continue to [`unwrap`] the results returned from this module,
47 //! instigating a propagation of failure among threads if one unexpectedly dies.
48 //!
49 //! [`Result`]: ../../../std/result/enum.Result.html
50 //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
51 //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
52 //!
53 //! # Examples
54 //!
55 //! Simple usage:
56 //!
57 //! ```
58 //! use std::thread;
59 //! use std::sync::mpsc::channel;
60 //!
61 //! // Create a simple streaming channel
62 //! let (tx, rx) = channel();
63 //! thread::spawn(move|| {
64 //!     tx.send(10).unwrap();
65 //! });
66 //! assert_eq!(rx.recv().unwrap(), 10);
67 //! ```
68 //!
69 //! Shared usage:
70 //!
71 //! ```
72 //! use std::thread;
73 //! use std::sync::mpsc::channel;
74 //!
75 //! // Create a shared channel that can be sent along from many threads
76 //! // where tx is the sending half (tx for transmission), and rx is the receiving
77 //! // half (rx for receiving).
78 //! let (tx, rx) = channel();
79 //! for i in 0..10 {
80 //!     let tx = tx.clone();
81 //!     thread::spawn(move|| {
82 //!         tx.send(i).unwrap();
83 //!     });
84 //! }
85 //!
86 //! for _ in 0..10 {
87 //!     let j = rx.recv().unwrap();
88 //!     assert!(0 <= j && j < 10);
89 //! }
90 //! ```
91 //!
92 //! Propagating panics:
93 //!
94 //! ```
95 //! use std::sync::mpsc::channel;
96 //!
97 //! // The call to recv() will return an error because the channel has already
98 //! // hung up (or been deallocated)
99 //! let (tx, rx) = channel::<i32>();
100 //! drop(tx);
101 //! assert!(rx.recv().is_err());
102 //! ```
103 //!
104 //! Synchronous channels:
105 //!
106 //! ```
107 //! use std::thread;
108 //! use std::sync::mpsc::sync_channel;
109 //!
110 //! let (tx, rx) = sync_channel::<i32>(0);
111 //! thread::spawn(move|| {
112 //!     // This will wait for the parent thread to start receiving
113 //!     tx.send(53).unwrap();
114 //! });
115 //! rx.recv().unwrap();
116 //! ```
117
118 #![stable(feature = "rust1", since = "1.0.0")]
119 #![allow(deprecated)] // for mpsc_select
120
121 // A description of how Rust's channel implementation works
122 //
123 // Channels are supposed to be the basic building block for all other
124 // concurrent primitives that are used in Rust. As a result, the channel type
125 // needs to be highly optimized, flexible, and broad enough for use everywhere.
126 //
127 // The choice of implementation of all channels is to be built on lock-free data
128 // structures. The channels themselves are then consequently also lock-free data
129 // structures. As always with lock-free code, this is a very "here be dragons"
130 // territory, especially because I'm unaware of any academic papers that have
131 // gone into great length about channels of these flavors.
132 //
133 // ## Flavors of channels
134 //
135 // From the perspective of a consumer of this library, there is only one flavor
136 // of channel. This channel can be used as a stream and cloned to allow multiple
137 // senders. Under the hood, however, there are actually three flavors of
138 // channels in play.
139 //
140 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
141 //                      case. They contain as few atomics as possible and
142 //                      involve one and exactly one allocation.
143 // * Streams - these channels are optimized for the non-shared use case. They
144 //             use a different concurrent queue that is more tailored for this
145 //             use case. The initial allocation of this flavor of channel is not
146 //             optimized.
147 // * Shared - this is the most general form of channel that this module offers,
148 //            a channel with multiple senders. This type is as optimized as it
149 //            can be, but the previous two types mentioned are much faster for
150 //            their use-cases.
151 //
152 // ## Concurrent queues
153 //
154 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
155 // but recv() obviously blocks. This means that under the hood there must be
156 // some shared and concurrent queue holding all of the actual data.
157 //
158 // With two flavors of channels, two flavors of queues are also used. We have
159 // chosen to use queues from a well-known author that are abbreviated as SPSC
160 // and MPSC (single producer, single consumer and multiple producer, single
161 // consumer). SPSC queues are used for streams while MPSC queues are used for
162 // shared channels.
163 //
164 // ### SPSC optimizations
165 //
166 // The SPSC queue found online is essentially a linked list of nodes where one
167 // half of the nodes are the "queue of data" and the other half of nodes are a
168 // cache of unused nodes. The unused nodes are used such that an allocation is
169 // not required on every push() and a free doesn't need to happen on every
170 // pop().
171 //
172 // As found online, however, the cache of nodes is of an infinite size. This
173 // means that if a channel at one point in its life had 50k items in the queue,
174 // then the queue will always have the capacity for 50k items. I believed that
175 // this was an unnecessary limitation of the implementation, so I have altered
176 // the queue to optionally have a bound on the cache size.
177 //
178 // By default, streams will have an unbounded SPSC queue with a small-ish cache
179 // size. The hope is that the cache is still large enough to have very fast
180 // send() operations while not too large such that millions of channels can
181 // coexist at once.
182 //
183 // ### MPSC optimizations
184 //
185 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
186 // a linked list under the hood to earn its unboundedness, but I have not put
187 // forth much effort into having a cache of nodes similar to the SPSC queue.
188 //
189 // For now, I believe that this is "ok" because shared channels are not the most
190 // common type, but soon we may wish to revisit this queue choice and determine
191 // another candidate for backend storage of shared channels.
192 //
193 // ## Overview of the Implementation
194 //
195 // Now that there's a little background on the concurrent queues used, it's
196 // worth going into much more detail about the channels themselves. The basic
197 // pseudocode for a send/recv are:
198 //
199 //
200 //      send(t)                             recv()
201 //        queue.push(t)                       return if queue.pop()
202 //        if increment() == -1                deschedule {
203 //          wakeup()                            if decrement() > 0
204 //                                                cancel_deschedule()
205 //                                            }
206 //                                            queue.pop()
207 //
208 // As mentioned before, there are no locks in this implementation, only atomic
209 // instructions are used.
210 //
211 // ### The internal atomic counter
212 //
213 // Every channel has a shared counter with each half to keep track of the size
214 // of the queue. This counter is used to abort descheduling by the receiver and
215 // to know when to wake up on the sending side.
216 //
217 // As seen in the pseudocode, senders will increment this count and receivers
218 // will decrement the count. The theory behind this is that if a sender sees a
219 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
220 // then it doesn't need to block.
221 //
222 // The recv() method has a beginning call to pop(), and if successful, it needs
223 // to decrement the count. It is a crucial implementation detail that this
224 // decrement does *not* happen to the shared counter. If this were the case,
225 // then it would be possible for the counter to be very negative when there were
226 // no receivers waiting, in which case the senders would have to determine when
227 // it was actually appropriate to wake up a receiver.
228 //
229 // Instead, the "steal count" is kept track of separately (not atomically
230 // because it's only used by receivers), and then the decrement() call when
231 // descheduling will lump in all of the recent steals into one large decrement.
232 //
233 // The implication of this is that if a sender sees a -1 count, then there's
234 // guaranteed to be a waiter waiting!
235 //
236 // ## Native Implementation
237 //
238 // A major goal of these channels is to work seamlessly on and off the runtime.
239 // All of the previous race conditions have been worded in terms of
240 // scheduler-isms (which is obviously not available without the runtime).
241 //
242 // For now, native usage of channels (off the runtime) will fall back onto
243 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
244 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
245 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
246 // condition variable.
247 //
248 // ## Select
249 //
250 // Being able to support selection over channels has greatly influenced this
251 // design, and not only does selection need to work inside the runtime, but also
252 // outside the runtime.
253 //
254 // The implementation is fairly straightforward. The goal of select() is not to
255 // return some data, but only to return which channel can receive data without
256 // blocking. The implementation is essentially the entire blocking procedure
257 // followed by an increment as soon as its woken up. The cancellation procedure
258 // involves an increment and swapping out of to_wake to acquire ownership of the
259 // thread to unblock.
260 //
261 // Sadly this current implementation requires multiple allocations, so I have
262 // seen the throughput of select() be much worse than it should be. I do not
263 // believe that there is anything fundamental that needs to change about these
264 // channels, however, in order to support a more efficient select().
265 //
266 // # Conclusion
267 //
268 // And now that you've seen all the races that I found and attempted to fix,
269 // here's the code for you to find some more!
270
271 use crate::sync::Arc;
272 use crate::error;
273 use crate::fmt;
274 use crate::mem;
275 use crate::cell::UnsafeCell;
276 use crate::time::{Duration, Instant};
277
278 #[unstable(feature = "mpsc_select", issue = "27800")]
279 pub use self::select::{Select, Handle};
280 use self::select::StartResult;
281 use self::select::StartResult::*;
282 use self::blocking::SignalToken;
283
284 #[cfg(all(test, not(target_os = "emscripten")))]
285 mod select_tests;
286
287 mod blocking;
288 mod oneshot;
289 mod select;
290 mod shared;
291 mod stream;
292 mod sync;
293 mod mpsc_queue;
294 mod spsc_queue;
295
296 mod cache_aligned;
297
298 /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
299 /// This half can only be owned by one thread.
300 ///
301 /// Messages sent to the channel can be retrieved using [`recv`].
302 ///
303 /// [`channel`]: fn.channel.html
304 /// [`sync_channel`]: fn.sync_channel.html
305 /// [`recv`]: struct.Receiver.html#method.recv
306 ///
307 /// # Examples
308 ///
309 /// ```rust
310 /// use std::sync::mpsc::channel;
311 /// use std::thread;
312 /// use std::time::Duration;
313 ///
314 /// let (send, recv) = channel();
315 ///
316 /// thread::spawn(move || {
317 ///     send.send("Hello world!").unwrap();
318 ///     thread::sleep(Duration::from_secs(2)); // block for two seconds
319 ///     send.send("Delayed for 2 seconds").unwrap();
320 /// });
321 ///
322 /// println!("{}", recv.recv().unwrap()); // Received immediately
323 /// println!("Waiting...");
324 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
325 /// ```
326 #[stable(feature = "rust1", since = "1.0.0")]
327 pub struct Receiver<T> {
328     inner: UnsafeCell<Flavor<T>>,
329 }
330
331 // The receiver port can be sent from place to place, so long as it
332 // is not used to receive non-sendable things.
333 #[stable(feature = "rust1", since = "1.0.0")]
334 unsafe impl<T: Send> Send for Receiver<T> { }
335
336 #[stable(feature = "rust1", since = "1.0.0")]
337 impl<T> !Sync for Receiver<T> { }
338
339 /// An iterator over messages on a [`Receiver`], created by [`iter`].
340 ///
341 /// This iterator will block whenever [`next`] is called,
342 /// waiting for a new message, and [`None`] will be returned
343 /// when the corresponding channel has hung up.
344 ///
345 /// [`iter`]: struct.Receiver.html#method.iter
346 /// [`Receiver`]: struct.Receiver.html
347 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
348 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
349 ///
350 /// # Examples
351 ///
352 /// ```rust
353 /// use std::sync::mpsc::channel;
354 /// use std::thread;
355 ///
356 /// let (send, recv) = channel();
357 ///
358 /// thread::spawn(move || {
359 ///     send.send(1u8).unwrap();
360 ///     send.send(2u8).unwrap();
361 ///     send.send(3u8).unwrap();
362 /// });
363 ///
364 /// for x in recv.iter() {
365 ///     println!("Got: {}", x);
366 /// }
367 /// ```
368 #[stable(feature = "rust1", since = "1.0.0")]
369 #[derive(Debug)]
370 pub struct Iter<'a, T: 'a> {
371     rx: &'a Receiver<T>
372 }
373
374 /// An iterator that attempts to yield all pending values for a [`Receiver`],
375 /// created by [`try_iter`].
376 ///
377 /// [`None`] will be returned when there are no pending values remaining or
378 /// if the corresponding channel has hung up.
379 ///
380 /// This iterator will never block the caller in order to wait for data to
381 /// become available. Instead, it will return [`None`].
382 ///
383 /// [`Receiver`]: struct.Receiver.html
384 /// [`try_iter`]: struct.Receiver.html#method.try_iter
385 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
386 ///
387 /// # Examples
388 ///
389 /// ```rust
390 /// use std::sync::mpsc::channel;
391 /// use std::thread;
392 /// use std::time::Duration;
393 ///
394 /// let (sender, receiver) = channel();
395 ///
396 /// // Nothing is in the buffer yet
397 /// assert!(receiver.try_iter().next().is_none());
398 /// println!("Nothing in the buffer...");
399 ///
400 /// thread::spawn(move || {
401 ///     sender.send(1).unwrap();
402 ///     sender.send(2).unwrap();
403 ///     sender.send(3).unwrap();
404 /// });
405 ///
406 /// println!("Going to sleep...");
407 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
408 ///
409 /// for x in receiver.try_iter() {
410 ///     println!("Got: {}", x);
411 /// }
412 /// ```
413 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
414 #[derive(Debug)]
415 pub struct TryIter<'a, T: 'a> {
416     rx: &'a Receiver<T>
417 }
418
419 /// An owning iterator over messages on a [`Receiver`],
420 /// created by **Receiver::into_iter**.
421 ///
422 /// This iterator will block whenever [`next`]
423 /// is called, waiting for a new message, and [`None`] will be
424 /// returned if the corresponding channel has hung up.
425 ///
426 /// [`Receiver`]: struct.Receiver.html
427 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
428 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
429 ///
430 /// # Examples
431 ///
432 /// ```rust
433 /// use std::sync::mpsc::channel;
434 /// use std::thread;
435 ///
436 /// let (send, recv) = channel();
437 ///
438 /// thread::spawn(move || {
439 ///     send.send(1u8).unwrap();
440 ///     send.send(2u8).unwrap();
441 ///     send.send(3u8).unwrap();
442 /// });
443 ///
444 /// for x in recv.into_iter() {
445 ///     println!("Got: {}", x);
446 /// }
447 /// ```
448 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
449 #[derive(Debug)]
450 pub struct IntoIter<T> {
451     rx: Receiver<T>
452 }
453
454 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
455 /// owned by one thread, but it can be cloned to send to other threads.
456 ///
457 /// Messages can be sent through this channel with [`send`].
458 ///
459 /// [`channel`]: fn.channel.html
460 /// [`send`]: struct.Sender.html#method.send
461 ///
462 /// # Examples
463 ///
464 /// ```rust
465 /// use std::sync::mpsc::channel;
466 /// use std::thread;
467 ///
468 /// let (sender, receiver) = channel();
469 /// let sender2 = sender.clone();
470 ///
471 /// // First thread owns sender
472 /// thread::spawn(move || {
473 ///     sender.send(1).unwrap();
474 /// });
475 ///
476 /// // Second thread owns sender2
477 /// thread::spawn(move || {
478 ///     sender2.send(2).unwrap();
479 /// });
480 ///
481 /// let msg = receiver.recv().unwrap();
482 /// let msg2 = receiver.recv().unwrap();
483 ///
484 /// assert_eq!(3, msg + msg2);
485 /// ```
486 #[stable(feature = "rust1", since = "1.0.0")]
487 pub struct Sender<T> {
488     inner: UnsafeCell<Flavor<T>>,
489 }
490
491 // The send port can be sent from place to place, so long as it
492 // is not used to send non-sendable things.
493 #[stable(feature = "rust1", since = "1.0.0")]
494 unsafe impl<T: Send> Send for Sender<T> { }
495
496 #[stable(feature = "rust1", since = "1.0.0")]
497 impl<T> !Sync for Sender<T> { }
498
499 /// The sending-half of Rust's synchronous [`sync_channel`] type.
500 ///
501 /// Messages can be sent through this channel with [`send`] or [`try_send`].
502 ///
503 /// [`send`] will block if there is no space in the internal buffer.
504 ///
505 /// [`sync_channel`]: fn.sync_channel.html
506 /// [`send`]: struct.SyncSender.html#method.send
507 /// [`try_send`]: struct.SyncSender.html#method.try_send
508 ///
509 /// # Examples
510 ///
511 /// ```rust
512 /// use std::sync::mpsc::sync_channel;
513 /// use std::thread;
514 ///
515 /// // Create a sync_channel with buffer size 2
516 /// let (sync_sender, receiver) = sync_channel(2);
517 /// let sync_sender2 = sync_sender.clone();
518 ///
519 /// // First thread owns sync_sender
520 /// thread::spawn(move || {
521 ///     sync_sender.send(1).unwrap();
522 ///     sync_sender.send(2).unwrap();
523 /// });
524 ///
525 /// // Second thread owns sync_sender2
526 /// thread::spawn(move || {
527 ///     sync_sender2.send(3).unwrap();
528 ///     // thread will now block since the buffer is full
529 ///     println!("Thread unblocked!");
530 /// });
531 ///
532 /// let mut msg;
533 ///
534 /// msg = receiver.recv().unwrap();
535 /// println!("message {} received", msg);
536 ///
537 /// // "Thread unblocked!" will be printed now
538 ///
539 /// msg = receiver.recv().unwrap();
540 /// println!("message {} received", msg);
541 ///
542 /// msg = receiver.recv().unwrap();
543 ///
544 /// println!("message {} received", msg);
545 /// ```
546 #[stable(feature = "rust1", since = "1.0.0")]
547 pub struct SyncSender<T> {
548     inner: Arc<sync::Packet<T>>,
549 }
550
551 #[stable(feature = "rust1", since = "1.0.0")]
552 unsafe impl<T: Send> Send for SyncSender<T> {}
553
554 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
555 /// function on **channel**s.
556 ///
557 /// A **send** operation can only fail if the receiving end of a channel is
558 /// disconnected, implying that the data could never be received. The error
559 /// contains the data being sent as a payload so it can be recovered.
560 ///
561 /// [`Sender::send`]: struct.Sender.html#method.send
562 /// [`SyncSender::send`]: struct.SyncSender.html#method.send
563 #[stable(feature = "rust1", since = "1.0.0")]
564 #[derive(PartialEq, Eq, Clone, Copy)]
565 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
566
567 /// An error returned from the [`recv`] function on a [`Receiver`].
568 ///
569 /// The [`recv`] operation can only fail if the sending half of a
570 /// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
571 /// messages will ever be received.
572 ///
573 /// [`recv`]: struct.Receiver.html#method.recv
574 /// [`Receiver`]: struct.Receiver.html
575 /// [`channel`]: fn.channel.html
576 /// [`sync_channel`]: fn.sync_channel.html
577 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
578 #[stable(feature = "rust1", since = "1.0.0")]
579 pub struct RecvError;
580
581 /// This enumeration is the list of the possible reasons that [`try_recv`] could
582 /// not return data when called. This can occur with both a [`channel`] and
583 /// a [`sync_channel`].
584 ///
585 /// [`try_recv`]: struct.Receiver.html#method.try_recv
586 /// [`channel`]: fn.channel.html
587 /// [`sync_channel`]: fn.sync_channel.html
588 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
589 #[stable(feature = "rust1", since = "1.0.0")]
590 pub enum TryRecvError {
591     /// This **channel** is currently empty, but the **Sender**(s) have not yet
592     /// disconnected, so data may yet become available.
593     #[stable(feature = "rust1", since = "1.0.0")]
594     Empty,
595
596     /// The **channel**'s sending half has become disconnected, and there will
597     /// never be any more data received on it.
598     #[stable(feature = "rust1", since = "1.0.0")]
599     Disconnected,
600 }
601
602 /// This enumeration is the list of possible errors that made [`recv_timeout`]
603 /// unable to return data when called. This can occur with both a [`channel`] and
604 /// a [`sync_channel`].
605 ///
606 /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
607 /// [`channel`]: fn.channel.html
608 /// [`sync_channel`]: fn.sync_channel.html
609 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
610 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
611 pub enum RecvTimeoutError {
612     /// This **channel** is currently empty, but the **Sender**(s) have not yet
613     /// disconnected, so data may yet become available.
614     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
615     Timeout,
616     /// The **channel**'s sending half has become disconnected, and there will
617     /// never be any more data received on it.
618     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
619     Disconnected,
620 }
621
622 /// This enumeration is the list of the possible error outcomes for the
623 /// [`try_send`] method.
624 ///
625 /// [`try_send`]: struct.SyncSender.html#method.try_send
626 #[stable(feature = "rust1", since = "1.0.0")]
627 #[derive(PartialEq, Eq, Clone, Copy)]
628 pub enum TrySendError<T> {
629     /// The data could not be sent on the [`sync_channel`] because it would require that
630     /// the callee block to send the data.
631     ///
632     /// If this is a buffered channel, then the buffer is full at this time. If
633     /// this is not a buffered channel, then there is no [`Receiver`] available to
634     /// acquire the data.
635     ///
636     /// [`sync_channel`]: fn.sync_channel.html
637     /// [`Receiver`]: struct.Receiver.html
638     #[stable(feature = "rust1", since = "1.0.0")]
639     Full(#[stable(feature = "rust1", since = "1.0.0")] T),
640
641     /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
642     /// sent. The data is returned back to the callee in this case.
643     ///
644     /// [`sync_channel`]: fn.sync_channel.html
645     #[stable(feature = "rust1", since = "1.0.0")]
646     Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
647 }
648
649 enum Flavor<T> {
650     Oneshot(Arc<oneshot::Packet<T>>),
651     Stream(Arc<stream::Packet<T>>),
652     Shared(Arc<shared::Packet<T>>),
653     Sync(Arc<sync::Packet<T>>),
654 }
655
656 #[doc(hidden)]
657 trait UnsafeFlavor<T> {
658     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
659     unsafe fn inner_mut(&self) -> &mut Flavor<T> {
660         &mut *self.inner_unsafe().get()
661     }
662     unsafe fn inner(&self) -> &Flavor<T> {
663         &*self.inner_unsafe().get()
664     }
665 }
666 impl<T> UnsafeFlavor<T> for Sender<T> {
667     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
668         &self.inner
669     }
670 }
671 impl<T> UnsafeFlavor<T> for Receiver<T> {
672     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
673         &self.inner
674     }
675 }
676
677 /// Creates a new asynchronous channel, returning the sender/receiver halves.
678 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
679 /// the same order as it was sent, and no [`send`] will block the calling thread
680 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
681 /// block after its buffer limit is reached). [`recv`] will block until a message
682 /// is available.
683 ///
684 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
685 /// only one [`Receiver`] is supported.
686 ///
687 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
688 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
689 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
690 /// return a [`RecvError`].
691 ///
692 /// [`send`]: struct.Sender.html#method.send
693 /// [`recv`]: struct.Receiver.html#method.recv
694 /// [`Sender`]: struct.Sender.html
695 /// [`Receiver`]: struct.Receiver.html
696 /// [`sync_channel`]: fn.sync_channel.html
697 /// [`SendError`]: struct.SendError.html
698 /// [`RecvError`]: struct.RecvError.html
699 ///
700 /// # Examples
701 ///
702 /// ```
703 /// use std::sync::mpsc::channel;
704 /// use std::thread;
705 ///
706 /// let (sender, receiver) = channel();
707 ///
708 /// // Spawn off an expensive computation
709 /// thread::spawn(move|| {
710 /// #   fn expensive_computation() {}
711 ///     sender.send(expensive_computation()).unwrap();
712 /// });
713 ///
714 /// // Do some useful work for awhile
715 ///
716 /// // Let's see what that answer was
717 /// println!("{:?}", receiver.recv().unwrap());
718 /// ```
719 #[stable(feature = "rust1", since = "1.0.0")]
720 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
721     let a = Arc::new(oneshot::Packet::new());
722     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
723 }
724
725 /// Creates a new synchronous, bounded channel.
726 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
727 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
728 /// [`Receiver`] will block until a message becomes available. `sync_channel`
729 /// differs greatly in the semantics of the sender, however.
730 ///
731 /// This channel has an internal buffer on which messages will be queued.
732 /// `bound` specifies the buffer size. When the internal buffer becomes full,
733 /// future sends will *block* waiting for the buffer to open up. Note that a
734 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
735 /// where each [`send`] will not return until a [`recv`] is paired with it.
736 ///
737 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
738 /// times, but only one [`Receiver`] is supported.
739 ///
740 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
741 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
742 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
743 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
744 ///
745 /// [`channel`]: fn.channel.html
746 /// [`send`]: struct.SyncSender.html#method.send
747 /// [`recv`]: struct.Receiver.html#method.recv
748 /// [`SyncSender`]: struct.SyncSender.html
749 /// [`Receiver`]: struct.Receiver.html
750 /// [`SendError`]: struct.SendError.html
751 /// [`RecvError`]: struct.RecvError.html
752 ///
753 /// # Examples
754 ///
755 /// ```
756 /// use std::sync::mpsc::sync_channel;
757 /// use std::thread;
758 ///
759 /// let (sender, receiver) = sync_channel(1);
760 ///
761 /// // this returns immediately
762 /// sender.send(1).unwrap();
763 ///
764 /// thread::spawn(move|| {
765 ///     // this will block until the previous message has been received
766 ///     sender.send(2).unwrap();
767 /// });
768 ///
769 /// assert_eq!(receiver.recv().unwrap(), 1);
770 /// assert_eq!(receiver.recv().unwrap(), 2);
771 /// ```
772 #[stable(feature = "rust1", since = "1.0.0")]
773 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
774     let a = Arc::new(sync::Packet::new(bound));
775     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
776 }
777
778 ////////////////////////////////////////////////////////////////////////////////
779 // Sender
780 ////////////////////////////////////////////////////////////////////////////////
781
782 impl<T> Sender<T> {
783     fn new(inner: Flavor<T>) -> Sender<T> {
784         Sender {
785             inner: UnsafeCell::new(inner),
786         }
787     }
788
789     /// Attempts to send a value on this channel, returning it back if it could
790     /// not be sent.
791     ///
792     /// A successful send occurs when it is determined that the other end of
793     /// the channel has not hung up already. An unsuccessful send would be one
794     /// where the corresponding receiver has already been deallocated. Note
795     /// that a return value of [`Err`] means that the data will never be
796     /// received, but a return value of [`Ok`] does *not* mean that the data
797     /// will be received. It is possible for the corresponding receiver to
798     /// hang up immediately after this function returns [`Ok`].
799     ///
800     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
801     /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
802     ///
803     /// This method will never block the current thread.
804     ///
805     /// # Examples
806     ///
807     /// ```
808     /// use std::sync::mpsc::channel;
809     ///
810     /// let (tx, rx) = channel();
811     ///
812     /// // This send is always successful
813     /// tx.send(1).unwrap();
814     ///
815     /// // This send will fail because the receiver is gone
816     /// drop(rx);
817     /// assert_eq!(tx.send(1).unwrap_err().0, 1);
818     /// ```
819     #[stable(feature = "rust1", since = "1.0.0")]
820     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
821         let (new_inner, ret) = match *unsafe { self.inner() } {
822             Flavor::Oneshot(ref p) => {
823                 if !p.sent() {
824                     return p.send(t).map_err(SendError);
825                 } else {
826                     let a = Arc::new(stream::Packet::new());
827                     let rx = Receiver::new(Flavor::Stream(a.clone()));
828                     match p.upgrade(rx) {
829                         oneshot::UpSuccess => {
830                             let ret = a.send(t);
831                             (a, ret)
832                         }
833                         oneshot::UpDisconnected => (a, Err(t)),
834                         oneshot::UpWoke(token) => {
835                             // This send cannot panic because the thread is
836                             // asleep (we're looking at it), so the receiver
837                             // can't go away.
838                             a.send(t).ok().unwrap();
839                             token.signal();
840                             (a, Ok(()))
841                         }
842                     }
843                 }
844             }
845             Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
846             Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
847             Flavor::Sync(..) => unreachable!(),
848         };
849
850         unsafe {
851             let tmp = Sender::new(Flavor::Stream(new_inner));
852             mem::swap(self.inner_mut(), tmp.inner_mut());
853         }
854         ret.map_err(SendError)
855     }
856 }
857
858 #[stable(feature = "rust1", since = "1.0.0")]
859 impl<T> Clone for Sender<T> {
860     fn clone(&self) -> Sender<T> {
861         let packet = match *unsafe { self.inner() } {
862             Flavor::Oneshot(ref p) => {
863                 let a = Arc::new(shared::Packet::new());
864                 {
865                     let guard = a.postinit_lock();
866                     let rx = Receiver::new(Flavor::Shared(a.clone()));
867                     let sleeper = match p.upgrade(rx) {
868                         oneshot::UpSuccess |
869                         oneshot::UpDisconnected => None,
870                         oneshot::UpWoke(task) => Some(task),
871                     };
872                     a.inherit_blocker(sleeper, guard);
873                 }
874                 a
875             }
876             Flavor::Stream(ref p) => {
877                 let a = Arc::new(shared::Packet::new());
878                 {
879                     let guard = a.postinit_lock();
880                     let rx = Receiver::new(Flavor::Shared(a.clone()));
881                     let sleeper = match p.upgrade(rx) {
882                         stream::UpSuccess |
883                         stream::UpDisconnected => None,
884                         stream::UpWoke(task) => Some(task),
885                     };
886                     a.inherit_blocker(sleeper, guard);
887                 }
888                 a
889             }
890             Flavor::Shared(ref p) => {
891                 p.clone_chan();
892                 return Sender::new(Flavor::Shared(p.clone()));
893             }
894             Flavor::Sync(..) => unreachable!(),
895         };
896
897         unsafe {
898             let tmp = Sender::new(Flavor::Shared(packet.clone()));
899             mem::swap(self.inner_mut(), tmp.inner_mut());
900         }
901         Sender::new(Flavor::Shared(packet))
902     }
903 }
904
905 #[stable(feature = "rust1", since = "1.0.0")]
906 impl<T> Drop for Sender<T> {
907     fn drop(&mut self) {
908         match *unsafe { self.inner() } {
909             Flavor::Oneshot(ref p) => p.drop_chan(),
910             Flavor::Stream(ref p) => p.drop_chan(),
911             Flavor::Shared(ref p) => p.drop_chan(),
912             Flavor::Sync(..) => unreachable!(),
913         }
914     }
915 }
916
917 #[stable(feature = "mpsc_debug", since = "1.8.0")]
918 impl<T> fmt::Debug for Sender<T> {
919     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
920         f.debug_struct("Sender").finish()
921     }
922 }
923
924 ////////////////////////////////////////////////////////////////////////////////
925 // SyncSender
926 ////////////////////////////////////////////////////////////////////////////////
927
928 impl<T> SyncSender<T> {
929     fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
930         SyncSender { inner }
931     }
932
933     /// Sends a value on this synchronous channel.
934     ///
935     /// This function will *block* until space in the internal buffer becomes
936     /// available or a receiver is available to hand off the message to.
937     ///
938     /// Note that a successful send does *not* guarantee that the receiver will
939     /// ever see the data if there is a buffer on this channel. Items may be
940     /// enqueued in the internal buffer for the receiver to receive at a later
941     /// time. If the buffer size is 0, however, the channel becomes a rendezvous
942     /// channel and it guarantees that the receiver has indeed received
943     /// the data if this function returns success.
944     ///
945     /// This function will never panic, but it may return [`Err`] if the
946     /// [`Receiver`] has disconnected and is no longer able to receive
947     /// information.
948     ///
949     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
950     /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
951     ///
952     /// # Examples
953     ///
954     /// ```rust
955     /// use std::sync::mpsc::sync_channel;
956     /// use std::thread;
957     ///
958     /// // Create a rendezvous sync_channel with buffer size 0
959     /// let (sync_sender, receiver) = sync_channel(0);
960     ///
961     /// thread::spawn(move || {
962     ///    println!("sending message...");
963     ///    sync_sender.send(1).unwrap();
964     ///    // Thread is now blocked until the message is received
965     ///
966     ///    println!("...message received!");
967     /// });
968     ///
969     /// let msg = receiver.recv().unwrap();
970     /// assert_eq!(1, msg);
971     /// ```
972     #[stable(feature = "rust1", since = "1.0.0")]
973     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
974         self.inner.send(t).map_err(SendError)
975     }
976
977     /// Attempts to send a value on this channel without blocking.
978     ///
979     /// This method differs from [`send`] by returning immediately if the
980     /// channel's buffer is full or no receiver is waiting to acquire some
981     /// data. Compared with [`send`], this function has two failure cases
982     /// instead of one (one for disconnection, one for a full buffer).
983     ///
984     /// See [`send`] for notes about guarantees of whether the
985     /// receiver has received the data or not if this function is successful.
986     ///
987     /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
988     ///
989     /// # Examples
990     ///
991     /// ```rust
992     /// use std::sync::mpsc::sync_channel;
993     /// use std::thread;
994     ///
995     /// // Create a sync_channel with buffer size 1
996     /// let (sync_sender, receiver) = sync_channel(1);
997     /// let sync_sender2 = sync_sender.clone();
998     ///
999     /// // First thread owns sync_sender
1000     /// thread::spawn(move || {
1001     ///     sync_sender.send(1).unwrap();
1002     ///     sync_sender.send(2).unwrap();
1003     ///     // Thread blocked
1004     /// });
1005     ///
1006     /// // Second thread owns sync_sender2
1007     /// thread::spawn(move || {
1008     ///     // This will return an error and send
1009     ///     // no message if the buffer is full
1010     ///     let _ = sync_sender2.try_send(3);
1011     /// });
1012     ///
1013     /// let mut msg;
1014     /// msg = receiver.recv().unwrap();
1015     /// println!("message {} received", msg);
1016     ///
1017     /// msg = receiver.recv().unwrap();
1018     /// println!("message {} received", msg);
1019     ///
1020     /// // Third message may have never been sent
1021     /// match receiver.try_recv() {
1022     ///     Ok(msg) => println!("message {} received", msg),
1023     ///     Err(_) => println!("the third message was never sent"),
1024     /// }
1025     /// ```
1026     #[stable(feature = "rust1", since = "1.0.0")]
1027     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
1028         self.inner.try_send(t)
1029     }
1030 }
1031
1032 #[stable(feature = "rust1", since = "1.0.0")]
1033 impl<T> Clone for SyncSender<T> {
1034     fn clone(&self) -> SyncSender<T> {
1035         self.inner.clone_chan();
1036         SyncSender::new(self.inner.clone())
1037     }
1038 }
1039
1040 #[stable(feature = "rust1", since = "1.0.0")]
1041 impl<T> Drop for SyncSender<T> {
1042     fn drop(&mut self) {
1043         self.inner.drop_chan();
1044     }
1045 }
1046
1047 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1048 impl<T> fmt::Debug for SyncSender<T> {
1049     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1050         f.debug_struct("SyncSender").finish()
1051     }
1052 }
1053
1054 ////////////////////////////////////////////////////////////////////////////////
1055 // Receiver
1056 ////////////////////////////////////////////////////////////////////////////////
1057
1058 impl<T> Receiver<T> {
1059     fn new(inner: Flavor<T>) -> Receiver<T> {
1060         Receiver { inner: UnsafeCell::new(inner) }
1061     }
1062
1063     /// Attempts to return a pending value on this receiver without blocking.
1064     ///
1065     /// This method will never block the caller in order to wait for data to
1066     /// become available. Instead, this will always return immediately with a
1067     /// possible option of pending data on the channel.
1068     ///
1069     /// This is useful for a flavor of "optimistic check" before deciding to
1070     /// block on a receiver.
1071     ///
1072     /// Compared with [`recv`], this function has two failure cases instead of one
1073     /// (one for disconnection, one for an empty buffer).
1074     ///
1075     /// [`recv`]: struct.Receiver.html#method.recv
1076     ///
1077     /// # Examples
1078     ///
1079     /// ```rust
1080     /// use std::sync::mpsc::{Receiver, channel};
1081     ///
1082     /// let (_, receiver): (_, Receiver<i32>) = channel();
1083     ///
1084     /// assert!(receiver.try_recv().is_err());
1085     /// ```
1086     #[stable(feature = "rust1", since = "1.0.0")]
1087     pub fn try_recv(&self) -> Result<T, TryRecvError> {
1088         loop {
1089             let new_port = match *unsafe { self.inner() } {
1090                 Flavor::Oneshot(ref p) => {
1091                     match p.try_recv() {
1092                         Ok(t) => return Ok(t),
1093                         Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1094                         Err(oneshot::Disconnected) => {
1095                             return Err(TryRecvError::Disconnected)
1096                         }
1097                         Err(oneshot::Upgraded(rx)) => rx,
1098                     }
1099                 }
1100                 Flavor::Stream(ref p) => {
1101                     match p.try_recv() {
1102                         Ok(t) => return Ok(t),
1103                         Err(stream::Empty) => return Err(TryRecvError::Empty),
1104                         Err(stream::Disconnected) => {
1105                             return Err(TryRecvError::Disconnected)
1106                         }
1107                         Err(stream::Upgraded(rx)) => rx,
1108                     }
1109                 }
1110                 Flavor::Shared(ref p) => {
1111                     match p.try_recv() {
1112                         Ok(t) => return Ok(t),
1113                         Err(shared::Empty) => return Err(TryRecvError::Empty),
1114                         Err(shared::Disconnected) => {
1115                             return Err(TryRecvError::Disconnected)
1116                         }
1117                     }
1118                 }
1119                 Flavor::Sync(ref p) => {
1120                     match p.try_recv() {
1121                         Ok(t) => return Ok(t),
1122                         Err(sync::Empty) => return Err(TryRecvError::Empty),
1123                         Err(sync::Disconnected) => {
1124                             return Err(TryRecvError::Disconnected)
1125                         }
1126                     }
1127                 }
1128             };
1129             unsafe {
1130                 mem::swap(self.inner_mut(),
1131                           new_port.inner_mut());
1132             }
1133         }
1134     }
1135
1136     /// Attempts to wait for a value on this receiver, returning an error if the
1137     /// corresponding channel has hung up.
1138     ///
1139     /// This function will always block the current thread if there is no data
1140     /// available and it's possible for more data to be sent. Once a message is
1141     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1142     /// receiver will wake up and return that message.
1143     ///
1144     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1145     /// this call is blocking, this call will wake up and return [`Err`] to
1146     /// indicate that no more messages can ever be received on this channel.
1147     /// However, since channels are buffered, messages sent before the disconnect
1148     /// will still be properly received.
1149     ///
1150     /// [`Sender`]: struct.Sender.html
1151     /// [`SyncSender`]: struct.SyncSender.html
1152     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1153     ///
1154     /// # Examples
1155     ///
1156     /// ```
1157     /// use std::sync::mpsc;
1158     /// use std::thread;
1159     ///
1160     /// let (send, recv) = mpsc::channel();
1161     /// let handle = thread::spawn(move || {
1162     ///     send.send(1u8).unwrap();
1163     /// });
1164     ///
1165     /// handle.join().unwrap();
1166     ///
1167     /// assert_eq!(Ok(1), recv.recv());
1168     /// ```
1169     ///
1170     /// Buffering behavior:
1171     ///
1172     /// ```
1173     /// use std::sync::mpsc;
1174     /// use std::thread;
1175     /// use std::sync::mpsc::RecvError;
1176     ///
1177     /// let (send, recv) = mpsc::channel();
1178     /// let handle = thread::spawn(move || {
1179     ///     send.send(1u8).unwrap();
1180     ///     send.send(2).unwrap();
1181     ///     send.send(3).unwrap();
1182     ///     drop(send);
1183     /// });
1184     ///
1185     /// // wait for the thread to join so we ensure the sender is dropped
1186     /// handle.join().unwrap();
1187     ///
1188     /// assert_eq!(Ok(1), recv.recv());
1189     /// assert_eq!(Ok(2), recv.recv());
1190     /// assert_eq!(Ok(3), recv.recv());
1191     /// assert_eq!(Err(RecvError), recv.recv());
1192     /// ```
1193     #[stable(feature = "rust1", since = "1.0.0")]
1194     pub fn recv(&self) -> Result<T, RecvError> {
1195         loop {
1196             let new_port = match *unsafe { self.inner() } {
1197                 Flavor::Oneshot(ref p) => {
1198                     match p.recv(None) {
1199                         Ok(t) => return Ok(t),
1200                         Err(oneshot::Disconnected) => return Err(RecvError),
1201                         Err(oneshot::Upgraded(rx)) => rx,
1202                         Err(oneshot::Empty) => unreachable!(),
1203                     }
1204                 }
1205                 Flavor::Stream(ref p) => {
1206                     match p.recv(None) {
1207                         Ok(t) => return Ok(t),
1208                         Err(stream::Disconnected) => return Err(RecvError),
1209                         Err(stream::Upgraded(rx)) => rx,
1210                         Err(stream::Empty) => unreachable!(),
1211                     }
1212                 }
1213                 Flavor::Shared(ref p) => {
1214                     match p.recv(None) {
1215                         Ok(t) => return Ok(t),
1216                         Err(shared::Disconnected) => return Err(RecvError),
1217                         Err(shared::Empty) => unreachable!(),
1218                     }
1219                 }
1220                 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1221             };
1222             unsafe {
1223                 mem::swap(self.inner_mut(), new_port.inner_mut());
1224             }
1225         }
1226     }
1227
1228     /// Attempts to wait for a value on this receiver, returning an error if the
1229     /// corresponding channel has hung up, or if it waits more than `timeout`.
1230     ///
1231     /// This function will always block the current thread if there is no data
1232     /// available and it's possible for more data to be sent. Once a message is
1233     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1234     /// receiver will wake up and return that message.
1235     ///
1236     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1237     /// this call is blocking, this call will wake up and return [`Err`] to
1238     /// indicate that no more messages can ever be received on this channel.
1239     /// However, since channels are buffered, messages sent before the disconnect
1240     /// will still be properly received.
1241     ///
1242     /// [`Sender`]: struct.Sender.html
1243     /// [`SyncSender`]: struct.SyncSender.html
1244     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1245     ///
1246     /// # Known Issues
1247     ///
1248     /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
1249     /// to panic unexpectedly with the following example:
1250     ///
1251     /// ```no_run
1252     /// use std::sync::mpsc::channel;
1253     /// use std::thread;
1254     /// use std::time::Duration;
1255     ///
1256     /// let (tx, rx) = channel::<String>();
1257     ///
1258     /// thread::spawn(move || {
1259     ///     let d = Duration::from_millis(10);
1260     ///     loop {
1261     ///         println!("recv");
1262     ///         let _r = rx.recv_timeout(d);
1263     ///     }
1264     /// });
1265     ///
1266     /// thread::sleep(Duration::from_millis(100));
1267     /// let _c1 = tx.clone();
1268     ///
1269     /// thread::sleep(Duration::from_secs(1));
1270     /// ```
1271     ///
1272     /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
1273     ///
1274     /// # Examples
1275     ///
1276     /// Successfully receiving value before encountering timeout:
1277     ///
1278     /// ```no_run
1279     /// use std::thread;
1280     /// use std::time::Duration;
1281     /// use std::sync::mpsc;
1282     ///
1283     /// let (send, recv) = mpsc::channel();
1284     ///
1285     /// thread::spawn(move || {
1286     ///     send.send('a').unwrap();
1287     /// });
1288     ///
1289     /// assert_eq!(
1290     ///     recv.recv_timeout(Duration::from_millis(400)),
1291     ///     Ok('a')
1292     /// );
1293     /// ```
1294     ///
1295     /// Receiving an error upon reaching timeout:
1296     ///
1297     /// ```no_run
1298     /// use std::thread;
1299     /// use std::time::Duration;
1300     /// use std::sync::mpsc;
1301     ///
1302     /// let (send, recv) = mpsc::channel();
1303     ///
1304     /// thread::spawn(move || {
1305     ///     thread::sleep(Duration::from_millis(800));
1306     ///     send.send('a').unwrap();
1307     /// });
1308     ///
1309     /// assert_eq!(
1310     ///     recv.recv_timeout(Duration::from_millis(400)),
1311     ///     Err(mpsc::RecvTimeoutError::Timeout)
1312     /// );
1313     /// ```
1314     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1315     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1316         // Do an optimistic try_recv to avoid the performance impact of
1317         // Instant::now() in the full-channel case.
1318         match self.try_recv() {
1319             Ok(result) => Ok(result),
1320             Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
1321             Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
1322                 Some(deadline) => self.recv_deadline(deadline),
1323                 // So far in the future that it's practically the same as waiting indefinitely.
1324                 None => self.recv().map_err(RecvTimeoutError::from),
1325             },
1326         }
1327     }
1328
1329     /// Attempts to wait for a value on this receiver, returning an error if the
1330     /// corresponding channel has hung up, or if `deadline` is reached.
1331     ///
1332     /// This function will always block the current thread if there is no data
1333     /// available and it's possible for more data to be sent. Once a message is
1334     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1335     /// receiver will wake up and return that message.
1336     ///
1337     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1338     /// this call is blocking, this call will wake up and return [`Err`] to
1339     /// indicate that no more messages can ever be received on this channel.
1340     /// However, since channels are buffered, messages sent before the disconnect
1341     /// will still be properly received.
1342     ///
1343     /// [`Sender`]: struct.Sender.html
1344     /// [`SyncSender`]: struct.SyncSender.html
1345     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1346     ///
1347     /// # Examples
1348     ///
1349     /// Successfully receiving value before reaching deadline:
1350     ///
1351     /// ```no_run
1352     /// #![feature(deadline_api)]
1353     /// use std::thread;
1354     /// use std::time::{Duration, Instant};
1355     /// use std::sync::mpsc;
1356     ///
1357     /// let (send, recv) = mpsc::channel();
1358     ///
1359     /// thread::spawn(move || {
1360     ///     send.send('a').unwrap();
1361     /// });
1362     ///
1363     /// assert_eq!(
1364     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1365     ///     Ok('a')
1366     /// );
1367     /// ```
1368     ///
1369     /// Receiving an error upon reaching deadline:
1370     ///
1371     /// ```no_run
1372     /// #![feature(deadline_api)]
1373     /// use std::thread;
1374     /// use std::time::{Duration, Instant};
1375     /// use std::sync::mpsc;
1376     ///
1377     /// let (send, recv) = mpsc::channel();
1378     ///
1379     /// thread::spawn(move || {
1380     ///     thread::sleep(Duration::from_millis(800));
1381     ///     send.send('a').unwrap();
1382     /// });
1383     ///
1384     /// assert_eq!(
1385     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1386     ///     Err(mpsc::RecvTimeoutError::Timeout)
1387     /// );
1388     /// ```
1389     #[unstable(feature = "deadline_api", issue = "46316")]
1390     pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1391         use self::RecvTimeoutError::*;
1392
1393         loop {
1394             let port_or_empty = match *unsafe { self.inner() } {
1395                 Flavor::Oneshot(ref p) => {
1396                     match p.recv(Some(deadline)) {
1397                         Ok(t) => return Ok(t),
1398                         Err(oneshot::Disconnected) => return Err(Disconnected),
1399                         Err(oneshot::Upgraded(rx)) => Some(rx),
1400                         Err(oneshot::Empty) => None,
1401                     }
1402                 }
1403                 Flavor::Stream(ref p) => {
1404                     match p.recv(Some(deadline)) {
1405                         Ok(t) => return Ok(t),
1406                         Err(stream::Disconnected) => return Err(Disconnected),
1407                         Err(stream::Upgraded(rx)) => Some(rx),
1408                         Err(stream::Empty) => None,
1409                     }
1410                 }
1411                 Flavor::Shared(ref p) => {
1412                     match p.recv(Some(deadline)) {
1413                         Ok(t) => return Ok(t),
1414                         Err(shared::Disconnected) => return Err(Disconnected),
1415                         Err(shared::Empty) => None,
1416                     }
1417                 }
1418                 Flavor::Sync(ref p) => {
1419                     match p.recv(Some(deadline)) {
1420                         Ok(t) => return Ok(t),
1421                         Err(sync::Disconnected) => return Err(Disconnected),
1422                         Err(sync::Empty) => None,
1423                     }
1424                 }
1425             };
1426
1427             if let Some(new_port) = port_or_empty {
1428                 unsafe {
1429                     mem::swap(self.inner_mut(), new_port.inner_mut());
1430                 }
1431             }
1432
1433             // If we're already passed the deadline, and we're here without
1434             // data, return a timeout, else try again.
1435             if Instant::now() >= deadline {
1436                 return Err(Timeout);
1437             }
1438         }
1439     }
1440
1441     /// Returns an iterator that will block waiting for messages, but never
1442     /// [`panic!`]. It will return [`None`] when the channel has hung up.
1443     ///
1444     /// [`panic!`]: ../../../std/macro.panic.html
1445     /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1446     ///
1447     /// # Examples
1448     ///
1449     /// ```rust
1450     /// use std::sync::mpsc::channel;
1451     /// use std::thread;
1452     ///
1453     /// let (send, recv) = channel();
1454     ///
1455     /// thread::spawn(move || {
1456     ///     send.send(1).unwrap();
1457     ///     send.send(2).unwrap();
1458     ///     send.send(3).unwrap();
1459     /// });
1460     ///
1461     /// let mut iter = recv.iter();
1462     /// assert_eq!(iter.next(), Some(1));
1463     /// assert_eq!(iter.next(), Some(2));
1464     /// assert_eq!(iter.next(), Some(3));
1465     /// assert_eq!(iter.next(), None);
1466     /// ```
1467     #[stable(feature = "rust1", since = "1.0.0")]
1468     pub fn iter(&self) -> Iter<'_, T> {
1469         Iter { rx: self }
1470     }
1471
1472     /// Returns an iterator that will attempt to yield all pending values.
1473     /// It will return `None` if there are no more pending values or if the
1474     /// channel has hung up. The iterator will never [`panic!`] or block the
1475     /// user by waiting for values.
1476     ///
1477     /// [`panic!`]: ../../../std/macro.panic.html
1478     ///
1479     /// # Examples
1480     ///
1481     /// ```no_run
1482     /// use std::sync::mpsc::channel;
1483     /// use std::thread;
1484     /// use std::time::Duration;
1485     ///
1486     /// let (sender, receiver) = channel();
1487     ///
1488     /// // nothing is in the buffer yet
1489     /// assert!(receiver.try_iter().next().is_none());
1490     ///
1491     /// thread::spawn(move || {
1492     ///     thread::sleep(Duration::from_secs(1));
1493     ///     sender.send(1).unwrap();
1494     ///     sender.send(2).unwrap();
1495     ///     sender.send(3).unwrap();
1496     /// });
1497     ///
1498     /// // nothing is in the buffer yet
1499     /// assert!(receiver.try_iter().next().is_none());
1500     ///
1501     /// // block for two seconds
1502     /// thread::sleep(Duration::from_secs(2));
1503     ///
1504     /// let mut iter = receiver.try_iter();
1505     /// assert_eq!(iter.next(), Some(1));
1506     /// assert_eq!(iter.next(), Some(2));
1507     /// assert_eq!(iter.next(), Some(3));
1508     /// assert_eq!(iter.next(), None);
1509     /// ```
1510     #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1511     pub fn try_iter(&self) -> TryIter<'_, T> {
1512         TryIter { rx: self }
1513     }
1514
1515 }
1516
1517 impl<T> select::Packet for Receiver<T> {
1518     fn can_recv(&self) -> bool {
1519         loop {
1520             let new_port = match *unsafe { self.inner() } {
1521                 Flavor::Oneshot(ref p) => {
1522                     match p.can_recv() {
1523                         Ok(ret) => return ret,
1524                         Err(upgrade) => upgrade,
1525                     }
1526                 }
1527                 Flavor::Stream(ref p) => {
1528                     match p.can_recv() {
1529                         Ok(ret) => return ret,
1530                         Err(upgrade) => upgrade,
1531                     }
1532                 }
1533                 Flavor::Shared(ref p) => return p.can_recv(),
1534                 Flavor::Sync(ref p) => return p.can_recv(),
1535             };
1536             unsafe {
1537                 mem::swap(self.inner_mut(),
1538                           new_port.inner_mut());
1539             }
1540         }
1541     }
1542
1543     fn start_selection(&self, mut token: SignalToken) -> StartResult {
1544         loop {
1545             let (t, new_port) = match *unsafe { self.inner() } {
1546                 Flavor::Oneshot(ref p) => {
1547                     match p.start_selection(token) {
1548                         oneshot::SelSuccess => return Installed,
1549                         oneshot::SelCanceled => return Abort,
1550                         oneshot::SelUpgraded(t, rx) => (t, rx),
1551                     }
1552                 }
1553                 Flavor::Stream(ref p) => {
1554                     match p.start_selection(token) {
1555                         stream::SelSuccess => return Installed,
1556                         stream::SelCanceled => return Abort,
1557                         stream::SelUpgraded(t, rx) => (t, rx),
1558                     }
1559                 }
1560                 Flavor::Shared(ref p) => return p.start_selection(token),
1561                 Flavor::Sync(ref p) => return p.start_selection(token),
1562             };
1563             token = t;
1564             unsafe {
1565                 mem::swap(self.inner_mut(), new_port.inner_mut());
1566             }
1567         }
1568     }
1569
1570     fn abort_selection(&self) -> bool {
1571         let mut was_upgrade = false;
1572         loop {
1573             let result = match *unsafe { self.inner() } {
1574                 Flavor::Oneshot(ref p) => p.abort_selection(),
1575                 Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
1576                 Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
1577                 Flavor::Sync(ref p) => return p.abort_selection(),
1578             };
1579             let new_port = match result { Ok(b) => return b, Err(p) => p };
1580             was_upgrade = true;
1581             unsafe {
1582                 mem::swap(self.inner_mut(),
1583                           new_port.inner_mut());
1584             }
1585         }
1586     }
1587 }
1588
1589 #[stable(feature = "rust1", since = "1.0.0")]
1590 impl<'a, T> Iterator for Iter<'a, T> {
1591     type Item = T;
1592
1593     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1594 }
1595
1596 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1597 impl<'a, T> Iterator for TryIter<'a, T> {
1598     type Item = T;
1599
1600     fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
1601 }
1602
1603 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1604 impl<'a, T> IntoIterator for &'a Receiver<T> {
1605     type Item = T;
1606     type IntoIter = Iter<'a, T>;
1607
1608     fn into_iter(self) -> Iter<'a, T> { self.iter() }
1609 }
1610
1611 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1612 impl<T> Iterator for IntoIter<T> {
1613     type Item = T;
1614     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1615 }
1616
1617 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1618 impl <T> IntoIterator for Receiver<T> {
1619     type Item = T;
1620     type IntoIter = IntoIter<T>;
1621
1622     fn into_iter(self) -> IntoIter<T> {
1623         IntoIter { rx: self }
1624     }
1625 }
1626
1627 #[stable(feature = "rust1", since = "1.0.0")]
1628 impl<T> Drop for Receiver<T> {
1629     fn drop(&mut self) {
1630         match *unsafe { self.inner() } {
1631             Flavor::Oneshot(ref p) => p.drop_port(),
1632             Flavor::Stream(ref p) => p.drop_port(),
1633             Flavor::Shared(ref p) => p.drop_port(),
1634             Flavor::Sync(ref p) => p.drop_port(),
1635         }
1636     }
1637 }
1638
1639 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1640 impl<T> fmt::Debug for Receiver<T> {
1641     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1642         f.debug_struct("Receiver").finish()
1643     }
1644 }
1645
1646 #[stable(feature = "rust1", since = "1.0.0")]
1647 impl<T> fmt::Debug for SendError<T> {
1648     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1649         "SendError(..)".fmt(f)
1650     }
1651 }
1652
1653 #[stable(feature = "rust1", since = "1.0.0")]
1654 impl<T> fmt::Display for SendError<T> {
1655     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1656         "sending on a closed channel".fmt(f)
1657     }
1658 }
1659
1660 #[stable(feature = "rust1", since = "1.0.0")]
1661 impl<T: Send> error::Error for SendError<T> {
1662     fn description(&self) -> &str {
1663         "sending on a closed channel"
1664     }
1665
1666     fn cause(&self) -> Option<&dyn error::Error> {
1667         None
1668     }
1669 }
1670
1671 #[stable(feature = "rust1", since = "1.0.0")]
1672 impl<T> fmt::Debug for TrySendError<T> {
1673     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1674         match *self {
1675             TrySendError::Full(..) => "Full(..)".fmt(f),
1676             TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1677         }
1678     }
1679 }
1680
1681 #[stable(feature = "rust1", since = "1.0.0")]
1682 impl<T> fmt::Display for TrySendError<T> {
1683     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1684         match *self {
1685             TrySendError::Full(..) => {
1686                 "sending on a full channel".fmt(f)
1687             }
1688             TrySendError::Disconnected(..) => {
1689                 "sending on a closed channel".fmt(f)
1690             }
1691         }
1692     }
1693 }
1694
1695 #[stable(feature = "rust1", since = "1.0.0")]
1696 impl<T: Send> error::Error for TrySendError<T> {
1697
1698     fn description(&self) -> &str {
1699         match *self {
1700             TrySendError::Full(..) => {
1701                 "sending on a full channel"
1702             }
1703             TrySendError::Disconnected(..) => {
1704                 "sending on a closed channel"
1705             }
1706         }
1707     }
1708
1709     fn cause(&self) -> Option<&dyn error::Error> {
1710         None
1711     }
1712 }
1713
1714 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1715 impl<T> From<SendError<T>> for TrySendError<T> {
1716     fn from(err: SendError<T>) -> TrySendError<T> {
1717         match err {
1718             SendError(t) => TrySendError::Disconnected(t),
1719         }
1720     }
1721 }
1722
1723 #[stable(feature = "rust1", since = "1.0.0")]
1724 impl fmt::Display for RecvError {
1725     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1726         "receiving on a closed channel".fmt(f)
1727     }
1728 }
1729
1730 #[stable(feature = "rust1", since = "1.0.0")]
1731 impl error::Error for RecvError {
1732
1733     fn description(&self) -> &str {
1734         "receiving on a closed channel"
1735     }
1736
1737     fn cause(&self) -> Option<&dyn error::Error> {
1738         None
1739     }
1740 }
1741
1742 #[stable(feature = "rust1", since = "1.0.0")]
1743 impl fmt::Display for TryRecvError {
1744     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1745         match *self {
1746             TryRecvError::Empty => {
1747                 "receiving on an empty channel".fmt(f)
1748             }
1749             TryRecvError::Disconnected => {
1750                 "receiving on a closed channel".fmt(f)
1751             }
1752         }
1753     }
1754 }
1755
1756 #[stable(feature = "rust1", since = "1.0.0")]
1757 impl error::Error for TryRecvError {
1758
1759     fn description(&self) -> &str {
1760         match *self {
1761             TryRecvError::Empty => {
1762                 "receiving on an empty channel"
1763             }
1764             TryRecvError::Disconnected => {
1765                 "receiving on a closed channel"
1766             }
1767         }
1768     }
1769
1770     fn cause(&self) -> Option<&dyn error::Error> {
1771         None
1772     }
1773 }
1774
1775 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1776 impl From<RecvError> for TryRecvError {
1777     fn from(err: RecvError) -> TryRecvError {
1778         match err {
1779             RecvError => TryRecvError::Disconnected,
1780         }
1781     }
1782 }
1783
1784 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1785 impl fmt::Display for RecvTimeoutError {
1786     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1787         match *self {
1788             RecvTimeoutError::Timeout => {
1789                 "timed out waiting on channel".fmt(f)
1790             }
1791             RecvTimeoutError::Disconnected => {
1792                 "channel is empty and sending half is closed".fmt(f)
1793             }
1794         }
1795     }
1796 }
1797
1798 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1799 impl error::Error for RecvTimeoutError {
1800     fn description(&self) -> &str {
1801         match *self {
1802             RecvTimeoutError::Timeout => {
1803                 "timed out waiting on channel"
1804             }
1805             RecvTimeoutError::Disconnected => {
1806                 "channel is empty and sending half is closed"
1807             }
1808         }
1809     }
1810
1811     fn cause(&self) -> Option<&dyn error::Error> {
1812         None
1813     }
1814 }
1815
1816 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1817 impl From<RecvError> for RecvTimeoutError {
1818     fn from(err: RecvError) -> RecvTimeoutError {
1819         match err {
1820             RecvError => RecvTimeoutError::Disconnected,
1821         }
1822     }
1823 }
1824
1825 #[cfg(all(test, not(target_os = "emscripten")))]
1826 mod tests {
1827     use super::*;
1828     use crate::env;
1829     use crate::thread;
1830     use crate::time::{Duration, Instant};
1831
1832     pub fn stress_factor() -> usize {
1833         match env::var("RUST_TEST_STRESS") {
1834             Ok(val) => val.parse().unwrap(),
1835             Err(..) => 1,
1836         }
1837     }
1838
1839     #[test]
1840     fn smoke() {
1841         let (tx, rx) = channel::<i32>();
1842         tx.send(1).unwrap();
1843         assert_eq!(rx.recv().unwrap(), 1);
1844     }
1845
1846     #[test]
1847     fn drop_full() {
1848         let (tx, _rx) = channel::<Box<isize>>();
1849         tx.send(box 1).unwrap();
1850     }
1851
1852     #[test]
1853     fn drop_full_shared() {
1854         let (tx, _rx) = channel::<Box<isize>>();
1855         drop(tx.clone());
1856         drop(tx.clone());
1857         tx.send(box 1).unwrap();
1858     }
1859
1860     #[test]
1861     fn smoke_shared() {
1862         let (tx, rx) = channel::<i32>();
1863         tx.send(1).unwrap();
1864         assert_eq!(rx.recv().unwrap(), 1);
1865         let tx = tx.clone();
1866         tx.send(1).unwrap();
1867         assert_eq!(rx.recv().unwrap(), 1);
1868     }
1869
1870     #[test]
1871     fn smoke_threads() {
1872         let (tx, rx) = channel::<i32>();
1873         let _t = thread::spawn(move|| {
1874             tx.send(1).unwrap();
1875         });
1876         assert_eq!(rx.recv().unwrap(), 1);
1877     }
1878
1879     #[test]
1880     fn smoke_port_gone() {
1881         let (tx, rx) = channel::<i32>();
1882         drop(rx);
1883         assert!(tx.send(1).is_err());
1884     }
1885
1886     #[test]
1887     fn smoke_shared_port_gone() {
1888         let (tx, rx) = channel::<i32>();
1889         drop(rx);
1890         assert!(tx.send(1).is_err())
1891     }
1892
1893     #[test]
1894     fn smoke_shared_port_gone2() {
1895         let (tx, rx) = channel::<i32>();
1896         drop(rx);
1897         let tx2 = tx.clone();
1898         drop(tx);
1899         assert!(tx2.send(1).is_err());
1900     }
1901
1902     #[test]
1903     fn port_gone_concurrent() {
1904         let (tx, rx) = channel::<i32>();
1905         let _t = thread::spawn(move|| {
1906             rx.recv().unwrap();
1907         });
1908         while tx.send(1).is_ok() {}
1909     }
1910
1911     #[test]
1912     fn port_gone_concurrent_shared() {
1913         let (tx, rx) = channel::<i32>();
1914         let tx2 = tx.clone();
1915         let _t = thread::spawn(move|| {
1916             rx.recv().unwrap();
1917         });
1918         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1919     }
1920
1921     #[test]
1922     fn smoke_chan_gone() {
1923         let (tx, rx) = channel::<i32>();
1924         drop(tx);
1925         assert!(rx.recv().is_err());
1926     }
1927
1928     #[test]
1929     fn smoke_chan_gone_shared() {
1930         let (tx, rx) = channel::<()>();
1931         let tx2 = tx.clone();
1932         drop(tx);
1933         drop(tx2);
1934         assert!(rx.recv().is_err());
1935     }
1936
1937     #[test]
1938     fn chan_gone_concurrent() {
1939         let (tx, rx) = channel::<i32>();
1940         let _t = thread::spawn(move|| {
1941             tx.send(1).unwrap();
1942             tx.send(1).unwrap();
1943         });
1944         while rx.recv().is_ok() {}
1945     }
1946
1947     #[test]
1948     fn stress() {
1949         let (tx, rx) = channel::<i32>();
1950         let t = thread::spawn(move|| {
1951             for _ in 0..10000 { tx.send(1).unwrap(); }
1952         });
1953         for _ in 0..10000 {
1954             assert_eq!(rx.recv().unwrap(), 1);
1955         }
1956         t.join().ok().expect("thread panicked");
1957     }
1958
1959     #[test]
1960     fn stress_shared() {
1961         const AMT: u32 = 10000;
1962         const NTHREADS: u32 = 8;
1963         let (tx, rx) = channel::<i32>();
1964
1965         let t = thread::spawn(move|| {
1966             for _ in 0..AMT * NTHREADS {
1967                 assert_eq!(rx.recv().unwrap(), 1);
1968             }
1969             match rx.try_recv() {
1970                 Ok(..) => panic!(),
1971                 _ => {}
1972             }
1973         });
1974
1975         for _ in 0..NTHREADS {
1976             let tx = tx.clone();
1977             thread::spawn(move|| {
1978                 for _ in 0..AMT { tx.send(1).unwrap(); }
1979             });
1980         }
1981         drop(tx);
1982         t.join().ok().expect("thread panicked");
1983     }
1984
1985     #[test]
1986     fn send_from_outside_runtime() {
1987         let (tx1, rx1) = channel::<()>();
1988         let (tx2, rx2) = channel::<i32>();
1989         let t1 = thread::spawn(move|| {
1990             tx1.send(()).unwrap();
1991             for _ in 0..40 {
1992                 assert_eq!(rx2.recv().unwrap(), 1);
1993             }
1994         });
1995         rx1.recv().unwrap();
1996         let t2 = thread::spawn(move|| {
1997             for _ in 0..40 {
1998                 tx2.send(1).unwrap();
1999             }
2000         });
2001         t1.join().ok().expect("thread panicked");
2002         t2.join().ok().expect("thread panicked");
2003     }
2004
2005     #[test]
2006     fn recv_from_outside_runtime() {
2007         let (tx, rx) = channel::<i32>();
2008         let t = thread::spawn(move|| {
2009             for _ in 0..40 {
2010                 assert_eq!(rx.recv().unwrap(), 1);
2011             }
2012         });
2013         for _ in 0..40 {
2014             tx.send(1).unwrap();
2015         }
2016         t.join().ok().expect("thread panicked");
2017     }
2018
2019     #[test]
2020     fn no_runtime() {
2021         let (tx1, rx1) = channel::<i32>();
2022         let (tx2, rx2) = channel::<i32>();
2023         let t1 = thread::spawn(move|| {
2024             assert_eq!(rx1.recv().unwrap(), 1);
2025             tx2.send(2).unwrap();
2026         });
2027         let t2 = thread::spawn(move|| {
2028             tx1.send(1).unwrap();
2029             assert_eq!(rx2.recv().unwrap(), 2);
2030         });
2031         t1.join().ok().expect("thread panicked");
2032         t2.join().ok().expect("thread panicked");
2033     }
2034
2035     #[test]
2036     fn oneshot_single_thread_close_port_first() {
2037         // Simple test of closing without sending
2038         let (_tx, rx) = channel::<i32>();
2039         drop(rx);
2040     }
2041
2042     #[test]
2043     fn oneshot_single_thread_close_chan_first() {
2044         // Simple test of closing without sending
2045         let (tx, _rx) = channel::<i32>();
2046         drop(tx);
2047     }
2048
2049     #[test]
2050     fn oneshot_single_thread_send_port_close() {
2051         // Testing that the sender cleans up the payload if receiver is closed
2052         let (tx, rx) = channel::<Box<i32>>();
2053         drop(rx);
2054         assert!(tx.send(box 0).is_err());
2055     }
2056
2057     #[test]
2058     fn oneshot_single_thread_recv_chan_close() {
2059         // Receiving on a closed chan will panic
2060         let res = thread::spawn(move|| {
2061             let (tx, rx) = channel::<i32>();
2062             drop(tx);
2063             rx.recv().unwrap();
2064         }).join();
2065         // What is our res?
2066         assert!(res.is_err());
2067     }
2068
2069     #[test]
2070     fn oneshot_single_thread_send_then_recv() {
2071         let (tx, rx) = channel::<Box<i32>>();
2072         tx.send(box 10).unwrap();
2073         assert!(*rx.recv().unwrap() == 10);
2074     }
2075
2076     #[test]
2077     fn oneshot_single_thread_try_send_open() {
2078         let (tx, rx) = channel::<i32>();
2079         assert!(tx.send(10).is_ok());
2080         assert!(rx.recv().unwrap() == 10);
2081     }
2082
2083     #[test]
2084     fn oneshot_single_thread_try_send_closed() {
2085         let (tx, rx) = channel::<i32>();
2086         drop(rx);
2087         assert!(tx.send(10).is_err());
2088     }
2089
2090     #[test]
2091     fn oneshot_single_thread_try_recv_open() {
2092         let (tx, rx) = channel::<i32>();
2093         tx.send(10).unwrap();
2094         assert!(rx.recv() == Ok(10));
2095     }
2096
2097     #[test]
2098     fn oneshot_single_thread_try_recv_closed() {
2099         let (tx, rx) = channel::<i32>();
2100         drop(tx);
2101         assert!(rx.recv().is_err());
2102     }
2103
2104     #[test]
2105     fn oneshot_single_thread_peek_data() {
2106         let (tx, rx) = channel::<i32>();
2107         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2108         tx.send(10).unwrap();
2109         assert_eq!(rx.try_recv(), Ok(10));
2110     }
2111
2112     #[test]
2113     fn oneshot_single_thread_peek_close() {
2114         let (tx, rx) = channel::<i32>();
2115         drop(tx);
2116         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2117         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2118     }
2119
2120     #[test]
2121     fn oneshot_single_thread_peek_open() {
2122         let (_tx, rx) = channel::<i32>();
2123         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2124     }
2125
2126     #[test]
2127     fn oneshot_multi_task_recv_then_send() {
2128         let (tx, rx) = channel::<Box<i32>>();
2129         let _t = thread::spawn(move|| {
2130             assert!(*rx.recv().unwrap() == 10);
2131         });
2132
2133         tx.send(box 10).unwrap();
2134     }
2135
2136     #[test]
2137     fn oneshot_multi_task_recv_then_close() {
2138         let (tx, rx) = channel::<Box<i32>>();
2139         let _t = thread::spawn(move|| {
2140             drop(tx);
2141         });
2142         let res = thread::spawn(move|| {
2143             assert!(*rx.recv().unwrap() == 10);
2144         }).join();
2145         assert!(res.is_err());
2146     }
2147
2148     #[test]
2149     fn oneshot_multi_thread_close_stress() {
2150         for _ in 0..stress_factor() {
2151             let (tx, rx) = channel::<i32>();
2152             let _t = thread::spawn(move|| {
2153                 drop(rx);
2154             });
2155             drop(tx);
2156         }
2157     }
2158
2159     #[test]
2160     fn oneshot_multi_thread_send_close_stress() {
2161         for _ in 0..stress_factor() {
2162             let (tx, rx) = channel::<i32>();
2163             let _t = thread::spawn(move|| {
2164                 drop(rx);
2165             });
2166             let _ = thread::spawn(move|| {
2167                 tx.send(1).unwrap();
2168             }).join();
2169         }
2170     }
2171
2172     #[test]
2173     fn oneshot_multi_thread_recv_close_stress() {
2174         for _ in 0..stress_factor() {
2175             let (tx, rx) = channel::<i32>();
2176             thread::spawn(move|| {
2177                 let res = thread::spawn(move|| {
2178                     rx.recv().unwrap();
2179                 }).join();
2180                 assert!(res.is_err());
2181             });
2182             let _t = thread::spawn(move|| {
2183                 thread::spawn(move|| {
2184                     drop(tx);
2185                 });
2186             });
2187         }
2188     }
2189
2190     #[test]
2191     fn oneshot_multi_thread_send_recv_stress() {
2192         for _ in 0..stress_factor() {
2193             let (tx, rx) = channel::<Box<isize>>();
2194             let _t = thread::spawn(move|| {
2195                 tx.send(box 10).unwrap();
2196             });
2197             assert!(*rx.recv().unwrap() == 10);
2198         }
2199     }
2200
2201     #[test]
2202     fn stream_send_recv_stress() {
2203         for _ in 0..stress_factor() {
2204             let (tx, rx) = channel();
2205
2206             send(tx, 0);
2207             recv(rx, 0);
2208
2209             fn send(tx: Sender<Box<i32>>, i: i32) {
2210                 if i == 10 { return }
2211
2212                 thread::spawn(move|| {
2213                     tx.send(box i).unwrap();
2214                     send(tx, i + 1);
2215                 });
2216             }
2217
2218             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2219                 if i == 10 { return }
2220
2221                 thread::spawn(move|| {
2222                     assert!(*rx.recv().unwrap() == i);
2223                     recv(rx, i + 1);
2224                 });
2225             }
2226         }
2227     }
2228
2229     #[test]
2230     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2231     fn oneshot_single_thread_recv_timeout() {
2232         let (tx, rx) = channel();
2233         tx.send(()).unwrap();
2234         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2235         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2236         tx.send(()).unwrap();
2237         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2238     }
2239
2240     #[test]
2241     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2242     fn stress_recv_timeout_two_threads() {
2243         let (tx, rx) = channel();
2244         let stress = stress_factor() + 100;
2245         let timeout = Duration::from_millis(100);
2246
2247         thread::spawn(move || {
2248             for i in 0..stress {
2249                 if i % 2 == 0 {
2250                     thread::sleep(timeout * 2);
2251                 }
2252                 tx.send(1usize).unwrap();
2253             }
2254         });
2255
2256         let mut recv_count = 0;
2257         loop {
2258             match rx.recv_timeout(timeout) {
2259                 Ok(n) => {
2260                     assert_eq!(n, 1usize);
2261                     recv_count += 1;
2262                 }
2263                 Err(RecvTimeoutError::Timeout) => continue,
2264                 Err(RecvTimeoutError::Disconnected) => break,
2265             }
2266         }
2267
2268         assert_eq!(recv_count, stress);
2269     }
2270
2271     #[test]
2272     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2273     fn recv_timeout_upgrade() {
2274         let (tx, rx) = channel::<()>();
2275         let timeout = Duration::from_millis(1);
2276         let _tx_clone = tx.clone();
2277
2278         let start = Instant::now();
2279         assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
2280         assert!(Instant::now() >= start + timeout);
2281     }
2282
2283     #[test]
2284     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2285     fn stress_recv_timeout_shared() {
2286         let (tx, rx) = channel();
2287         let stress = stress_factor() + 100;
2288
2289         for i in 0..stress {
2290             let tx = tx.clone();
2291             thread::spawn(move || {
2292                 thread::sleep(Duration::from_millis(i as u64 * 10));
2293                 tx.send(1usize).unwrap();
2294             });
2295         }
2296
2297         drop(tx);
2298
2299         let mut recv_count = 0;
2300         loop {
2301             match rx.recv_timeout(Duration::from_millis(10)) {
2302                 Ok(n) => {
2303                     assert_eq!(n, 1usize);
2304                     recv_count += 1;
2305                 }
2306                 Err(RecvTimeoutError::Timeout) => continue,
2307                 Err(RecvTimeoutError::Disconnected) => break,
2308             }
2309         }
2310
2311         assert_eq!(recv_count, stress);
2312     }
2313
2314     #[test]
2315     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2316     fn very_long_recv_timeout_wont_panic() {
2317         let (tx, rx) = channel::<()>();
2318         let join_handle = thread::spawn(move || {
2319             rx.recv_timeout(Duration::from_secs(u64::max_value()))
2320         });
2321         thread::sleep(Duration::from_secs(1));
2322         assert!(tx.send(()).is_ok());
2323         assert_eq!(join_handle.join().unwrap(), Ok(()));
2324     }
2325
2326     #[test]
2327     fn recv_a_lot() {
2328         // Regression test that we don't run out of stack in scheduler context
2329         let (tx, rx) = channel();
2330         for _ in 0..10000 { tx.send(()).unwrap(); }
2331         for _ in 0..10000 { rx.recv().unwrap(); }
2332     }
2333
2334     #[test]
2335     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2336     fn shared_recv_timeout() {
2337         let (tx, rx) = channel();
2338         let total = 5;
2339         for _ in 0..total {
2340             let tx = tx.clone();
2341             thread::spawn(move|| {
2342                 tx.send(()).unwrap();
2343             });
2344         }
2345
2346         for _ in 0..total { rx.recv().unwrap(); }
2347
2348         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2349         tx.send(()).unwrap();
2350         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2351     }
2352
2353     #[test]
2354     fn shared_chan_stress() {
2355         let (tx, rx) = channel();
2356         let total = stress_factor() + 100;
2357         for _ in 0..total {
2358             let tx = tx.clone();
2359             thread::spawn(move|| {
2360                 tx.send(()).unwrap();
2361             });
2362         }
2363
2364         for _ in 0..total {
2365             rx.recv().unwrap();
2366         }
2367     }
2368
2369     #[test]
2370     fn test_nested_recv_iter() {
2371         let (tx, rx) = channel::<i32>();
2372         let (total_tx, total_rx) = channel::<i32>();
2373
2374         let _t = thread::spawn(move|| {
2375             let mut acc = 0;
2376             for x in rx.iter() {
2377                 acc += x;
2378             }
2379             total_tx.send(acc).unwrap();
2380         });
2381
2382         tx.send(3).unwrap();
2383         tx.send(1).unwrap();
2384         tx.send(2).unwrap();
2385         drop(tx);
2386         assert_eq!(total_rx.recv().unwrap(), 6);
2387     }
2388
2389     #[test]
2390     fn test_recv_iter_break() {
2391         let (tx, rx) = channel::<i32>();
2392         let (count_tx, count_rx) = channel();
2393
2394         let _t = thread::spawn(move|| {
2395             let mut count = 0;
2396             for x in rx.iter() {
2397                 if count >= 3 {
2398                     break;
2399                 } else {
2400                     count += x;
2401                 }
2402             }
2403             count_tx.send(count).unwrap();
2404         });
2405
2406         tx.send(2).unwrap();
2407         tx.send(2).unwrap();
2408         tx.send(2).unwrap();
2409         let _ = tx.send(2);
2410         drop(tx);
2411         assert_eq!(count_rx.recv().unwrap(), 4);
2412     }
2413
2414     #[test]
2415     fn test_recv_try_iter() {
2416         let (request_tx, request_rx) = channel();
2417         let (response_tx, response_rx) = channel();
2418
2419         // Request `x`s until we have `6`.
2420         let t = thread::spawn(move|| {
2421             let mut count = 0;
2422             loop {
2423                 for x in response_rx.try_iter() {
2424                     count += x;
2425                     if count == 6 {
2426                         return count;
2427                     }
2428                 }
2429                 request_tx.send(()).unwrap();
2430             }
2431         });
2432
2433         for _ in request_rx.iter() {
2434             if response_tx.send(2).is_err() {
2435                 break;
2436             }
2437         }
2438
2439         assert_eq!(t.join().unwrap(), 6);
2440     }
2441
2442     #[test]
2443     fn test_recv_into_iter_owned() {
2444         let mut iter = {
2445           let (tx, rx) = channel::<i32>();
2446           tx.send(1).unwrap();
2447           tx.send(2).unwrap();
2448
2449           rx.into_iter()
2450         };
2451         assert_eq!(iter.next().unwrap(), 1);
2452         assert_eq!(iter.next().unwrap(), 2);
2453         assert_eq!(iter.next().is_none(), true);
2454     }
2455
2456     #[test]
2457     fn test_recv_into_iter_borrowed() {
2458         let (tx, rx) = channel::<i32>();
2459         tx.send(1).unwrap();
2460         tx.send(2).unwrap();
2461         drop(tx);
2462         let mut iter = (&rx).into_iter();
2463         assert_eq!(iter.next().unwrap(), 1);
2464         assert_eq!(iter.next().unwrap(), 2);
2465         assert_eq!(iter.next().is_none(), true);
2466     }
2467
2468     #[test]
2469     fn try_recv_states() {
2470         let (tx1, rx1) = channel::<i32>();
2471         let (tx2, rx2) = channel::<()>();
2472         let (tx3, rx3) = channel::<()>();
2473         let _t = thread::spawn(move|| {
2474             rx2.recv().unwrap();
2475             tx1.send(1).unwrap();
2476             tx3.send(()).unwrap();
2477             rx2.recv().unwrap();
2478             drop(tx1);
2479             tx3.send(()).unwrap();
2480         });
2481
2482         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2483         tx2.send(()).unwrap();
2484         rx3.recv().unwrap();
2485         assert_eq!(rx1.try_recv(), Ok(1));
2486         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2487         tx2.send(()).unwrap();
2488         rx3.recv().unwrap();
2489         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2490     }
2491
2492     // This bug used to end up in a livelock inside of the Receiver destructor
2493     // because the internal state of the Shared packet was corrupted
2494     #[test]
2495     fn destroy_upgraded_shared_port_when_sender_still_active() {
2496         let (tx, rx) = channel();
2497         let (tx2, rx2) = channel();
2498         let _t = thread::spawn(move|| {
2499             rx.recv().unwrap(); // wait on a oneshot
2500             drop(rx);  // destroy a shared
2501             tx2.send(()).unwrap();
2502         });
2503         // make sure the other thread has gone to sleep
2504         for _ in 0..5000 { thread::yield_now(); }
2505
2506         // upgrade to a shared chan and send a message
2507         let t = tx.clone();
2508         drop(tx);
2509         t.send(()).unwrap();
2510
2511         // wait for the child thread to exit before we exit
2512         rx2.recv().unwrap();
2513     }
2514
2515     #[test]
2516     fn issue_32114() {
2517         let (tx, _) = channel();
2518         let _ = tx.send(123);
2519         assert_eq!(tx.send(123), Err(SendError(123)));
2520     }
2521 }
2522
2523 #[cfg(all(test, not(target_os = "emscripten")))]
2524 mod sync_tests {
2525     use super::*;
2526     use crate::env;
2527     use crate::thread;
2528     use crate::time::Duration;
2529
2530     pub fn stress_factor() -> usize {
2531         match env::var("RUST_TEST_STRESS") {
2532             Ok(val) => val.parse().unwrap(),
2533             Err(..) => 1,
2534         }
2535     }
2536
2537     #[test]
2538     fn smoke() {
2539         let (tx, rx) = sync_channel::<i32>(1);
2540         tx.send(1).unwrap();
2541         assert_eq!(rx.recv().unwrap(), 1);
2542     }
2543
2544     #[test]
2545     fn drop_full() {
2546         let (tx, _rx) = sync_channel::<Box<isize>>(1);
2547         tx.send(box 1).unwrap();
2548     }
2549
2550     #[test]
2551     fn smoke_shared() {
2552         let (tx, rx) = sync_channel::<i32>(1);
2553         tx.send(1).unwrap();
2554         assert_eq!(rx.recv().unwrap(), 1);
2555         let tx = tx.clone();
2556         tx.send(1).unwrap();
2557         assert_eq!(rx.recv().unwrap(), 1);
2558     }
2559
2560     #[test]
2561     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2562     fn recv_timeout() {
2563         let (tx, rx) = sync_channel::<i32>(1);
2564         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2565         tx.send(1).unwrap();
2566         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2567     }
2568
2569     #[test]
2570     fn smoke_threads() {
2571         let (tx, rx) = sync_channel::<i32>(0);
2572         let _t = thread::spawn(move|| {
2573             tx.send(1).unwrap();
2574         });
2575         assert_eq!(rx.recv().unwrap(), 1);
2576     }
2577
2578     #[test]
2579     fn smoke_port_gone() {
2580         let (tx, rx) = sync_channel::<i32>(0);
2581         drop(rx);
2582         assert!(tx.send(1).is_err());
2583     }
2584
2585     #[test]
2586     fn smoke_shared_port_gone2() {
2587         let (tx, rx) = sync_channel::<i32>(0);
2588         drop(rx);
2589         let tx2 = tx.clone();
2590         drop(tx);
2591         assert!(tx2.send(1).is_err());
2592     }
2593
2594     #[test]
2595     fn port_gone_concurrent() {
2596         let (tx, rx) = sync_channel::<i32>(0);
2597         let _t = thread::spawn(move|| {
2598             rx.recv().unwrap();
2599         });
2600         while tx.send(1).is_ok() {}
2601     }
2602
2603     #[test]
2604     fn port_gone_concurrent_shared() {
2605         let (tx, rx) = sync_channel::<i32>(0);
2606         let tx2 = tx.clone();
2607         let _t = thread::spawn(move|| {
2608             rx.recv().unwrap();
2609         });
2610         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2611     }
2612
2613     #[test]
2614     fn smoke_chan_gone() {
2615         let (tx, rx) = sync_channel::<i32>(0);
2616         drop(tx);
2617         assert!(rx.recv().is_err());
2618     }
2619
2620     #[test]
2621     fn smoke_chan_gone_shared() {
2622         let (tx, rx) = sync_channel::<()>(0);
2623         let tx2 = tx.clone();
2624         drop(tx);
2625         drop(tx2);
2626         assert!(rx.recv().is_err());
2627     }
2628
2629     #[test]
2630     fn chan_gone_concurrent() {
2631         let (tx, rx) = sync_channel::<i32>(0);
2632         thread::spawn(move|| {
2633             tx.send(1).unwrap();
2634             tx.send(1).unwrap();
2635         });
2636         while rx.recv().is_ok() {}
2637     }
2638
2639     #[test]
2640     fn stress() {
2641         let (tx, rx) = sync_channel::<i32>(0);
2642         thread::spawn(move|| {
2643             for _ in 0..10000 { tx.send(1).unwrap(); }
2644         });
2645         for _ in 0..10000 {
2646             assert_eq!(rx.recv().unwrap(), 1);
2647         }
2648     }
2649
2650     #[test]
2651     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2652     fn stress_recv_timeout_two_threads() {
2653         let (tx, rx) = sync_channel::<i32>(0);
2654
2655         thread::spawn(move|| {
2656             for _ in 0..10000 { tx.send(1).unwrap(); }
2657         });
2658
2659         let mut recv_count = 0;
2660         loop {
2661             match rx.recv_timeout(Duration::from_millis(1)) {
2662                 Ok(v) => {
2663                     assert_eq!(v, 1);
2664                     recv_count += 1;
2665                 },
2666                 Err(RecvTimeoutError::Timeout) => continue,
2667                 Err(RecvTimeoutError::Disconnected) => break,
2668             }
2669         }
2670
2671         assert_eq!(recv_count, 10000);
2672     }
2673
2674     #[test]
2675     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2676     fn stress_recv_timeout_shared() {
2677         const AMT: u32 = 1000;
2678         const NTHREADS: u32 = 8;
2679         let (tx, rx) = sync_channel::<i32>(0);
2680         let (dtx, drx) = sync_channel::<()>(0);
2681
2682         thread::spawn(move|| {
2683             let mut recv_count = 0;
2684             loop {
2685                 match rx.recv_timeout(Duration::from_millis(10)) {
2686                     Ok(v) => {
2687                         assert_eq!(v, 1);
2688                         recv_count += 1;
2689                     },
2690                     Err(RecvTimeoutError::Timeout) => continue,
2691                     Err(RecvTimeoutError::Disconnected) => break,
2692                 }
2693             }
2694
2695             assert_eq!(recv_count, AMT * NTHREADS);
2696             assert!(rx.try_recv().is_err());
2697
2698             dtx.send(()).unwrap();
2699         });
2700
2701         for _ in 0..NTHREADS {
2702             let tx = tx.clone();
2703             thread::spawn(move|| {
2704                 for _ in 0..AMT { tx.send(1).unwrap(); }
2705             });
2706         }
2707
2708         drop(tx);
2709
2710         drx.recv().unwrap();
2711     }
2712
2713     #[test]
2714     fn stress_shared() {
2715         const AMT: u32 = 1000;
2716         const NTHREADS: u32 = 8;
2717         let (tx, rx) = sync_channel::<i32>(0);
2718         let (dtx, drx) = sync_channel::<()>(0);
2719
2720         thread::spawn(move|| {
2721             for _ in 0..AMT * NTHREADS {
2722                 assert_eq!(rx.recv().unwrap(), 1);
2723             }
2724             match rx.try_recv() {
2725                 Ok(..) => panic!(),
2726                 _ => {}
2727             }
2728             dtx.send(()).unwrap();
2729         });
2730
2731         for _ in 0..NTHREADS {
2732             let tx = tx.clone();
2733             thread::spawn(move|| {
2734                 for _ in 0..AMT { tx.send(1).unwrap(); }
2735             });
2736         }
2737         drop(tx);
2738         drx.recv().unwrap();
2739     }
2740
2741     #[test]
2742     fn oneshot_single_thread_close_port_first() {
2743         // Simple test of closing without sending
2744         let (_tx, rx) = sync_channel::<i32>(0);
2745         drop(rx);
2746     }
2747
2748     #[test]
2749     fn oneshot_single_thread_close_chan_first() {
2750         // Simple test of closing without sending
2751         let (tx, _rx) = sync_channel::<i32>(0);
2752         drop(tx);
2753     }
2754
2755     #[test]
2756     fn oneshot_single_thread_send_port_close() {
2757         // Testing that the sender cleans up the payload if receiver is closed
2758         let (tx, rx) = sync_channel::<Box<i32>>(0);
2759         drop(rx);
2760         assert!(tx.send(box 0).is_err());
2761     }
2762
2763     #[test]
2764     fn oneshot_single_thread_recv_chan_close() {
2765         // Receiving on a closed chan will panic
2766         let res = thread::spawn(move|| {
2767             let (tx, rx) = sync_channel::<i32>(0);
2768             drop(tx);
2769             rx.recv().unwrap();
2770         }).join();
2771         // What is our res?
2772         assert!(res.is_err());
2773     }
2774
2775     #[test]
2776     fn oneshot_single_thread_send_then_recv() {
2777         let (tx, rx) = sync_channel::<Box<i32>>(1);
2778         tx.send(box 10).unwrap();
2779         assert!(*rx.recv().unwrap() == 10);
2780     }
2781
2782     #[test]
2783     fn oneshot_single_thread_try_send_open() {
2784         let (tx, rx) = sync_channel::<i32>(1);
2785         assert_eq!(tx.try_send(10), Ok(()));
2786         assert!(rx.recv().unwrap() == 10);
2787     }
2788
2789     #[test]
2790     fn oneshot_single_thread_try_send_closed() {
2791         let (tx, rx) = sync_channel::<i32>(0);
2792         drop(rx);
2793         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2794     }
2795
2796     #[test]
2797     fn oneshot_single_thread_try_send_closed2() {
2798         let (tx, _rx) = sync_channel::<i32>(0);
2799         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2800     }
2801
2802     #[test]
2803     fn oneshot_single_thread_try_recv_open() {
2804         let (tx, rx) = sync_channel::<i32>(1);
2805         tx.send(10).unwrap();
2806         assert!(rx.recv() == Ok(10));
2807     }
2808
2809     #[test]
2810     fn oneshot_single_thread_try_recv_closed() {
2811         let (tx, rx) = sync_channel::<i32>(0);
2812         drop(tx);
2813         assert!(rx.recv().is_err());
2814     }
2815
2816     #[test]
2817     fn oneshot_single_thread_try_recv_closed_with_data() {
2818         let (tx, rx) = sync_channel::<i32>(1);
2819         tx.send(10).unwrap();
2820         drop(tx);
2821         assert_eq!(rx.try_recv(), Ok(10));
2822         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2823     }
2824
2825     #[test]
2826     fn oneshot_single_thread_peek_data() {
2827         let (tx, rx) = sync_channel::<i32>(1);
2828         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2829         tx.send(10).unwrap();
2830         assert_eq!(rx.try_recv(), Ok(10));
2831     }
2832
2833     #[test]
2834     fn oneshot_single_thread_peek_close() {
2835         let (tx, rx) = sync_channel::<i32>(0);
2836         drop(tx);
2837         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2838         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2839     }
2840
2841     #[test]
2842     fn oneshot_single_thread_peek_open() {
2843         let (_tx, rx) = sync_channel::<i32>(0);
2844         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2845     }
2846
2847     #[test]
2848     fn oneshot_multi_task_recv_then_send() {
2849         let (tx, rx) = sync_channel::<Box<i32>>(0);
2850         let _t = thread::spawn(move|| {
2851             assert!(*rx.recv().unwrap() == 10);
2852         });
2853
2854         tx.send(box 10).unwrap();
2855     }
2856
2857     #[test]
2858     fn oneshot_multi_task_recv_then_close() {
2859         let (tx, rx) = sync_channel::<Box<i32>>(0);
2860         let _t = thread::spawn(move|| {
2861             drop(tx);
2862         });
2863         let res = thread::spawn(move|| {
2864             assert!(*rx.recv().unwrap() == 10);
2865         }).join();
2866         assert!(res.is_err());
2867     }
2868
2869     #[test]
2870     fn oneshot_multi_thread_close_stress() {
2871         for _ in 0..stress_factor() {
2872             let (tx, rx) = sync_channel::<i32>(0);
2873             let _t = thread::spawn(move|| {
2874                 drop(rx);
2875             });
2876             drop(tx);
2877         }
2878     }
2879
2880     #[test]
2881     fn oneshot_multi_thread_send_close_stress() {
2882         for _ in 0..stress_factor() {
2883             let (tx, rx) = sync_channel::<i32>(0);
2884             let _t = thread::spawn(move|| {
2885                 drop(rx);
2886             });
2887             let _ = thread::spawn(move || {
2888                 tx.send(1).unwrap();
2889             }).join();
2890         }
2891     }
2892
2893     #[test]
2894     fn oneshot_multi_thread_recv_close_stress() {
2895         for _ in 0..stress_factor() {
2896             let (tx, rx) = sync_channel::<i32>(0);
2897             let _t = thread::spawn(move|| {
2898                 let res = thread::spawn(move|| {
2899                     rx.recv().unwrap();
2900                 }).join();
2901                 assert!(res.is_err());
2902             });
2903             let _t = thread::spawn(move|| {
2904                 thread::spawn(move|| {
2905                     drop(tx);
2906                 });
2907             });
2908         }
2909     }
2910
2911     #[test]
2912     fn oneshot_multi_thread_send_recv_stress() {
2913         for _ in 0..stress_factor() {
2914             let (tx, rx) = sync_channel::<Box<i32>>(0);
2915             let _t = thread::spawn(move|| {
2916                 tx.send(box 10).unwrap();
2917             });
2918             assert!(*rx.recv().unwrap() == 10);
2919         }
2920     }
2921
2922     #[test]
2923     fn stream_send_recv_stress() {
2924         for _ in 0..stress_factor() {
2925             let (tx, rx) = sync_channel::<Box<i32>>(0);
2926
2927             send(tx, 0);
2928             recv(rx, 0);
2929
2930             fn send(tx: SyncSender<Box<i32>>, i: i32) {
2931                 if i == 10 { return }
2932
2933                 thread::spawn(move|| {
2934                     tx.send(box i).unwrap();
2935                     send(tx, i + 1);
2936                 });
2937             }
2938
2939             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2940                 if i == 10 { return }
2941
2942                 thread::spawn(move|| {
2943                     assert!(*rx.recv().unwrap() == i);
2944                     recv(rx, i + 1);
2945                 });
2946             }
2947         }
2948     }
2949
2950     #[test]
2951     fn recv_a_lot() {
2952         // Regression test that we don't run out of stack in scheduler context
2953         let (tx, rx) = sync_channel(10000);
2954         for _ in 0..10000 { tx.send(()).unwrap(); }
2955         for _ in 0..10000 { rx.recv().unwrap(); }
2956     }
2957
2958     #[test]
2959     fn shared_chan_stress() {
2960         let (tx, rx) = sync_channel(0);
2961         let total = stress_factor() + 100;
2962         for _ in 0..total {
2963             let tx = tx.clone();
2964             thread::spawn(move|| {
2965                 tx.send(()).unwrap();
2966             });
2967         }
2968
2969         for _ in 0..total {
2970             rx.recv().unwrap();
2971         }
2972     }
2973
2974     #[test]
2975     fn test_nested_recv_iter() {
2976         let (tx, rx) = sync_channel::<i32>(0);
2977         let (total_tx, total_rx) = sync_channel::<i32>(0);
2978
2979         let _t = thread::spawn(move|| {
2980             let mut acc = 0;
2981             for x in rx.iter() {
2982                 acc += x;
2983             }
2984             total_tx.send(acc).unwrap();
2985         });
2986
2987         tx.send(3).unwrap();
2988         tx.send(1).unwrap();
2989         tx.send(2).unwrap();
2990         drop(tx);
2991         assert_eq!(total_rx.recv().unwrap(), 6);
2992     }
2993
2994     #[test]
2995     fn test_recv_iter_break() {
2996         let (tx, rx) = sync_channel::<i32>(0);
2997         let (count_tx, count_rx) = sync_channel(0);
2998
2999         let _t = thread::spawn(move|| {
3000             let mut count = 0;
3001             for x in rx.iter() {
3002                 if count >= 3 {
3003                     break;
3004                 } else {
3005                     count += x;
3006                 }
3007             }
3008             count_tx.send(count).unwrap();
3009         });
3010
3011         tx.send(2).unwrap();
3012         tx.send(2).unwrap();
3013         tx.send(2).unwrap();
3014         let _ = tx.try_send(2);
3015         drop(tx);
3016         assert_eq!(count_rx.recv().unwrap(), 4);
3017     }
3018
3019     #[test]
3020     fn try_recv_states() {
3021         let (tx1, rx1) = sync_channel::<i32>(1);
3022         let (tx2, rx2) = sync_channel::<()>(1);
3023         let (tx3, rx3) = sync_channel::<()>(1);
3024         let _t = thread::spawn(move|| {
3025             rx2.recv().unwrap();
3026             tx1.send(1).unwrap();
3027             tx3.send(()).unwrap();
3028             rx2.recv().unwrap();
3029             drop(tx1);
3030             tx3.send(()).unwrap();
3031         });
3032
3033         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
3034         tx2.send(()).unwrap();
3035         rx3.recv().unwrap();
3036         assert_eq!(rx1.try_recv(), Ok(1));
3037         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
3038         tx2.send(()).unwrap();
3039         rx3.recv().unwrap();
3040         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
3041     }
3042
3043     // This bug used to end up in a livelock inside of the Receiver destructor
3044     // because the internal state of the Shared packet was corrupted
3045     #[test]
3046     fn destroy_upgraded_shared_port_when_sender_still_active() {
3047         let (tx, rx) = sync_channel::<()>(0);
3048         let (tx2, rx2) = sync_channel::<()>(0);
3049         let _t = thread::spawn(move|| {
3050             rx.recv().unwrap(); // wait on a oneshot
3051             drop(rx);  // destroy a shared
3052             tx2.send(()).unwrap();
3053         });
3054         // make sure the other thread has gone to sleep
3055         for _ in 0..5000 { thread::yield_now(); }
3056
3057         // upgrade to a shared chan and send a message
3058         let t = tx.clone();
3059         drop(tx);
3060         t.send(()).unwrap();
3061
3062         // wait for the child thread to exit before we exit
3063         rx2.recv().unwrap();
3064     }
3065
3066     #[test]
3067     fn send1() {
3068         let (tx, rx) = sync_channel::<i32>(0);
3069         let _t = thread::spawn(move|| { rx.recv().unwrap(); });
3070         assert_eq!(tx.send(1), Ok(()));
3071     }
3072
3073     #[test]
3074     fn send2() {
3075         let (tx, rx) = sync_channel::<i32>(0);
3076         let _t = thread::spawn(move|| { drop(rx); });
3077         assert!(tx.send(1).is_err());
3078     }
3079
3080     #[test]
3081     fn send3() {
3082         let (tx, rx) = sync_channel::<i32>(1);
3083         assert_eq!(tx.send(1), Ok(()));
3084         let _t =thread::spawn(move|| { drop(rx); });
3085         assert!(tx.send(1).is_err());
3086     }
3087
3088     #[test]
3089     fn send4() {
3090         let (tx, rx) = sync_channel::<i32>(0);
3091         let tx2 = tx.clone();
3092         let (done, donerx) = channel();
3093         let done2 = done.clone();
3094         let _t = thread::spawn(move|| {
3095             assert!(tx.send(1).is_err());
3096             done.send(()).unwrap();
3097         });
3098         let _t = thread::spawn(move|| {
3099             assert!(tx2.send(2).is_err());
3100             done2.send(()).unwrap();
3101         });
3102         drop(rx);
3103         donerx.recv().unwrap();
3104         donerx.recv().unwrap();
3105     }
3106
3107     #[test]
3108     fn try_send1() {
3109         let (tx, _rx) = sync_channel::<i32>(0);
3110         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3111     }
3112
3113     #[test]
3114     fn try_send2() {
3115         let (tx, _rx) = sync_channel::<i32>(1);
3116         assert_eq!(tx.try_send(1), Ok(()));
3117         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3118     }
3119
3120     #[test]
3121     fn try_send3() {
3122         let (tx, rx) = sync_channel::<i32>(1);
3123         assert_eq!(tx.try_send(1), Ok(()));
3124         drop(rx);
3125         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
3126     }
3127
3128     #[test]
3129     fn issue_15761() {
3130         fn repro() {
3131             let (tx1, rx1) = sync_channel::<()>(3);
3132             let (tx2, rx2) = sync_channel::<()>(3);
3133
3134             let _t = thread::spawn(move|| {
3135                 rx1.recv().unwrap();
3136                 tx2.try_send(()).unwrap();
3137             });
3138
3139             tx1.try_send(()).unwrap();
3140             rx2.recv().unwrap();
3141         }
3142
3143         for _ in 0..100 {
3144             repro()
3145         }
3146     }
3147 }