]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/mod.rs
Improve some compiletest documentation
[rust.git] / src / libstd / sync / mpsc / mod.rs
1 //! Multi-producer, single-consumer FIFO queue communication primitives.
2 //!
3 //! This module provides message-based communication over channels, concretely
4 //! defined among three types:
5 //!
6 //! * [`Sender`]
7 //! * [`SyncSender`]
8 //! * [`Receiver`]
9 //!
10 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
11 //! senders are clone-able (multi-producer) such that many threads can send
12 //! simultaneously to one receiver (single-consumer).
13 //!
14 //! These channels come in two flavors:
15 //!
16 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
17 //!    will return a `(Sender, Receiver)` tuple where all sends will be
18 //!    **asynchronous** (they never block). The channel conceptually has an
19 //!    infinite buffer.
20 //!
21 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
22 //!    return a `(SyncSender, Receiver)` tuple where the storage for pending
23 //!    messages is a pre-allocated buffer of a fixed size. All sends will be
24 //!    **synchronous** by blocking until there is buffer space available. Note
25 //!    that a bound of 0 is allowed, causing the channel to become a "rendezvous"
26 //!    channel where each sender atomically hands off a message to a receiver.
27 //!
28 //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
29 //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
30 //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
31 //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
32 //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
33 //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
34 //!
35 //! ## Disconnection
36 //!
37 //! The send and receive operations on channels will all return a [`Result`]
38 //! indicating whether the operation succeeded or not. An unsuccessful operation
39 //! is normally indicative of the other half of a channel having "hung up" by
40 //! being dropped in its corresponding thread.
41 //!
42 //! Once half of a channel has been deallocated, most operations can no longer
43 //! continue to make progress, so [`Err`] will be returned. Many applications
44 //! will continue to [`unwrap`] the results returned from this module,
45 //! instigating a propagation of failure among threads if one unexpectedly dies.
46 //!
47 //! [`Result`]: ../../../std/result/enum.Result.html
48 //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
49 //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
50 //!
51 //! # Examples
52 //!
53 //! Simple usage:
54 //!
55 //! ```
56 //! use std::thread;
57 //! use std::sync::mpsc::channel;
58 //!
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! thread::spawn(move|| {
62 //!     tx.send(10).unwrap();
63 //! });
64 //! assert_eq!(rx.recv().unwrap(), 10);
65 //! ```
66 //!
67 //! Shared usage:
68 //!
69 //! ```
70 //! use std::thread;
71 //! use std::sync::mpsc::channel;
72 //!
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
77 //! for i in 0..10 {
78 //!     let tx = tx.clone();
79 //!     thread::spawn(move|| {
80 //!         tx.send(i).unwrap();
81 //!     });
82 //! }
83 //!
84 //! for _ in 0..10 {
85 //!     let j = rx.recv().unwrap();
86 //!     assert!(0 <= j && j < 10);
87 //! }
88 //! ```
89 //!
90 //! Propagating panics:
91 //!
92 //! ```
93 //! use std::sync::mpsc::channel;
94 //!
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<i32>();
98 //! drop(tx);
99 //! assert!(rx.recv().is_err());
100 //! ```
101 //!
102 //! Synchronous channels:
103 //!
104 //! ```
105 //! use std::thread;
106 //! use std::sync::mpsc::sync_channel;
107 //!
108 //! let (tx, rx) = sync_channel::<i32>(0);
109 //! thread::spawn(move|| {
110 //!     // This will wait for the parent thread to start receiving
111 //!     tx.send(53).unwrap();
112 //! });
113 //! rx.recv().unwrap();
114 //! ```
115
116 #![stable(feature = "rust1", since = "1.0.0")]
117 #![allow(deprecated)] // for mpsc_select
118
119 // A description of how Rust's channel implementation works
120 //
121 // Channels are supposed to be the basic building block for all other
122 // concurrent primitives that are used in Rust. As a result, the channel type
123 // needs to be highly optimized, flexible, and broad enough for use everywhere.
124 //
125 // The choice of implementation of all channels is to be built on lock-free data
126 // structures. The channels themselves are then consequently also lock-free data
127 // structures. As always with lock-free code, this is a very "here be dragons"
128 // territory, especially because I'm unaware of any academic papers that have
129 // gone into great length about channels of these flavors.
130 //
131 // ## Flavors of channels
132 //
133 // From the perspective of a consumer of this library, there is only one flavor
134 // of channel. This channel can be used as a stream and cloned to allow multiple
135 // senders. Under the hood, however, there are actually three flavors of
136 // channels in play.
137 //
138 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
139 //                      case. They contain as few atomics as possible and
140 //                      involve one and exactly one allocation.
141 // * Streams - these channels are optimized for the non-shared use case. They
142 //             use a different concurrent queue that is more tailored for this
143 //             use case. The initial allocation of this flavor of channel is not
144 //             optimized.
145 // * Shared - this is the most general form of channel that this module offers,
146 //            a channel with multiple senders. This type is as optimized as it
147 //            can be, but the previous two types mentioned are much faster for
148 //            their use-cases.
149 //
150 // ## Concurrent queues
151 //
152 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
153 // but recv() obviously blocks. This means that under the hood there must be
154 // some shared and concurrent queue holding all of the actual data.
155 //
156 // With two flavors of channels, two flavors of queues are also used. We have
157 // chosen to use queues from a well-known author that are abbreviated as SPSC
158 // and MPSC (single producer, single consumer and multiple producer, single
159 // consumer). SPSC queues are used for streams while MPSC queues are used for
160 // shared channels.
161 //
162 // ### SPSC optimizations
163 //
164 // The SPSC queue found online is essentially a linked list of nodes where one
165 // half of the nodes are the "queue of data" and the other half of nodes are a
166 // cache of unused nodes. The unused nodes are used such that an allocation is
167 // not required on every push() and a free doesn't need to happen on every
168 // pop().
169 //
170 // As found online, however, the cache of nodes is of an infinite size. This
171 // means that if a channel at one point in its life had 50k items in the queue,
172 // then the queue will always have the capacity for 50k items. I believed that
173 // this was an unnecessary limitation of the implementation, so I have altered
174 // the queue to optionally have a bound on the cache size.
175 //
176 // By default, streams will have an unbounded SPSC queue with a small-ish cache
177 // size. The hope is that the cache is still large enough to have very fast
178 // send() operations while not too large such that millions of channels can
179 // coexist at once.
180 //
181 // ### MPSC optimizations
182 //
183 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
184 // a linked list under the hood to earn its unboundedness, but I have not put
185 // forth much effort into having a cache of nodes similar to the SPSC queue.
186 //
187 // For now, I believe that this is "ok" because shared channels are not the most
188 // common type, but soon we may wish to revisit this queue choice and determine
189 // another candidate for backend storage of shared channels.
190 //
191 // ## Overview of the Implementation
192 //
193 // Now that there's a little background on the concurrent queues used, it's
194 // worth going into much more detail about the channels themselves. The basic
195 // pseudocode for a send/recv are:
196 //
197 //
198 //      send(t)                             recv()
199 //        queue.push(t)                       return if queue.pop()
200 //        if increment() == -1                deschedule {
201 //          wakeup()                            if decrement() > 0
202 //                                                cancel_deschedule()
203 //                                            }
204 //                                            queue.pop()
205 //
206 // As mentioned before, there are no locks in this implementation, only atomic
207 // instructions are used.
208 //
209 // ### The internal atomic counter
210 //
211 // Every channel has a shared counter with each half to keep track of the size
212 // of the queue. This counter is used to abort descheduling by the receiver and
213 // to know when to wake up on the sending side.
214 //
215 // As seen in the pseudocode, senders will increment this count and receivers
216 // will decrement the count. The theory behind this is that if a sender sees a
217 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
218 // then it doesn't need to block.
219 //
220 // The recv() method has a beginning call to pop(), and if successful, it needs
221 // to decrement the count. It is a crucial implementation detail that this
222 // decrement does *not* happen to the shared counter. If this were the case,
223 // then it would be possible for the counter to be very negative when there were
224 // no receivers waiting, in which case the senders would have to determine when
225 // it was actually appropriate to wake up a receiver.
226 //
227 // Instead, the "steal count" is kept track of separately (not atomically
228 // because it's only used by receivers), and then the decrement() call when
229 // descheduling will lump in all of the recent steals into one large decrement.
230 //
231 // The implication of this is that if a sender sees a -1 count, then there's
232 // guaranteed to be a waiter waiting!
233 //
234 // ## Native Implementation
235 //
236 // A major goal of these channels is to work seamlessly on and off the runtime.
237 // All of the previous race conditions have been worded in terms of
238 // scheduler-isms (which is obviously not available without the runtime).
239 //
240 // For now, native usage of channels (off the runtime) will fall back onto
241 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
242 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
243 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
244 // condition variable.
245 //
246 // ## Select
247 //
248 // Being able to support selection over channels has greatly influenced this
249 // design, and not only does selection need to work inside the runtime, but also
250 // outside the runtime.
251 //
252 // The implementation is fairly straightforward. The goal of select() is not to
253 // return some data, but only to return which channel can receive data without
254 // blocking. The implementation is essentially the entire blocking procedure
255 // followed by an increment as soon as its woken up. The cancellation procedure
256 // involves an increment and swapping out of to_wake to acquire ownership of the
257 // thread to unblock.
258 //
259 // Sadly this current implementation requires multiple allocations, so I have
260 // seen the throughput of select() be much worse than it should be. I do not
261 // believe that there is anything fundamental that needs to change about these
262 // channels, however, in order to support a more efficient select().
263 //
264 // # Conclusion
265 //
266 // And now that you've seen all the races that I found and attempted to fix,
267 // here's the code for you to find some more!
268
269 use crate::sync::Arc;
270 use crate::error;
271 use crate::fmt;
272 use crate::mem;
273 use crate::cell::UnsafeCell;
274 use crate::time::{Duration, Instant};
275
276 #[unstable(feature = "mpsc_select", issue = "27800")]
277 pub use self::select::{Select, Handle};
278 use self::select::StartResult;
279 use self::select::StartResult::*;
280 use self::blocking::SignalToken;
281
282 #[cfg(all(test, not(target_os = "emscripten")))]
283 mod select_tests;
284
285 mod blocking;
286 mod oneshot;
287 mod select;
288 mod shared;
289 mod stream;
290 mod sync;
291 mod mpsc_queue;
292 mod spsc_queue;
293
294 mod cache_aligned;
295
296 /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
297 /// This half can only be owned by one thread.
298 ///
299 /// Messages sent to the channel can be retrieved using [`recv`].
300 ///
301 /// [`channel`]: fn.channel.html
302 /// [`sync_channel`]: fn.sync_channel.html
303 /// [`recv`]: struct.Receiver.html#method.recv
304 ///
305 /// # Examples
306 ///
307 /// ```rust
308 /// use std::sync::mpsc::channel;
309 /// use std::thread;
310 /// use std::time::Duration;
311 ///
312 /// let (send, recv) = channel();
313 ///
314 /// thread::spawn(move || {
315 ///     send.send("Hello world!").unwrap();
316 ///     thread::sleep(Duration::from_secs(2)); // block for two seconds
317 ///     send.send("Delayed for 2 seconds").unwrap();
318 /// });
319 ///
320 /// println!("{}", recv.recv().unwrap()); // Received immediately
321 /// println!("Waiting...");
322 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
323 /// ```
324 #[stable(feature = "rust1", since = "1.0.0")]
325 pub struct Receiver<T> {
326     inner: UnsafeCell<Flavor<T>>,
327 }
328
329 // The receiver port can be sent from place to place, so long as it
330 // is not used to receive non-sendable things.
331 #[stable(feature = "rust1", since = "1.0.0")]
332 unsafe impl<T: Send> Send for Receiver<T> { }
333
334 #[stable(feature = "rust1", since = "1.0.0")]
335 impl<T> !Sync for Receiver<T> { }
336
337 /// An iterator over messages on a [`Receiver`], created by [`iter`].
338 ///
339 /// This iterator will block whenever [`next`] is called,
340 /// waiting for a new message, and [`None`] will be returned
341 /// when the corresponding channel has hung up.
342 ///
343 /// [`iter`]: struct.Receiver.html#method.iter
344 /// [`Receiver`]: struct.Receiver.html
345 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
346 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
347 ///
348 /// # Examples
349 ///
350 /// ```rust
351 /// use std::sync::mpsc::channel;
352 /// use std::thread;
353 ///
354 /// let (send, recv) = channel();
355 ///
356 /// thread::spawn(move || {
357 ///     send.send(1u8).unwrap();
358 ///     send.send(2u8).unwrap();
359 ///     send.send(3u8).unwrap();
360 /// });
361 ///
362 /// for x in recv.iter() {
363 ///     println!("Got: {}", x);
364 /// }
365 /// ```
366 #[stable(feature = "rust1", since = "1.0.0")]
367 #[derive(Debug)]
368 pub struct Iter<'a, T: 'a> {
369     rx: &'a Receiver<T>
370 }
371
372 /// An iterator that attempts to yield all pending values for a [`Receiver`],
373 /// created by [`try_iter`].
374 ///
375 /// [`None`] will be returned when there are no pending values remaining or
376 /// if the corresponding channel has hung up.
377 ///
378 /// This iterator will never block the caller in order to wait for data to
379 /// become available. Instead, it will return [`None`].
380 ///
381 /// [`Receiver`]: struct.Receiver.html
382 /// [`try_iter`]: struct.Receiver.html#method.try_iter
383 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
384 ///
385 /// # Examples
386 ///
387 /// ```rust
388 /// use std::sync::mpsc::channel;
389 /// use std::thread;
390 /// use std::time::Duration;
391 ///
392 /// let (sender, receiver) = channel();
393 ///
394 /// // Nothing is in the buffer yet
395 /// assert!(receiver.try_iter().next().is_none());
396 /// println!("Nothing in the buffer...");
397 ///
398 /// thread::spawn(move || {
399 ///     sender.send(1).unwrap();
400 ///     sender.send(2).unwrap();
401 ///     sender.send(3).unwrap();
402 /// });
403 ///
404 /// println!("Going to sleep...");
405 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
406 ///
407 /// for x in receiver.try_iter() {
408 ///     println!("Got: {}", x);
409 /// }
410 /// ```
411 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
412 #[derive(Debug)]
413 pub struct TryIter<'a, T: 'a> {
414     rx: &'a Receiver<T>
415 }
416
417 /// An owning iterator over messages on a [`Receiver`],
418 /// created by **Receiver::into_iter**.
419 ///
420 /// This iterator will block whenever [`next`]
421 /// is called, waiting for a new message, and [`None`] will be
422 /// returned if the corresponding channel has hung up.
423 ///
424 /// [`Receiver`]: struct.Receiver.html
425 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
426 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
427 ///
428 /// # Examples
429 ///
430 /// ```rust
431 /// use std::sync::mpsc::channel;
432 /// use std::thread;
433 ///
434 /// let (send, recv) = channel();
435 ///
436 /// thread::spawn(move || {
437 ///     send.send(1u8).unwrap();
438 ///     send.send(2u8).unwrap();
439 ///     send.send(3u8).unwrap();
440 /// });
441 ///
442 /// for x in recv.into_iter() {
443 ///     println!("Got: {}", x);
444 /// }
445 /// ```
446 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
447 #[derive(Debug)]
448 pub struct IntoIter<T> {
449     rx: Receiver<T>
450 }
451
452 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
453 /// owned by one thread, but it can be cloned to send to other threads.
454 ///
455 /// Messages can be sent through this channel with [`send`].
456 ///
457 /// [`channel`]: fn.channel.html
458 /// [`send`]: struct.Sender.html#method.send
459 ///
460 /// # Examples
461 ///
462 /// ```rust
463 /// use std::sync::mpsc::channel;
464 /// use std::thread;
465 ///
466 /// let (sender, receiver) = channel();
467 /// let sender2 = sender.clone();
468 ///
469 /// // First thread owns sender
470 /// thread::spawn(move || {
471 ///     sender.send(1).unwrap();
472 /// });
473 ///
474 /// // Second thread owns sender2
475 /// thread::spawn(move || {
476 ///     sender2.send(2).unwrap();
477 /// });
478 ///
479 /// let msg = receiver.recv().unwrap();
480 /// let msg2 = receiver.recv().unwrap();
481 ///
482 /// assert_eq!(3, msg + msg2);
483 /// ```
484 #[stable(feature = "rust1", since = "1.0.0")]
485 pub struct Sender<T> {
486     inner: UnsafeCell<Flavor<T>>,
487 }
488
489 // The send port can be sent from place to place, so long as it
490 // is not used to send non-sendable things.
491 #[stable(feature = "rust1", since = "1.0.0")]
492 unsafe impl<T: Send> Send for Sender<T> { }
493
494 #[stable(feature = "rust1", since = "1.0.0")]
495 impl<T> !Sync for Sender<T> { }
496
497 /// The sending-half of Rust's synchronous [`sync_channel`] type.
498 ///
499 /// Messages can be sent through this channel with [`send`] or [`try_send`].
500 ///
501 /// [`send`] will block if there is no space in the internal buffer.
502 ///
503 /// [`sync_channel`]: fn.sync_channel.html
504 /// [`send`]: struct.SyncSender.html#method.send
505 /// [`try_send`]: struct.SyncSender.html#method.try_send
506 ///
507 /// # Examples
508 ///
509 /// ```rust
510 /// use std::sync::mpsc::sync_channel;
511 /// use std::thread;
512 ///
513 /// // Create a sync_channel with buffer size 2
514 /// let (sync_sender, receiver) = sync_channel(2);
515 /// let sync_sender2 = sync_sender.clone();
516 ///
517 /// // First thread owns sync_sender
518 /// thread::spawn(move || {
519 ///     sync_sender.send(1).unwrap();
520 ///     sync_sender.send(2).unwrap();
521 /// });
522 ///
523 /// // Second thread owns sync_sender2
524 /// thread::spawn(move || {
525 ///     sync_sender2.send(3).unwrap();
526 ///     // thread will now block since the buffer is full
527 ///     println!("Thread unblocked!");
528 /// });
529 ///
530 /// let mut msg;
531 ///
532 /// msg = receiver.recv().unwrap();
533 /// println!("message {} received", msg);
534 ///
535 /// // "Thread unblocked!" will be printed now
536 ///
537 /// msg = receiver.recv().unwrap();
538 /// println!("message {} received", msg);
539 ///
540 /// msg = receiver.recv().unwrap();
541 ///
542 /// println!("message {} received", msg);
543 /// ```
544 #[stable(feature = "rust1", since = "1.0.0")]
545 pub struct SyncSender<T> {
546     inner: Arc<sync::Packet<T>>,
547 }
548
549 #[stable(feature = "rust1", since = "1.0.0")]
550 unsafe impl<T: Send> Send for SyncSender<T> {}
551
552 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
553 /// function on **channel**s.
554 ///
555 /// A **send** operation can only fail if the receiving end of a channel is
556 /// disconnected, implying that the data could never be received. The error
557 /// contains the data being sent as a payload so it can be recovered.
558 ///
559 /// [`Sender::send`]: struct.Sender.html#method.send
560 /// [`SyncSender::send`]: struct.SyncSender.html#method.send
561 #[stable(feature = "rust1", since = "1.0.0")]
562 #[derive(PartialEq, Eq, Clone, Copy)]
563 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
564
565 /// An error returned from the [`recv`] function on a [`Receiver`].
566 ///
567 /// The [`recv`] operation can only fail if the sending half of a
568 /// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
569 /// messages will ever be received.
570 ///
571 /// [`recv`]: struct.Receiver.html#method.recv
572 /// [`Receiver`]: struct.Receiver.html
573 /// [`channel`]: fn.channel.html
574 /// [`sync_channel`]: fn.sync_channel.html
575 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
576 #[stable(feature = "rust1", since = "1.0.0")]
577 pub struct RecvError;
578
579 /// This enumeration is the list of the possible reasons that [`try_recv`] could
580 /// not return data when called. This can occur with both a [`channel`] and
581 /// a [`sync_channel`].
582 ///
583 /// [`try_recv`]: struct.Receiver.html#method.try_recv
584 /// [`channel`]: fn.channel.html
585 /// [`sync_channel`]: fn.sync_channel.html
586 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
587 #[stable(feature = "rust1", since = "1.0.0")]
588 pub enum TryRecvError {
589     /// This **channel** is currently empty, but the **Sender**(s) have not yet
590     /// disconnected, so data may yet become available.
591     #[stable(feature = "rust1", since = "1.0.0")]
592     Empty,
593
594     /// The **channel**'s sending half has become disconnected, and there will
595     /// never be any more data received on it.
596     #[stable(feature = "rust1", since = "1.0.0")]
597     Disconnected,
598 }
599
600 /// This enumeration is the list of possible errors that made [`recv_timeout`]
601 /// unable to return data when called. This can occur with both a [`channel`] and
602 /// a [`sync_channel`].
603 ///
604 /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
605 /// [`channel`]: fn.channel.html
606 /// [`sync_channel`]: fn.sync_channel.html
607 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
608 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
609 pub enum RecvTimeoutError {
610     /// This **channel** is currently empty, but the **Sender**(s) have not yet
611     /// disconnected, so data may yet become available.
612     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
613     Timeout,
614     /// The **channel**'s sending half has become disconnected, and there will
615     /// never be any more data received on it.
616     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
617     Disconnected,
618 }
619
620 /// This enumeration is the list of the possible error outcomes for the
621 /// [`try_send`] method.
622 ///
623 /// [`try_send`]: struct.SyncSender.html#method.try_send
624 #[stable(feature = "rust1", since = "1.0.0")]
625 #[derive(PartialEq, Eq, Clone, Copy)]
626 pub enum TrySendError<T> {
627     /// The data could not be sent on the [`sync_channel`] because it would require that
628     /// the callee block to send the data.
629     ///
630     /// If this is a buffered channel, then the buffer is full at this time. If
631     /// this is not a buffered channel, then there is no [`Receiver`] available to
632     /// acquire the data.
633     ///
634     /// [`sync_channel`]: fn.sync_channel.html
635     /// [`Receiver`]: struct.Receiver.html
636     #[stable(feature = "rust1", since = "1.0.0")]
637     Full(#[stable(feature = "rust1", since = "1.0.0")] T),
638
639     /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
640     /// sent. The data is returned back to the callee in this case.
641     ///
642     /// [`sync_channel`]: fn.sync_channel.html
643     #[stable(feature = "rust1", since = "1.0.0")]
644     Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
645 }
646
647 enum Flavor<T> {
648     Oneshot(Arc<oneshot::Packet<T>>),
649     Stream(Arc<stream::Packet<T>>),
650     Shared(Arc<shared::Packet<T>>),
651     Sync(Arc<sync::Packet<T>>),
652 }
653
654 #[doc(hidden)]
655 trait UnsafeFlavor<T> {
656     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
657     unsafe fn inner_mut(&self) -> &mut Flavor<T> {
658         &mut *self.inner_unsafe().get()
659     }
660     unsafe fn inner(&self) -> &Flavor<T> {
661         &*self.inner_unsafe().get()
662     }
663 }
664 impl<T> UnsafeFlavor<T> for Sender<T> {
665     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
666         &self.inner
667     }
668 }
669 impl<T> UnsafeFlavor<T> for Receiver<T> {
670     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
671         &self.inner
672     }
673 }
674
675 /// Creates a new asynchronous channel, returning the sender/receiver halves.
676 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
677 /// the same order as it was sent, and no [`send`] will block the calling thread
678 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
679 /// block after its buffer limit is reached). [`recv`] will block until a message
680 /// is available.
681 ///
682 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
683 /// only one [`Receiver`] is supported.
684 ///
685 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
686 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
687 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
688 /// return a [`RecvError`].
689 ///
690 /// [`send`]: struct.Sender.html#method.send
691 /// [`recv`]: struct.Receiver.html#method.recv
692 /// [`Sender`]: struct.Sender.html
693 /// [`Receiver`]: struct.Receiver.html
694 /// [`sync_channel`]: fn.sync_channel.html
695 /// [`SendError`]: struct.SendError.html
696 /// [`RecvError`]: struct.RecvError.html
697 ///
698 /// # Examples
699 ///
700 /// ```
701 /// use std::sync::mpsc::channel;
702 /// use std::thread;
703 ///
704 /// let (sender, receiver) = channel();
705 ///
706 /// // Spawn off an expensive computation
707 /// thread::spawn(move|| {
708 /// #   fn expensive_computation() {}
709 ///     sender.send(expensive_computation()).unwrap();
710 /// });
711 ///
712 /// // Do some useful work for awhile
713 ///
714 /// // Let's see what that answer was
715 /// println!("{:?}", receiver.recv().unwrap());
716 /// ```
717 #[stable(feature = "rust1", since = "1.0.0")]
718 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
719     let a = Arc::new(oneshot::Packet::new());
720     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
721 }
722
723 /// Creates a new synchronous, bounded channel.
724 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
725 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
726 /// [`Receiver`] will block until a message becomes available. `sync_channel`
727 /// differs greatly in the semantics of the sender, however.
728 ///
729 /// This channel has an internal buffer on which messages will be queued.
730 /// `bound` specifies the buffer size. When the internal buffer becomes full,
731 /// future sends will *block* waiting for the buffer to open up. Note that a
732 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
733 /// where each [`send`] will not return until a [`recv`] is paired with it.
734 ///
735 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
736 /// times, but only one [`Receiver`] is supported.
737 ///
738 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
739 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
740 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
741 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
742 ///
743 /// [`channel`]: fn.channel.html
744 /// [`send`]: struct.SyncSender.html#method.send
745 /// [`recv`]: struct.Receiver.html#method.recv
746 /// [`SyncSender`]: struct.SyncSender.html
747 /// [`Receiver`]: struct.Receiver.html
748 /// [`SendError`]: struct.SendError.html
749 /// [`RecvError`]: struct.RecvError.html
750 ///
751 /// # Examples
752 ///
753 /// ```
754 /// use std::sync::mpsc::sync_channel;
755 /// use std::thread;
756 ///
757 /// let (sender, receiver) = sync_channel(1);
758 ///
759 /// // this returns immediately
760 /// sender.send(1).unwrap();
761 ///
762 /// thread::spawn(move|| {
763 ///     // this will block until the previous message has been received
764 ///     sender.send(2).unwrap();
765 /// });
766 ///
767 /// assert_eq!(receiver.recv().unwrap(), 1);
768 /// assert_eq!(receiver.recv().unwrap(), 2);
769 /// ```
770 #[stable(feature = "rust1", since = "1.0.0")]
771 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
772     let a = Arc::new(sync::Packet::new(bound));
773     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
774 }
775
776 ////////////////////////////////////////////////////////////////////////////////
777 // Sender
778 ////////////////////////////////////////////////////////////////////////////////
779
780 impl<T> Sender<T> {
781     fn new(inner: Flavor<T>) -> Sender<T> {
782         Sender {
783             inner: UnsafeCell::new(inner),
784         }
785     }
786
787     /// Attempts to send a value on this channel, returning it back if it could
788     /// not be sent.
789     ///
790     /// A successful send occurs when it is determined that the other end of
791     /// the channel has not hung up already. An unsuccessful send would be one
792     /// where the corresponding receiver has already been deallocated. Note
793     /// that a return value of [`Err`] means that the data will never be
794     /// received, but a return value of [`Ok`] does *not* mean that the data
795     /// will be received. It is possible for the corresponding receiver to
796     /// hang up immediately after this function returns [`Ok`].
797     ///
798     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
799     /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
800     ///
801     /// This method will never block the current thread.
802     ///
803     /// # Examples
804     ///
805     /// ```
806     /// use std::sync::mpsc::channel;
807     ///
808     /// let (tx, rx) = channel();
809     ///
810     /// // This send is always successful
811     /// tx.send(1).unwrap();
812     ///
813     /// // This send will fail because the receiver is gone
814     /// drop(rx);
815     /// assert_eq!(tx.send(1).unwrap_err().0, 1);
816     /// ```
817     #[stable(feature = "rust1", since = "1.0.0")]
818     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
819         let (new_inner, ret) = match *unsafe { self.inner() } {
820             Flavor::Oneshot(ref p) => {
821                 if !p.sent() {
822                     return p.send(t).map_err(SendError);
823                 } else {
824                     let a = Arc::new(stream::Packet::new());
825                     let rx = Receiver::new(Flavor::Stream(a.clone()));
826                     match p.upgrade(rx) {
827                         oneshot::UpSuccess => {
828                             let ret = a.send(t);
829                             (a, ret)
830                         }
831                         oneshot::UpDisconnected => (a, Err(t)),
832                         oneshot::UpWoke(token) => {
833                             // This send cannot panic because the thread is
834                             // asleep (we're looking at it), so the receiver
835                             // can't go away.
836                             a.send(t).ok().unwrap();
837                             token.signal();
838                             (a, Ok(()))
839                         }
840                     }
841                 }
842             }
843             Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
844             Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
845             Flavor::Sync(..) => unreachable!(),
846         };
847
848         unsafe {
849             let tmp = Sender::new(Flavor::Stream(new_inner));
850             mem::swap(self.inner_mut(), tmp.inner_mut());
851         }
852         ret.map_err(SendError)
853     }
854 }
855
856 #[stable(feature = "rust1", since = "1.0.0")]
857 impl<T> Clone for Sender<T> {
858     fn clone(&self) -> Sender<T> {
859         let packet = match *unsafe { self.inner() } {
860             Flavor::Oneshot(ref p) => {
861                 let a = Arc::new(shared::Packet::new());
862                 {
863                     let guard = a.postinit_lock();
864                     let rx = Receiver::new(Flavor::Shared(a.clone()));
865                     let sleeper = match p.upgrade(rx) {
866                         oneshot::UpSuccess |
867                         oneshot::UpDisconnected => None,
868                         oneshot::UpWoke(task) => Some(task),
869                     };
870                     a.inherit_blocker(sleeper, guard);
871                 }
872                 a
873             }
874             Flavor::Stream(ref p) => {
875                 let a = Arc::new(shared::Packet::new());
876                 {
877                     let guard = a.postinit_lock();
878                     let rx = Receiver::new(Flavor::Shared(a.clone()));
879                     let sleeper = match p.upgrade(rx) {
880                         stream::UpSuccess |
881                         stream::UpDisconnected => None,
882                         stream::UpWoke(task) => Some(task),
883                     };
884                     a.inherit_blocker(sleeper, guard);
885                 }
886                 a
887             }
888             Flavor::Shared(ref p) => {
889                 p.clone_chan();
890                 return Sender::new(Flavor::Shared(p.clone()));
891             }
892             Flavor::Sync(..) => unreachable!(),
893         };
894
895         unsafe {
896             let tmp = Sender::new(Flavor::Shared(packet.clone()));
897             mem::swap(self.inner_mut(), tmp.inner_mut());
898         }
899         Sender::new(Flavor::Shared(packet))
900     }
901 }
902
903 #[stable(feature = "rust1", since = "1.0.0")]
904 impl<T> Drop for Sender<T> {
905     fn drop(&mut self) {
906         match *unsafe { self.inner() } {
907             Flavor::Oneshot(ref p) => p.drop_chan(),
908             Flavor::Stream(ref p) => p.drop_chan(),
909             Flavor::Shared(ref p) => p.drop_chan(),
910             Flavor::Sync(..) => unreachable!(),
911         }
912     }
913 }
914
915 #[stable(feature = "mpsc_debug", since = "1.8.0")]
916 impl<T> fmt::Debug for Sender<T> {
917     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
918         f.debug_struct("Sender").finish()
919     }
920 }
921
922 ////////////////////////////////////////////////////////////////////////////////
923 // SyncSender
924 ////////////////////////////////////////////////////////////////////////////////
925
926 impl<T> SyncSender<T> {
927     fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
928         SyncSender { inner }
929     }
930
931     /// Sends a value on this synchronous channel.
932     ///
933     /// This function will *block* until space in the internal buffer becomes
934     /// available or a receiver is available to hand off the message to.
935     ///
936     /// Note that a successful send does *not* guarantee that the receiver will
937     /// ever see the data if there is a buffer on this channel. Items may be
938     /// enqueued in the internal buffer for the receiver to receive at a later
939     /// time. If the buffer size is 0, however, the channel becomes a rendezvous
940     /// channel and it guarantees that the receiver has indeed received
941     /// the data if this function returns success.
942     ///
943     /// This function will never panic, but it may return [`Err`] if the
944     /// [`Receiver`] has disconnected and is no longer able to receive
945     /// information.
946     ///
947     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
948     /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
949     ///
950     /// # Examples
951     ///
952     /// ```rust
953     /// use std::sync::mpsc::sync_channel;
954     /// use std::thread;
955     ///
956     /// // Create a rendezvous sync_channel with buffer size 0
957     /// let (sync_sender, receiver) = sync_channel(0);
958     ///
959     /// thread::spawn(move || {
960     ///    println!("sending message...");
961     ///    sync_sender.send(1).unwrap();
962     ///    // Thread is now blocked until the message is received
963     ///
964     ///    println!("...message received!");
965     /// });
966     ///
967     /// let msg = receiver.recv().unwrap();
968     /// assert_eq!(1, msg);
969     /// ```
970     #[stable(feature = "rust1", since = "1.0.0")]
971     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
972         self.inner.send(t).map_err(SendError)
973     }
974
975     /// Attempts to send a value on this channel without blocking.
976     ///
977     /// This method differs from [`send`] by returning immediately if the
978     /// channel's buffer is full or no receiver is waiting to acquire some
979     /// data. Compared with [`send`], this function has two failure cases
980     /// instead of one (one for disconnection, one for a full buffer).
981     ///
982     /// See [`send`] for notes about guarantees of whether the
983     /// receiver has received the data or not if this function is successful.
984     ///
985     /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
986     ///
987     /// # Examples
988     ///
989     /// ```rust
990     /// use std::sync::mpsc::sync_channel;
991     /// use std::thread;
992     ///
993     /// // Create a sync_channel with buffer size 1
994     /// let (sync_sender, receiver) = sync_channel(1);
995     /// let sync_sender2 = sync_sender.clone();
996     ///
997     /// // First thread owns sync_sender
998     /// thread::spawn(move || {
999     ///     sync_sender.send(1).unwrap();
1000     ///     sync_sender.send(2).unwrap();
1001     ///     // Thread blocked
1002     /// });
1003     ///
1004     /// // Second thread owns sync_sender2
1005     /// thread::spawn(move || {
1006     ///     // This will return an error and send
1007     ///     // no message if the buffer is full
1008     ///     sync_sender2.try_send(3).is_err();
1009     /// });
1010     ///
1011     /// let mut msg;
1012     /// msg = receiver.recv().unwrap();
1013     /// println!("message {} received", msg);
1014     ///
1015     /// msg = receiver.recv().unwrap();
1016     /// println!("message {} received", msg);
1017     ///
1018     /// // Third message may have never been sent
1019     /// match receiver.try_recv() {
1020     ///     Ok(msg) => println!("message {} received", msg),
1021     ///     Err(_) => println!("the third message was never sent"),
1022     /// }
1023     /// ```
1024     #[stable(feature = "rust1", since = "1.0.0")]
1025     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
1026         self.inner.try_send(t)
1027     }
1028 }
1029
1030 #[stable(feature = "rust1", since = "1.0.0")]
1031 impl<T> Clone for SyncSender<T> {
1032     fn clone(&self) -> SyncSender<T> {
1033         self.inner.clone_chan();
1034         SyncSender::new(self.inner.clone())
1035     }
1036 }
1037
1038 #[stable(feature = "rust1", since = "1.0.0")]
1039 impl<T> Drop for SyncSender<T> {
1040     fn drop(&mut self) {
1041         self.inner.drop_chan();
1042     }
1043 }
1044
1045 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1046 impl<T> fmt::Debug for SyncSender<T> {
1047     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1048         f.debug_struct("SyncSender").finish()
1049     }
1050 }
1051
1052 ////////////////////////////////////////////////////////////////////////////////
1053 // Receiver
1054 ////////////////////////////////////////////////////////////////////////////////
1055
1056 impl<T> Receiver<T> {
1057     fn new(inner: Flavor<T>) -> Receiver<T> {
1058         Receiver { inner: UnsafeCell::new(inner) }
1059     }
1060
1061     /// Attempts to return a pending value on this receiver without blocking.
1062     ///
1063     /// This method will never block the caller in order to wait for data to
1064     /// become available. Instead, this will always return immediately with a
1065     /// possible option of pending data on the channel.
1066     ///
1067     /// This is useful for a flavor of "optimistic check" before deciding to
1068     /// block on a receiver.
1069     ///
1070     /// Compared with [`recv`], this function has two failure cases instead of one
1071     /// (one for disconnection, one for an empty buffer).
1072     ///
1073     /// [`recv`]: struct.Receiver.html#method.recv
1074     ///
1075     /// # Examples
1076     ///
1077     /// ```rust
1078     /// use std::sync::mpsc::{Receiver, channel};
1079     ///
1080     /// let (_, receiver): (_, Receiver<i32>) = channel();
1081     ///
1082     /// assert!(receiver.try_recv().is_err());
1083     /// ```
1084     #[stable(feature = "rust1", since = "1.0.0")]
1085     pub fn try_recv(&self) -> Result<T, TryRecvError> {
1086         loop {
1087             let new_port = match *unsafe { self.inner() } {
1088                 Flavor::Oneshot(ref p) => {
1089                     match p.try_recv() {
1090                         Ok(t) => return Ok(t),
1091                         Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1092                         Err(oneshot::Disconnected) => {
1093                             return Err(TryRecvError::Disconnected)
1094                         }
1095                         Err(oneshot::Upgraded(rx)) => rx,
1096                     }
1097                 }
1098                 Flavor::Stream(ref p) => {
1099                     match p.try_recv() {
1100                         Ok(t) => return Ok(t),
1101                         Err(stream::Empty) => return Err(TryRecvError::Empty),
1102                         Err(stream::Disconnected) => {
1103                             return Err(TryRecvError::Disconnected)
1104                         }
1105                         Err(stream::Upgraded(rx)) => rx,
1106                     }
1107                 }
1108                 Flavor::Shared(ref p) => {
1109                     match p.try_recv() {
1110                         Ok(t) => return Ok(t),
1111                         Err(shared::Empty) => return Err(TryRecvError::Empty),
1112                         Err(shared::Disconnected) => {
1113                             return Err(TryRecvError::Disconnected)
1114                         }
1115                     }
1116                 }
1117                 Flavor::Sync(ref p) => {
1118                     match p.try_recv() {
1119                         Ok(t) => return Ok(t),
1120                         Err(sync::Empty) => return Err(TryRecvError::Empty),
1121                         Err(sync::Disconnected) => {
1122                             return Err(TryRecvError::Disconnected)
1123                         }
1124                     }
1125                 }
1126             };
1127             unsafe {
1128                 mem::swap(self.inner_mut(),
1129                           new_port.inner_mut());
1130             }
1131         }
1132     }
1133
1134     /// Attempts to wait for a value on this receiver, returning an error if the
1135     /// corresponding channel has hung up.
1136     ///
1137     /// This function will always block the current thread if there is no data
1138     /// available and it's possible for more data to be sent. Once a message is
1139     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1140     /// receiver will wake up and return that message.
1141     ///
1142     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1143     /// this call is blocking, this call will wake up and return [`Err`] to
1144     /// indicate that no more messages can ever be received on this channel.
1145     /// However, since channels are buffered, messages sent before the disconnect
1146     /// will still be properly received.
1147     ///
1148     /// [`Sender`]: struct.Sender.html
1149     /// [`SyncSender`]: struct.SyncSender.html
1150     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1151     ///
1152     /// # Examples
1153     ///
1154     /// ```
1155     /// use std::sync::mpsc;
1156     /// use std::thread;
1157     ///
1158     /// let (send, recv) = mpsc::channel();
1159     /// let handle = thread::spawn(move || {
1160     ///     send.send(1u8).unwrap();
1161     /// });
1162     ///
1163     /// handle.join().unwrap();
1164     ///
1165     /// assert_eq!(Ok(1), recv.recv());
1166     /// ```
1167     ///
1168     /// Buffering behavior:
1169     ///
1170     /// ```
1171     /// use std::sync::mpsc;
1172     /// use std::thread;
1173     /// use std::sync::mpsc::RecvError;
1174     ///
1175     /// let (send, recv) = mpsc::channel();
1176     /// let handle = thread::spawn(move || {
1177     ///     send.send(1u8).unwrap();
1178     ///     send.send(2).unwrap();
1179     ///     send.send(3).unwrap();
1180     ///     drop(send);
1181     /// });
1182     ///
1183     /// // wait for the thread to join so we ensure the sender is dropped
1184     /// handle.join().unwrap();
1185     ///
1186     /// assert_eq!(Ok(1), recv.recv());
1187     /// assert_eq!(Ok(2), recv.recv());
1188     /// assert_eq!(Ok(3), recv.recv());
1189     /// assert_eq!(Err(RecvError), recv.recv());
1190     /// ```
1191     #[stable(feature = "rust1", since = "1.0.0")]
1192     pub fn recv(&self) -> Result<T, RecvError> {
1193         loop {
1194             let new_port = match *unsafe { self.inner() } {
1195                 Flavor::Oneshot(ref p) => {
1196                     match p.recv(None) {
1197                         Ok(t) => return Ok(t),
1198                         Err(oneshot::Disconnected) => return Err(RecvError),
1199                         Err(oneshot::Upgraded(rx)) => rx,
1200                         Err(oneshot::Empty) => unreachable!(),
1201                     }
1202                 }
1203                 Flavor::Stream(ref p) => {
1204                     match p.recv(None) {
1205                         Ok(t) => return Ok(t),
1206                         Err(stream::Disconnected) => return Err(RecvError),
1207                         Err(stream::Upgraded(rx)) => rx,
1208                         Err(stream::Empty) => unreachable!(),
1209                     }
1210                 }
1211                 Flavor::Shared(ref p) => {
1212                     match p.recv(None) {
1213                         Ok(t) => return Ok(t),
1214                         Err(shared::Disconnected) => return Err(RecvError),
1215                         Err(shared::Empty) => unreachable!(),
1216                     }
1217                 }
1218                 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1219             };
1220             unsafe {
1221                 mem::swap(self.inner_mut(), new_port.inner_mut());
1222             }
1223         }
1224     }
1225
1226     /// Attempts to wait for a value on this receiver, returning an error if the
1227     /// corresponding channel has hung up, or if it waits more than `timeout`.
1228     ///
1229     /// This function will always block the current thread if there is no data
1230     /// available and it's possible for more data to be sent. Once a message is
1231     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1232     /// receiver will wake up and return that message.
1233     ///
1234     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1235     /// this call is blocking, this call will wake up and return [`Err`] to
1236     /// indicate that no more messages can ever be received on this channel.
1237     /// However, since channels are buffered, messages sent before the disconnect
1238     /// will still be properly received.
1239     ///
1240     /// [`Sender`]: struct.Sender.html
1241     /// [`SyncSender`]: struct.SyncSender.html
1242     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1243     ///
1244     /// # Known Issues
1245     ///
1246     /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
1247     /// to panic unexpectedly with the following example:
1248     ///
1249     /// ```no_run
1250     /// use std::sync::mpsc::channel;
1251     /// use std::thread;
1252     /// use std::time::Duration;
1253     ///
1254     /// let (tx, rx) = channel::<String>();
1255     ///
1256     /// thread::spawn(move || {
1257     ///     let d = Duration::from_millis(10);
1258     ///     loop {
1259     ///         println!("recv");
1260     ///         let _r = rx.recv_timeout(d);
1261     ///     }
1262     /// });
1263     ///
1264     /// thread::sleep(Duration::from_millis(100));
1265     /// let _c1 = tx.clone();
1266     ///
1267     /// thread::sleep(Duration::from_secs(1));
1268     /// ```
1269     ///
1270     /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
1271     ///
1272     /// # Examples
1273     ///
1274     /// Successfully receiving value before encountering timeout:
1275     ///
1276     /// ```no_run
1277     /// use std::thread;
1278     /// use std::time::Duration;
1279     /// use std::sync::mpsc;
1280     ///
1281     /// let (send, recv) = mpsc::channel();
1282     ///
1283     /// thread::spawn(move || {
1284     ///     send.send('a').unwrap();
1285     /// });
1286     ///
1287     /// assert_eq!(
1288     ///     recv.recv_timeout(Duration::from_millis(400)),
1289     ///     Ok('a')
1290     /// );
1291     /// ```
1292     ///
1293     /// Receiving an error upon reaching timeout:
1294     ///
1295     /// ```no_run
1296     /// use std::thread;
1297     /// use std::time::Duration;
1298     /// use std::sync::mpsc;
1299     ///
1300     /// let (send, recv) = mpsc::channel();
1301     ///
1302     /// thread::spawn(move || {
1303     ///     thread::sleep(Duration::from_millis(800));
1304     ///     send.send('a').unwrap();
1305     /// });
1306     ///
1307     /// assert_eq!(
1308     ///     recv.recv_timeout(Duration::from_millis(400)),
1309     ///     Err(mpsc::RecvTimeoutError::Timeout)
1310     /// );
1311     /// ```
1312     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1313     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1314         // Do an optimistic try_recv to avoid the performance impact of
1315         // Instant::now() in the full-channel case.
1316         match self.try_recv() {
1317             Ok(result) => Ok(result),
1318             Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
1319             Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
1320                 Some(deadline) => self.recv_deadline(deadline),
1321                 // So far in the future that it's practically the same as waiting indefinitely.
1322                 None => self.recv().map_err(RecvTimeoutError::from),
1323             },
1324         }
1325     }
1326
1327     /// Attempts to wait for a value on this receiver, returning an error if the
1328     /// corresponding channel has hung up, or if `deadline` is reached.
1329     ///
1330     /// This function will always block the current thread if there is no data
1331     /// available and it's possible for more data to be sent. Once a message is
1332     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1333     /// receiver will wake up and return that message.
1334     ///
1335     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1336     /// this call is blocking, this call will wake up and return [`Err`] to
1337     /// indicate that no more messages can ever be received on this channel.
1338     /// However, since channels are buffered, messages sent before the disconnect
1339     /// will still be properly received.
1340     ///
1341     /// [`Sender`]: struct.Sender.html
1342     /// [`SyncSender`]: struct.SyncSender.html
1343     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1344     ///
1345     /// # Examples
1346     ///
1347     /// Successfully receiving value before reaching deadline:
1348     ///
1349     /// ```no_run
1350     /// #![feature(deadline_api)]
1351     /// use std::thread;
1352     /// use std::time::{Duration, Instant};
1353     /// use std::sync::mpsc;
1354     ///
1355     /// let (send, recv) = mpsc::channel();
1356     ///
1357     /// thread::spawn(move || {
1358     ///     send.send('a').unwrap();
1359     /// });
1360     ///
1361     /// assert_eq!(
1362     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1363     ///     Ok('a')
1364     /// );
1365     /// ```
1366     ///
1367     /// Receiving an error upon reaching deadline:
1368     ///
1369     /// ```no_run
1370     /// #![feature(deadline_api)]
1371     /// use std::thread;
1372     /// use std::time::{Duration, Instant};
1373     /// use std::sync::mpsc;
1374     ///
1375     /// let (send, recv) = mpsc::channel();
1376     ///
1377     /// thread::spawn(move || {
1378     ///     thread::sleep(Duration::from_millis(800));
1379     ///     send.send('a').unwrap();
1380     /// });
1381     ///
1382     /// assert_eq!(
1383     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1384     ///     Err(mpsc::RecvTimeoutError::Timeout)
1385     /// );
1386     /// ```
1387     #[unstable(feature = "deadline_api", issue = "46316")]
1388     pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1389         use self::RecvTimeoutError::*;
1390
1391         loop {
1392             let port_or_empty = match *unsafe { self.inner() } {
1393                 Flavor::Oneshot(ref p) => {
1394                     match p.recv(Some(deadline)) {
1395                         Ok(t) => return Ok(t),
1396                         Err(oneshot::Disconnected) => return Err(Disconnected),
1397                         Err(oneshot::Upgraded(rx)) => Some(rx),
1398                         Err(oneshot::Empty) => None,
1399                     }
1400                 }
1401                 Flavor::Stream(ref p) => {
1402                     match p.recv(Some(deadline)) {
1403                         Ok(t) => return Ok(t),
1404                         Err(stream::Disconnected) => return Err(Disconnected),
1405                         Err(stream::Upgraded(rx)) => Some(rx),
1406                         Err(stream::Empty) => None,
1407                     }
1408                 }
1409                 Flavor::Shared(ref p) => {
1410                     match p.recv(Some(deadline)) {
1411                         Ok(t) => return Ok(t),
1412                         Err(shared::Disconnected) => return Err(Disconnected),
1413                         Err(shared::Empty) => None,
1414                     }
1415                 }
1416                 Flavor::Sync(ref p) => {
1417                     match p.recv(Some(deadline)) {
1418                         Ok(t) => return Ok(t),
1419                         Err(sync::Disconnected) => return Err(Disconnected),
1420                         Err(sync::Empty) => None,
1421                     }
1422                 }
1423             };
1424
1425             if let Some(new_port) = port_or_empty {
1426                 unsafe {
1427                     mem::swap(self.inner_mut(), new_port.inner_mut());
1428                 }
1429             }
1430
1431             // If we're already passed the deadline, and we're here without
1432             // data, return a timeout, else try again.
1433             if Instant::now() >= deadline {
1434                 return Err(Timeout);
1435             }
1436         }
1437     }
1438
1439     /// Returns an iterator that will block waiting for messages, but never
1440     /// [`panic!`]. It will return [`None`] when the channel has hung up.
1441     ///
1442     /// [`panic!`]: ../../../std/macro.panic.html
1443     /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1444     ///
1445     /// # Examples
1446     ///
1447     /// ```rust
1448     /// use std::sync::mpsc::channel;
1449     /// use std::thread;
1450     ///
1451     /// let (send, recv) = channel();
1452     ///
1453     /// thread::spawn(move || {
1454     ///     send.send(1).unwrap();
1455     ///     send.send(2).unwrap();
1456     ///     send.send(3).unwrap();
1457     /// });
1458     ///
1459     /// let mut iter = recv.iter();
1460     /// assert_eq!(iter.next(), Some(1));
1461     /// assert_eq!(iter.next(), Some(2));
1462     /// assert_eq!(iter.next(), Some(3));
1463     /// assert_eq!(iter.next(), None);
1464     /// ```
1465     #[stable(feature = "rust1", since = "1.0.0")]
1466     pub fn iter(&self) -> Iter<T> {
1467         Iter { rx: self }
1468     }
1469
1470     /// Returns an iterator that will attempt to yield all pending values.
1471     /// It will return `None` if there are no more pending values or if the
1472     /// channel has hung up. The iterator will never [`panic!`] or block the
1473     /// user by waiting for values.
1474     ///
1475     /// [`panic!`]: ../../../std/macro.panic.html
1476     ///
1477     /// # Examples
1478     ///
1479     /// ```no_run
1480     /// use std::sync::mpsc::channel;
1481     /// use std::thread;
1482     /// use std::time::Duration;
1483     ///
1484     /// let (sender, receiver) = channel();
1485     ///
1486     /// // nothing is in the buffer yet
1487     /// assert!(receiver.try_iter().next().is_none());
1488     ///
1489     /// thread::spawn(move || {
1490     ///     thread::sleep(Duration::from_secs(1));
1491     ///     sender.send(1).unwrap();
1492     ///     sender.send(2).unwrap();
1493     ///     sender.send(3).unwrap();
1494     /// });
1495     ///
1496     /// // nothing is in the buffer yet
1497     /// assert!(receiver.try_iter().next().is_none());
1498     ///
1499     /// // block for two seconds
1500     /// thread::sleep(Duration::from_secs(2));
1501     ///
1502     /// let mut iter = receiver.try_iter();
1503     /// assert_eq!(iter.next(), Some(1));
1504     /// assert_eq!(iter.next(), Some(2));
1505     /// assert_eq!(iter.next(), Some(3));
1506     /// assert_eq!(iter.next(), None);
1507     /// ```
1508     #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1509     pub fn try_iter(&self) -> TryIter<T> {
1510         TryIter { rx: self }
1511     }
1512
1513 }
1514
1515 impl<T> select::Packet for Receiver<T> {
1516     fn can_recv(&self) -> bool {
1517         loop {
1518             let new_port = match *unsafe { self.inner() } {
1519                 Flavor::Oneshot(ref p) => {
1520                     match p.can_recv() {
1521                         Ok(ret) => return ret,
1522                         Err(upgrade) => upgrade,
1523                     }
1524                 }
1525                 Flavor::Stream(ref p) => {
1526                     match p.can_recv() {
1527                         Ok(ret) => return ret,
1528                         Err(upgrade) => upgrade,
1529                     }
1530                 }
1531                 Flavor::Shared(ref p) => return p.can_recv(),
1532                 Flavor::Sync(ref p) => return p.can_recv(),
1533             };
1534             unsafe {
1535                 mem::swap(self.inner_mut(),
1536                           new_port.inner_mut());
1537             }
1538         }
1539     }
1540
1541     fn start_selection(&self, mut token: SignalToken) -> StartResult {
1542         loop {
1543             let (t, new_port) = match *unsafe { self.inner() } {
1544                 Flavor::Oneshot(ref p) => {
1545                     match p.start_selection(token) {
1546                         oneshot::SelSuccess => return Installed,
1547                         oneshot::SelCanceled => return Abort,
1548                         oneshot::SelUpgraded(t, rx) => (t, rx),
1549                     }
1550                 }
1551                 Flavor::Stream(ref p) => {
1552                     match p.start_selection(token) {
1553                         stream::SelSuccess => return Installed,
1554                         stream::SelCanceled => return Abort,
1555                         stream::SelUpgraded(t, rx) => (t, rx),
1556                     }
1557                 }
1558                 Flavor::Shared(ref p) => return p.start_selection(token),
1559                 Flavor::Sync(ref p) => return p.start_selection(token),
1560             };
1561             token = t;
1562             unsafe {
1563                 mem::swap(self.inner_mut(), new_port.inner_mut());
1564             }
1565         }
1566     }
1567
1568     fn abort_selection(&self) -> bool {
1569         let mut was_upgrade = false;
1570         loop {
1571             let result = match *unsafe { self.inner() } {
1572                 Flavor::Oneshot(ref p) => p.abort_selection(),
1573                 Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
1574                 Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
1575                 Flavor::Sync(ref p) => return p.abort_selection(),
1576             };
1577             let new_port = match result { Ok(b) => return b, Err(p) => p };
1578             was_upgrade = true;
1579             unsafe {
1580                 mem::swap(self.inner_mut(),
1581                           new_port.inner_mut());
1582             }
1583         }
1584     }
1585 }
1586
1587 #[stable(feature = "rust1", since = "1.0.0")]
1588 impl<'a, T> Iterator for Iter<'a, T> {
1589     type Item = T;
1590
1591     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1592 }
1593
1594 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1595 impl<'a, T> Iterator for TryIter<'a, T> {
1596     type Item = T;
1597
1598     fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
1599 }
1600
1601 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1602 impl<'a, T> IntoIterator for &'a Receiver<T> {
1603     type Item = T;
1604     type IntoIter = Iter<'a, T>;
1605
1606     fn into_iter(self) -> Iter<'a, T> { self.iter() }
1607 }
1608
1609 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1610 impl<T> Iterator for IntoIter<T> {
1611     type Item = T;
1612     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1613 }
1614
1615 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1616 impl <T> IntoIterator for Receiver<T> {
1617     type Item = T;
1618     type IntoIter = IntoIter<T>;
1619
1620     fn into_iter(self) -> IntoIter<T> {
1621         IntoIter { rx: self }
1622     }
1623 }
1624
1625 #[stable(feature = "rust1", since = "1.0.0")]
1626 impl<T> Drop for Receiver<T> {
1627     fn drop(&mut self) {
1628         match *unsafe { self.inner() } {
1629             Flavor::Oneshot(ref p) => p.drop_port(),
1630             Flavor::Stream(ref p) => p.drop_port(),
1631             Flavor::Shared(ref p) => p.drop_port(),
1632             Flavor::Sync(ref p) => p.drop_port(),
1633         }
1634     }
1635 }
1636
1637 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1638 impl<T> fmt::Debug for Receiver<T> {
1639     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1640         f.debug_struct("Receiver").finish()
1641     }
1642 }
1643
1644 #[stable(feature = "rust1", since = "1.0.0")]
1645 impl<T> fmt::Debug for SendError<T> {
1646     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1647         "SendError(..)".fmt(f)
1648     }
1649 }
1650
1651 #[stable(feature = "rust1", since = "1.0.0")]
1652 impl<T> fmt::Display for SendError<T> {
1653     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1654         "sending on a closed channel".fmt(f)
1655     }
1656 }
1657
1658 #[stable(feature = "rust1", since = "1.0.0")]
1659 impl<T: Send> error::Error for SendError<T> {
1660     fn description(&self) -> &str {
1661         "sending on a closed channel"
1662     }
1663
1664     fn cause(&self) -> Option<&dyn error::Error> {
1665         None
1666     }
1667 }
1668
1669 #[stable(feature = "rust1", since = "1.0.0")]
1670 impl<T> fmt::Debug for TrySendError<T> {
1671     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1672         match *self {
1673             TrySendError::Full(..) => "Full(..)".fmt(f),
1674             TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1675         }
1676     }
1677 }
1678
1679 #[stable(feature = "rust1", since = "1.0.0")]
1680 impl<T> fmt::Display for TrySendError<T> {
1681     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1682         match *self {
1683             TrySendError::Full(..) => {
1684                 "sending on a full channel".fmt(f)
1685             }
1686             TrySendError::Disconnected(..) => {
1687                 "sending on a closed channel".fmt(f)
1688             }
1689         }
1690     }
1691 }
1692
1693 #[stable(feature = "rust1", since = "1.0.0")]
1694 impl<T: Send> error::Error for TrySendError<T> {
1695
1696     fn description(&self) -> &str {
1697         match *self {
1698             TrySendError::Full(..) => {
1699                 "sending on a full channel"
1700             }
1701             TrySendError::Disconnected(..) => {
1702                 "sending on a closed channel"
1703             }
1704         }
1705     }
1706
1707     fn cause(&self) -> Option<&dyn error::Error> {
1708         None
1709     }
1710 }
1711
1712 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1713 impl<T> From<SendError<T>> for TrySendError<T> {
1714     fn from(err: SendError<T>) -> TrySendError<T> {
1715         match err {
1716             SendError(t) => TrySendError::Disconnected(t),
1717         }
1718     }
1719 }
1720
1721 #[stable(feature = "rust1", since = "1.0.0")]
1722 impl fmt::Display for RecvError {
1723     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1724         "receiving on a closed channel".fmt(f)
1725     }
1726 }
1727
1728 #[stable(feature = "rust1", since = "1.0.0")]
1729 impl error::Error for RecvError {
1730
1731     fn description(&self) -> &str {
1732         "receiving on a closed channel"
1733     }
1734
1735     fn cause(&self) -> Option<&dyn error::Error> {
1736         None
1737     }
1738 }
1739
1740 #[stable(feature = "rust1", since = "1.0.0")]
1741 impl fmt::Display for TryRecvError {
1742     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1743         match *self {
1744             TryRecvError::Empty => {
1745                 "receiving on an empty channel".fmt(f)
1746             }
1747             TryRecvError::Disconnected => {
1748                 "receiving on a closed channel".fmt(f)
1749             }
1750         }
1751     }
1752 }
1753
1754 #[stable(feature = "rust1", since = "1.0.0")]
1755 impl error::Error for TryRecvError {
1756
1757     fn description(&self) -> &str {
1758         match *self {
1759             TryRecvError::Empty => {
1760                 "receiving on an empty channel"
1761             }
1762             TryRecvError::Disconnected => {
1763                 "receiving on a closed channel"
1764             }
1765         }
1766     }
1767
1768     fn cause(&self) -> Option<&dyn error::Error> {
1769         None
1770     }
1771 }
1772
1773 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1774 impl From<RecvError> for TryRecvError {
1775     fn from(err: RecvError) -> TryRecvError {
1776         match err {
1777             RecvError => TryRecvError::Disconnected,
1778         }
1779     }
1780 }
1781
1782 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1783 impl fmt::Display for RecvTimeoutError {
1784     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1785         match *self {
1786             RecvTimeoutError::Timeout => {
1787                 "timed out waiting on channel".fmt(f)
1788             }
1789             RecvTimeoutError::Disconnected => {
1790                 "channel is empty and sending half is closed".fmt(f)
1791             }
1792         }
1793     }
1794 }
1795
1796 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1797 impl error::Error for RecvTimeoutError {
1798     fn description(&self) -> &str {
1799         match *self {
1800             RecvTimeoutError::Timeout => {
1801                 "timed out waiting on channel"
1802             }
1803             RecvTimeoutError::Disconnected => {
1804                 "channel is empty and sending half is closed"
1805             }
1806         }
1807     }
1808
1809     fn cause(&self) -> Option<&dyn error::Error> {
1810         None
1811     }
1812 }
1813
1814 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1815 impl From<RecvError> for RecvTimeoutError {
1816     fn from(err: RecvError) -> RecvTimeoutError {
1817         match err {
1818             RecvError => RecvTimeoutError::Disconnected,
1819         }
1820     }
1821 }
1822
1823 #[cfg(all(test, not(target_os = "emscripten")))]
1824 mod tests {
1825     use super::*;
1826     use crate::env;
1827     use crate::thread;
1828     use crate::time::{Duration, Instant};
1829
1830     pub fn stress_factor() -> usize {
1831         match env::var("RUST_TEST_STRESS") {
1832             Ok(val) => val.parse().unwrap(),
1833             Err(..) => 1,
1834         }
1835     }
1836
1837     #[test]
1838     fn smoke() {
1839         let (tx, rx) = channel::<i32>();
1840         tx.send(1).unwrap();
1841         assert_eq!(rx.recv().unwrap(), 1);
1842     }
1843
1844     #[test]
1845     fn drop_full() {
1846         let (tx, _rx) = channel::<Box<isize>>();
1847         tx.send(box 1).unwrap();
1848     }
1849
1850     #[test]
1851     fn drop_full_shared() {
1852         let (tx, _rx) = channel::<Box<isize>>();
1853         drop(tx.clone());
1854         drop(tx.clone());
1855         tx.send(box 1).unwrap();
1856     }
1857
1858     #[test]
1859     fn smoke_shared() {
1860         let (tx, rx) = channel::<i32>();
1861         tx.send(1).unwrap();
1862         assert_eq!(rx.recv().unwrap(), 1);
1863         let tx = tx.clone();
1864         tx.send(1).unwrap();
1865         assert_eq!(rx.recv().unwrap(), 1);
1866     }
1867
1868     #[test]
1869     fn smoke_threads() {
1870         let (tx, rx) = channel::<i32>();
1871         let _t = thread::spawn(move|| {
1872             tx.send(1).unwrap();
1873         });
1874         assert_eq!(rx.recv().unwrap(), 1);
1875     }
1876
1877     #[test]
1878     fn smoke_port_gone() {
1879         let (tx, rx) = channel::<i32>();
1880         drop(rx);
1881         assert!(tx.send(1).is_err());
1882     }
1883
1884     #[test]
1885     fn smoke_shared_port_gone() {
1886         let (tx, rx) = channel::<i32>();
1887         drop(rx);
1888         assert!(tx.send(1).is_err())
1889     }
1890
1891     #[test]
1892     fn smoke_shared_port_gone2() {
1893         let (tx, rx) = channel::<i32>();
1894         drop(rx);
1895         let tx2 = tx.clone();
1896         drop(tx);
1897         assert!(tx2.send(1).is_err());
1898     }
1899
1900     #[test]
1901     fn port_gone_concurrent() {
1902         let (tx, rx) = channel::<i32>();
1903         let _t = thread::spawn(move|| {
1904             rx.recv().unwrap();
1905         });
1906         while tx.send(1).is_ok() {}
1907     }
1908
1909     #[test]
1910     fn port_gone_concurrent_shared() {
1911         let (tx, rx) = channel::<i32>();
1912         let tx2 = tx.clone();
1913         let _t = thread::spawn(move|| {
1914             rx.recv().unwrap();
1915         });
1916         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1917     }
1918
1919     #[test]
1920     fn smoke_chan_gone() {
1921         let (tx, rx) = channel::<i32>();
1922         drop(tx);
1923         assert!(rx.recv().is_err());
1924     }
1925
1926     #[test]
1927     fn smoke_chan_gone_shared() {
1928         let (tx, rx) = channel::<()>();
1929         let tx2 = tx.clone();
1930         drop(tx);
1931         drop(tx2);
1932         assert!(rx.recv().is_err());
1933     }
1934
1935     #[test]
1936     fn chan_gone_concurrent() {
1937         let (tx, rx) = channel::<i32>();
1938         let _t = thread::spawn(move|| {
1939             tx.send(1).unwrap();
1940             tx.send(1).unwrap();
1941         });
1942         while rx.recv().is_ok() {}
1943     }
1944
1945     #[test]
1946     fn stress() {
1947         let (tx, rx) = channel::<i32>();
1948         let t = thread::spawn(move|| {
1949             for _ in 0..10000 { tx.send(1).unwrap(); }
1950         });
1951         for _ in 0..10000 {
1952             assert_eq!(rx.recv().unwrap(), 1);
1953         }
1954         t.join().ok().unwrap();
1955     }
1956
1957     #[test]
1958     fn stress_shared() {
1959         const AMT: u32 = 10000;
1960         const NTHREADS: u32 = 8;
1961         let (tx, rx) = channel::<i32>();
1962
1963         let t = thread::spawn(move|| {
1964             for _ in 0..AMT * NTHREADS {
1965                 assert_eq!(rx.recv().unwrap(), 1);
1966             }
1967             match rx.try_recv() {
1968                 Ok(..) => panic!(),
1969                 _ => {}
1970             }
1971         });
1972
1973         for _ in 0..NTHREADS {
1974             let tx = tx.clone();
1975             thread::spawn(move|| {
1976                 for _ in 0..AMT { tx.send(1).unwrap(); }
1977             });
1978         }
1979         drop(tx);
1980         t.join().ok().unwrap();
1981     }
1982
1983     #[test]
1984     fn send_from_outside_runtime() {
1985         let (tx1, rx1) = channel::<()>();
1986         let (tx2, rx2) = channel::<i32>();
1987         let t1 = thread::spawn(move|| {
1988             tx1.send(()).unwrap();
1989             for _ in 0..40 {
1990                 assert_eq!(rx2.recv().unwrap(), 1);
1991             }
1992         });
1993         rx1.recv().unwrap();
1994         let t2 = thread::spawn(move|| {
1995             for _ in 0..40 {
1996                 tx2.send(1).unwrap();
1997             }
1998         });
1999         t1.join().ok().unwrap();
2000         t2.join().ok().unwrap();
2001     }
2002
2003     #[test]
2004     fn recv_from_outside_runtime() {
2005         let (tx, rx) = channel::<i32>();
2006         let t = thread::spawn(move|| {
2007             for _ in 0..40 {
2008                 assert_eq!(rx.recv().unwrap(), 1);
2009             }
2010         });
2011         for _ in 0..40 {
2012             tx.send(1).unwrap();
2013         }
2014         t.join().ok().unwrap();
2015     }
2016
2017     #[test]
2018     fn no_runtime() {
2019         let (tx1, rx1) = channel::<i32>();
2020         let (tx2, rx2) = channel::<i32>();
2021         let t1 = thread::spawn(move|| {
2022             assert_eq!(rx1.recv().unwrap(), 1);
2023             tx2.send(2).unwrap();
2024         });
2025         let t2 = thread::spawn(move|| {
2026             tx1.send(1).unwrap();
2027             assert_eq!(rx2.recv().unwrap(), 2);
2028         });
2029         t1.join().ok().unwrap();
2030         t2.join().ok().unwrap();
2031     }
2032
2033     #[test]
2034     fn oneshot_single_thread_close_port_first() {
2035         // Simple test of closing without sending
2036         let (_tx, rx) = channel::<i32>();
2037         drop(rx);
2038     }
2039
2040     #[test]
2041     fn oneshot_single_thread_close_chan_first() {
2042         // Simple test of closing without sending
2043         let (tx, _rx) = channel::<i32>();
2044         drop(tx);
2045     }
2046
2047     #[test]
2048     fn oneshot_single_thread_send_port_close() {
2049         // Testing that the sender cleans up the payload if receiver is closed
2050         let (tx, rx) = channel::<Box<i32>>();
2051         drop(rx);
2052         assert!(tx.send(box 0).is_err());
2053     }
2054
2055     #[test]
2056     fn oneshot_single_thread_recv_chan_close() {
2057         // Receiving on a closed chan will panic
2058         let res = thread::spawn(move|| {
2059             let (tx, rx) = channel::<i32>();
2060             drop(tx);
2061             rx.recv().unwrap();
2062         }).join();
2063         // What is our res?
2064         assert!(res.is_err());
2065     }
2066
2067     #[test]
2068     fn oneshot_single_thread_send_then_recv() {
2069         let (tx, rx) = channel::<Box<i32>>();
2070         tx.send(box 10).unwrap();
2071         assert!(*rx.recv().unwrap() == 10);
2072     }
2073
2074     #[test]
2075     fn oneshot_single_thread_try_send_open() {
2076         let (tx, rx) = channel::<i32>();
2077         assert!(tx.send(10).is_ok());
2078         assert!(rx.recv().unwrap() == 10);
2079     }
2080
2081     #[test]
2082     fn oneshot_single_thread_try_send_closed() {
2083         let (tx, rx) = channel::<i32>();
2084         drop(rx);
2085         assert!(tx.send(10).is_err());
2086     }
2087
2088     #[test]
2089     fn oneshot_single_thread_try_recv_open() {
2090         let (tx, rx) = channel::<i32>();
2091         tx.send(10).unwrap();
2092         assert!(rx.recv() == Ok(10));
2093     }
2094
2095     #[test]
2096     fn oneshot_single_thread_try_recv_closed() {
2097         let (tx, rx) = channel::<i32>();
2098         drop(tx);
2099         assert!(rx.recv().is_err());
2100     }
2101
2102     #[test]
2103     fn oneshot_single_thread_peek_data() {
2104         let (tx, rx) = channel::<i32>();
2105         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2106         tx.send(10).unwrap();
2107         assert_eq!(rx.try_recv(), Ok(10));
2108     }
2109
2110     #[test]
2111     fn oneshot_single_thread_peek_close() {
2112         let (tx, rx) = channel::<i32>();
2113         drop(tx);
2114         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2115         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2116     }
2117
2118     #[test]
2119     fn oneshot_single_thread_peek_open() {
2120         let (_tx, rx) = channel::<i32>();
2121         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2122     }
2123
2124     #[test]
2125     fn oneshot_multi_task_recv_then_send() {
2126         let (tx, rx) = channel::<Box<i32>>();
2127         let _t = thread::spawn(move|| {
2128             assert!(*rx.recv().unwrap() == 10);
2129         });
2130
2131         tx.send(box 10).unwrap();
2132     }
2133
2134     #[test]
2135     fn oneshot_multi_task_recv_then_close() {
2136         let (tx, rx) = channel::<Box<i32>>();
2137         let _t = thread::spawn(move|| {
2138             drop(tx);
2139         });
2140         let res = thread::spawn(move|| {
2141             assert!(*rx.recv().unwrap() == 10);
2142         }).join();
2143         assert!(res.is_err());
2144     }
2145
2146     #[test]
2147     fn oneshot_multi_thread_close_stress() {
2148         for _ in 0..stress_factor() {
2149             let (tx, rx) = channel::<i32>();
2150             let _t = thread::spawn(move|| {
2151                 drop(rx);
2152             });
2153             drop(tx);
2154         }
2155     }
2156
2157     #[test]
2158     fn oneshot_multi_thread_send_close_stress() {
2159         for _ in 0..stress_factor() {
2160             let (tx, rx) = channel::<i32>();
2161             let _t = thread::spawn(move|| {
2162                 drop(rx);
2163             });
2164             let _ = thread::spawn(move|| {
2165                 tx.send(1).unwrap();
2166             }).join();
2167         }
2168     }
2169
2170     #[test]
2171     fn oneshot_multi_thread_recv_close_stress() {
2172         for _ in 0..stress_factor() {
2173             let (tx, rx) = channel::<i32>();
2174             thread::spawn(move|| {
2175                 let res = thread::spawn(move|| {
2176                     rx.recv().unwrap();
2177                 }).join();
2178                 assert!(res.is_err());
2179             });
2180             let _t = thread::spawn(move|| {
2181                 thread::spawn(move|| {
2182                     drop(tx);
2183                 });
2184             });
2185         }
2186     }
2187
2188     #[test]
2189     fn oneshot_multi_thread_send_recv_stress() {
2190         for _ in 0..stress_factor() {
2191             let (tx, rx) = channel::<Box<isize>>();
2192             let _t = thread::spawn(move|| {
2193                 tx.send(box 10).unwrap();
2194             });
2195             assert!(*rx.recv().unwrap() == 10);
2196         }
2197     }
2198
2199     #[test]
2200     fn stream_send_recv_stress() {
2201         for _ in 0..stress_factor() {
2202             let (tx, rx) = channel();
2203
2204             send(tx, 0);
2205             recv(rx, 0);
2206
2207             fn send(tx: Sender<Box<i32>>, i: i32) {
2208                 if i == 10 { return }
2209
2210                 thread::spawn(move|| {
2211                     tx.send(box i).unwrap();
2212                     send(tx, i + 1);
2213                 });
2214             }
2215
2216             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2217                 if i == 10 { return }
2218
2219                 thread::spawn(move|| {
2220                     assert!(*rx.recv().unwrap() == i);
2221                     recv(rx, i + 1);
2222                 });
2223             }
2224         }
2225     }
2226
2227     #[test]
2228     fn oneshot_single_thread_recv_timeout() {
2229         let (tx, rx) = channel();
2230         tx.send(()).unwrap();
2231         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2232         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2233         tx.send(()).unwrap();
2234         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2235     }
2236
2237     #[test]
2238     fn stress_recv_timeout_two_threads() {
2239         let (tx, rx) = channel();
2240         let stress = stress_factor() + 100;
2241         let timeout = Duration::from_millis(100);
2242
2243         thread::spawn(move || {
2244             for i in 0..stress {
2245                 if i % 2 == 0 {
2246                     thread::sleep(timeout * 2);
2247                 }
2248                 tx.send(1usize).unwrap();
2249             }
2250         });
2251
2252         let mut recv_count = 0;
2253         loop {
2254             match rx.recv_timeout(timeout) {
2255                 Ok(n) => {
2256                     assert_eq!(n, 1usize);
2257                     recv_count += 1;
2258                 }
2259                 Err(RecvTimeoutError::Timeout) => continue,
2260                 Err(RecvTimeoutError::Disconnected) => break,
2261             }
2262         }
2263
2264         assert_eq!(recv_count, stress);
2265     }
2266
2267     #[test]
2268     fn recv_timeout_upgrade() {
2269         let (tx, rx) = channel::<()>();
2270         let timeout = Duration::from_millis(1);
2271         let _tx_clone = tx.clone();
2272
2273         let start = Instant::now();
2274         assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
2275         assert!(Instant::now() >= start + timeout);
2276     }
2277
2278     #[test]
2279     fn stress_recv_timeout_shared() {
2280         let (tx, rx) = channel();
2281         let stress = stress_factor() + 100;
2282
2283         for i in 0..stress {
2284             let tx = tx.clone();
2285             thread::spawn(move || {
2286                 thread::sleep(Duration::from_millis(i as u64 * 10));
2287                 tx.send(1usize).unwrap();
2288             });
2289         }
2290
2291         drop(tx);
2292
2293         let mut recv_count = 0;
2294         loop {
2295             match rx.recv_timeout(Duration::from_millis(10)) {
2296                 Ok(n) => {
2297                     assert_eq!(n, 1usize);
2298                     recv_count += 1;
2299                 }
2300                 Err(RecvTimeoutError::Timeout) => continue,
2301                 Err(RecvTimeoutError::Disconnected) => break,
2302             }
2303         }
2304
2305         assert_eq!(recv_count, stress);
2306     }
2307
2308     #[test]
2309     fn very_long_recv_timeout_wont_panic() {
2310         let (tx, rx) = channel::<()>();
2311         let join_handle = thread::spawn(move || {
2312             rx.recv_timeout(Duration::from_secs(u64::max_value()))
2313         });
2314         thread::sleep(Duration::from_secs(1));
2315         assert!(tx.send(()).is_ok());
2316         assert_eq!(join_handle.join().unwrap(), Ok(()));
2317     }
2318
2319     #[test]
2320     fn recv_a_lot() {
2321         // Regression test that we don't run out of stack in scheduler context
2322         let (tx, rx) = channel();
2323         for _ in 0..10000 { tx.send(()).unwrap(); }
2324         for _ in 0..10000 { rx.recv().unwrap(); }
2325     }
2326
2327     #[test]
2328     fn shared_recv_timeout() {
2329         let (tx, rx) = channel();
2330         let total = 5;
2331         for _ in 0..total {
2332             let tx = tx.clone();
2333             thread::spawn(move|| {
2334                 tx.send(()).unwrap();
2335             });
2336         }
2337
2338         for _ in 0..total { rx.recv().unwrap(); }
2339
2340         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2341         tx.send(()).unwrap();
2342         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2343     }
2344
2345     #[test]
2346     fn shared_chan_stress() {
2347         let (tx, rx) = channel();
2348         let total = stress_factor() + 100;
2349         for _ in 0..total {
2350             let tx = tx.clone();
2351             thread::spawn(move|| {
2352                 tx.send(()).unwrap();
2353             });
2354         }
2355
2356         for _ in 0..total {
2357             rx.recv().unwrap();
2358         }
2359     }
2360
2361     #[test]
2362     fn test_nested_recv_iter() {
2363         let (tx, rx) = channel::<i32>();
2364         let (total_tx, total_rx) = channel::<i32>();
2365
2366         let _t = thread::spawn(move|| {
2367             let mut acc = 0;
2368             for x in rx.iter() {
2369                 acc += x;
2370             }
2371             total_tx.send(acc).unwrap();
2372         });
2373
2374         tx.send(3).unwrap();
2375         tx.send(1).unwrap();
2376         tx.send(2).unwrap();
2377         drop(tx);
2378         assert_eq!(total_rx.recv().unwrap(), 6);
2379     }
2380
2381     #[test]
2382     fn test_recv_iter_break() {
2383         let (tx, rx) = channel::<i32>();
2384         let (count_tx, count_rx) = channel();
2385
2386         let _t = thread::spawn(move|| {
2387             let mut count = 0;
2388             for x in rx.iter() {
2389                 if count >= 3 {
2390                     break;
2391                 } else {
2392                     count += x;
2393                 }
2394             }
2395             count_tx.send(count).unwrap();
2396         });
2397
2398         tx.send(2).unwrap();
2399         tx.send(2).unwrap();
2400         tx.send(2).unwrap();
2401         let _ = tx.send(2);
2402         drop(tx);
2403         assert_eq!(count_rx.recv().unwrap(), 4);
2404     }
2405
2406     #[test]
2407     fn test_recv_try_iter() {
2408         let (request_tx, request_rx) = channel();
2409         let (response_tx, response_rx) = channel();
2410
2411         // Request `x`s until we have `6`.
2412         let t = thread::spawn(move|| {
2413             let mut count = 0;
2414             loop {
2415                 for x in response_rx.try_iter() {
2416                     count += x;
2417                     if count == 6 {
2418                         return count;
2419                     }
2420                 }
2421                 request_tx.send(()).unwrap();
2422             }
2423         });
2424
2425         for _ in request_rx.iter() {
2426             if response_tx.send(2).is_err() {
2427                 break;
2428             }
2429         }
2430
2431         assert_eq!(t.join().unwrap(), 6);
2432     }
2433
2434     #[test]
2435     fn test_recv_into_iter_owned() {
2436         let mut iter = {
2437           let (tx, rx) = channel::<i32>();
2438           tx.send(1).unwrap();
2439           tx.send(2).unwrap();
2440
2441           rx.into_iter()
2442         };
2443         assert_eq!(iter.next().unwrap(), 1);
2444         assert_eq!(iter.next().unwrap(), 2);
2445         assert_eq!(iter.next().is_none(), true);
2446     }
2447
2448     #[test]
2449     fn test_recv_into_iter_borrowed() {
2450         let (tx, rx) = channel::<i32>();
2451         tx.send(1).unwrap();
2452         tx.send(2).unwrap();
2453         drop(tx);
2454         let mut iter = (&rx).into_iter();
2455         assert_eq!(iter.next().unwrap(), 1);
2456         assert_eq!(iter.next().unwrap(), 2);
2457         assert_eq!(iter.next().is_none(), true);
2458     }
2459
2460     #[test]
2461     fn try_recv_states() {
2462         let (tx1, rx1) = channel::<i32>();
2463         let (tx2, rx2) = channel::<()>();
2464         let (tx3, rx3) = channel::<()>();
2465         let _t = thread::spawn(move|| {
2466             rx2.recv().unwrap();
2467             tx1.send(1).unwrap();
2468             tx3.send(()).unwrap();
2469             rx2.recv().unwrap();
2470             drop(tx1);
2471             tx3.send(()).unwrap();
2472         });
2473
2474         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2475         tx2.send(()).unwrap();
2476         rx3.recv().unwrap();
2477         assert_eq!(rx1.try_recv(), Ok(1));
2478         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2479         tx2.send(()).unwrap();
2480         rx3.recv().unwrap();
2481         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2482     }
2483
2484     // This bug used to end up in a livelock inside of the Receiver destructor
2485     // because the internal state of the Shared packet was corrupted
2486     #[test]
2487     fn destroy_upgraded_shared_port_when_sender_still_active() {
2488         let (tx, rx) = channel();
2489         let (tx2, rx2) = channel();
2490         let _t = thread::spawn(move|| {
2491             rx.recv().unwrap(); // wait on a oneshot
2492             drop(rx);  // destroy a shared
2493             tx2.send(()).unwrap();
2494         });
2495         // make sure the other thread has gone to sleep
2496         for _ in 0..5000 { thread::yield_now(); }
2497
2498         // upgrade to a shared chan and send a message
2499         let t = tx.clone();
2500         drop(tx);
2501         t.send(()).unwrap();
2502
2503         // wait for the child thread to exit before we exit
2504         rx2.recv().unwrap();
2505     }
2506
2507     #[test]
2508     fn issue_32114() {
2509         let (tx, _) = channel();
2510         let _ = tx.send(123);
2511         assert_eq!(tx.send(123), Err(SendError(123)));
2512     }
2513 }
2514
2515 #[cfg(all(test, not(target_os = "emscripten")))]
2516 mod sync_tests {
2517     use super::*;
2518     use crate::env;
2519     use crate::thread;
2520     use crate::time::Duration;
2521
2522     pub fn stress_factor() -> usize {
2523         match env::var("RUST_TEST_STRESS") {
2524             Ok(val) => val.parse().unwrap(),
2525             Err(..) => 1,
2526         }
2527     }
2528
2529     #[test]
2530     fn smoke() {
2531         let (tx, rx) = sync_channel::<i32>(1);
2532         tx.send(1).unwrap();
2533         assert_eq!(rx.recv().unwrap(), 1);
2534     }
2535
2536     #[test]
2537     fn drop_full() {
2538         let (tx, _rx) = sync_channel::<Box<isize>>(1);
2539         tx.send(box 1).unwrap();
2540     }
2541
2542     #[test]
2543     fn smoke_shared() {
2544         let (tx, rx) = sync_channel::<i32>(1);
2545         tx.send(1).unwrap();
2546         assert_eq!(rx.recv().unwrap(), 1);
2547         let tx = tx.clone();
2548         tx.send(1).unwrap();
2549         assert_eq!(rx.recv().unwrap(), 1);
2550     }
2551
2552     #[test]
2553     fn recv_timeout() {
2554         let (tx, rx) = sync_channel::<i32>(1);
2555         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2556         tx.send(1).unwrap();
2557         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2558     }
2559
2560     #[test]
2561     fn smoke_threads() {
2562         let (tx, rx) = sync_channel::<i32>(0);
2563         let _t = thread::spawn(move|| {
2564             tx.send(1).unwrap();
2565         });
2566         assert_eq!(rx.recv().unwrap(), 1);
2567     }
2568
2569     #[test]
2570     fn smoke_port_gone() {
2571         let (tx, rx) = sync_channel::<i32>(0);
2572         drop(rx);
2573         assert!(tx.send(1).is_err());
2574     }
2575
2576     #[test]
2577     fn smoke_shared_port_gone2() {
2578         let (tx, rx) = sync_channel::<i32>(0);
2579         drop(rx);
2580         let tx2 = tx.clone();
2581         drop(tx);
2582         assert!(tx2.send(1).is_err());
2583     }
2584
2585     #[test]
2586     fn port_gone_concurrent() {
2587         let (tx, rx) = sync_channel::<i32>(0);
2588         let _t = thread::spawn(move|| {
2589             rx.recv().unwrap();
2590         });
2591         while tx.send(1).is_ok() {}
2592     }
2593
2594     #[test]
2595     fn port_gone_concurrent_shared() {
2596         let (tx, rx) = sync_channel::<i32>(0);
2597         let tx2 = tx.clone();
2598         let _t = thread::spawn(move|| {
2599             rx.recv().unwrap();
2600         });
2601         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2602     }
2603
2604     #[test]
2605     fn smoke_chan_gone() {
2606         let (tx, rx) = sync_channel::<i32>(0);
2607         drop(tx);
2608         assert!(rx.recv().is_err());
2609     }
2610
2611     #[test]
2612     fn smoke_chan_gone_shared() {
2613         let (tx, rx) = sync_channel::<()>(0);
2614         let tx2 = tx.clone();
2615         drop(tx);
2616         drop(tx2);
2617         assert!(rx.recv().is_err());
2618     }
2619
2620     #[test]
2621     fn chan_gone_concurrent() {
2622         let (tx, rx) = sync_channel::<i32>(0);
2623         thread::spawn(move|| {
2624             tx.send(1).unwrap();
2625             tx.send(1).unwrap();
2626         });
2627         while rx.recv().is_ok() {}
2628     }
2629
2630     #[test]
2631     fn stress() {
2632         let (tx, rx) = sync_channel::<i32>(0);
2633         thread::spawn(move|| {
2634             for _ in 0..10000 { tx.send(1).unwrap(); }
2635         });
2636         for _ in 0..10000 {
2637             assert_eq!(rx.recv().unwrap(), 1);
2638         }
2639     }
2640
2641     #[test]
2642     fn stress_recv_timeout_two_threads() {
2643         let (tx, rx) = sync_channel::<i32>(0);
2644
2645         thread::spawn(move|| {
2646             for _ in 0..10000 { tx.send(1).unwrap(); }
2647         });
2648
2649         let mut recv_count = 0;
2650         loop {
2651             match rx.recv_timeout(Duration::from_millis(1)) {
2652                 Ok(v) => {
2653                     assert_eq!(v, 1);
2654                     recv_count += 1;
2655                 },
2656                 Err(RecvTimeoutError::Timeout) => continue,
2657                 Err(RecvTimeoutError::Disconnected) => break,
2658             }
2659         }
2660
2661         assert_eq!(recv_count, 10000);
2662     }
2663
2664     #[test]
2665     fn stress_recv_timeout_shared() {
2666         const AMT: u32 = 1000;
2667         const NTHREADS: u32 = 8;
2668         let (tx, rx) = sync_channel::<i32>(0);
2669         let (dtx, drx) = sync_channel::<()>(0);
2670
2671         thread::spawn(move|| {
2672             let mut recv_count = 0;
2673             loop {
2674                 match rx.recv_timeout(Duration::from_millis(10)) {
2675                     Ok(v) => {
2676                         assert_eq!(v, 1);
2677                         recv_count += 1;
2678                     },
2679                     Err(RecvTimeoutError::Timeout) => continue,
2680                     Err(RecvTimeoutError::Disconnected) => break,
2681                 }
2682             }
2683
2684             assert_eq!(recv_count, AMT * NTHREADS);
2685             assert!(rx.try_recv().is_err());
2686
2687             dtx.send(()).unwrap();
2688         });
2689
2690         for _ in 0..NTHREADS {
2691             let tx = tx.clone();
2692             thread::spawn(move|| {
2693                 for _ in 0..AMT { tx.send(1).unwrap(); }
2694             });
2695         }
2696
2697         drop(tx);
2698
2699         drx.recv().unwrap();
2700     }
2701
2702     #[test]
2703     fn stress_shared() {
2704         const AMT: u32 = 1000;
2705         const NTHREADS: u32 = 8;
2706         let (tx, rx) = sync_channel::<i32>(0);
2707         let (dtx, drx) = sync_channel::<()>(0);
2708
2709         thread::spawn(move|| {
2710             for _ in 0..AMT * NTHREADS {
2711                 assert_eq!(rx.recv().unwrap(), 1);
2712             }
2713             match rx.try_recv() {
2714                 Ok(..) => panic!(),
2715                 _ => {}
2716             }
2717             dtx.send(()).unwrap();
2718         });
2719
2720         for _ in 0..NTHREADS {
2721             let tx = tx.clone();
2722             thread::spawn(move|| {
2723                 for _ in 0..AMT { tx.send(1).unwrap(); }
2724             });
2725         }
2726         drop(tx);
2727         drx.recv().unwrap();
2728     }
2729
2730     #[test]
2731     fn oneshot_single_thread_close_port_first() {
2732         // Simple test of closing without sending
2733         let (_tx, rx) = sync_channel::<i32>(0);
2734         drop(rx);
2735     }
2736
2737     #[test]
2738     fn oneshot_single_thread_close_chan_first() {
2739         // Simple test of closing without sending
2740         let (tx, _rx) = sync_channel::<i32>(0);
2741         drop(tx);
2742     }
2743
2744     #[test]
2745     fn oneshot_single_thread_send_port_close() {
2746         // Testing that the sender cleans up the payload if receiver is closed
2747         let (tx, rx) = sync_channel::<Box<i32>>(0);
2748         drop(rx);
2749         assert!(tx.send(box 0).is_err());
2750     }
2751
2752     #[test]
2753     fn oneshot_single_thread_recv_chan_close() {
2754         // Receiving on a closed chan will panic
2755         let res = thread::spawn(move|| {
2756             let (tx, rx) = sync_channel::<i32>(0);
2757             drop(tx);
2758             rx.recv().unwrap();
2759         }).join();
2760         // What is our res?
2761         assert!(res.is_err());
2762     }
2763
2764     #[test]
2765     fn oneshot_single_thread_send_then_recv() {
2766         let (tx, rx) = sync_channel::<Box<i32>>(1);
2767         tx.send(box 10).unwrap();
2768         assert!(*rx.recv().unwrap() == 10);
2769     }
2770
2771     #[test]
2772     fn oneshot_single_thread_try_send_open() {
2773         let (tx, rx) = sync_channel::<i32>(1);
2774         assert_eq!(tx.try_send(10), Ok(()));
2775         assert!(rx.recv().unwrap() == 10);
2776     }
2777
2778     #[test]
2779     fn oneshot_single_thread_try_send_closed() {
2780         let (tx, rx) = sync_channel::<i32>(0);
2781         drop(rx);
2782         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2783     }
2784
2785     #[test]
2786     fn oneshot_single_thread_try_send_closed2() {
2787         let (tx, _rx) = sync_channel::<i32>(0);
2788         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2789     }
2790
2791     #[test]
2792     fn oneshot_single_thread_try_recv_open() {
2793         let (tx, rx) = sync_channel::<i32>(1);
2794         tx.send(10).unwrap();
2795         assert!(rx.recv() == Ok(10));
2796     }
2797
2798     #[test]
2799     fn oneshot_single_thread_try_recv_closed() {
2800         let (tx, rx) = sync_channel::<i32>(0);
2801         drop(tx);
2802         assert!(rx.recv().is_err());
2803     }
2804
2805     #[test]
2806     fn oneshot_single_thread_try_recv_closed_with_data() {
2807         let (tx, rx) = sync_channel::<i32>(1);
2808         tx.send(10).unwrap();
2809         drop(tx);
2810         assert_eq!(rx.try_recv(), Ok(10));
2811         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2812     }
2813
2814     #[test]
2815     fn oneshot_single_thread_peek_data() {
2816         let (tx, rx) = sync_channel::<i32>(1);
2817         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2818         tx.send(10).unwrap();
2819         assert_eq!(rx.try_recv(), Ok(10));
2820     }
2821
2822     #[test]
2823     fn oneshot_single_thread_peek_close() {
2824         let (tx, rx) = sync_channel::<i32>(0);
2825         drop(tx);
2826         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2827         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2828     }
2829
2830     #[test]
2831     fn oneshot_single_thread_peek_open() {
2832         let (_tx, rx) = sync_channel::<i32>(0);
2833         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2834     }
2835
2836     #[test]
2837     fn oneshot_multi_task_recv_then_send() {
2838         let (tx, rx) = sync_channel::<Box<i32>>(0);
2839         let _t = thread::spawn(move|| {
2840             assert!(*rx.recv().unwrap() == 10);
2841         });
2842
2843         tx.send(box 10).unwrap();
2844     }
2845
2846     #[test]
2847     fn oneshot_multi_task_recv_then_close() {
2848         let (tx, rx) = sync_channel::<Box<i32>>(0);
2849         let _t = thread::spawn(move|| {
2850             drop(tx);
2851         });
2852         let res = thread::spawn(move|| {
2853             assert!(*rx.recv().unwrap() == 10);
2854         }).join();
2855         assert!(res.is_err());
2856     }
2857
2858     #[test]
2859     fn oneshot_multi_thread_close_stress() {
2860         for _ in 0..stress_factor() {
2861             let (tx, rx) = sync_channel::<i32>(0);
2862             let _t = thread::spawn(move|| {
2863                 drop(rx);
2864             });
2865             drop(tx);
2866         }
2867     }
2868
2869     #[test]
2870     fn oneshot_multi_thread_send_close_stress() {
2871         for _ in 0..stress_factor() {
2872             let (tx, rx) = sync_channel::<i32>(0);
2873             let _t = thread::spawn(move|| {
2874                 drop(rx);
2875             });
2876             let _ = thread::spawn(move || {
2877                 tx.send(1).unwrap();
2878             }).join();
2879         }
2880     }
2881
2882     #[test]
2883     fn oneshot_multi_thread_recv_close_stress() {
2884         for _ in 0..stress_factor() {
2885             let (tx, rx) = sync_channel::<i32>(0);
2886             let _t = thread::spawn(move|| {
2887                 let res = thread::spawn(move|| {
2888                     rx.recv().unwrap();
2889                 }).join();
2890                 assert!(res.is_err());
2891             });
2892             let _t = thread::spawn(move|| {
2893                 thread::spawn(move|| {
2894                     drop(tx);
2895                 });
2896             });
2897         }
2898     }
2899
2900     #[test]
2901     fn oneshot_multi_thread_send_recv_stress() {
2902         for _ in 0..stress_factor() {
2903             let (tx, rx) = sync_channel::<Box<i32>>(0);
2904             let _t = thread::spawn(move|| {
2905                 tx.send(box 10).unwrap();
2906             });
2907             assert!(*rx.recv().unwrap() == 10);
2908         }
2909     }
2910
2911     #[test]
2912     fn stream_send_recv_stress() {
2913         for _ in 0..stress_factor() {
2914             let (tx, rx) = sync_channel::<Box<i32>>(0);
2915
2916             send(tx, 0);
2917             recv(rx, 0);
2918
2919             fn send(tx: SyncSender<Box<i32>>, i: i32) {
2920                 if i == 10 { return }
2921
2922                 thread::spawn(move|| {
2923                     tx.send(box i).unwrap();
2924                     send(tx, i + 1);
2925                 });
2926             }
2927
2928             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2929                 if i == 10 { return }
2930
2931                 thread::spawn(move|| {
2932                     assert!(*rx.recv().unwrap() == i);
2933                     recv(rx, i + 1);
2934                 });
2935             }
2936         }
2937     }
2938
2939     #[test]
2940     fn recv_a_lot() {
2941         // Regression test that we don't run out of stack in scheduler context
2942         let (tx, rx) = sync_channel(10000);
2943         for _ in 0..10000 { tx.send(()).unwrap(); }
2944         for _ in 0..10000 { rx.recv().unwrap(); }
2945     }
2946
2947     #[test]
2948     fn shared_chan_stress() {
2949         let (tx, rx) = sync_channel(0);
2950         let total = stress_factor() + 100;
2951         for _ in 0..total {
2952             let tx = tx.clone();
2953             thread::spawn(move|| {
2954                 tx.send(()).unwrap();
2955             });
2956         }
2957
2958         for _ in 0..total {
2959             rx.recv().unwrap();
2960         }
2961     }
2962
2963     #[test]
2964     fn test_nested_recv_iter() {
2965         let (tx, rx) = sync_channel::<i32>(0);
2966         let (total_tx, total_rx) = sync_channel::<i32>(0);
2967
2968         let _t = thread::spawn(move|| {
2969             let mut acc = 0;
2970             for x in rx.iter() {
2971                 acc += x;
2972             }
2973             total_tx.send(acc).unwrap();
2974         });
2975
2976         tx.send(3).unwrap();
2977         tx.send(1).unwrap();
2978         tx.send(2).unwrap();
2979         drop(tx);
2980         assert_eq!(total_rx.recv().unwrap(), 6);
2981     }
2982
2983     #[test]
2984     fn test_recv_iter_break() {
2985         let (tx, rx) = sync_channel::<i32>(0);
2986         let (count_tx, count_rx) = sync_channel(0);
2987
2988         let _t = thread::spawn(move|| {
2989             let mut count = 0;
2990             for x in rx.iter() {
2991                 if count >= 3 {
2992                     break;
2993                 } else {
2994                     count += x;
2995                 }
2996             }
2997             count_tx.send(count).unwrap();
2998         });
2999
3000         tx.send(2).unwrap();
3001         tx.send(2).unwrap();
3002         tx.send(2).unwrap();
3003         let _ = tx.try_send(2);
3004         drop(tx);
3005         assert_eq!(count_rx.recv().unwrap(), 4);
3006     }
3007
3008     #[test]
3009     fn try_recv_states() {
3010         let (tx1, rx1) = sync_channel::<i32>(1);
3011         let (tx2, rx2) = sync_channel::<()>(1);
3012         let (tx3, rx3) = sync_channel::<()>(1);
3013         let _t = thread::spawn(move|| {
3014             rx2.recv().unwrap();
3015             tx1.send(1).unwrap();
3016             tx3.send(()).unwrap();
3017             rx2.recv().unwrap();
3018             drop(tx1);
3019             tx3.send(()).unwrap();
3020         });
3021
3022         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
3023         tx2.send(()).unwrap();
3024         rx3.recv().unwrap();
3025         assert_eq!(rx1.try_recv(), Ok(1));
3026         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
3027         tx2.send(()).unwrap();
3028         rx3.recv().unwrap();
3029         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
3030     }
3031
3032     // This bug used to end up in a livelock inside of the Receiver destructor
3033     // because the internal state of the Shared packet was corrupted
3034     #[test]
3035     fn destroy_upgraded_shared_port_when_sender_still_active() {
3036         let (tx, rx) = sync_channel::<()>(0);
3037         let (tx2, rx2) = sync_channel::<()>(0);
3038         let _t = thread::spawn(move|| {
3039             rx.recv().unwrap(); // wait on a oneshot
3040             drop(rx);  // destroy a shared
3041             tx2.send(()).unwrap();
3042         });
3043         // make sure the other thread has gone to sleep
3044         for _ in 0..5000 { thread::yield_now(); }
3045
3046         // upgrade to a shared chan and send a message
3047         let t = tx.clone();
3048         drop(tx);
3049         t.send(()).unwrap();
3050
3051         // wait for the child thread to exit before we exit
3052         rx2.recv().unwrap();
3053     }
3054
3055     #[test]
3056     fn send1() {
3057         let (tx, rx) = sync_channel::<i32>(0);
3058         let _t = thread::spawn(move|| { rx.recv().unwrap(); });
3059         assert_eq!(tx.send(1), Ok(()));
3060     }
3061
3062     #[test]
3063     fn send2() {
3064         let (tx, rx) = sync_channel::<i32>(0);
3065         let _t = thread::spawn(move|| { drop(rx); });
3066         assert!(tx.send(1).is_err());
3067     }
3068
3069     #[test]
3070     fn send3() {
3071         let (tx, rx) = sync_channel::<i32>(1);
3072         assert_eq!(tx.send(1), Ok(()));
3073         let _t =thread::spawn(move|| { drop(rx); });
3074         assert!(tx.send(1).is_err());
3075     }
3076
3077     #[test]
3078     fn send4() {
3079         let (tx, rx) = sync_channel::<i32>(0);
3080         let tx2 = tx.clone();
3081         let (done, donerx) = channel();
3082         let done2 = done.clone();
3083         let _t = thread::spawn(move|| {
3084             assert!(tx.send(1).is_err());
3085             done.send(()).unwrap();
3086         });
3087         let _t = thread::spawn(move|| {
3088             assert!(tx2.send(2).is_err());
3089             done2.send(()).unwrap();
3090         });
3091         drop(rx);
3092         donerx.recv().unwrap();
3093         donerx.recv().unwrap();
3094     }
3095
3096     #[test]
3097     fn try_send1() {
3098         let (tx, _rx) = sync_channel::<i32>(0);
3099         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3100     }
3101
3102     #[test]
3103     fn try_send2() {
3104         let (tx, _rx) = sync_channel::<i32>(1);
3105         assert_eq!(tx.try_send(1), Ok(()));
3106         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3107     }
3108
3109     #[test]
3110     fn try_send3() {
3111         let (tx, rx) = sync_channel::<i32>(1);
3112         assert_eq!(tx.try_send(1), Ok(()));
3113         drop(rx);
3114         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
3115     }
3116
3117     #[test]
3118     fn issue_15761() {
3119         fn repro() {
3120             let (tx1, rx1) = sync_channel::<()>(3);
3121             let (tx2, rx2) = sync_channel::<()>(3);
3122
3123             let _t = thread::spawn(move|| {
3124                 rx1.recv().unwrap();
3125                 tx2.try_send(()).unwrap();
3126             });
3127
3128             tx1.try_send(()).unwrap();
3129             rx2.recv().unwrap();
3130         }
3131
3132         for _ in 0..100 {
3133             repro()
3134         }
3135     }
3136 }