]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/mod.rs
SGX target: fix std unit tests
[rust.git] / src / libstd / sync / mpsc / mod.rs
1 //! Multi-producer, single-consumer FIFO queue communication primitives.
2 //!
3 //! This module provides message-based communication over channels, concretely
4 //! defined among three types:
5 //!
6 //! * [`Sender`]
7 //! * [`SyncSender`]
8 //! * [`Receiver`]
9 //!
10 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
11 //! senders are clone-able (multi-producer) such that many threads can send
12 //! simultaneously to one receiver (single-consumer).
13 //!
14 //! These channels come in two flavors:
15 //!
16 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
17 //!    will return a `(Sender, Receiver)` tuple where all sends will be
18 //!    **asynchronous** (they never block). The channel conceptually has an
19 //!    infinite buffer.
20 //!
21 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
22 //!    return a `(SyncSender, Receiver)` tuple where the storage for pending
23 //!    messages is a pre-allocated buffer of a fixed size. All sends will be
24 //!    **synchronous** by blocking until there is buffer space available. Note
25 //!    that a bound of 0 is allowed, causing the channel to become a "rendezvous"
26 //!    channel where each sender atomically hands off a message to a receiver.
27 //!
28 //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
29 //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
30 //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
31 //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
32 //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
33 //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
34 //!
35 //! ## Disconnection
36 //!
37 //! The send and receive operations on channels will all return a [`Result`]
38 //! indicating whether the operation succeeded or not. An unsuccessful operation
39 //! is normally indicative of the other half of a channel having "hung up" by
40 //! being dropped in its corresponding thread.
41 //!
42 //! Once half of a channel has been deallocated, most operations can no longer
43 //! continue to make progress, so [`Err`] will be returned. Many applications
44 //! will continue to [`unwrap`] the results returned from this module,
45 //! instigating a propagation of failure among threads if one unexpectedly dies.
46 //!
47 //! [`Result`]: ../../../std/result/enum.Result.html
48 //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
49 //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
50 //!
51 //! # Examples
52 //!
53 //! Simple usage:
54 //!
55 //! ```
56 //! use std::thread;
57 //! use std::sync::mpsc::channel;
58 //!
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! thread::spawn(move|| {
62 //!     tx.send(10).unwrap();
63 //! });
64 //! assert_eq!(rx.recv().unwrap(), 10);
65 //! ```
66 //!
67 //! Shared usage:
68 //!
69 //! ```
70 //! use std::thread;
71 //! use std::sync::mpsc::channel;
72 //!
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
77 //! for i in 0..10 {
78 //!     let tx = tx.clone();
79 //!     thread::spawn(move|| {
80 //!         tx.send(i).unwrap();
81 //!     });
82 //! }
83 //!
84 //! for _ in 0..10 {
85 //!     let j = rx.recv().unwrap();
86 //!     assert!(0 <= j && j < 10);
87 //! }
88 //! ```
89 //!
90 //! Propagating panics:
91 //!
92 //! ```
93 //! use std::sync::mpsc::channel;
94 //!
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<i32>();
98 //! drop(tx);
99 //! assert!(rx.recv().is_err());
100 //! ```
101 //!
102 //! Synchronous channels:
103 //!
104 //! ```
105 //! use std::thread;
106 //! use std::sync::mpsc::sync_channel;
107 //!
108 //! let (tx, rx) = sync_channel::<i32>(0);
109 //! thread::spawn(move|| {
110 //!     // This will wait for the parent thread to start receiving
111 //!     tx.send(53).unwrap();
112 //! });
113 //! rx.recv().unwrap();
114 //! ```
115
116 #![stable(feature = "rust1", since = "1.0.0")]
117 #![allow(deprecated)] // for mpsc_select
118
119 // A description of how Rust's channel implementation works
120 //
121 // Channels are supposed to be the basic building block for all other
122 // concurrent primitives that are used in Rust. As a result, the channel type
123 // needs to be highly optimized, flexible, and broad enough for use everywhere.
124 //
125 // The choice of implementation of all channels is to be built on lock-free data
126 // structures. The channels themselves are then consequently also lock-free data
127 // structures. As always with lock-free code, this is a very "here be dragons"
128 // territory, especially because I'm unaware of any academic papers that have
129 // gone into great length about channels of these flavors.
130 //
131 // ## Flavors of channels
132 //
133 // From the perspective of a consumer of this library, there is only one flavor
134 // of channel. This channel can be used as a stream and cloned to allow multiple
135 // senders. Under the hood, however, there are actually three flavors of
136 // channels in play.
137 //
138 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
139 //                      case. They contain as few atomics as possible and
140 //                      involve one and exactly one allocation.
141 // * Streams - these channels are optimized for the non-shared use case. They
142 //             use a different concurrent queue that is more tailored for this
143 //             use case. The initial allocation of this flavor of channel is not
144 //             optimized.
145 // * Shared - this is the most general form of channel that this module offers,
146 //            a channel with multiple senders. This type is as optimized as it
147 //            can be, but the previous two types mentioned are much faster for
148 //            their use-cases.
149 //
150 // ## Concurrent queues
151 //
152 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
153 // but recv() obviously blocks. This means that under the hood there must be
154 // some shared and concurrent queue holding all of the actual data.
155 //
156 // With two flavors of channels, two flavors of queues are also used. We have
157 // chosen to use queues from a well-known author that are abbreviated as SPSC
158 // and MPSC (single producer, single consumer and multiple producer, single
159 // consumer). SPSC queues are used for streams while MPSC queues are used for
160 // shared channels.
161 //
162 // ### SPSC optimizations
163 //
164 // The SPSC queue found online is essentially a linked list of nodes where one
165 // half of the nodes are the "queue of data" and the other half of nodes are a
166 // cache of unused nodes. The unused nodes are used such that an allocation is
167 // not required on every push() and a free doesn't need to happen on every
168 // pop().
169 //
170 // As found online, however, the cache of nodes is of an infinite size. This
171 // means that if a channel at one point in its life had 50k items in the queue,
172 // then the queue will always have the capacity for 50k items. I believed that
173 // this was an unnecessary limitation of the implementation, so I have altered
174 // the queue to optionally have a bound on the cache size.
175 //
176 // By default, streams will have an unbounded SPSC queue with a small-ish cache
177 // size. The hope is that the cache is still large enough to have very fast
178 // send() operations while not too large such that millions of channels can
179 // coexist at once.
180 //
181 // ### MPSC optimizations
182 //
183 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
184 // a linked list under the hood to earn its unboundedness, but I have not put
185 // forth much effort into having a cache of nodes similar to the SPSC queue.
186 //
187 // For now, I believe that this is "ok" because shared channels are not the most
188 // common type, but soon we may wish to revisit this queue choice and determine
189 // another candidate for backend storage of shared channels.
190 //
191 // ## Overview of the Implementation
192 //
193 // Now that there's a little background on the concurrent queues used, it's
194 // worth going into much more detail about the channels themselves. The basic
195 // pseudocode for a send/recv are:
196 //
197 //
198 //      send(t)                             recv()
199 //        queue.push(t)                       return if queue.pop()
200 //        if increment() == -1                deschedule {
201 //          wakeup()                            if decrement() > 0
202 //                                                cancel_deschedule()
203 //                                            }
204 //                                            queue.pop()
205 //
206 // As mentioned before, there are no locks in this implementation, only atomic
207 // instructions are used.
208 //
209 // ### The internal atomic counter
210 //
211 // Every channel has a shared counter with each half to keep track of the size
212 // of the queue. This counter is used to abort descheduling by the receiver and
213 // to know when to wake up on the sending side.
214 //
215 // As seen in the pseudocode, senders will increment this count and receivers
216 // will decrement the count. The theory behind this is that if a sender sees a
217 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
218 // then it doesn't need to block.
219 //
220 // The recv() method has a beginning call to pop(), and if successful, it needs
221 // to decrement the count. It is a crucial implementation detail that this
222 // decrement does *not* happen to the shared counter. If this were the case,
223 // then it would be possible for the counter to be very negative when there were
224 // no receivers waiting, in which case the senders would have to determine when
225 // it was actually appropriate to wake up a receiver.
226 //
227 // Instead, the "steal count" is kept track of separately (not atomically
228 // because it's only used by receivers), and then the decrement() call when
229 // descheduling will lump in all of the recent steals into one large decrement.
230 //
231 // The implication of this is that if a sender sees a -1 count, then there's
232 // guaranteed to be a waiter waiting!
233 //
234 // ## Native Implementation
235 //
236 // A major goal of these channels is to work seamlessly on and off the runtime.
237 // All of the previous race conditions have been worded in terms of
238 // scheduler-isms (which is obviously not available without the runtime).
239 //
240 // For now, native usage of channels (off the runtime) will fall back onto
241 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
242 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
243 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
244 // condition variable.
245 //
246 // ## Select
247 //
248 // Being able to support selection over channels has greatly influenced this
249 // design, and not only does selection need to work inside the runtime, but also
250 // outside the runtime.
251 //
252 // The implementation is fairly straightforward. The goal of select() is not to
253 // return some data, but only to return which channel can receive data without
254 // blocking. The implementation is essentially the entire blocking procedure
255 // followed by an increment as soon as its woken up. The cancellation procedure
256 // involves an increment and swapping out of to_wake to acquire ownership of the
257 // thread to unblock.
258 //
259 // Sadly this current implementation requires multiple allocations, so I have
260 // seen the throughput of select() be much worse than it should be. I do not
261 // believe that there is anything fundamental that needs to change about these
262 // channels, however, in order to support a more efficient select().
263 //
264 // # Conclusion
265 //
266 // And now that you've seen all the races that I found and attempted to fix,
267 // here's the code for you to find some more!
268
269 use crate::sync::Arc;
270 use crate::error;
271 use crate::fmt;
272 use crate::mem;
273 use crate::cell::UnsafeCell;
274 use crate::time::{Duration, Instant};
275
276 #[unstable(feature = "mpsc_select", issue = "27800")]
277 pub use self::select::{Select, Handle};
278 use self::select::StartResult;
279 use self::select::StartResult::*;
280 use self::blocking::SignalToken;
281
282 #[cfg(all(test, not(target_os = "emscripten")))]
283 mod select_tests;
284
285 mod blocking;
286 mod oneshot;
287 mod select;
288 mod shared;
289 mod stream;
290 mod sync;
291 mod mpsc_queue;
292 mod spsc_queue;
293
294 mod cache_aligned;
295
296 /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
297 /// This half can only be owned by one thread.
298 ///
299 /// Messages sent to the channel can be retrieved using [`recv`].
300 ///
301 /// [`channel`]: fn.channel.html
302 /// [`sync_channel`]: fn.sync_channel.html
303 /// [`recv`]: struct.Receiver.html#method.recv
304 ///
305 /// # Examples
306 ///
307 /// ```rust
308 /// use std::sync::mpsc::channel;
309 /// use std::thread;
310 /// use std::time::Duration;
311 ///
312 /// let (send, recv) = channel();
313 ///
314 /// thread::spawn(move || {
315 ///     send.send("Hello world!").unwrap();
316 ///     thread::sleep(Duration::from_secs(2)); // block for two seconds
317 ///     send.send("Delayed for 2 seconds").unwrap();
318 /// });
319 ///
320 /// println!("{}", recv.recv().unwrap()); // Received immediately
321 /// println!("Waiting...");
322 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
323 /// ```
324 #[stable(feature = "rust1", since = "1.0.0")]
325 pub struct Receiver<T> {
326     inner: UnsafeCell<Flavor<T>>,
327 }
328
329 // The receiver port can be sent from place to place, so long as it
330 // is not used to receive non-sendable things.
331 #[stable(feature = "rust1", since = "1.0.0")]
332 unsafe impl<T: Send> Send for Receiver<T> { }
333
334 #[stable(feature = "rust1", since = "1.0.0")]
335 impl<T> !Sync for Receiver<T> { }
336
337 /// An iterator over messages on a [`Receiver`], created by [`iter`].
338 ///
339 /// This iterator will block whenever [`next`] is called,
340 /// waiting for a new message, and [`None`] will be returned
341 /// when the corresponding channel has hung up.
342 ///
343 /// [`iter`]: struct.Receiver.html#method.iter
344 /// [`Receiver`]: struct.Receiver.html
345 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
346 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
347 ///
348 /// # Examples
349 ///
350 /// ```rust
351 /// use std::sync::mpsc::channel;
352 /// use std::thread;
353 ///
354 /// let (send, recv) = channel();
355 ///
356 /// thread::spawn(move || {
357 ///     send.send(1u8).unwrap();
358 ///     send.send(2u8).unwrap();
359 ///     send.send(3u8).unwrap();
360 /// });
361 ///
362 /// for x in recv.iter() {
363 ///     println!("Got: {}", x);
364 /// }
365 /// ```
366 #[stable(feature = "rust1", since = "1.0.0")]
367 #[derive(Debug)]
368 pub struct Iter<'a, T: 'a> {
369     rx: &'a Receiver<T>
370 }
371
372 /// An iterator that attempts to yield all pending values for a [`Receiver`],
373 /// created by [`try_iter`].
374 ///
375 /// [`None`] will be returned when there are no pending values remaining or
376 /// if the corresponding channel has hung up.
377 ///
378 /// This iterator will never block the caller in order to wait for data to
379 /// become available. Instead, it will return [`None`].
380 ///
381 /// [`Receiver`]: struct.Receiver.html
382 /// [`try_iter`]: struct.Receiver.html#method.try_iter
383 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
384 ///
385 /// # Examples
386 ///
387 /// ```rust
388 /// use std::sync::mpsc::channel;
389 /// use std::thread;
390 /// use std::time::Duration;
391 ///
392 /// let (sender, receiver) = channel();
393 ///
394 /// // Nothing is in the buffer yet
395 /// assert!(receiver.try_iter().next().is_none());
396 /// println!("Nothing in the buffer...");
397 ///
398 /// thread::spawn(move || {
399 ///     sender.send(1).unwrap();
400 ///     sender.send(2).unwrap();
401 ///     sender.send(3).unwrap();
402 /// });
403 ///
404 /// println!("Going to sleep...");
405 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
406 ///
407 /// for x in receiver.try_iter() {
408 ///     println!("Got: {}", x);
409 /// }
410 /// ```
411 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
412 #[derive(Debug)]
413 pub struct TryIter<'a, T: 'a> {
414     rx: &'a Receiver<T>
415 }
416
417 /// An owning iterator over messages on a [`Receiver`],
418 /// created by **Receiver::into_iter**.
419 ///
420 /// This iterator will block whenever [`next`]
421 /// is called, waiting for a new message, and [`None`] will be
422 /// returned if the corresponding channel has hung up.
423 ///
424 /// [`Receiver`]: struct.Receiver.html
425 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
426 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
427 ///
428 /// # Examples
429 ///
430 /// ```rust
431 /// use std::sync::mpsc::channel;
432 /// use std::thread;
433 ///
434 /// let (send, recv) = channel();
435 ///
436 /// thread::spawn(move || {
437 ///     send.send(1u8).unwrap();
438 ///     send.send(2u8).unwrap();
439 ///     send.send(3u8).unwrap();
440 /// });
441 ///
442 /// for x in recv.into_iter() {
443 ///     println!("Got: {}", x);
444 /// }
445 /// ```
446 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
447 #[derive(Debug)]
448 pub struct IntoIter<T> {
449     rx: Receiver<T>
450 }
451
452 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
453 /// owned by one thread, but it can be cloned to send to other threads.
454 ///
455 /// Messages can be sent through this channel with [`send`].
456 ///
457 /// [`channel`]: fn.channel.html
458 /// [`send`]: struct.Sender.html#method.send
459 ///
460 /// # Examples
461 ///
462 /// ```rust
463 /// use std::sync::mpsc::channel;
464 /// use std::thread;
465 ///
466 /// let (sender, receiver) = channel();
467 /// let sender2 = sender.clone();
468 ///
469 /// // First thread owns sender
470 /// thread::spawn(move || {
471 ///     sender.send(1).unwrap();
472 /// });
473 ///
474 /// // Second thread owns sender2
475 /// thread::spawn(move || {
476 ///     sender2.send(2).unwrap();
477 /// });
478 ///
479 /// let msg = receiver.recv().unwrap();
480 /// let msg2 = receiver.recv().unwrap();
481 ///
482 /// assert_eq!(3, msg + msg2);
483 /// ```
484 #[stable(feature = "rust1", since = "1.0.0")]
485 pub struct Sender<T> {
486     inner: UnsafeCell<Flavor<T>>,
487 }
488
489 // The send port can be sent from place to place, so long as it
490 // is not used to send non-sendable things.
491 #[stable(feature = "rust1", since = "1.0.0")]
492 unsafe impl<T: Send> Send for Sender<T> { }
493
494 #[stable(feature = "rust1", since = "1.0.0")]
495 impl<T> !Sync for Sender<T> { }
496
497 /// The sending-half of Rust's synchronous [`sync_channel`] type.
498 ///
499 /// Messages can be sent through this channel with [`send`] or [`try_send`].
500 ///
501 /// [`send`] will block if there is no space in the internal buffer.
502 ///
503 /// [`sync_channel`]: fn.sync_channel.html
504 /// [`send`]: struct.SyncSender.html#method.send
505 /// [`try_send`]: struct.SyncSender.html#method.try_send
506 ///
507 /// # Examples
508 ///
509 /// ```rust
510 /// use std::sync::mpsc::sync_channel;
511 /// use std::thread;
512 ///
513 /// // Create a sync_channel with buffer size 2
514 /// let (sync_sender, receiver) = sync_channel(2);
515 /// let sync_sender2 = sync_sender.clone();
516 ///
517 /// // First thread owns sync_sender
518 /// thread::spawn(move || {
519 ///     sync_sender.send(1).unwrap();
520 ///     sync_sender.send(2).unwrap();
521 /// });
522 ///
523 /// // Second thread owns sync_sender2
524 /// thread::spawn(move || {
525 ///     sync_sender2.send(3).unwrap();
526 ///     // thread will now block since the buffer is full
527 ///     println!("Thread unblocked!");
528 /// });
529 ///
530 /// let mut msg;
531 ///
532 /// msg = receiver.recv().unwrap();
533 /// println!("message {} received", msg);
534 ///
535 /// // "Thread unblocked!" will be printed now
536 ///
537 /// msg = receiver.recv().unwrap();
538 /// println!("message {} received", msg);
539 ///
540 /// msg = receiver.recv().unwrap();
541 ///
542 /// println!("message {} received", msg);
543 /// ```
544 #[stable(feature = "rust1", since = "1.0.0")]
545 pub struct SyncSender<T> {
546     inner: Arc<sync::Packet<T>>,
547 }
548
549 #[stable(feature = "rust1", since = "1.0.0")]
550 unsafe impl<T: Send> Send for SyncSender<T> {}
551
552 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
553 /// function on **channel**s.
554 ///
555 /// A **send** operation can only fail if the receiving end of a channel is
556 /// disconnected, implying that the data could never be received. The error
557 /// contains the data being sent as a payload so it can be recovered.
558 ///
559 /// [`Sender::send`]: struct.Sender.html#method.send
560 /// [`SyncSender::send`]: struct.SyncSender.html#method.send
561 #[stable(feature = "rust1", since = "1.0.0")]
562 #[derive(PartialEq, Eq, Clone, Copy)]
563 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
564
565 /// An error returned from the [`recv`] function on a [`Receiver`].
566 ///
567 /// The [`recv`] operation can only fail if the sending half of a
568 /// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
569 /// messages will ever be received.
570 ///
571 /// [`recv`]: struct.Receiver.html#method.recv
572 /// [`Receiver`]: struct.Receiver.html
573 /// [`channel`]: fn.channel.html
574 /// [`sync_channel`]: fn.sync_channel.html
575 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
576 #[stable(feature = "rust1", since = "1.0.0")]
577 pub struct RecvError;
578
579 /// This enumeration is the list of the possible reasons that [`try_recv`] could
580 /// not return data when called. This can occur with both a [`channel`] and
581 /// a [`sync_channel`].
582 ///
583 /// [`try_recv`]: struct.Receiver.html#method.try_recv
584 /// [`channel`]: fn.channel.html
585 /// [`sync_channel`]: fn.sync_channel.html
586 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
587 #[stable(feature = "rust1", since = "1.0.0")]
588 pub enum TryRecvError {
589     /// This **channel** is currently empty, but the **Sender**(s) have not yet
590     /// disconnected, so data may yet become available.
591     #[stable(feature = "rust1", since = "1.0.0")]
592     Empty,
593
594     /// The **channel**'s sending half has become disconnected, and there will
595     /// never be any more data received on it.
596     #[stable(feature = "rust1", since = "1.0.0")]
597     Disconnected,
598 }
599
600 /// This enumeration is the list of possible errors that made [`recv_timeout`]
601 /// unable to return data when called. This can occur with both a [`channel`] and
602 /// a [`sync_channel`].
603 ///
604 /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
605 /// [`channel`]: fn.channel.html
606 /// [`sync_channel`]: fn.sync_channel.html
607 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
608 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
609 pub enum RecvTimeoutError {
610     /// This **channel** is currently empty, but the **Sender**(s) have not yet
611     /// disconnected, so data may yet become available.
612     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
613     Timeout,
614     /// The **channel**'s sending half has become disconnected, and there will
615     /// never be any more data received on it.
616     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
617     Disconnected,
618 }
619
620 /// This enumeration is the list of the possible error outcomes for the
621 /// [`try_send`] method.
622 ///
623 /// [`try_send`]: struct.SyncSender.html#method.try_send
624 #[stable(feature = "rust1", since = "1.0.0")]
625 #[derive(PartialEq, Eq, Clone, Copy)]
626 pub enum TrySendError<T> {
627     /// The data could not be sent on the [`sync_channel`] because it would require that
628     /// the callee block to send the data.
629     ///
630     /// If this is a buffered channel, then the buffer is full at this time. If
631     /// this is not a buffered channel, then there is no [`Receiver`] available to
632     /// acquire the data.
633     ///
634     /// [`sync_channel`]: fn.sync_channel.html
635     /// [`Receiver`]: struct.Receiver.html
636     #[stable(feature = "rust1", since = "1.0.0")]
637     Full(#[stable(feature = "rust1", since = "1.0.0")] T),
638
639     /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
640     /// sent. The data is returned back to the callee in this case.
641     ///
642     /// [`sync_channel`]: fn.sync_channel.html
643     #[stable(feature = "rust1", since = "1.0.0")]
644     Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
645 }
646
647 enum Flavor<T> {
648     Oneshot(Arc<oneshot::Packet<T>>),
649     Stream(Arc<stream::Packet<T>>),
650     Shared(Arc<shared::Packet<T>>),
651     Sync(Arc<sync::Packet<T>>),
652 }
653
654 #[doc(hidden)]
655 trait UnsafeFlavor<T> {
656     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
657     unsafe fn inner_mut(&self) -> &mut Flavor<T> {
658         &mut *self.inner_unsafe().get()
659     }
660     unsafe fn inner(&self) -> &Flavor<T> {
661         &*self.inner_unsafe().get()
662     }
663 }
664 impl<T> UnsafeFlavor<T> for Sender<T> {
665     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
666         &self.inner
667     }
668 }
669 impl<T> UnsafeFlavor<T> for Receiver<T> {
670     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
671         &self.inner
672     }
673 }
674
675 /// Creates a new asynchronous channel, returning the sender/receiver halves.
676 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
677 /// the same order as it was sent, and no [`send`] will block the calling thread
678 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
679 /// block after its buffer limit is reached). [`recv`] will block until a message
680 /// is available.
681 ///
682 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
683 /// only one [`Receiver`] is supported.
684 ///
685 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
686 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
687 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
688 /// return a [`RecvError`].
689 ///
690 /// [`send`]: struct.Sender.html#method.send
691 /// [`recv`]: struct.Receiver.html#method.recv
692 /// [`Sender`]: struct.Sender.html
693 /// [`Receiver`]: struct.Receiver.html
694 /// [`sync_channel`]: fn.sync_channel.html
695 /// [`SendError`]: struct.SendError.html
696 /// [`RecvError`]: struct.RecvError.html
697 ///
698 /// # Examples
699 ///
700 /// ```
701 /// use std::sync::mpsc::channel;
702 /// use std::thread;
703 ///
704 /// let (sender, receiver) = channel();
705 ///
706 /// // Spawn off an expensive computation
707 /// thread::spawn(move|| {
708 /// #   fn expensive_computation() {}
709 ///     sender.send(expensive_computation()).unwrap();
710 /// });
711 ///
712 /// // Do some useful work for awhile
713 ///
714 /// // Let's see what that answer was
715 /// println!("{:?}", receiver.recv().unwrap());
716 /// ```
717 #[stable(feature = "rust1", since = "1.0.0")]
718 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
719     let a = Arc::new(oneshot::Packet::new());
720     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
721 }
722
723 /// Creates a new synchronous, bounded channel.
724 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
725 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
726 /// [`Receiver`] will block until a message becomes available. `sync_channel`
727 /// differs greatly in the semantics of the sender, however.
728 ///
729 /// This channel has an internal buffer on which messages will be queued.
730 /// `bound` specifies the buffer size. When the internal buffer becomes full,
731 /// future sends will *block* waiting for the buffer to open up. Note that a
732 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
733 /// where each [`send`] will not return until a [`recv`] is paired with it.
734 ///
735 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
736 /// times, but only one [`Receiver`] is supported.
737 ///
738 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
739 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
740 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
741 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
742 ///
743 /// [`channel`]: fn.channel.html
744 /// [`send`]: struct.SyncSender.html#method.send
745 /// [`recv`]: struct.Receiver.html#method.recv
746 /// [`SyncSender`]: struct.SyncSender.html
747 /// [`Receiver`]: struct.Receiver.html
748 /// [`SendError`]: struct.SendError.html
749 /// [`RecvError`]: struct.RecvError.html
750 ///
751 /// # Examples
752 ///
753 /// ```
754 /// use std::sync::mpsc::sync_channel;
755 /// use std::thread;
756 ///
757 /// let (sender, receiver) = sync_channel(1);
758 ///
759 /// // this returns immediately
760 /// sender.send(1).unwrap();
761 ///
762 /// thread::spawn(move|| {
763 ///     // this will block until the previous message has been received
764 ///     sender.send(2).unwrap();
765 /// });
766 ///
767 /// assert_eq!(receiver.recv().unwrap(), 1);
768 /// assert_eq!(receiver.recv().unwrap(), 2);
769 /// ```
770 #[stable(feature = "rust1", since = "1.0.0")]
771 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
772     let a = Arc::new(sync::Packet::new(bound));
773     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
774 }
775
776 ////////////////////////////////////////////////////////////////////////////////
777 // Sender
778 ////////////////////////////////////////////////////////////////////////////////
779
780 impl<T> Sender<T> {
781     fn new(inner: Flavor<T>) -> Sender<T> {
782         Sender {
783             inner: UnsafeCell::new(inner),
784         }
785     }
786
787     /// Attempts to send a value on this channel, returning it back if it could
788     /// not be sent.
789     ///
790     /// A successful send occurs when it is determined that the other end of
791     /// the channel has not hung up already. An unsuccessful send would be one
792     /// where the corresponding receiver has already been deallocated. Note
793     /// that a return value of [`Err`] means that the data will never be
794     /// received, but a return value of [`Ok`] does *not* mean that the data
795     /// will be received. It is possible for the corresponding receiver to
796     /// hang up immediately after this function returns [`Ok`].
797     ///
798     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
799     /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
800     ///
801     /// This method will never block the current thread.
802     ///
803     /// # Examples
804     ///
805     /// ```
806     /// use std::sync::mpsc::channel;
807     ///
808     /// let (tx, rx) = channel();
809     ///
810     /// // This send is always successful
811     /// tx.send(1).unwrap();
812     ///
813     /// // This send will fail because the receiver is gone
814     /// drop(rx);
815     /// assert_eq!(tx.send(1).unwrap_err().0, 1);
816     /// ```
817     #[stable(feature = "rust1", since = "1.0.0")]
818     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
819         let (new_inner, ret) = match *unsafe { self.inner() } {
820             Flavor::Oneshot(ref p) => {
821                 if !p.sent() {
822                     return p.send(t).map_err(SendError);
823                 } else {
824                     let a = Arc::new(stream::Packet::new());
825                     let rx = Receiver::new(Flavor::Stream(a.clone()));
826                     match p.upgrade(rx) {
827                         oneshot::UpSuccess => {
828                             let ret = a.send(t);
829                             (a, ret)
830                         }
831                         oneshot::UpDisconnected => (a, Err(t)),
832                         oneshot::UpWoke(token) => {
833                             // This send cannot panic because the thread is
834                             // asleep (we're looking at it), so the receiver
835                             // can't go away.
836                             a.send(t).ok().unwrap();
837                             token.signal();
838                             (a, Ok(()))
839                         }
840                     }
841                 }
842             }
843             Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
844             Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
845             Flavor::Sync(..) => unreachable!(),
846         };
847
848         unsafe {
849             let tmp = Sender::new(Flavor::Stream(new_inner));
850             mem::swap(self.inner_mut(), tmp.inner_mut());
851         }
852         ret.map_err(SendError)
853     }
854 }
855
856 #[stable(feature = "rust1", since = "1.0.0")]
857 impl<T> Clone for Sender<T> {
858     fn clone(&self) -> Sender<T> {
859         let packet = match *unsafe { self.inner() } {
860             Flavor::Oneshot(ref p) => {
861                 let a = Arc::new(shared::Packet::new());
862                 {
863                     let guard = a.postinit_lock();
864                     let rx = Receiver::new(Flavor::Shared(a.clone()));
865                     let sleeper = match p.upgrade(rx) {
866                         oneshot::UpSuccess |
867                         oneshot::UpDisconnected => None,
868                         oneshot::UpWoke(task) => Some(task),
869                     };
870                     a.inherit_blocker(sleeper, guard);
871                 }
872                 a
873             }
874             Flavor::Stream(ref p) => {
875                 let a = Arc::new(shared::Packet::new());
876                 {
877                     let guard = a.postinit_lock();
878                     let rx = Receiver::new(Flavor::Shared(a.clone()));
879                     let sleeper = match p.upgrade(rx) {
880                         stream::UpSuccess |
881                         stream::UpDisconnected => None,
882                         stream::UpWoke(task) => Some(task),
883                     };
884                     a.inherit_blocker(sleeper, guard);
885                 }
886                 a
887             }
888             Flavor::Shared(ref p) => {
889                 p.clone_chan();
890                 return Sender::new(Flavor::Shared(p.clone()));
891             }
892             Flavor::Sync(..) => unreachable!(),
893         };
894
895         unsafe {
896             let tmp = Sender::new(Flavor::Shared(packet.clone()));
897             mem::swap(self.inner_mut(), tmp.inner_mut());
898         }
899         Sender::new(Flavor::Shared(packet))
900     }
901 }
902
903 #[stable(feature = "rust1", since = "1.0.0")]
904 impl<T> Drop for Sender<T> {
905     fn drop(&mut self) {
906         match *unsafe { self.inner() } {
907             Flavor::Oneshot(ref p) => p.drop_chan(),
908             Flavor::Stream(ref p) => p.drop_chan(),
909             Flavor::Shared(ref p) => p.drop_chan(),
910             Flavor::Sync(..) => unreachable!(),
911         }
912     }
913 }
914
915 #[stable(feature = "mpsc_debug", since = "1.8.0")]
916 impl<T> fmt::Debug for Sender<T> {
917     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
918         f.debug_struct("Sender").finish()
919     }
920 }
921
922 ////////////////////////////////////////////////////////////////////////////////
923 // SyncSender
924 ////////////////////////////////////////////////////////////////////////////////
925
926 impl<T> SyncSender<T> {
927     fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
928         SyncSender { inner }
929     }
930
931     /// Sends a value on this synchronous channel.
932     ///
933     /// This function will *block* until space in the internal buffer becomes
934     /// available or a receiver is available to hand off the message to.
935     ///
936     /// Note that a successful send does *not* guarantee that the receiver will
937     /// ever see the data if there is a buffer on this channel. Items may be
938     /// enqueued in the internal buffer for the receiver to receive at a later
939     /// time. If the buffer size is 0, however, the channel becomes a rendezvous
940     /// channel and it guarantees that the receiver has indeed received
941     /// the data if this function returns success.
942     ///
943     /// This function will never panic, but it may return [`Err`] if the
944     /// [`Receiver`] has disconnected and is no longer able to receive
945     /// information.
946     ///
947     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
948     /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
949     ///
950     /// # Examples
951     ///
952     /// ```rust
953     /// use std::sync::mpsc::sync_channel;
954     /// use std::thread;
955     ///
956     /// // Create a rendezvous sync_channel with buffer size 0
957     /// let (sync_sender, receiver) = sync_channel(0);
958     ///
959     /// thread::spawn(move || {
960     ///    println!("sending message...");
961     ///    sync_sender.send(1).unwrap();
962     ///    // Thread is now blocked until the message is received
963     ///
964     ///    println!("...message received!");
965     /// });
966     ///
967     /// let msg = receiver.recv().unwrap();
968     /// assert_eq!(1, msg);
969     /// ```
970     #[stable(feature = "rust1", since = "1.0.0")]
971     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
972         self.inner.send(t).map_err(SendError)
973     }
974
975     /// Attempts to send a value on this channel without blocking.
976     ///
977     /// This method differs from [`send`] by returning immediately if the
978     /// channel's buffer is full or no receiver is waiting to acquire some
979     /// data. Compared with [`send`], this function has two failure cases
980     /// instead of one (one for disconnection, one for a full buffer).
981     ///
982     /// See [`send`] for notes about guarantees of whether the
983     /// receiver has received the data or not if this function is successful.
984     ///
985     /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
986     ///
987     /// # Examples
988     ///
989     /// ```rust
990     /// use std::sync::mpsc::sync_channel;
991     /// use std::thread;
992     ///
993     /// // Create a sync_channel with buffer size 1
994     /// let (sync_sender, receiver) = sync_channel(1);
995     /// let sync_sender2 = sync_sender.clone();
996     ///
997     /// // First thread owns sync_sender
998     /// thread::spawn(move || {
999     ///     sync_sender.send(1).unwrap();
1000     ///     sync_sender.send(2).unwrap();
1001     ///     // Thread blocked
1002     /// });
1003     ///
1004     /// // Second thread owns sync_sender2
1005     /// thread::spawn(move || {
1006     ///     // This will return an error and send
1007     ///     // no message if the buffer is full
1008     ///     sync_sender2.try_send(3).is_err();
1009     /// });
1010     ///
1011     /// let mut msg;
1012     /// msg = receiver.recv().unwrap();
1013     /// println!("message {} received", msg);
1014     ///
1015     /// msg = receiver.recv().unwrap();
1016     /// println!("message {} received", msg);
1017     ///
1018     /// // Third message may have never been sent
1019     /// match receiver.try_recv() {
1020     ///     Ok(msg) => println!("message {} received", msg),
1021     ///     Err(_) => println!("the third message was never sent"),
1022     /// }
1023     /// ```
1024     #[stable(feature = "rust1", since = "1.0.0")]
1025     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
1026         self.inner.try_send(t)
1027     }
1028 }
1029
1030 #[stable(feature = "rust1", since = "1.0.0")]
1031 impl<T> Clone for SyncSender<T> {
1032     fn clone(&self) -> SyncSender<T> {
1033         self.inner.clone_chan();
1034         SyncSender::new(self.inner.clone())
1035     }
1036 }
1037
1038 #[stable(feature = "rust1", since = "1.0.0")]
1039 impl<T> Drop for SyncSender<T> {
1040     fn drop(&mut self) {
1041         self.inner.drop_chan();
1042     }
1043 }
1044
1045 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1046 impl<T> fmt::Debug for SyncSender<T> {
1047     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1048         f.debug_struct("SyncSender").finish()
1049     }
1050 }
1051
1052 ////////////////////////////////////////////////////////////////////////////////
1053 // Receiver
1054 ////////////////////////////////////////////////////////////////////////////////
1055
1056 impl<T> Receiver<T> {
1057     fn new(inner: Flavor<T>) -> Receiver<T> {
1058         Receiver { inner: UnsafeCell::new(inner) }
1059     }
1060
1061     /// Attempts to return a pending value on this receiver without blocking.
1062     ///
1063     /// This method will never block the caller in order to wait for data to
1064     /// become available. Instead, this will always return immediately with a
1065     /// possible option of pending data on the channel.
1066     ///
1067     /// This is useful for a flavor of "optimistic check" before deciding to
1068     /// block on a receiver.
1069     ///
1070     /// Compared with [`recv`], this function has two failure cases instead of one
1071     /// (one for disconnection, one for an empty buffer).
1072     ///
1073     /// [`recv`]: struct.Receiver.html#method.recv
1074     ///
1075     /// # Examples
1076     ///
1077     /// ```rust
1078     /// use std::sync::mpsc::{Receiver, channel};
1079     ///
1080     /// let (_, receiver): (_, Receiver<i32>) = channel();
1081     ///
1082     /// assert!(receiver.try_recv().is_err());
1083     /// ```
1084     #[stable(feature = "rust1", since = "1.0.0")]
1085     pub fn try_recv(&self) -> Result<T, TryRecvError> {
1086         loop {
1087             let new_port = match *unsafe { self.inner() } {
1088                 Flavor::Oneshot(ref p) => {
1089                     match p.try_recv() {
1090                         Ok(t) => return Ok(t),
1091                         Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1092                         Err(oneshot::Disconnected) => {
1093                             return Err(TryRecvError::Disconnected)
1094                         }
1095                         Err(oneshot::Upgraded(rx)) => rx,
1096                     }
1097                 }
1098                 Flavor::Stream(ref p) => {
1099                     match p.try_recv() {
1100                         Ok(t) => return Ok(t),
1101                         Err(stream::Empty) => return Err(TryRecvError::Empty),
1102                         Err(stream::Disconnected) => {
1103                             return Err(TryRecvError::Disconnected)
1104                         }
1105                         Err(stream::Upgraded(rx)) => rx,
1106                     }
1107                 }
1108                 Flavor::Shared(ref p) => {
1109                     match p.try_recv() {
1110                         Ok(t) => return Ok(t),
1111                         Err(shared::Empty) => return Err(TryRecvError::Empty),
1112                         Err(shared::Disconnected) => {
1113                             return Err(TryRecvError::Disconnected)
1114                         }
1115                     }
1116                 }
1117                 Flavor::Sync(ref p) => {
1118                     match p.try_recv() {
1119                         Ok(t) => return Ok(t),
1120                         Err(sync::Empty) => return Err(TryRecvError::Empty),
1121                         Err(sync::Disconnected) => {
1122                             return Err(TryRecvError::Disconnected)
1123                         }
1124                     }
1125                 }
1126             };
1127             unsafe {
1128                 mem::swap(self.inner_mut(),
1129                           new_port.inner_mut());
1130             }
1131         }
1132     }
1133
1134     /// Attempts to wait for a value on this receiver, returning an error if the
1135     /// corresponding channel has hung up.
1136     ///
1137     /// This function will always block the current thread if there is no data
1138     /// available and it's possible for more data to be sent. Once a message is
1139     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1140     /// receiver will wake up and return that message.
1141     ///
1142     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1143     /// this call is blocking, this call will wake up and return [`Err`] to
1144     /// indicate that no more messages can ever be received on this channel.
1145     /// However, since channels are buffered, messages sent before the disconnect
1146     /// will still be properly received.
1147     ///
1148     /// [`Sender`]: struct.Sender.html
1149     /// [`SyncSender`]: struct.SyncSender.html
1150     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1151     ///
1152     /// # Examples
1153     ///
1154     /// ```
1155     /// use std::sync::mpsc;
1156     /// use std::thread;
1157     ///
1158     /// let (send, recv) = mpsc::channel();
1159     /// let handle = thread::spawn(move || {
1160     ///     send.send(1u8).unwrap();
1161     /// });
1162     ///
1163     /// handle.join().unwrap();
1164     ///
1165     /// assert_eq!(Ok(1), recv.recv());
1166     /// ```
1167     ///
1168     /// Buffering behavior:
1169     ///
1170     /// ```
1171     /// use std::sync::mpsc;
1172     /// use std::thread;
1173     /// use std::sync::mpsc::RecvError;
1174     ///
1175     /// let (send, recv) = mpsc::channel();
1176     /// let handle = thread::spawn(move || {
1177     ///     send.send(1u8).unwrap();
1178     ///     send.send(2).unwrap();
1179     ///     send.send(3).unwrap();
1180     ///     drop(send);
1181     /// });
1182     ///
1183     /// // wait for the thread to join so we ensure the sender is dropped
1184     /// handle.join().unwrap();
1185     ///
1186     /// assert_eq!(Ok(1), recv.recv());
1187     /// assert_eq!(Ok(2), recv.recv());
1188     /// assert_eq!(Ok(3), recv.recv());
1189     /// assert_eq!(Err(RecvError), recv.recv());
1190     /// ```
1191     #[stable(feature = "rust1", since = "1.0.0")]
1192     pub fn recv(&self) -> Result<T, RecvError> {
1193         loop {
1194             let new_port = match *unsafe { self.inner() } {
1195                 Flavor::Oneshot(ref p) => {
1196                     match p.recv(None) {
1197                         Ok(t) => return Ok(t),
1198                         Err(oneshot::Disconnected) => return Err(RecvError),
1199                         Err(oneshot::Upgraded(rx)) => rx,
1200                         Err(oneshot::Empty) => unreachable!(),
1201                     }
1202                 }
1203                 Flavor::Stream(ref p) => {
1204                     match p.recv(None) {
1205                         Ok(t) => return Ok(t),
1206                         Err(stream::Disconnected) => return Err(RecvError),
1207                         Err(stream::Upgraded(rx)) => rx,
1208                         Err(stream::Empty) => unreachable!(),
1209                     }
1210                 }
1211                 Flavor::Shared(ref p) => {
1212                     match p.recv(None) {
1213                         Ok(t) => return Ok(t),
1214                         Err(shared::Disconnected) => return Err(RecvError),
1215                         Err(shared::Empty) => unreachable!(),
1216                     }
1217                 }
1218                 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1219             };
1220             unsafe {
1221                 mem::swap(self.inner_mut(), new_port.inner_mut());
1222             }
1223         }
1224     }
1225
1226     /// Attempts to wait for a value on this receiver, returning an error if the
1227     /// corresponding channel has hung up, or if it waits more than `timeout`.
1228     ///
1229     /// This function will always block the current thread if there is no data
1230     /// available and it's possible for more data to be sent. Once a message is
1231     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1232     /// receiver will wake up and return that message.
1233     ///
1234     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1235     /// this call is blocking, this call will wake up and return [`Err`] to
1236     /// indicate that no more messages can ever be received on this channel.
1237     /// However, since channels are buffered, messages sent before the disconnect
1238     /// will still be properly received.
1239     ///
1240     /// [`Sender`]: struct.Sender.html
1241     /// [`SyncSender`]: struct.SyncSender.html
1242     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1243     ///
1244     /// # Known Issues
1245     ///
1246     /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
1247     /// to panic unexpectedly with the following example:
1248     ///
1249     /// ```no_run
1250     /// use std::sync::mpsc::channel;
1251     /// use std::thread;
1252     /// use std::time::Duration;
1253     ///
1254     /// let (tx, rx) = channel::<String>();
1255     ///
1256     /// thread::spawn(move || {
1257     ///     let d = Duration::from_millis(10);
1258     ///     loop {
1259     ///         println!("recv");
1260     ///         let _r = rx.recv_timeout(d);
1261     ///     }
1262     /// });
1263     ///
1264     /// thread::sleep(Duration::from_millis(100));
1265     /// let _c1 = tx.clone();
1266     ///
1267     /// thread::sleep(Duration::from_secs(1));
1268     /// ```
1269     ///
1270     /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
1271     ///
1272     /// # Examples
1273     ///
1274     /// Successfully receiving value before encountering timeout:
1275     ///
1276     /// ```no_run
1277     /// use std::thread;
1278     /// use std::time::Duration;
1279     /// use std::sync::mpsc;
1280     ///
1281     /// let (send, recv) = mpsc::channel();
1282     ///
1283     /// thread::spawn(move || {
1284     ///     send.send('a').unwrap();
1285     /// });
1286     ///
1287     /// assert_eq!(
1288     ///     recv.recv_timeout(Duration::from_millis(400)),
1289     ///     Ok('a')
1290     /// );
1291     /// ```
1292     ///
1293     /// Receiving an error upon reaching timeout:
1294     ///
1295     /// ```no_run
1296     /// use std::thread;
1297     /// use std::time::Duration;
1298     /// use std::sync::mpsc;
1299     ///
1300     /// let (send, recv) = mpsc::channel();
1301     ///
1302     /// thread::spawn(move || {
1303     ///     thread::sleep(Duration::from_millis(800));
1304     ///     send.send('a').unwrap();
1305     /// });
1306     ///
1307     /// assert_eq!(
1308     ///     recv.recv_timeout(Duration::from_millis(400)),
1309     ///     Err(mpsc::RecvTimeoutError::Timeout)
1310     /// );
1311     /// ```
1312     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1313     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1314         // Do an optimistic try_recv to avoid the performance impact of
1315         // Instant::now() in the full-channel case.
1316         match self.try_recv() {
1317             Ok(result) => Ok(result),
1318             Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
1319             Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
1320                 Some(deadline) => self.recv_deadline(deadline),
1321                 // So far in the future that it's practically the same as waiting indefinitely.
1322                 None => self.recv().map_err(RecvTimeoutError::from),
1323             },
1324         }
1325     }
1326
1327     /// Attempts to wait for a value on this receiver, returning an error if the
1328     /// corresponding channel has hung up, or if `deadline` is reached.
1329     ///
1330     /// This function will always block the current thread if there is no data
1331     /// available and it's possible for more data to be sent. Once a message is
1332     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1333     /// receiver will wake up and return that message.
1334     ///
1335     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1336     /// this call is blocking, this call will wake up and return [`Err`] to
1337     /// indicate that no more messages can ever be received on this channel.
1338     /// However, since channels are buffered, messages sent before the disconnect
1339     /// will still be properly received.
1340     ///
1341     /// [`Sender`]: struct.Sender.html
1342     /// [`SyncSender`]: struct.SyncSender.html
1343     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1344     ///
1345     /// # Examples
1346     ///
1347     /// Successfully receiving value before reaching deadline:
1348     ///
1349     /// ```no_run
1350     /// #![feature(deadline_api)]
1351     /// use std::thread;
1352     /// use std::time::{Duration, Instant};
1353     /// use std::sync::mpsc;
1354     ///
1355     /// let (send, recv) = mpsc::channel();
1356     ///
1357     /// thread::spawn(move || {
1358     ///     send.send('a').unwrap();
1359     /// });
1360     ///
1361     /// assert_eq!(
1362     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1363     ///     Ok('a')
1364     /// );
1365     /// ```
1366     ///
1367     /// Receiving an error upon reaching deadline:
1368     ///
1369     /// ```no_run
1370     /// #![feature(deadline_api)]
1371     /// use std::thread;
1372     /// use std::time::{Duration, Instant};
1373     /// use std::sync::mpsc;
1374     ///
1375     /// let (send, recv) = mpsc::channel();
1376     ///
1377     /// thread::spawn(move || {
1378     ///     thread::sleep(Duration::from_millis(800));
1379     ///     send.send('a').unwrap();
1380     /// });
1381     ///
1382     /// assert_eq!(
1383     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1384     ///     Err(mpsc::RecvTimeoutError::Timeout)
1385     /// );
1386     /// ```
1387     #[unstable(feature = "deadline_api", issue = "46316")]
1388     pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1389         use self::RecvTimeoutError::*;
1390
1391         loop {
1392             let port_or_empty = match *unsafe { self.inner() } {
1393                 Flavor::Oneshot(ref p) => {
1394                     match p.recv(Some(deadline)) {
1395                         Ok(t) => return Ok(t),
1396                         Err(oneshot::Disconnected) => return Err(Disconnected),
1397                         Err(oneshot::Upgraded(rx)) => Some(rx),
1398                         Err(oneshot::Empty) => None,
1399                     }
1400                 }
1401                 Flavor::Stream(ref p) => {
1402                     match p.recv(Some(deadline)) {
1403                         Ok(t) => return Ok(t),
1404                         Err(stream::Disconnected) => return Err(Disconnected),
1405                         Err(stream::Upgraded(rx)) => Some(rx),
1406                         Err(stream::Empty) => None,
1407                     }
1408                 }
1409                 Flavor::Shared(ref p) => {
1410                     match p.recv(Some(deadline)) {
1411                         Ok(t) => return Ok(t),
1412                         Err(shared::Disconnected) => return Err(Disconnected),
1413                         Err(shared::Empty) => None,
1414                     }
1415                 }
1416                 Flavor::Sync(ref p) => {
1417                     match p.recv(Some(deadline)) {
1418                         Ok(t) => return Ok(t),
1419                         Err(sync::Disconnected) => return Err(Disconnected),
1420                         Err(sync::Empty) => None,
1421                     }
1422                 }
1423             };
1424
1425             if let Some(new_port) = port_or_empty {
1426                 unsafe {
1427                     mem::swap(self.inner_mut(), new_port.inner_mut());
1428                 }
1429             }
1430
1431             // If we're already passed the deadline, and we're here without
1432             // data, return a timeout, else try again.
1433             if Instant::now() >= deadline {
1434                 return Err(Timeout);
1435             }
1436         }
1437     }
1438
1439     /// Returns an iterator that will block waiting for messages, but never
1440     /// [`panic!`]. It will return [`None`] when the channel has hung up.
1441     ///
1442     /// [`panic!`]: ../../../std/macro.panic.html
1443     /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1444     ///
1445     /// # Examples
1446     ///
1447     /// ```rust
1448     /// use std::sync::mpsc::channel;
1449     /// use std::thread;
1450     ///
1451     /// let (send, recv) = channel();
1452     ///
1453     /// thread::spawn(move || {
1454     ///     send.send(1).unwrap();
1455     ///     send.send(2).unwrap();
1456     ///     send.send(3).unwrap();
1457     /// });
1458     ///
1459     /// let mut iter = recv.iter();
1460     /// assert_eq!(iter.next(), Some(1));
1461     /// assert_eq!(iter.next(), Some(2));
1462     /// assert_eq!(iter.next(), Some(3));
1463     /// assert_eq!(iter.next(), None);
1464     /// ```
1465     #[stable(feature = "rust1", since = "1.0.0")]
1466     pub fn iter(&self) -> Iter<T> {
1467         Iter { rx: self }
1468     }
1469
1470     /// Returns an iterator that will attempt to yield all pending values.
1471     /// It will return `None` if there are no more pending values or if the
1472     /// channel has hung up. The iterator will never [`panic!`] or block the
1473     /// user by waiting for values.
1474     ///
1475     /// [`panic!`]: ../../../std/macro.panic.html
1476     ///
1477     /// # Examples
1478     ///
1479     /// ```no_run
1480     /// use std::sync::mpsc::channel;
1481     /// use std::thread;
1482     /// use std::time::Duration;
1483     ///
1484     /// let (sender, receiver) = channel();
1485     ///
1486     /// // nothing is in the buffer yet
1487     /// assert!(receiver.try_iter().next().is_none());
1488     ///
1489     /// thread::spawn(move || {
1490     ///     thread::sleep(Duration::from_secs(1));
1491     ///     sender.send(1).unwrap();
1492     ///     sender.send(2).unwrap();
1493     ///     sender.send(3).unwrap();
1494     /// });
1495     ///
1496     /// // nothing is in the buffer yet
1497     /// assert!(receiver.try_iter().next().is_none());
1498     ///
1499     /// // block for two seconds
1500     /// thread::sleep(Duration::from_secs(2));
1501     ///
1502     /// let mut iter = receiver.try_iter();
1503     /// assert_eq!(iter.next(), Some(1));
1504     /// assert_eq!(iter.next(), Some(2));
1505     /// assert_eq!(iter.next(), Some(3));
1506     /// assert_eq!(iter.next(), None);
1507     /// ```
1508     #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1509     pub fn try_iter(&self) -> TryIter<T> {
1510         TryIter { rx: self }
1511     }
1512
1513 }
1514
1515 impl<T> select::Packet for Receiver<T> {
1516     fn can_recv(&self) -> bool {
1517         loop {
1518             let new_port = match *unsafe { self.inner() } {
1519                 Flavor::Oneshot(ref p) => {
1520                     match p.can_recv() {
1521                         Ok(ret) => return ret,
1522                         Err(upgrade) => upgrade,
1523                     }
1524                 }
1525                 Flavor::Stream(ref p) => {
1526                     match p.can_recv() {
1527                         Ok(ret) => return ret,
1528                         Err(upgrade) => upgrade,
1529                     }
1530                 }
1531                 Flavor::Shared(ref p) => return p.can_recv(),
1532                 Flavor::Sync(ref p) => return p.can_recv(),
1533             };
1534             unsafe {
1535                 mem::swap(self.inner_mut(),
1536                           new_port.inner_mut());
1537             }
1538         }
1539     }
1540
1541     fn start_selection(&self, mut token: SignalToken) -> StartResult {
1542         loop {
1543             let (t, new_port) = match *unsafe { self.inner() } {
1544                 Flavor::Oneshot(ref p) => {
1545                     match p.start_selection(token) {
1546                         oneshot::SelSuccess => return Installed,
1547                         oneshot::SelCanceled => return Abort,
1548                         oneshot::SelUpgraded(t, rx) => (t, rx),
1549                     }
1550                 }
1551                 Flavor::Stream(ref p) => {
1552                     match p.start_selection(token) {
1553                         stream::SelSuccess => return Installed,
1554                         stream::SelCanceled => return Abort,
1555                         stream::SelUpgraded(t, rx) => (t, rx),
1556                     }
1557                 }
1558                 Flavor::Shared(ref p) => return p.start_selection(token),
1559                 Flavor::Sync(ref p) => return p.start_selection(token),
1560             };
1561             token = t;
1562             unsafe {
1563                 mem::swap(self.inner_mut(), new_port.inner_mut());
1564             }
1565         }
1566     }
1567
1568     fn abort_selection(&self) -> bool {
1569         let mut was_upgrade = false;
1570         loop {
1571             let result = match *unsafe { self.inner() } {
1572                 Flavor::Oneshot(ref p) => p.abort_selection(),
1573                 Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
1574                 Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
1575                 Flavor::Sync(ref p) => return p.abort_selection(),
1576             };
1577             let new_port = match result { Ok(b) => return b, Err(p) => p };
1578             was_upgrade = true;
1579             unsafe {
1580                 mem::swap(self.inner_mut(),
1581                           new_port.inner_mut());
1582             }
1583         }
1584     }
1585 }
1586
1587 #[stable(feature = "rust1", since = "1.0.0")]
1588 impl<'a, T> Iterator for Iter<'a, T> {
1589     type Item = T;
1590
1591     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1592 }
1593
1594 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1595 impl<'a, T> Iterator for TryIter<'a, T> {
1596     type Item = T;
1597
1598     fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
1599 }
1600
1601 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1602 impl<'a, T> IntoIterator for &'a Receiver<T> {
1603     type Item = T;
1604     type IntoIter = Iter<'a, T>;
1605
1606     fn into_iter(self) -> Iter<'a, T> { self.iter() }
1607 }
1608
1609 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1610 impl<T> Iterator for IntoIter<T> {
1611     type Item = T;
1612     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1613 }
1614
1615 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1616 impl <T> IntoIterator for Receiver<T> {
1617     type Item = T;
1618     type IntoIter = IntoIter<T>;
1619
1620     fn into_iter(self) -> IntoIter<T> {
1621         IntoIter { rx: self }
1622     }
1623 }
1624
1625 #[stable(feature = "rust1", since = "1.0.0")]
1626 impl<T> Drop for Receiver<T> {
1627     fn drop(&mut self) {
1628         match *unsafe { self.inner() } {
1629             Flavor::Oneshot(ref p) => p.drop_port(),
1630             Flavor::Stream(ref p) => p.drop_port(),
1631             Flavor::Shared(ref p) => p.drop_port(),
1632             Flavor::Sync(ref p) => p.drop_port(),
1633         }
1634     }
1635 }
1636
1637 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1638 impl<T> fmt::Debug for Receiver<T> {
1639     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1640         f.debug_struct("Receiver").finish()
1641     }
1642 }
1643
1644 #[stable(feature = "rust1", since = "1.0.0")]
1645 impl<T> fmt::Debug for SendError<T> {
1646     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1647         "SendError(..)".fmt(f)
1648     }
1649 }
1650
1651 #[stable(feature = "rust1", since = "1.0.0")]
1652 impl<T> fmt::Display for SendError<T> {
1653     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1654         "sending on a closed channel".fmt(f)
1655     }
1656 }
1657
1658 #[stable(feature = "rust1", since = "1.0.0")]
1659 impl<T: Send> error::Error for SendError<T> {
1660     fn description(&self) -> &str {
1661         "sending on a closed channel"
1662     }
1663
1664     fn cause(&self) -> Option<&dyn error::Error> {
1665         None
1666     }
1667 }
1668
1669 #[stable(feature = "rust1", since = "1.0.0")]
1670 impl<T> fmt::Debug for TrySendError<T> {
1671     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1672         match *self {
1673             TrySendError::Full(..) => "Full(..)".fmt(f),
1674             TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1675         }
1676     }
1677 }
1678
1679 #[stable(feature = "rust1", since = "1.0.0")]
1680 impl<T> fmt::Display for TrySendError<T> {
1681     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1682         match *self {
1683             TrySendError::Full(..) => {
1684                 "sending on a full channel".fmt(f)
1685             }
1686             TrySendError::Disconnected(..) => {
1687                 "sending on a closed channel".fmt(f)
1688             }
1689         }
1690     }
1691 }
1692
1693 #[stable(feature = "rust1", since = "1.0.0")]
1694 impl<T: Send> error::Error for TrySendError<T> {
1695
1696     fn description(&self) -> &str {
1697         match *self {
1698             TrySendError::Full(..) => {
1699                 "sending on a full channel"
1700             }
1701             TrySendError::Disconnected(..) => {
1702                 "sending on a closed channel"
1703             }
1704         }
1705     }
1706
1707     fn cause(&self) -> Option<&dyn error::Error> {
1708         None
1709     }
1710 }
1711
1712 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1713 impl<T> From<SendError<T>> for TrySendError<T> {
1714     fn from(err: SendError<T>) -> TrySendError<T> {
1715         match err {
1716             SendError(t) => TrySendError::Disconnected(t),
1717         }
1718     }
1719 }
1720
1721 #[stable(feature = "rust1", since = "1.0.0")]
1722 impl fmt::Display for RecvError {
1723     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1724         "receiving on a closed channel".fmt(f)
1725     }
1726 }
1727
1728 #[stable(feature = "rust1", since = "1.0.0")]
1729 impl error::Error for RecvError {
1730
1731     fn description(&self) -> &str {
1732         "receiving on a closed channel"
1733     }
1734
1735     fn cause(&self) -> Option<&dyn error::Error> {
1736         None
1737     }
1738 }
1739
1740 #[stable(feature = "rust1", since = "1.0.0")]
1741 impl fmt::Display for TryRecvError {
1742     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1743         match *self {
1744             TryRecvError::Empty => {
1745                 "receiving on an empty channel".fmt(f)
1746             }
1747             TryRecvError::Disconnected => {
1748                 "receiving on a closed channel".fmt(f)
1749             }
1750         }
1751     }
1752 }
1753
1754 #[stable(feature = "rust1", since = "1.0.0")]
1755 impl error::Error for TryRecvError {
1756
1757     fn description(&self) -> &str {
1758         match *self {
1759             TryRecvError::Empty => {
1760                 "receiving on an empty channel"
1761             }
1762             TryRecvError::Disconnected => {
1763                 "receiving on a closed channel"
1764             }
1765         }
1766     }
1767
1768     fn cause(&self) -> Option<&dyn error::Error> {
1769         None
1770     }
1771 }
1772
1773 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1774 impl From<RecvError> for TryRecvError {
1775     fn from(err: RecvError) -> TryRecvError {
1776         match err {
1777             RecvError => TryRecvError::Disconnected,
1778         }
1779     }
1780 }
1781
1782 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1783 impl fmt::Display for RecvTimeoutError {
1784     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1785         match *self {
1786             RecvTimeoutError::Timeout => {
1787                 "timed out waiting on channel".fmt(f)
1788             }
1789             RecvTimeoutError::Disconnected => {
1790                 "channel is empty and sending half is closed".fmt(f)
1791             }
1792         }
1793     }
1794 }
1795
1796 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1797 impl error::Error for RecvTimeoutError {
1798     fn description(&self) -> &str {
1799         match *self {
1800             RecvTimeoutError::Timeout => {
1801                 "timed out waiting on channel"
1802             }
1803             RecvTimeoutError::Disconnected => {
1804                 "channel is empty and sending half is closed"
1805             }
1806         }
1807     }
1808
1809     fn cause(&self) -> Option<&dyn error::Error> {
1810         None
1811     }
1812 }
1813
1814 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1815 impl From<RecvError> for RecvTimeoutError {
1816     fn from(err: RecvError) -> RecvTimeoutError {
1817         match err {
1818             RecvError => RecvTimeoutError::Disconnected,
1819         }
1820     }
1821 }
1822
1823 #[cfg(all(test, not(target_os = "emscripten")))]
1824 mod tests {
1825     use super::*;
1826     use crate::env;
1827     use crate::thread;
1828     use crate::time::{Duration, Instant};
1829
1830     pub fn stress_factor() -> usize {
1831         match env::var("RUST_TEST_STRESS") {
1832             Ok(val) => val.parse().unwrap(),
1833             Err(..) => 1,
1834         }
1835     }
1836
1837     #[test]
1838     fn smoke() {
1839         let (tx, rx) = channel::<i32>();
1840         tx.send(1).unwrap();
1841         assert_eq!(rx.recv().unwrap(), 1);
1842     }
1843
1844     #[test]
1845     fn drop_full() {
1846         let (tx, _rx) = channel::<Box<isize>>();
1847         tx.send(box 1).unwrap();
1848     }
1849
1850     #[test]
1851     fn drop_full_shared() {
1852         let (tx, _rx) = channel::<Box<isize>>();
1853         drop(tx.clone());
1854         drop(tx.clone());
1855         tx.send(box 1).unwrap();
1856     }
1857
1858     #[test]
1859     fn smoke_shared() {
1860         let (tx, rx) = channel::<i32>();
1861         tx.send(1).unwrap();
1862         assert_eq!(rx.recv().unwrap(), 1);
1863         let tx = tx.clone();
1864         tx.send(1).unwrap();
1865         assert_eq!(rx.recv().unwrap(), 1);
1866     }
1867
1868     #[test]
1869     fn smoke_threads() {
1870         let (tx, rx) = channel::<i32>();
1871         let _t = thread::spawn(move|| {
1872             tx.send(1).unwrap();
1873         });
1874         assert_eq!(rx.recv().unwrap(), 1);
1875     }
1876
1877     #[test]
1878     fn smoke_port_gone() {
1879         let (tx, rx) = channel::<i32>();
1880         drop(rx);
1881         assert!(tx.send(1).is_err());
1882     }
1883
1884     #[test]
1885     fn smoke_shared_port_gone() {
1886         let (tx, rx) = channel::<i32>();
1887         drop(rx);
1888         assert!(tx.send(1).is_err())
1889     }
1890
1891     #[test]
1892     fn smoke_shared_port_gone2() {
1893         let (tx, rx) = channel::<i32>();
1894         drop(rx);
1895         let tx2 = tx.clone();
1896         drop(tx);
1897         assert!(tx2.send(1).is_err());
1898     }
1899
1900     #[test]
1901     fn port_gone_concurrent() {
1902         let (tx, rx) = channel::<i32>();
1903         let _t = thread::spawn(move|| {
1904             rx.recv().unwrap();
1905         });
1906         while tx.send(1).is_ok() {}
1907     }
1908
1909     #[test]
1910     fn port_gone_concurrent_shared() {
1911         let (tx, rx) = channel::<i32>();
1912         let tx2 = tx.clone();
1913         let _t = thread::spawn(move|| {
1914             rx.recv().unwrap();
1915         });
1916         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1917     }
1918
1919     #[test]
1920     fn smoke_chan_gone() {
1921         let (tx, rx) = channel::<i32>();
1922         drop(tx);
1923         assert!(rx.recv().is_err());
1924     }
1925
1926     #[test]
1927     fn smoke_chan_gone_shared() {
1928         let (tx, rx) = channel::<()>();
1929         let tx2 = tx.clone();
1930         drop(tx);
1931         drop(tx2);
1932         assert!(rx.recv().is_err());
1933     }
1934
1935     #[test]
1936     fn chan_gone_concurrent() {
1937         let (tx, rx) = channel::<i32>();
1938         let _t = thread::spawn(move|| {
1939             tx.send(1).unwrap();
1940             tx.send(1).unwrap();
1941         });
1942         while rx.recv().is_ok() {}
1943     }
1944
1945     #[test]
1946     fn stress() {
1947         let (tx, rx) = channel::<i32>();
1948         let t = thread::spawn(move|| {
1949             for _ in 0..10000 { tx.send(1).unwrap(); }
1950         });
1951         for _ in 0..10000 {
1952             assert_eq!(rx.recv().unwrap(), 1);
1953         }
1954         t.join().ok().expect("thread panicked");
1955     }
1956
1957     #[test]
1958     fn stress_shared() {
1959         const AMT: u32 = 10000;
1960         const NTHREADS: u32 = 8;
1961         let (tx, rx) = channel::<i32>();
1962
1963         let t = thread::spawn(move|| {
1964             for _ in 0..AMT * NTHREADS {
1965                 assert_eq!(rx.recv().unwrap(), 1);
1966             }
1967             match rx.try_recv() {
1968                 Ok(..) => panic!(),
1969                 _ => {}
1970             }
1971         });
1972
1973         for _ in 0..NTHREADS {
1974             let tx = tx.clone();
1975             thread::spawn(move|| {
1976                 for _ in 0..AMT { tx.send(1).unwrap(); }
1977             });
1978         }
1979         drop(tx);
1980         t.join().ok().expect("thread panicked");
1981     }
1982
1983     #[test]
1984     fn send_from_outside_runtime() {
1985         let (tx1, rx1) = channel::<()>();
1986         let (tx2, rx2) = channel::<i32>();
1987         let t1 = thread::spawn(move|| {
1988             tx1.send(()).unwrap();
1989             for _ in 0..40 {
1990                 assert_eq!(rx2.recv().unwrap(), 1);
1991             }
1992         });
1993         rx1.recv().unwrap();
1994         let t2 = thread::spawn(move|| {
1995             for _ in 0..40 {
1996                 tx2.send(1).unwrap();
1997             }
1998         });
1999         t1.join().ok().expect("thread panicked");
2000         t2.join().ok().expect("thread panicked");
2001     }
2002
2003     #[test]
2004     fn recv_from_outside_runtime() {
2005         let (tx, rx) = channel::<i32>();
2006         let t = thread::spawn(move|| {
2007             for _ in 0..40 {
2008                 assert_eq!(rx.recv().unwrap(), 1);
2009             }
2010         });
2011         for _ in 0..40 {
2012             tx.send(1).unwrap();
2013         }
2014         t.join().ok().expect("thread panicked");
2015     }
2016
2017     #[test]
2018     fn no_runtime() {
2019         let (tx1, rx1) = channel::<i32>();
2020         let (tx2, rx2) = channel::<i32>();
2021         let t1 = thread::spawn(move|| {
2022             assert_eq!(rx1.recv().unwrap(), 1);
2023             tx2.send(2).unwrap();
2024         });
2025         let t2 = thread::spawn(move|| {
2026             tx1.send(1).unwrap();
2027             assert_eq!(rx2.recv().unwrap(), 2);
2028         });
2029         t1.join().ok().expect("thread panicked");
2030         t2.join().ok().expect("thread panicked");
2031     }
2032
2033     #[test]
2034     fn oneshot_single_thread_close_port_first() {
2035         // Simple test of closing without sending
2036         let (_tx, rx) = channel::<i32>();
2037         drop(rx);
2038     }
2039
2040     #[test]
2041     fn oneshot_single_thread_close_chan_first() {
2042         // Simple test of closing without sending
2043         let (tx, _rx) = channel::<i32>();
2044         drop(tx);
2045     }
2046
2047     #[test]
2048     fn oneshot_single_thread_send_port_close() {
2049         // Testing that the sender cleans up the payload if receiver is closed
2050         let (tx, rx) = channel::<Box<i32>>();
2051         drop(rx);
2052         assert!(tx.send(box 0).is_err());
2053     }
2054
2055     #[test]
2056     fn oneshot_single_thread_recv_chan_close() {
2057         // Receiving on a closed chan will panic
2058         let res = thread::spawn(move|| {
2059             let (tx, rx) = channel::<i32>();
2060             drop(tx);
2061             rx.recv().unwrap();
2062         }).join();
2063         // What is our res?
2064         assert!(res.is_err());
2065     }
2066
2067     #[test]
2068     fn oneshot_single_thread_send_then_recv() {
2069         let (tx, rx) = channel::<Box<i32>>();
2070         tx.send(box 10).unwrap();
2071         assert!(*rx.recv().unwrap() == 10);
2072     }
2073
2074     #[test]
2075     fn oneshot_single_thread_try_send_open() {
2076         let (tx, rx) = channel::<i32>();
2077         assert!(tx.send(10).is_ok());
2078         assert!(rx.recv().unwrap() == 10);
2079     }
2080
2081     #[test]
2082     fn oneshot_single_thread_try_send_closed() {
2083         let (tx, rx) = channel::<i32>();
2084         drop(rx);
2085         assert!(tx.send(10).is_err());
2086     }
2087
2088     #[test]
2089     fn oneshot_single_thread_try_recv_open() {
2090         let (tx, rx) = channel::<i32>();
2091         tx.send(10).unwrap();
2092         assert!(rx.recv() == Ok(10));
2093     }
2094
2095     #[test]
2096     fn oneshot_single_thread_try_recv_closed() {
2097         let (tx, rx) = channel::<i32>();
2098         drop(tx);
2099         assert!(rx.recv().is_err());
2100     }
2101
2102     #[test]
2103     fn oneshot_single_thread_peek_data() {
2104         let (tx, rx) = channel::<i32>();
2105         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2106         tx.send(10).unwrap();
2107         assert_eq!(rx.try_recv(), Ok(10));
2108     }
2109
2110     #[test]
2111     fn oneshot_single_thread_peek_close() {
2112         let (tx, rx) = channel::<i32>();
2113         drop(tx);
2114         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2115         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2116     }
2117
2118     #[test]
2119     fn oneshot_single_thread_peek_open() {
2120         let (_tx, rx) = channel::<i32>();
2121         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2122     }
2123
2124     #[test]
2125     fn oneshot_multi_task_recv_then_send() {
2126         let (tx, rx) = channel::<Box<i32>>();
2127         let _t = thread::spawn(move|| {
2128             assert!(*rx.recv().unwrap() == 10);
2129         });
2130
2131         tx.send(box 10).unwrap();
2132     }
2133
2134     #[test]
2135     fn oneshot_multi_task_recv_then_close() {
2136         let (tx, rx) = channel::<Box<i32>>();
2137         let _t = thread::spawn(move|| {
2138             drop(tx);
2139         });
2140         let res = thread::spawn(move|| {
2141             assert!(*rx.recv().unwrap() == 10);
2142         }).join();
2143         assert!(res.is_err());
2144     }
2145
2146     #[test]
2147     fn oneshot_multi_thread_close_stress() {
2148         for _ in 0..stress_factor() {
2149             let (tx, rx) = channel::<i32>();
2150             let _t = thread::spawn(move|| {
2151                 drop(rx);
2152             });
2153             drop(tx);
2154         }
2155     }
2156
2157     #[test]
2158     fn oneshot_multi_thread_send_close_stress() {
2159         for _ in 0..stress_factor() {
2160             let (tx, rx) = channel::<i32>();
2161             let _t = thread::spawn(move|| {
2162                 drop(rx);
2163             });
2164             let _ = thread::spawn(move|| {
2165                 tx.send(1).unwrap();
2166             }).join();
2167         }
2168     }
2169
2170     #[test]
2171     fn oneshot_multi_thread_recv_close_stress() {
2172         for _ in 0..stress_factor() {
2173             let (tx, rx) = channel::<i32>();
2174             thread::spawn(move|| {
2175                 let res = thread::spawn(move|| {
2176                     rx.recv().unwrap();
2177                 }).join();
2178                 assert!(res.is_err());
2179             });
2180             let _t = thread::spawn(move|| {
2181                 thread::spawn(move|| {
2182                     drop(tx);
2183                 });
2184             });
2185         }
2186     }
2187
2188     #[test]
2189     fn oneshot_multi_thread_send_recv_stress() {
2190         for _ in 0..stress_factor() {
2191             let (tx, rx) = channel::<Box<isize>>();
2192             let _t = thread::spawn(move|| {
2193                 tx.send(box 10).unwrap();
2194             });
2195             assert!(*rx.recv().unwrap() == 10);
2196         }
2197     }
2198
2199     #[test]
2200     fn stream_send_recv_stress() {
2201         for _ in 0..stress_factor() {
2202             let (tx, rx) = channel();
2203
2204             send(tx, 0);
2205             recv(rx, 0);
2206
2207             fn send(tx: Sender<Box<i32>>, i: i32) {
2208                 if i == 10 { return }
2209
2210                 thread::spawn(move|| {
2211                     tx.send(box i).unwrap();
2212                     send(tx, i + 1);
2213                 });
2214             }
2215
2216             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2217                 if i == 10 { return }
2218
2219                 thread::spawn(move|| {
2220                     assert!(*rx.recv().unwrap() == i);
2221                     recv(rx, i + 1);
2222                 });
2223             }
2224         }
2225     }
2226
2227     #[test]
2228     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2229     fn oneshot_single_thread_recv_timeout() {
2230         let (tx, rx) = channel();
2231         tx.send(()).unwrap();
2232         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2233         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2234         tx.send(()).unwrap();
2235         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2236     }
2237
2238     #[test]
2239     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2240     fn stress_recv_timeout_two_threads() {
2241         let (tx, rx) = channel();
2242         let stress = stress_factor() + 100;
2243         let timeout = Duration::from_millis(100);
2244
2245         thread::spawn(move || {
2246             for i in 0..stress {
2247                 if i % 2 == 0 {
2248                     thread::sleep(timeout * 2);
2249                 }
2250                 tx.send(1usize).unwrap();
2251             }
2252         });
2253
2254         let mut recv_count = 0;
2255         loop {
2256             match rx.recv_timeout(timeout) {
2257                 Ok(n) => {
2258                     assert_eq!(n, 1usize);
2259                     recv_count += 1;
2260                 }
2261                 Err(RecvTimeoutError::Timeout) => continue,
2262                 Err(RecvTimeoutError::Disconnected) => break,
2263             }
2264         }
2265
2266         assert_eq!(recv_count, stress);
2267     }
2268
2269     #[test]
2270     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2271     fn recv_timeout_upgrade() {
2272         let (tx, rx) = channel::<()>();
2273         let timeout = Duration::from_millis(1);
2274         let _tx_clone = tx.clone();
2275
2276         let start = Instant::now();
2277         assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
2278         assert!(Instant::now() >= start + timeout);
2279     }
2280
2281     #[test]
2282     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2283     fn stress_recv_timeout_shared() {
2284         let (tx, rx) = channel();
2285         let stress = stress_factor() + 100;
2286
2287         for i in 0..stress {
2288             let tx = tx.clone();
2289             thread::spawn(move || {
2290                 thread::sleep(Duration::from_millis(i as u64 * 10));
2291                 tx.send(1usize).unwrap();
2292             });
2293         }
2294
2295         drop(tx);
2296
2297         let mut recv_count = 0;
2298         loop {
2299             match rx.recv_timeout(Duration::from_millis(10)) {
2300                 Ok(n) => {
2301                     assert_eq!(n, 1usize);
2302                     recv_count += 1;
2303                 }
2304                 Err(RecvTimeoutError::Timeout) => continue,
2305                 Err(RecvTimeoutError::Disconnected) => break,
2306             }
2307         }
2308
2309         assert_eq!(recv_count, stress);
2310     }
2311
2312     #[test]
2313     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2314     fn very_long_recv_timeout_wont_panic() {
2315         let (tx, rx) = channel::<()>();
2316         let join_handle = thread::spawn(move || {
2317             rx.recv_timeout(Duration::from_secs(u64::max_value()))
2318         });
2319         thread::sleep(Duration::from_secs(1));
2320         assert!(tx.send(()).is_ok());
2321         assert_eq!(join_handle.join().unwrap(), Ok(()));
2322     }
2323
2324     #[test]
2325     fn recv_a_lot() {
2326         // Regression test that we don't run out of stack in scheduler context
2327         let (tx, rx) = channel();
2328         for _ in 0..10000 { tx.send(()).unwrap(); }
2329         for _ in 0..10000 { rx.recv().unwrap(); }
2330     }
2331
2332     #[test]
2333     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2334     fn shared_recv_timeout() {
2335         let (tx, rx) = channel();
2336         let total = 5;
2337         for _ in 0..total {
2338             let tx = tx.clone();
2339             thread::spawn(move|| {
2340                 tx.send(()).unwrap();
2341             });
2342         }
2343
2344         for _ in 0..total { rx.recv().unwrap(); }
2345
2346         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2347         tx.send(()).unwrap();
2348         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2349     }
2350
2351     #[test]
2352     fn shared_chan_stress() {
2353         let (tx, rx) = channel();
2354         let total = stress_factor() + 100;
2355         for _ in 0..total {
2356             let tx = tx.clone();
2357             thread::spawn(move|| {
2358                 tx.send(()).unwrap();
2359             });
2360         }
2361
2362         for _ in 0..total {
2363             rx.recv().unwrap();
2364         }
2365     }
2366
2367     #[test]
2368     fn test_nested_recv_iter() {
2369         let (tx, rx) = channel::<i32>();
2370         let (total_tx, total_rx) = channel::<i32>();
2371
2372         let _t = thread::spawn(move|| {
2373             let mut acc = 0;
2374             for x in rx.iter() {
2375                 acc += x;
2376             }
2377             total_tx.send(acc).unwrap();
2378         });
2379
2380         tx.send(3).unwrap();
2381         tx.send(1).unwrap();
2382         tx.send(2).unwrap();
2383         drop(tx);
2384         assert_eq!(total_rx.recv().unwrap(), 6);
2385     }
2386
2387     #[test]
2388     fn test_recv_iter_break() {
2389         let (tx, rx) = channel::<i32>();
2390         let (count_tx, count_rx) = channel();
2391
2392         let _t = thread::spawn(move|| {
2393             let mut count = 0;
2394             for x in rx.iter() {
2395                 if count >= 3 {
2396                     break;
2397                 } else {
2398                     count += x;
2399                 }
2400             }
2401             count_tx.send(count).unwrap();
2402         });
2403
2404         tx.send(2).unwrap();
2405         tx.send(2).unwrap();
2406         tx.send(2).unwrap();
2407         let _ = tx.send(2);
2408         drop(tx);
2409         assert_eq!(count_rx.recv().unwrap(), 4);
2410     }
2411
2412     #[test]
2413     fn test_recv_try_iter() {
2414         let (request_tx, request_rx) = channel();
2415         let (response_tx, response_rx) = channel();
2416
2417         // Request `x`s until we have `6`.
2418         let t = thread::spawn(move|| {
2419             let mut count = 0;
2420             loop {
2421                 for x in response_rx.try_iter() {
2422                     count += x;
2423                     if count == 6 {
2424                         return count;
2425                     }
2426                 }
2427                 request_tx.send(()).unwrap();
2428             }
2429         });
2430
2431         for _ in request_rx.iter() {
2432             if response_tx.send(2).is_err() {
2433                 break;
2434             }
2435         }
2436
2437         assert_eq!(t.join().unwrap(), 6);
2438     }
2439
2440     #[test]
2441     fn test_recv_into_iter_owned() {
2442         let mut iter = {
2443           let (tx, rx) = channel::<i32>();
2444           tx.send(1).unwrap();
2445           tx.send(2).unwrap();
2446
2447           rx.into_iter()
2448         };
2449         assert_eq!(iter.next().unwrap(), 1);
2450         assert_eq!(iter.next().unwrap(), 2);
2451         assert_eq!(iter.next().is_none(), true);
2452     }
2453
2454     #[test]
2455     fn test_recv_into_iter_borrowed() {
2456         let (tx, rx) = channel::<i32>();
2457         tx.send(1).unwrap();
2458         tx.send(2).unwrap();
2459         drop(tx);
2460         let mut iter = (&rx).into_iter();
2461         assert_eq!(iter.next().unwrap(), 1);
2462         assert_eq!(iter.next().unwrap(), 2);
2463         assert_eq!(iter.next().is_none(), true);
2464     }
2465
2466     #[test]
2467     fn try_recv_states() {
2468         let (tx1, rx1) = channel::<i32>();
2469         let (tx2, rx2) = channel::<()>();
2470         let (tx3, rx3) = channel::<()>();
2471         let _t = thread::spawn(move|| {
2472             rx2.recv().unwrap();
2473             tx1.send(1).unwrap();
2474             tx3.send(()).unwrap();
2475             rx2.recv().unwrap();
2476             drop(tx1);
2477             tx3.send(()).unwrap();
2478         });
2479
2480         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2481         tx2.send(()).unwrap();
2482         rx3.recv().unwrap();
2483         assert_eq!(rx1.try_recv(), Ok(1));
2484         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2485         tx2.send(()).unwrap();
2486         rx3.recv().unwrap();
2487         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2488     }
2489
2490     // This bug used to end up in a livelock inside of the Receiver destructor
2491     // because the internal state of the Shared packet was corrupted
2492     #[test]
2493     fn destroy_upgraded_shared_port_when_sender_still_active() {
2494         let (tx, rx) = channel();
2495         let (tx2, rx2) = channel();
2496         let _t = thread::spawn(move|| {
2497             rx.recv().unwrap(); // wait on a oneshot
2498             drop(rx);  // destroy a shared
2499             tx2.send(()).unwrap();
2500         });
2501         // make sure the other thread has gone to sleep
2502         for _ in 0..5000 { thread::yield_now(); }
2503
2504         // upgrade to a shared chan and send a message
2505         let t = tx.clone();
2506         drop(tx);
2507         t.send(()).unwrap();
2508
2509         // wait for the child thread to exit before we exit
2510         rx2.recv().unwrap();
2511     }
2512
2513     #[test]
2514     fn issue_32114() {
2515         let (tx, _) = channel();
2516         let _ = tx.send(123);
2517         assert_eq!(tx.send(123), Err(SendError(123)));
2518     }
2519 }
2520
2521 #[cfg(all(test, not(target_os = "emscripten")))]
2522 mod sync_tests {
2523     use super::*;
2524     use crate::env;
2525     use crate::thread;
2526     use crate::time::Duration;
2527
2528     pub fn stress_factor() -> usize {
2529         match env::var("RUST_TEST_STRESS") {
2530             Ok(val) => val.parse().unwrap(),
2531             Err(..) => 1,
2532         }
2533     }
2534
2535     #[test]
2536     fn smoke() {
2537         let (tx, rx) = sync_channel::<i32>(1);
2538         tx.send(1).unwrap();
2539         assert_eq!(rx.recv().unwrap(), 1);
2540     }
2541
2542     #[test]
2543     fn drop_full() {
2544         let (tx, _rx) = sync_channel::<Box<isize>>(1);
2545         tx.send(box 1).unwrap();
2546     }
2547
2548     #[test]
2549     fn smoke_shared() {
2550         let (tx, rx) = sync_channel::<i32>(1);
2551         tx.send(1).unwrap();
2552         assert_eq!(rx.recv().unwrap(), 1);
2553         let tx = tx.clone();
2554         tx.send(1).unwrap();
2555         assert_eq!(rx.recv().unwrap(), 1);
2556     }
2557
2558     #[test]
2559     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2560     fn recv_timeout() {
2561         let (tx, rx) = sync_channel::<i32>(1);
2562         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2563         tx.send(1).unwrap();
2564         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2565     }
2566
2567     #[test]
2568     fn smoke_threads() {
2569         let (tx, rx) = sync_channel::<i32>(0);
2570         let _t = thread::spawn(move|| {
2571             tx.send(1).unwrap();
2572         });
2573         assert_eq!(rx.recv().unwrap(), 1);
2574     }
2575
2576     #[test]
2577     fn smoke_port_gone() {
2578         let (tx, rx) = sync_channel::<i32>(0);
2579         drop(rx);
2580         assert!(tx.send(1).is_err());
2581     }
2582
2583     #[test]
2584     fn smoke_shared_port_gone2() {
2585         let (tx, rx) = sync_channel::<i32>(0);
2586         drop(rx);
2587         let tx2 = tx.clone();
2588         drop(tx);
2589         assert!(tx2.send(1).is_err());
2590     }
2591
2592     #[test]
2593     fn port_gone_concurrent() {
2594         let (tx, rx) = sync_channel::<i32>(0);
2595         let _t = thread::spawn(move|| {
2596             rx.recv().unwrap();
2597         });
2598         while tx.send(1).is_ok() {}
2599     }
2600
2601     #[test]
2602     fn port_gone_concurrent_shared() {
2603         let (tx, rx) = sync_channel::<i32>(0);
2604         let tx2 = tx.clone();
2605         let _t = thread::spawn(move|| {
2606             rx.recv().unwrap();
2607         });
2608         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2609     }
2610
2611     #[test]
2612     fn smoke_chan_gone() {
2613         let (tx, rx) = sync_channel::<i32>(0);
2614         drop(tx);
2615         assert!(rx.recv().is_err());
2616     }
2617
2618     #[test]
2619     fn smoke_chan_gone_shared() {
2620         let (tx, rx) = sync_channel::<()>(0);
2621         let tx2 = tx.clone();
2622         drop(tx);
2623         drop(tx2);
2624         assert!(rx.recv().is_err());
2625     }
2626
2627     #[test]
2628     fn chan_gone_concurrent() {
2629         let (tx, rx) = sync_channel::<i32>(0);
2630         thread::spawn(move|| {
2631             tx.send(1).unwrap();
2632             tx.send(1).unwrap();
2633         });
2634         while rx.recv().is_ok() {}
2635     }
2636
2637     #[test]
2638     fn stress() {
2639         let (tx, rx) = sync_channel::<i32>(0);
2640         thread::spawn(move|| {
2641             for _ in 0..10000 { tx.send(1).unwrap(); }
2642         });
2643         for _ in 0..10000 {
2644             assert_eq!(rx.recv().unwrap(), 1);
2645         }
2646     }
2647
2648     #[test]
2649     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2650     fn stress_recv_timeout_two_threads() {
2651         let (tx, rx) = sync_channel::<i32>(0);
2652
2653         thread::spawn(move|| {
2654             for _ in 0..10000 { tx.send(1).unwrap(); }
2655         });
2656
2657         let mut recv_count = 0;
2658         loop {
2659             match rx.recv_timeout(Duration::from_millis(1)) {
2660                 Ok(v) => {
2661                     assert_eq!(v, 1);
2662                     recv_count += 1;
2663                 },
2664                 Err(RecvTimeoutError::Timeout) => continue,
2665                 Err(RecvTimeoutError::Disconnected) => break,
2666             }
2667         }
2668
2669         assert_eq!(recv_count, 10000);
2670     }
2671
2672     #[test]
2673     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2674     fn stress_recv_timeout_shared() {
2675         const AMT: u32 = 1000;
2676         const NTHREADS: u32 = 8;
2677         let (tx, rx) = sync_channel::<i32>(0);
2678         let (dtx, drx) = sync_channel::<()>(0);
2679
2680         thread::spawn(move|| {
2681             let mut recv_count = 0;
2682             loop {
2683                 match rx.recv_timeout(Duration::from_millis(10)) {
2684                     Ok(v) => {
2685                         assert_eq!(v, 1);
2686                         recv_count += 1;
2687                     },
2688                     Err(RecvTimeoutError::Timeout) => continue,
2689                     Err(RecvTimeoutError::Disconnected) => break,
2690                 }
2691             }
2692
2693             assert_eq!(recv_count, AMT * NTHREADS);
2694             assert!(rx.try_recv().is_err());
2695
2696             dtx.send(()).unwrap();
2697         });
2698
2699         for _ in 0..NTHREADS {
2700             let tx = tx.clone();
2701             thread::spawn(move|| {
2702                 for _ in 0..AMT { tx.send(1).unwrap(); }
2703             });
2704         }
2705
2706         drop(tx);
2707
2708         drx.recv().unwrap();
2709     }
2710
2711     #[test]
2712     fn stress_shared() {
2713         const AMT: u32 = 1000;
2714         const NTHREADS: u32 = 8;
2715         let (tx, rx) = sync_channel::<i32>(0);
2716         let (dtx, drx) = sync_channel::<()>(0);
2717
2718         thread::spawn(move|| {
2719             for _ in 0..AMT * NTHREADS {
2720                 assert_eq!(rx.recv().unwrap(), 1);
2721             }
2722             match rx.try_recv() {
2723                 Ok(..) => panic!(),
2724                 _ => {}
2725             }
2726             dtx.send(()).unwrap();
2727         });
2728
2729         for _ in 0..NTHREADS {
2730             let tx = tx.clone();
2731             thread::spawn(move|| {
2732                 for _ in 0..AMT { tx.send(1).unwrap(); }
2733             });
2734         }
2735         drop(tx);
2736         drx.recv().unwrap();
2737     }
2738
2739     #[test]
2740     fn oneshot_single_thread_close_port_first() {
2741         // Simple test of closing without sending
2742         let (_tx, rx) = sync_channel::<i32>(0);
2743         drop(rx);
2744     }
2745
2746     #[test]
2747     fn oneshot_single_thread_close_chan_first() {
2748         // Simple test of closing without sending
2749         let (tx, _rx) = sync_channel::<i32>(0);
2750         drop(tx);
2751     }
2752
2753     #[test]
2754     fn oneshot_single_thread_send_port_close() {
2755         // Testing that the sender cleans up the payload if receiver is closed
2756         let (tx, rx) = sync_channel::<Box<i32>>(0);
2757         drop(rx);
2758         assert!(tx.send(box 0).is_err());
2759     }
2760
2761     #[test]
2762     fn oneshot_single_thread_recv_chan_close() {
2763         // Receiving on a closed chan will panic
2764         let res = thread::spawn(move|| {
2765             let (tx, rx) = sync_channel::<i32>(0);
2766             drop(tx);
2767             rx.recv().unwrap();
2768         }).join();
2769         // What is our res?
2770         assert!(res.is_err());
2771     }
2772
2773     #[test]
2774     fn oneshot_single_thread_send_then_recv() {
2775         let (tx, rx) = sync_channel::<Box<i32>>(1);
2776         tx.send(box 10).unwrap();
2777         assert!(*rx.recv().unwrap() == 10);
2778     }
2779
2780     #[test]
2781     fn oneshot_single_thread_try_send_open() {
2782         let (tx, rx) = sync_channel::<i32>(1);
2783         assert_eq!(tx.try_send(10), Ok(()));
2784         assert!(rx.recv().unwrap() == 10);
2785     }
2786
2787     #[test]
2788     fn oneshot_single_thread_try_send_closed() {
2789         let (tx, rx) = sync_channel::<i32>(0);
2790         drop(rx);
2791         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2792     }
2793
2794     #[test]
2795     fn oneshot_single_thread_try_send_closed2() {
2796         let (tx, _rx) = sync_channel::<i32>(0);
2797         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2798     }
2799
2800     #[test]
2801     fn oneshot_single_thread_try_recv_open() {
2802         let (tx, rx) = sync_channel::<i32>(1);
2803         tx.send(10).unwrap();
2804         assert!(rx.recv() == Ok(10));
2805     }
2806
2807     #[test]
2808     fn oneshot_single_thread_try_recv_closed() {
2809         let (tx, rx) = sync_channel::<i32>(0);
2810         drop(tx);
2811         assert!(rx.recv().is_err());
2812     }
2813
2814     #[test]
2815     fn oneshot_single_thread_try_recv_closed_with_data() {
2816         let (tx, rx) = sync_channel::<i32>(1);
2817         tx.send(10).unwrap();
2818         drop(tx);
2819         assert_eq!(rx.try_recv(), Ok(10));
2820         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2821     }
2822
2823     #[test]
2824     fn oneshot_single_thread_peek_data() {
2825         let (tx, rx) = sync_channel::<i32>(1);
2826         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2827         tx.send(10).unwrap();
2828         assert_eq!(rx.try_recv(), Ok(10));
2829     }
2830
2831     #[test]
2832     fn oneshot_single_thread_peek_close() {
2833         let (tx, rx) = sync_channel::<i32>(0);
2834         drop(tx);
2835         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2836         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2837     }
2838
2839     #[test]
2840     fn oneshot_single_thread_peek_open() {
2841         let (_tx, rx) = sync_channel::<i32>(0);
2842         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2843     }
2844
2845     #[test]
2846     fn oneshot_multi_task_recv_then_send() {
2847         let (tx, rx) = sync_channel::<Box<i32>>(0);
2848         let _t = thread::spawn(move|| {
2849             assert!(*rx.recv().unwrap() == 10);
2850         });
2851
2852         tx.send(box 10).unwrap();
2853     }
2854
2855     #[test]
2856     fn oneshot_multi_task_recv_then_close() {
2857         let (tx, rx) = sync_channel::<Box<i32>>(0);
2858         let _t = thread::spawn(move|| {
2859             drop(tx);
2860         });
2861         let res = thread::spawn(move|| {
2862             assert!(*rx.recv().unwrap() == 10);
2863         }).join();
2864         assert!(res.is_err());
2865     }
2866
2867     #[test]
2868     fn oneshot_multi_thread_close_stress() {
2869         for _ in 0..stress_factor() {
2870             let (tx, rx) = sync_channel::<i32>(0);
2871             let _t = thread::spawn(move|| {
2872                 drop(rx);
2873             });
2874             drop(tx);
2875         }
2876     }
2877
2878     #[test]
2879     fn oneshot_multi_thread_send_close_stress() {
2880         for _ in 0..stress_factor() {
2881             let (tx, rx) = sync_channel::<i32>(0);
2882             let _t = thread::spawn(move|| {
2883                 drop(rx);
2884             });
2885             let _ = thread::spawn(move || {
2886                 tx.send(1).unwrap();
2887             }).join();
2888         }
2889     }
2890
2891     #[test]
2892     fn oneshot_multi_thread_recv_close_stress() {
2893         for _ in 0..stress_factor() {
2894             let (tx, rx) = sync_channel::<i32>(0);
2895             let _t = thread::spawn(move|| {
2896                 let res = thread::spawn(move|| {
2897                     rx.recv().unwrap();
2898                 }).join();
2899                 assert!(res.is_err());
2900             });
2901             let _t = thread::spawn(move|| {
2902                 thread::spawn(move|| {
2903                     drop(tx);
2904                 });
2905             });
2906         }
2907     }
2908
2909     #[test]
2910     fn oneshot_multi_thread_send_recv_stress() {
2911         for _ in 0..stress_factor() {
2912             let (tx, rx) = sync_channel::<Box<i32>>(0);
2913             let _t = thread::spawn(move|| {
2914                 tx.send(box 10).unwrap();
2915             });
2916             assert!(*rx.recv().unwrap() == 10);
2917         }
2918     }
2919
2920     #[test]
2921     fn stream_send_recv_stress() {
2922         for _ in 0..stress_factor() {
2923             let (tx, rx) = sync_channel::<Box<i32>>(0);
2924
2925             send(tx, 0);
2926             recv(rx, 0);
2927
2928             fn send(tx: SyncSender<Box<i32>>, i: i32) {
2929                 if i == 10 { return }
2930
2931                 thread::spawn(move|| {
2932                     tx.send(box i).unwrap();
2933                     send(tx, i + 1);
2934                 });
2935             }
2936
2937             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2938                 if i == 10 { return }
2939
2940                 thread::spawn(move|| {
2941                     assert!(*rx.recv().unwrap() == i);
2942                     recv(rx, i + 1);
2943                 });
2944             }
2945         }
2946     }
2947
2948     #[test]
2949     fn recv_a_lot() {
2950         // Regression test that we don't run out of stack in scheduler context
2951         let (tx, rx) = sync_channel(10000);
2952         for _ in 0..10000 { tx.send(()).unwrap(); }
2953         for _ in 0..10000 { rx.recv().unwrap(); }
2954     }
2955
2956     #[test]
2957     fn shared_chan_stress() {
2958         let (tx, rx) = sync_channel(0);
2959         let total = stress_factor() + 100;
2960         for _ in 0..total {
2961             let tx = tx.clone();
2962             thread::spawn(move|| {
2963                 tx.send(()).unwrap();
2964             });
2965         }
2966
2967         for _ in 0..total {
2968             rx.recv().unwrap();
2969         }
2970     }
2971
2972     #[test]
2973     fn test_nested_recv_iter() {
2974         let (tx, rx) = sync_channel::<i32>(0);
2975         let (total_tx, total_rx) = sync_channel::<i32>(0);
2976
2977         let _t = thread::spawn(move|| {
2978             let mut acc = 0;
2979             for x in rx.iter() {
2980                 acc += x;
2981             }
2982             total_tx.send(acc).unwrap();
2983         });
2984
2985         tx.send(3).unwrap();
2986         tx.send(1).unwrap();
2987         tx.send(2).unwrap();
2988         drop(tx);
2989         assert_eq!(total_rx.recv().unwrap(), 6);
2990     }
2991
2992     #[test]
2993     fn test_recv_iter_break() {
2994         let (tx, rx) = sync_channel::<i32>(0);
2995         let (count_tx, count_rx) = sync_channel(0);
2996
2997         let _t = thread::spawn(move|| {
2998             let mut count = 0;
2999             for x in rx.iter() {
3000                 if count >= 3 {
3001                     break;
3002                 } else {
3003                     count += x;
3004                 }
3005             }
3006             count_tx.send(count).unwrap();
3007         });
3008
3009         tx.send(2).unwrap();
3010         tx.send(2).unwrap();
3011         tx.send(2).unwrap();
3012         let _ = tx.try_send(2);
3013         drop(tx);
3014         assert_eq!(count_rx.recv().unwrap(), 4);
3015     }
3016
3017     #[test]
3018     fn try_recv_states() {
3019         let (tx1, rx1) = sync_channel::<i32>(1);
3020         let (tx2, rx2) = sync_channel::<()>(1);
3021         let (tx3, rx3) = sync_channel::<()>(1);
3022         let _t = thread::spawn(move|| {
3023             rx2.recv().unwrap();
3024             tx1.send(1).unwrap();
3025             tx3.send(()).unwrap();
3026             rx2.recv().unwrap();
3027             drop(tx1);
3028             tx3.send(()).unwrap();
3029         });
3030
3031         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
3032         tx2.send(()).unwrap();
3033         rx3.recv().unwrap();
3034         assert_eq!(rx1.try_recv(), Ok(1));
3035         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
3036         tx2.send(()).unwrap();
3037         rx3.recv().unwrap();
3038         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
3039     }
3040
3041     // This bug used to end up in a livelock inside of the Receiver destructor
3042     // because the internal state of the Shared packet was corrupted
3043     #[test]
3044     fn destroy_upgraded_shared_port_when_sender_still_active() {
3045         let (tx, rx) = sync_channel::<()>(0);
3046         let (tx2, rx2) = sync_channel::<()>(0);
3047         let _t = thread::spawn(move|| {
3048             rx.recv().unwrap(); // wait on a oneshot
3049             drop(rx);  // destroy a shared
3050             tx2.send(()).unwrap();
3051         });
3052         // make sure the other thread has gone to sleep
3053         for _ in 0..5000 { thread::yield_now(); }
3054
3055         // upgrade to a shared chan and send a message
3056         let t = tx.clone();
3057         drop(tx);
3058         t.send(()).unwrap();
3059
3060         // wait for the child thread to exit before we exit
3061         rx2.recv().unwrap();
3062     }
3063
3064     #[test]
3065     fn send1() {
3066         let (tx, rx) = sync_channel::<i32>(0);
3067         let _t = thread::spawn(move|| { rx.recv().unwrap(); });
3068         assert_eq!(tx.send(1), Ok(()));
3069     }
3070
3071     #[test]
3072     fn send2() {
3073         let (tx, rx) = sync_channel::<i32>(0);
3074         let _t = thread::spawn(move|| { drop(rx); });
3075         assert!(tx.send(1).is_err());
3076     }
3077
3078     #[test]
3079     fn send3() {
3080         let (tx, rx) = sync_channel::<i32>(1);
3081         assert_eq!(tx.send(1), Ok(()));
3082         let _t =thread::spawn(move|| { drop(rx); });
3083         assert!(tx.send(1).is_err());
3084     }
3085
3086     #[test]
3087     fn send4() {
3088         let (tx, rx) = sync_channel::<i32>(0);
3089         let tx2 = tx.clone();
3090         let (done, donerx) = channel();
3091         let done2 = done.clone();
3092         let _t = thread::spawn(move|| {
3093             assert!(tx.send(1).is_err());
3094             done.send(()).unwrap();
3095         });
3096         let _t = thread::spawn(move|| {
3097             assert!(tx2.send(2).is_err());
3098             done2.send(()).unwrap();
3099         });
3100         drop(rx);
3101         donerx.recv().unwrap();
3102         donerx.recv().unwrap();
3103     }
3104
3105     #[test]
3106     fn try_send1() {
3107         let (tx, _rx) = sync_channel::<i32>(0);
3108         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3109     }
3110
3111     #[test]
3112     fn try_send2() {
3113         let (tx, _rx) = sync_channel::<i32>(1);
3114         assert_eq!(tx.try_send(1), Ok(()));
3115         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3116     }
3117
3118     #[test]
3119     fn try_send3() {
3120         let (tx, rx) = sync_channel::<i32>(1);
3121         assert_eq!(tx.try_send(1), Ok(()));
3122         drop(rx);
3123         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
3124     }
3125
3126     #[test]
3127     fn issue_15761() {
3128         fn repro() {
3129             let (tx1, rx1) = sync_channel::<()>(3);
3130             let (tx2, rx2) = sync_channel::<()>(3);
3131
3132             let _t = thread::spawn(move|| {
3133                 rx1.recv().unwrap();
3134                 tx2.try_send(()).unwrap();
3135             });
3136
3137             tx1.try_send(()).unwrap();
3138             rx2.recv().unwrap();
3139         }
3140
3141         for _ in 0..100 {
3142             repro()
3143         }
3144     }
3145 }