]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/mod.rs
Deprecate Error::description for real
[rust.git] / src / libstd / sync / mpsc / mod.rs
1 // ignore-tidy-filelength
2
3 //! Multi-producer, single-consumer FIFO queue communication primitives.
4 //!
5 //! This module provides message-based communication over channels, concretely
6 //! defined among three types:
7 //!
8 //! * [`Sender`]
9 //! * [`SyncSender`]
10 //! * [`Receiver`]
11 //!
12 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
13 //! senders are clone-able (multi-producer) such that many threads can send
14 //! simultaneously to one receiver (single-consumer).
15 //!
16 //! These channels come in two flavors:
17 //!
18 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
19 //!    will return a `(Sender, Receiver)` tuple where all sends will be
20 //!    **asynchronous** (they never block). The channel conceptually has an
21 //!    infinite buffer.
22 //!
23 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
24 //!    return a `(SyncSender, Receiver)` tuple where the storage for pending
25 //!    messages is a pre-allocated buffer of a fixed size. All sends will be
26 //!    **synchronous** by blocking until there is buffer space available. Note
27 //!    that a bound of 0 is allowed, causing the channel to become a "rendezvous"
28 //!    channel where each sender atomically hands off a message to a receiver.
29 //!
30 //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
31 //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
32 //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
33 //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
34 //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
35 //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
36 //!
37 //! ## Disconnection
38 //!
39 //! The send and receive operations on channels will all return a [`Result`]
40 //! indicating whether the operation succeeded or not. An unsuccessful operation
41 //! is normally indicative of the other half of a channel having "hung up" by
42 //! being dropped in its corresponding thread.
43 //!
44 //! Once half of a channel has been deallocated, most operations can no longer
45 //! continue to make progress, so [`Err`] will be returned. Many applications
46 //! will continue to [`unwrap`] the results returned from this module,
47 //! instigating a propagation of failure among threads if one unexpectedly dies.
48 //!
49 //! [`Result`]: ../../../std/result/enum.Result.html
50 //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
51 //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
52 //!
53 //! # Examples
54 //!
55 //! Simple usage:
56 //!
57 //! ```
58 //! use std::thread;
59 //! use std::sync::mpsc::channel;
60 //!
61 //! // Create a simple streaming channel
62 //! let (tx, rx) = channel();
63 //! thread::spawn(move|| {
64 //!     tx.send(10).unwrap();
65 //! });
66 //! assert_eq!(rx.recv().unwrap(), 10);
67 //! ```
68 //!
69 //! Shared usage:
70 //!
71 //! ```
72 //! use std::thread;
73 //! use std::sync::mpsc::channel;
74 //!
75 //! // Create a shared channel that can be sent along from many threads
76 //! // where tx is the sending half (tx for transmission), and rx is the receiving
77 //! // half (rx for receiving).
78 //! let (tx, rx) = channel();
79 //! for i in 0..10 {
80 //!     let tx = tx.clone();
81 //!     thread::spawn(move|| {
82 //!         tx.send(i).unwrap();
83 //!     });
84 //! }
85 //!
86 //! for _ in 0..10 {
87 //!     let j = rx.recv().unwrap();
88 //!     assert!(0 <= j && j < 10);
89 //! }
90 //! ```
91 //!
92 //! Propagating panics:
93 //!
94 //! ```
95 //! use std::sync::mpsc::channel;
96 //!
97 //! // The call to recv() will return an error because the channel has already
98 //! // hung up (or been deallocated)
99 //! let (tx, rx) = channel::<i32>();
100 //! drop(tx);
101 //! assert!(rx.recv().is_err());
102 //! ```
103 //!
104 //! Synchronous channels:
105 //!
106 //! ```
107 //! use std::thread;
108 //! use std::sync::mpsc::sync_channel;
109 //!
110 //! let (tx, rx) = sync_channel::<i32>(0);
111 //! thread::spawn(move|| {
112 //!     // This will wait for the parent thread to start receiving
113 //!     tx.send(53).unwrap();
114 //! });
115 //! rx.recv().unwrap();
116 //! ```
117
118 #![stable(feature = "rust1", since = "1.0.0")]
119
120 // A description of how Rust's channel implementation works
121 //
122 // Channels are supposed to be the basic building block for all other
123 // concurrent primitives that are used in Rust. As a result, the channel type
124 // needs to be highly optimized, flexible, and broad enough for use everywhere.
125 //
126 // The choice of implementation of all channels is to be built on lock-free data
127 // structures. The channels themselves are then consequently also lock-free data
128 // structures. As always with lock-free code, this is a very "here be dragons"
129 // territory, especially because I'm unaware of any academic papers that have
130 // gone into great length about channels of these flavors.
131 //
132 // ## Flavors of channels
133 //
134 // From the perspective of a consumer of this library, there is only one flavor
135 // of channel. This channel can be used as a stream and cloned to allow multiple
136 // senders. Under the hood, however, there are actually three flavors of
137 // channels in play.
138 //
139 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
140 //                      case. They contain as few atomics as possible and
141 //                      involve one and exactly one allocation.
142 // * Streams - these channels are optimized for the non-shared use case. They
143 //             use a different concurrent queue that is more tailored for this
144 //             use case. The initial allocation of this flavor of channel is not
145 //             optimized.
146 // * Shared - this is the most general form of channel that this module offers,
147 //            a channel with multiple senders. This type is as optimized as it
148 //            can be, but the previous two types mentioned are much faster for
149 //            their use-cases.
150 //
151 // ## Concurrent queues
152 //
153 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
154 // but recv() obviously blocks. This means that under the hood there must be
155 // some shared and concurrent queue holding all of the actual data.
156 //
157 // With two flavors of channels, two flavors of queues are also used. We have
158 // chosen to use queues from a well-known author that are abbreviated as SPSC
159 // and MPSC (single producer, single consumer and multiple producer, single
160 // consumer). SPSC queues are used for streams while MPSC queues are used for
161 // shared channels.
162 //
163 // ### SPSC optimizations
164 //
165 // The SPSC queue found online is essentially a linked list of nodes where one
166 // half of the nodes are the "queue of data" and the other half of nodes are a
167 // cache of unused nodes. The unused nodes are used such that an allocation is
168 // not required on every push() and a free doesn't need to happen on every
169 // pop().
170 //
171 // As found online, however, the cache of nodes is of an infinite size. This
172 // means that if a channel at one point in its life had 50k items in the queue,
173 // then the queue will always have the capacity for 50k items. I believed that
174 // this was an unnecessary limitation of the implementation, so I have altered
175 // the queue to optionally have a bound on the cache size.
176 //
177 // By default, streams will have an unbounded SPSC queue with a small-ish cache
178 // size. The hope is that the cache is still large enough to have very fast
179 // send() operations while not too large such that millions of channels can
180 // coexist at once.
181 //
182 // ### MPSC optimizations
183 //
184 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
185 // a linked list under the hood to earn its unboundedness, but I have not put
186 // forth much effort into having a cache of nodes similar to the SPSC queue.
187 //
188 // For now, I believe that this is "ok" because shared channels are not the most
189 // common type, but soon we may wish to revisit this queue choice and determine
190 // another candidate for backend storage of shared channels.
191 //
192 // ## Overview of the Implementation
193 //
194 // Now that there's a little background on the concurrent queues used, it's
195 // worth going into much more detail about the channels themselves. The basic
196 // pseudocode for a send/recv are:
197 //
198 //
199 //      send(t)                             recv()
200 //        queue.push(t)                       return if queue.pop()
201 //        if increment() == -1                deschedule {
202 //          wakeup()                            if decrement() > 0
203 //                                                cancel_deschedule()
204 //                                            }
205 //                                            queue.pop()
206 //
207 // As mentioned before, there are no locks in this implementation, only atomic
208 // instructions are used.
209 //
210 // ### The internal atomic counter
211 //
212 // Every channel has a shared counter with each half to keep track of the size
213 // of the queue. This counter is used to abort descheduling by the receiver and
214 // to know when to wake up on the sending side.
215 //
216 // As seen in the pseudocode, senders will increment this count and receivers
217 // will decrement the count. The theory behind this is that if a sender sees a
218 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
219 // then it doesn't need to block.
220 //
221 // The recv() method has a beginning call to pop(), and if successful, it needs
222 // to decrement the count. It is a crucial implementation detail that this
223 // decrement does *not* happen to the shared counter. If this were the case,
224 // then it would be possible for the counter to be very negative when there were
225 // no receivers waiting, in which case the senders would have to determine when
226 // it was actually appropriate to wake up a receiver.
227 //
228 // Instead, the "steal count" is kept track of separately (not atomically
229 // because it's only used by receivers), and then the decrement() call when
230 // descheduling will lump in all of the recent steals into one large decrement.
231 //
232 // The implication of this is that if a sender sees a -1 count, then there's
233 // guaranteed to be a waiter waiting!
234 //
235 // ## Native Implementation
236 //
237 // A major goal of these channels is to work seamlessly on and off the runtime.
238 // All of the previous race conditions have been worded in terms of
239 // scheduler-isms (which is obviously not available without the runtime).
240 //
241 // For now, native usage of channels (off the runtime) will fall back onto
242 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
243 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
244 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
245 // condition variable.
246 //
247 // ## Select
248 //
249 // Being able to support selection over channels has greatly influenced this
250 // design, and not only does selection need to work inside the runtime, but also
251 // outside the runtime.
252 //
253 // The implementation is fairly straightforward. The goal of select() is not to
254 // return some data, but only to return which channel can receive data without
255 // blocking. The implementation is essentially the entire blocking procedure
256 // followed by an increment as soon as its woken up. The cancellation procedure
257 // involves an increment and swapping out of to_wake to acquire ownership of the
258 // thread to unblock.
259 //
260 // Sadly this current implementation requires multiple allocations, so I have
261 // seen the throughput of select() be much worse than it should be. I do not
262 // believe that there is anything fundamental that needs to change about these
263 // channels, however, in order to support a more efficient select().
264 //
265 // FIXME: Select is now removed, so these factors are ready to be cleaned up!
266 //
267 // # Conclusion
268 //
269 // And now that you've seen all the races that I found and attempted to fix,
270 // here's the code for you to find some more!
271
272 use crate::cell::UnsafeCell;
273 use crate::error;
274 use crate::fmt;
275 use crate::mem;
276 use crate::sync::Arc;
277 use crate::time::{Duration, Instant};
278
279 mod blocking;
280 mod mpsc_queue;
281 mod oneshot;
282 mod shared;
283 mod spsc_queue;
284 mod stream;
285 mod sync;
286
287 mod cache_aligned;
288
289 /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
290 /// This half can only be owned by one thread.
291 ///
292 /// Messages sent to the channel can be retrieved using [`recv`].
293 ///
294 /// [`channel`]: fn.channel.html
295 /// [`sync_channel`]: fn.sync_channel.html
296 /// [`recv`]: struct.Receiver.html#method.recv
297 ///
298 /// # Examples
299 ///
300 /// ```rust
301 /// use std::sync::mpsc::channel;
302 /// use std::thread;
303 /// use std::time::Duration;
304 ///
305 /// let (send, recv) = channel();
306 ///
307 /// thread::spawn(move || {
308 ///     send.send("Hello world!").unwrap();
309 ///     thread::sleep(Duration::from_secs(2)); // block for two seconds
310 ///     send.send("Delayed for 2 seconds").unwrap();
311 /// });
312 ///
313 /// println!("{}", recv.recv().unwrap()); // Received immediately
314 /// println!("Waiting...");
315 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
316 /// ```
317 #[stable(feature = "rust1", since = "1.0.0")]
318 pub struct Receiver<T> {
319     inner: UnsafeCell<Flavor<T>>,
320 }
321
322 // The receiver port can be sent from place to place, so long as it
323 // is not used to receive non-sendable things.
324 #[stable(feature = "rust1", since = "1.0.0")]
325 unsafe impl<T: Send> Send for Receiver<T> {}
326
327 #[stable(feature = "rust1", since = "1.0.0")]
328 impl<T> !Sync for Receiver<T> {}
329
330 /// An iterator over messages on a [`Receiver`], created by [`iter`].
331 ///
332 /// This iterator will block whenever [`next`] is called,
333 /// waiting for a new message, and [`None`] will be returned
334 /// when the corresponding channel has hung up.
335 ///
336 /// [`iter`]: struct.Receiver.html#method.iter
337 /// [`Receiver`]: struct.Receiver.html
338 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
339 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
340 ///
341 /// # Examples
342 ///
343 /// ```rust
344 /// use std::sync::mpsc::channel;
345 /// use std::thread;
346 ///
347 /// let (send, recv) = channel();
348 ///
349 /// thread::spawn(move || {
350 ///     send.send(1u8).unwrap();
351 ///     send.send(2u8).unwrap();
352 ///     send.send(3u8).unwrap();
353 /// });
354 ///
355 /// for x in recv.iter() {
356 ///     println!("Got: {}", x);
357 /// }
358 /// ```
359 #[stable(feature = "rust1", since = "1.0.0")]
360 #[derive(Debug)]
361 pub struct Iter<'a, T: 'a> {
362     rx: &'a Receiver<T>,
363 }
364
365 /// An iterator that attempts to yield all pending values for a [`Receiver`],
366 /// created by [`try_iter`].
367 ///
368 /// [`None`] will be returned when there are no pending values remaining or
369 /// if the corresponding channel has hung up.
370 ///
371 /// This iterator will never block the caller in order to wait for data to
372 /// become available. Instead, it will return [`None`].
373 ///
374 /// [`Receiver`]: struct.Receiver.html
375 /// [`try_iter`]: struct.Receiver.html#method.try_iter
376 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
377 ///
378 /// # Examples
379 ///
380 /// ```rust
381 /// use std::sync::mpsc::channel;
382 /// use std::thread;
383 /// use std::time::Duration;
384 ///
385 /// let (sender, receiver) = channel();
386 ///
387 /// // Nothing is in the buffer yet
388 /// assert!(receiver.try_iter().next().is_none());
389 /// println!("Nothing in the buffer...");
390 ///
391 /// thread::spawn(move || {
392 ///     sender.send(1).unwrap();
393 ///     sender.send(2).unwrap();
394 ///     sender.send(3).unwrap();
395 /// });
396 ///
397 /// println!("Going to sleep...");
398 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
399 ///
400 /// for x in receiver.try_iter() {
401 ///     println!("Got: {}", x);
402 /// }
403 /// ```
404 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
405 #[derive(Debug)]
406 pub struct TryIter<'a, T: 'a> {
407     rx: &'a Receiver<T>,
408 }
409
410 /// An owning iterator over messages on a [`Receiver`],
411 /// created by **Receiver::into_iter**.
412 ///
413 /// This iterator will block whenever [`next`]
414 /// is called, waiting for a new message, and [`None`] will be
415 /// returned if the corresponding channel has hung up.
416 ///
417 /// [`Receiver`]: struct.Receiver.html
418 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
419 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
420 ///
421 /// # Examples
422 ///
423 /// ```rust
424 /// use std::sync::mpsc::channel;
425 /// use std::thread;
426 ///
427 /// let (send, recv) = channel();
428 ///
429 /// thread::spawn(move || {
430 ///     send.send(1u8).unwrap();
431 ///     send.send(2u8).unwrap();
432 ///     send.send(3u8).unwrap();
433 /// });
434 ///
435 /// for x in recv.into_iter() {
436 ///     println!("Got: {}", x);
437 /// }
438 /// ```
439 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
440 #[derive(Debug)]
441 pub struct IntoIter<T> {
442     rx: Receiver<T>,
443 }
444
445 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
446 /// owned by one thread, but it can be cloned to send to other threads.
447 ///
448 /// Messages can be sent through this channel with [`send`].
449 ///
450 /// [`channel`]: fn.channel.html
451 /// [`send`]: struct.Sender.html#method.send
452 ///
453 /// # Examples
454 ///
455 /// ```rust
456 /// use std::sync::mpsc::channel;
457 /// use std::thread;
458 ///
459 /// let (sender, receiver) = channel();
460 /// let sender2 = sender.clone();
461 ///
462 /// // First thread owns sender
463 /// thread::spawn(move || {
464 ///     sender.send(1).unwrap();
465 /// });
466 ///
467 /// // Second thread owns sender2
468 /// thread::spawn(move || {
469 ///     sender2.send(2).unwrap();
470 /// });
471 ///
472 /// let msg = receiver.recv().unwrap();
473 /// let msg2 = receiver.recv().unwrap();
474 ///
475 /// assert_eq!(3, msg + msg2);
476 /// ```
477 #[stable(feature = "rust1", since = "1.0.0")]
478 pub struct Sender<T> {
479     inner: UnsafeCell<Flavor<T>>,
480 }
481
482 // The send port can be sent from place to place, so long as it
483 // is not used to send non-sendable things.
484 #[stable(feature = "rust1", since = "1.0.0")]
485 unsafe impl<T: Send> Send for Sender<T> {}
486
487 #[stable(feature = "rust1", since = "1.0.0")]
488 impl<T> !Sync for Sender<T> {}
489
490 /// The sending-half of Rust's synchronous [`sync_channel`] type.
491 ///
492 /// Messages can be sent through this channel with [`send`] or [`try_send`].
493 ///
494 /// [`send`] will block if there is no space in the internal buffer.
495 ///
496 /// [`sync_channel`]: fn.sync_channel.html
497 /// [`send`]: struct.SyncSender.html#method.send
498 /// [`try_send`]: struct.SyncSender.html#method.try_send
499 ///
500 /// # Examples
501 ///
502 /// ```rust
503 /// use std::sync::mpsc::sync_channel;
504 /// use std::thread;
505 ///
506 /// // Create a sync_channel with buffer size 2
507 /// let (sync_sender, receiver) = sync_channel(2);
508 /// let sync_sender2 = sync_sender.clone();
509 ///
510 /// // First thread owns sync_sender
511 /// thread::spawn(move || {
512 ///     sync_sender.send(1).unwrap();
513 ///     sync_sender.send(2).unwrap();
514 /// });
515 ///
516 /// // Second thread owns sync_sender2
517 /// thread::spawn(move || {
518 ///     sync_sender2.send(3).unwrap();
519 ///     // thread will now block since the buffer is full
520 ///     println!("Thread unblocked!");
521 /// });
522 ///
523 /// let mut msg;
524 ///
525 /// msg = receiver.recv().unwrap();
526 /// println!("message {} received", msg);
527 ///
528 /// // "Thread unblocked!" will be printed now
529 ///
530 /// msg = receiver.recv().unwrap();
531 /// println!("message {} received", msg);
532 ///
533 /// msg = receiver.recv().unwrap();
534 ///
535 /// println!("message {} received", msg);
536 /// ```
537 #[stable(feature = "rust1", since = "1.0.0")]
538 pub struct SyncSender<T> {
539     inner: Arc<sync::Packet<T>>,
540 }
541
542 #[stable(feature = "rust1", since = "1.0.0")]
543 unsafe impl<T: Send> Send for SyncSender<T> {}
544
545 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
546 /// function on **channel**s.
547 ///
548 /// A **send** operation can only fail if the receiving end of a channel is
549 /// disconnected, implying that the data could never be received. The error
550 /// contains the data being sent as a payload so it can be recovered.
551 ///
552 /// [`Sender::send`]: struct.Sender.html#method.send
553 /// [`SyncSender::send`]: struct.SyncSender.html#method.send
554 #[stable(feature = "rust1", since = "1.0.0")]
555 #[derive(PartialEq, Eq, Clone, Copy)]
556 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
557
558 /// An error returned from the [`recv`] function on a [`Receiver`].
559 ///
560 /// The [`recv`] operation can only fail if the sending half of a
561 /// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
562 /// messages will ever be received.
563 ///
564 /// [`recv`]: struct.Receiver.html#method.recv
565 /// [`Receiver`]: struct.Receiver.html
566 /// [`channel`]: fn.channel.html
567 /// [`sync_channel`]: fn.sync_channel.html
568 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
569 #[stable(feature = "rust1", since = "1.0.0")]
570 pub struct RecvError;
571
572 /// This enumeration is the list of the possible reasons that [`try_recv`] could
573 /// not return data when called. This can occur with both a [`channel`] and
574 /// a [`sync_channel`].
575 ///
576 /// [`try_recv`]: struct.Receiver.html#method.try_recv
577 /// [`channel`]: fn.channel.html
578 /// [`sync_channel`]: fn.sync_channel.html
579 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
580 #[stable(feature = "rust1", since = "1.0.0")]
581 pub enum TryRecvError {
582     /// This **channel** is currently empty, but the **Sender**(s) have not yet
583     /// disconnected, so data may yet become available.
584     #[stable(feature = "rust1", since = "1.0.0")]
585     Empty,
586
587     /// The **channel**'s sending half has become disconnected, and there will
588     /// never be any more data received on it.
589     #[stable(feature = "rust1", since = "1.0.0")]
590     Disconnected,
591 }
592
593 /// This enumeration is the list of possible errors that made [`recv_timeout`]
594 /// unable to return data when called. This can occur with both a [`channel`] and
595 /// a [`sync_channel`].
596 ///
597 /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
598 /// [`channel`]: fn.channel.html
599 /// [`sync_channel`]: fn.sync_channel.html
600 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
601 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
602 pub enum RecvTimeoutError {
603     /// This **channel** is currently empty, but the **Sender**(s) have not yet
604     /// disconnected, so data may yet become available.
605     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
606     Timeout,
607     /// The **channel**'s sending half has become disconnected, and there will
608     /// never be any more data received on it.
609     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
610     Disconnected,
611 }
612
613 /// This enumeration is the list of the possible error outcomes for the
614 /// [`try_send`] method.
615 ///
616 /// [`try_send`]: struct.SyncSender.html#method.try_send
617 #[stable(feature = "rust1", since = "1.0.0")]
618 #[derive(PartialEq, Eq, Clone, Copy)]
619 pub enum TrySendError<T> {
620     /// The data could not be sent on the [`sync_channel`] because it would require that
621     /// the callee block to send the data.
622     ///
623     /// If this is a buffered channel, then the buffer is full at this time. If
624     /// this is not a buffered channel, then there is no [`Receiver`] available to
625     /// acquire the data.
626     ///
627     /// [`sync_channel`]: fn.sync_channel.html
628     /// [`Receiver`]: struct.Receiver.html
629     #[stable(feature = "rust1", since = "1.0.0")]
630     Full(#[stable(feature = "rust1", since = "1.0.0")] T),
631
632     /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
633     /// sent. The data is returned back to the callee in this case.
634     ///
635     /// [`sync_channel`]: fn.sync_channel.html
636     #[stable(feature = "rust1", since = "1.0.0")]
637     Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
638 }
639
640 enum Flavor<T> {
641     Oneshot(Arc<oneshot::Packet<T>>),
642     Stream(Arc<stream::Packet<T>>),
643     Shared(Arc<shared::Packet<T>>),
644     Sync(Arc<sync::Packet<T>>),
645 }
646
647 #[doc(hidden)]
648 trait UnsafeFlavor<T> {
649     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
650     unsafe fn inner_mut(&self) -> &mut Flavor<T> {
651         &mut *self.inner_unsafe().get()
652     }
653     unsafe fn inner(&self) -> &Flavor<T> {
654         &*self.inner_unsafe().get()
655     }
656 }
657 impl<T> UnsafeFlavor<T> for Sender<T> {
658     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
659         &self.inner
660     }
661 }
662 impl<T> UnsafeFlavor<T> for Receiver<T> {
663     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
664         &self.inner
665     }
666 }
667
668 /// Creates a new asynchronous channel, returning the sender/receiver halves.
669 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
670 /// the same order as it was sent, and no [`send`] will block the calling thread
671 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
672 /// block after its buffer limit is reached). [`recv`] will block until a message
673 /// is available.
674 ///
675 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
676 /// only one [`Receiver`] is supported.
677 ///
678 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
679 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
680 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
681 /// return a [`RecvError`].
682 ///
683 /// [`send`]: struct.Sender.html#method.send
684 /// [`recv`]: struct.Receiver.html#method.recv
685 /// [`Sender`]: struct.Sender.html
686 /// [`Receiver`]: struct.Receiver.html
687 /// [`sync_channel`]: fn.sync_channel.html
688 /// [`SendError`]: struct.SendError.html
689 /// [`RecvError`]: struct.RecvError.html
690 ///
691 /// # Examples
692 ///
693 /// ```
694 /// use std::sync::mpsc::channel;
695 /// use std::thread;
696 ///
697 /// let (sender, receiver) = channel();
698 ///
699 /// // Spawn off an expensive computation
700 /// thread::spawn(move|| {
701 /// #   fn expensive_computation() {}
702 ///     sender.send(expensive_computation()).unwrap();
703 /// });
704 ///
705 /// // Do some useful work for awhile
706 ///
707 /// // Let's see what that answer was
708 /// println!("{:?}", receiver.recv().unwrap());
709 /// ```
710 #[stable(feature = "rust1", since = "1.0.0")]
711 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
712     let a = Arc::new(oneshot::Packet::new());
713     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
714 }
715
716 /// Creates a new synchronous, bounded channel.
717 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
718 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
719 /// [`Receiver`] will block until a message becomes available. `sync_channel`
720 /// differs greatly in the semantics of the sender, however.
721 ///
722 /// This channel has an internal buffer on which messages will be queued.
723 /// `bound` specifies the buffer size. When the internal buffer becomes full,
724 /// future sends will *block* waiting for the buffer to open up. Note that a
725 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
726 /// where each [`send`] will not return until a [`recv`] is paired with it.
727 ///
728 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
729 /// times, but only one [`Receiver`] is supported.
730 ///
731 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
732 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
733 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
734 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
735 ///
736 /// [`channel`]: fn.channel.html
737 /// [`send`]: struct.SyncSender.html#method.send
738 /// [`recv`]: struct.Receiver.html#method.recv
739 /// [`SyncSender`]: struct.SyncSender.html
740 /// [`Receiver`]: struct.Receiver.html
741 /// [`SendError`]: struct.SendError.html
742 /// [`RecvError`]: struct.RecvError.html
743 ///
744 /// # Examples
745 ///
746 /// ```
747 /// use std::sync::mpsc::sync_channel;
748 /// use std::thread;
749 ///
750 /// let (sender, receiver) = sync_channel(1);
751 ///
752 /// // this returns immediately
753 /// sender.send(1).unwrap();
754 ///
755 /// thread::spawn(move|| {
756 ///     // this will block until the previous message has been received
757 ///     sender.send(2).unwrap();
758 /// });
759 ///
760 /// assert_eq!(receiver.recv().unwrap(), 1);
761 /// assert_eq!(receiver.recv().unwrap(), 2);
762 /// ```
763 #[stable(feature = "rust1", since = "1.0.0")]
764 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
765     let a = Arc::new(sync::Packet::new(bound));
766     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
767 }
768
769 ////////////////////////////////////////////////////////////////////////////////
770 // Sender
771 ////////////////////////////////////////////////////////////////////////////////
772
773 impl<T> Sender<T> {
774     fn new(inner: Flavor<T>) -> Sender<T> {
775         Sender { inner: UnsafeCell::new(inner) }
776     }
777
778     /// Attempts to send a value on this channel, returning it back if it could
779     /// not be sent.
780     ///
781     /// A successful send occurs when it is determined that the other end of
782     /// the channel has not hung up already. An unsuccessful send would be one
783     /// where the corresponding receiver has already been deallocated. Note
784     /// that a return value of [`Err`] means that the data will never be
785     /// received, but a return value of [`Ok`] does *not* mean that the data
786     /// will be received. It is possible for the corresponding receiver to
787     /// hang up immediately after this function returns [`Ok`].
788     ///
789     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
790     /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
791     ///
792     /// This method will never block the current thread.
793     ///
794     /// # Examples
795     ///
796     /// ```
797     /// use std::sync::mpsc::channel;
798     ///
799     /// let (tx, rx) = channel();
800     ///
801     /// // This send is always successful
802     /// tx.send(1).unwrap();
803     ///
804     /// // This send will fail because the receiver is gone
805     /// drop(rx);
806     /// assert_eq!(tx.send(1).unwrap_err().0, 1);
807     /// ```
808     #[stable(feature = "rust1", since = "1.0.0")]
809     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
810         let (new_inner, ret) = match *unsafe { self.inner() } {
811             Flavor::Oneshot(ref p) => {
812                 if !p.sent() {
813                     return p.send(t).map_err(SendError);
814                 } else {
815                     let a = Arc::new(stream::Packet::new());
816                     let rx = Receiver::new(Flavor::Stream(a.clone()));
817                     match p.upgrade(rx) {
818                         oneshot::UpSuccess => {
819                             let ret = a.send(t);
820                             (a, ret)
821                         }
822                         oneshot::UpDisconnected => (a, Err(t)),
823                         oneshot::UpWoke(token) => {
824                             // This send cannot panic because the thread is
825                             // asleep (we're looking at it), so the receiver
826                             // can't go away.
827                             a.send(t).ok().unwrap();
828                             token.signal();
829                             (a, Ok(()))
830                         }
831                     }
832                 }
833             }
834             Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
835             Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
836             Flavor::Sync(..) => unreachable!(),
837         };
838
839         unsafe {
840             let tmp = Sender::new(Flavor::Stream(new_inner));
841             mem::swap(self.inner_mut(), tmp.inner_mut());
842         }
843         ret.map_err(SendError)
844     }
845 }
846
847 #[stable(feature = "rust1", since = "1.0.0")]
848 impl<T> Clone for Sender<T> {
849     fn clone(&self) -> Sender<T> {
850         let packet = match *unsafe { self.inner() } {
851             Flavor::Oneshot(ref p) => {
852                 let a = Arc::new(shared::Packet::new());
853                 {
854                     let guard = a.postinit_lock();
855                     let rx = Receiver::new(Flavor::Shared(a.clone()));
856                     let sleeper = match p.upgrade(rx) {
857                         oneshot::UpSuccess | oneshot::UpDisconnected => None,
858                         oneshot::UpWoke(task) => Some(task),
859                     };
860                     a.inherit_blocker(sleeper, guard);
861                 }
862                 a
863             }
864             Flavor::Stream(ref p) => {
865                 let a = Arc::new(shared::Packet::new());
866                 {
867                     let guard = a.postinit_lock();
868                     let rx = Receiver::new(Flavor::Shared(a.clone()));
869                     let sleeper = match p.upgrade(rx) {
870                         stream::UpSuccess | stream::UpDisconnected => None,
871                         stream::UpWoke(task) => Some(task),
872                     };
873                     a.inherit_blocker(sleeper, guard);
874                 }
875                 a
876             }
877             Flavor::Shared(ref p) => {
878                 p.clone_chan();
879                 return Sender::new(Flavor::Shared(p.clone()));
880             }
881             Flavor::Sync(..) => unreachable!(),
882         };
883
884         unsafe {
885             let tmp = Sender::new(Flavor::Shared(packet.clone()));
886             mem::swap(self.inner_mut(), tmp.inner_mut());
887         }
888         Sender::new(Flavor::Shared(packet))
889     }
890 }
891
892 #[stable(feature = "rust1", since = "1.0.0")]
893 impl<T> Drop for Sender<T> {
894     fn drop(&mut self) {
895         match *unsafe { self.inner() } {
896             Flavor::Oneshot(ref p) => p.drop_chan(),
897             Flavor::Stream(ref p) => p.drop_chan(),
898             Flavor::Shared(ref p) => p.drop_chan(),
899             Flavor::Sync(..) => unreachable!(),
900         }
901     }
902 }
903
904 #[stable(feature = "mpsc_debug", since = "1.8.0")]
905 impl<T> fmt::Debug for Sender<T> {
906     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
907         f.debug_struct("Sender").finish()
908     }
909 }
910
911 ////////////////////////////////////////////////////////////////////////////////
912 // SyncSender
913 ////////////////////////////////////////////////////////////////////////////////
914
915 impl<T> SyncSender<T> {
916     fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
917         SyncSender { inner }
918     }
919
920     /// Sends a value on this synchronous channel.
921     ///
922     /// This function will *block* until space in the internal buffer becomes
923     /// available or a receiver is available to hand off the message to.
924     ///
925     /// Note that a successful send does *not* guarantee that the receiver will
926     /// ever see the data if there is a buffer on this channel. Items may be
927     /// enqueued in the internal buffer for the receiver to receive at a later
928     /// time. If the buffer size is 0, however, the channel becomes a rendezvous
929     /// channel and it guarantees that the receiver has indeed received
930     /// the data if this function returns success.
931     ///
932     /// This function will never panic, but it may return [`Err`] if the
933     /// [`Receiver`] has disconnected and is no longer able to receive
934     /// information.
935     ///
936     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
937     /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
938     ///
939     /// # Examples
940     ///
941     /// ```rust
942     /// use std::sync::mpsc::sync_channel;
943     /// use std::thread;
944     ///
945     /// // Create a rendezvous sync_channel with buffer size 0
946     /// let (sync_sender, receiver) = sync_channel(0);
947     ///
948     /// thread::spawn(move || {
949     ///    println!("sending message...");
950     ///    sync_sender.send(1).unwrap();
951     ///    // Thread is now blocked until the message is received
952     ///
953     ///    println!("...message received!");
954     /// });
955     ///
956     /// let msg = receiver.recv().unwrap();
957     /// assert_eq!(1, msg);
958     /// ```
959     #[stable(feature = "rust1", since = "1.0.0")]
960     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
961         self.inner.send(t).map_err(SendError)
962     }
963
964     /// Attempts to send a value on this channel without blocking.
965     ///
966     /// This method differs from [`send`] by returning immediately if the
967     /// channel's buffer is full or no receiver is waiting to acquire some
968     /// data. Compared with [`send`], this function has two failure cases
969     /// instead of one (one for disconnection, one for a full buffer).
970     ///
971     /// See [`send`] for notes about guarantees of whether the
972     /// receiver has received the data or not if this function is successful.
973     ///
974     /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
975     ///
976     /// # Examples
977     ///
978     /// ```rust
979     /// use std::sync::mpsc::sync_channel;
980     /// use std::thread;
981     ///
982     /// // Create a sync_channel with buffer size 1
983     /// let (sync_sender, receiver) = sync_channel(1);
984     /// let sync_sender2 = sync_sender.clone();
985     ///
986     /// // First thread owns sync_sender
987     /// thread::spawn(move || {
988     ///     sync_sender.send(1).unwrap();
989     ///     sync_sender.send(2).unwrap();
990     ///     // Thread blocked
991     /// });
992     ///
993     /// // Second thread owns sync_sender2
994     /// thread::spawn(move || {
995     ///     // This will return an error and send
996     ///     // no message if the buffer is full
997     ///     let _ = sync_sender2.try_send(3);
998     /// });
999     ///
1000     /// let mut msg;
1001     /// msg = receiver.recv().unwrap();
1002     /// println!("message {} received", msg);
1003     ///
1004     /// msg = receiver.recv().unwrap();
1005     /// println!("message {} received", msg);
1006     ///
1007     /// // Third message may have never been sent
1008     /// match receiver.try_recv() {
1009     ///     Ok(msg) => println!("message {} received", msg),
1010     ///     Err(_) => println!("the third message was never sent"),
1011     /// }
1012     /// ```
1013     #[stable(feature = "rust1", since = "1.0.0")]
1014     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
1015         self.inner.try_send(t)
1016     }
1017 }
1018
1019 #[stable(feature = "rust1", since = "1.0.0")]
1020 impl<T> Clone for SyncSender<T> {
1021     fn clone(&self) -> SyncSender<T> {
1022         self.inner.clone_chan();
1023         SyncSender::new(self.inner.clone())
1024     }
1025 }
1026
1027 #[stable(feature = "rust1", since = "1.0.0")]
1028 impl<T> Drop for SyncSender<T> {
1029     fn drop(&mut self) {
1030         self.inner.drop_chan();
1031     }
1032 }
1033
1034 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1035 impl<T> fmt::Debug for SyncSender<T> {
1036     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1037         f.debug_struct("SyncSender").finish()
1038     }
1039 }
1040
1041 ////////////////////////////////////////////////////////////////////////////////
1042 // Receiver
1043 ////////////////////////////////////////////////////////////////////////////////
1044
1045 impl<T> Receiver<T> {
1046     fn new(inner: Flavor<T>) -> Receiver<T> {
1047         Receiver { inner: UnsafeCell::new(inner) }
1048     }
1049
1050     /// Attempts to return a pending value on this receiver without blocking.
1051     ///
1052     /// This method will never block the caller in order to wait for data to
1053     /// become available. Instead, this will always return immediately with a
1054     /// possible option of pending data on the channel.
1055     ///
1056     /// This is useful for a flavor of "optimistic check" before deciding to
1057     /// block on a receiver.
1058     ///
1059     /// Compared with [`recv`], this function has two failure cases instead of one
1060     /// (one for disconnection, one for an empty buffer).
1061     ///
1062     /// [`recv`]: struct.Receiver.html#method.recv
1063     ///
1064     /// # Examples
1065     ///
1066     /// ```rust
1067     /// use std::sync::mpsc::{Receiver, channel};
1068     ///
1069     /// let (_, receiver): (_, Receiver<i32>) = channel();
1070     ///
1071     /// assert!(receiver.try_recv().is_err());
1072     /// ```
1073     #[stable(feature = "rust1", since = "1.0.0")]
1074     pub fn try_recv(&self) -> Result<T, TryRecvError> {
1075         loop {
1076             let new_port = match *unsafe { self.inner() } {
1077                 Flavor::Oneshot(ref p) => match p.try_recv() {
1078                     Ok(t) => return Ok(t),
1079                     Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1080                     Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected),
1081                     Err(oneshot::Upgraded(rx)) => rx,
1082                 },
1083                 Flavor::Stream(ref p) => match p.try_recv() {
1084                     Ok(t) => return Ok(t),
1085                     Err(stream::Empty) => return Err(TryRecvError::Empty),
1086                     Err(stream::Disconnected) => return Err(TryRecvError::Disconnected),
1087                     Err(stream::Upgraded(rx)) => rx,
1088                 },
1089                 Flavor::Shared(ref p) => match p.try_recv() {
1090                     Ok(t) => return Ok(t),
1091                     Err(shared::Empty) => return Err(TryRecvError::Empty),
1092                     Err(shared::Disconnected) => return Err(TryRecvError::Disconnected),
1093                 },
1094                 Flavor::Sync(ref p) => match p.try_recv() {
1095                     Ok(t) => return Ok(t),
1096                     Err(sync::Empty) => return Err(TryRecvError::Empty),
1097                     Err(sync::Disconnected) => return Err(TryRecvError::Disconnected),
1098                 },
1099             };
1100             unsafe {
1101                 mem::swap(self.inner_mut(), new_port.inner_mut());
1102             }
1103         }
1104     }
1105
1106     /// Attempts to wait for a value on this receiver, returning an error if the
1107     /// corresponding channel has hung up.
1108     ///
1109     /// This function will always block the current thread if there is no data
1110     /// available and it's possible for more data to be sent. Once a message is
1111     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1112     /// receiver will wake up and return that message.
1113     ///
1114     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1115     /// this call is blocking, this call will wake up and return [`Err`] to
1116     /// indicate that no more messages can ever be received on this channel.
1117     /// However, since channels are buffered, messages sent before the disconnect
1118     /// will still be properly received.
1119     ///
1120     /// [`Sender`]: struct.Sender.html
1121     /// [`SyncSender`]: struct.SyncSender.html
1122     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1123     ///
1124     /// # Examples
1125     ///
1126     /// ```
1127     /// use std::sync::mpsc;
1128     /// use std::thread;
1129     ///
1130     /// let (send, recv) = mpsc::channel();
1131     /// let handle = thread::spawn(move || {
1132     ///     send.send(1u8).unwrap();
1133     /// });
1134     ///
1135     /// handle.join().unwrap();
1136     ///
1137     /// assert_eq!(Ok(1), recv.recv());
1138     /// ```
1139     ///
1140     /// Buffering behavior:
1141     ///
1142     /// ```
1143     /// use std::sync::mpsc;
1144     /// use std::thread;
1145     /// use std::sync::mpsc::RecvError;
1146     ///
1147     /// let (send, recv) = mpsc::channel();
1148     /// let handle = thread::spawn(move || {
1149     ///     send.send(1u8).unwrap();
1150     ///     send.send(2).unwrap();
1151     ///     send.send(3).unwrap();
1152     ///     drop(send);
1153     /// });
1154     ///
1155     /// // wait for the thread to join so we ensure the sender is dropped
1156     /// handle.join().unwrap();
1157     ///
1158     /// assert_eq!(Ok(1), recv.recv());
1159     /// assert_eq!(Ok(2), recv.recv());
1160     /// assert_eq!(Ok(3), recv.recv());
1161     /// assert_eq!(Err(RecvError), recv.recv());
1162     /// ```
1163     #[stable(feature = "rust1", since = "1.0.0")]
1164     pub fn recv(&self) -> Result<T, RecvError> {
1165         loop {
1166             let new_port = match *unsafe { self.inner() } {
1167                 Flavor::Oneshot(ref p) => match p.recv(None) {
1168                     Ok(t) => return Ok(t),
1169                     Err(oneshot::Disconnected) => return Err(RecvError),
1170                     Err(oneshot::Upgraded(rx)) => rx,
1171                     Err(oneshot::Empty) => unreachable!(),
1172                 },
1173                 Flavor::Stream(ref p) => match p.recv(None) {
1174                     Ok(t) => return Ok(t),
1175                     Err(stream::Disconnected) => return Err(RecvError),
1176                     Err(stream::Upgraded(rx)) => rx,
1177                     Err(stream::Empty) => unreachable!(),
1178                 },
1179                 Flavor::Shared(ref p) => match p.recv(None) {
1180                     Ok(t) => return Ok(t),
1181                     Err(shared::Disconnected) => return Err(RecvError),
1182                     Err(shared::Empty) => unreachable!(),
1183                 },
1184                 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1185             };
1186             unsafe {
1187                 mem::swap(self.inner_mut(), new_port.inner_mut());
1188             }
1189         }
1190     }
1191
1192     /// Attempts to wait for a value on this receiver, returning an error if the
1193     /// corresponding channel has hung up, or if it waits more than `timeout`.
1194     ///
1195     /// This function will always block the current thread if there is no data
1196     /// available and it's possible for more data to be sent. Once a message is
1197     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1198     /// receiver will wake up and return that message.
1199     ///
1200     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1201     /// this call is blocking, this call will wake up and return [`Err`] to
1202     /// indicate that no more messages can ever be received on this channel.
1203     /// However, since channels are buffered, messages sent before the disconnect
1204     /// will still be properly received.
1205     ///
1206     /// [`Sender`]: struct.Sender.html
1207     /// [`SyncSender`]: struct.SyncSender.html
1208     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1209     ///
1210     /// # Known Issues
1211     ///
1212     /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
1213     /// to panic unexpectedly with the following example:
1214     ///
1215     /// ```no_run
1216     /// use std::sync::mpsc::channel;
1217     /// use std::thread;
1218     /// use std::time::Duration;
1219     ///
1220     /// let (tx, rx) = channel::<String>();
1221     ///
1222     /// thread::spawn(move || {
1223     ///     let d = Duration::from_millis(10);
1224     ///     loop {
1225     ///         println!("recv");
1226     ///         let _r = rx.recv_timeout(d);
1227     ///     }
1228     /// });
1229     ///
1230     /// thread::sleep(Duration::from_millis(100));
1231     /// let _c1 = tx.clone();
1232     ///
1233     /// thread::sleep(Duration::from_secs(1));
1234     /// ```
1235     ///
1236     /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
1237     ///
1238     /// # Examples
1239     ///
1240     /// Successfully receiving value before encountering timeout:
1241     ///
1242     /// ```no_run
1243     /// use std::thread;
1244     /// use std::time::Duration;
1245     /// use std::sync::mpsc;
1246     ///
1247     /// let (send, recv) = mpsc::channel();
1248     ///
1249     /// thread::spawn(move || {
1250     ///     send.send('a').unwrap();
1251     /// });
1252     ///
1253     /// assert_eq!(
1254     ///     recv.recv_timeout(Duration::from_millis(400)),
1255     ///     Ok('a')
1256     /// );
1257     /// ```
1258     ///
1259     /// Receiving an error upon reaching timeout:
1260     ///
1261     /// ```no_run
1262     /// use std::thread;
1263     /// use std::time::Duration;
1264     /// use std::sync::mpsc;
1265     ///
1266     /// let (send, recv) = mpsc::channel();
1267     ///
1268     /// thread::spawn(move || {
1269     ///     thread::sleep(Duration::from_millis(800));
1270     ///     send.send('a').unwrap();
1271     /// });
1272     ///
1273     /// assert_eq!(
1274     ///     recv.recv_timeout(Duration::from_millis(400)),
1275     ///     Err(mpsc::RecvTimeoutError::Timeout)
1276     /// );
1277     /// ```
1278     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1279     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1280         // Do an optimistic try_recv to avoid the performance impact of
1281         // Instant::now() in the full-channel case.
1282         match self.try_recv() {
1283             Ok(result) => Ok(result),
1284             Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
1285             Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
1286                 Some(deadline) => self.recv_deadline(deadline),
1287                 // So far in the future that it's practically the same as waiting indefinitely.
1288                 None => self.recv().map_err(RecvTimeoutError::from),
1289             },
1290         }
1291     }
1292
1293     /// Attempts to wait for a value on this receiver, returning an error if the
1294     /// corresponding channel has hung up, or if `deadline` is reached.
1295     ///
1296     /// This function will always block the current thread if there is no data
1297     /// available and it's possible for more data to be sent. Once a message is
1298     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1299     /// receiver will wake up and return that message.
1300     ///
1301     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1302     /// this call is blocking, this call will wake up and return [`Err`] to
1303     /// indicate that no more messages can ever be received on this channel.
1304     /// However, since channels are buffered, messages sent before the disconnect
1305     /// will still be properly received.
1306     ///
1307     /// [`Sender`]: struct.Sender.html
1308     /// [`SyncSender`]: struct.SyncSender.html
1309     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1310     ///
1311     /// # Examples
1312     ///
1313     /// Successfully receiving value before reaching deadline:
1314     ///
1315     /// ```no_run
1316     /// #![feature(deadline_api)]
1317     /// use std::thread;
1318     /// use std::time::{Duration, Instant};
1319     /// use std::sync::mpsc;
1320     ///
1321     /// let (send, recv) = mpsc::channel();
1322     ///
1323     /// thread::spawn(move || {
1324     ///     send.send('a').unwrap();
1325     /// });
1326     ///
1327     /// assert_eq!(
1328     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1329     ///     Ok('a')
1330     /// );
1331     /// ```
1332     ///
1333     /// Receiving an error upon reaching deadline:
1334     ///
1335     /// ```no_run
1336     /// #![feature(deadline_api)]
1337     /// use std::thread;
1338     /// use std::time::{Duration, Instant};
1339     /// use std::sync::mpsc;
1340     ///
1341     /// let (send, recv) = mpsc::channel();
1342     ///
1343     /// thread::spawn(move || {
1344     ///     thread::sleep(Duration::from_millis(800));
1345     ///     send.send('a').unwrap();
1346     /// });
1347     ///
1348     /// assert_eq!(
1349     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1350     ///     Err(mpsc::RecvTimeoutError::Timeout)
1351     /// );
1352     /// ```
1353     #[unstable(feature = "deadline_api", issue = "46316")]
1354     pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1355         use self::RecvTimeoutError::*;
1356
1357         loop {
1358             let port_or_empty = match *unsafe { self.inner() } {
1359                 Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) {
1360                     Ok(t) => return Ok(t),
1361                     Err(oneshot::Disconnected) => return Err(Disconnected),
1362                     Err(oneshot::Upgraded(rx)) => Some(rx),
1363                     Err(oneshot::Empty) => None,
1364                 },
1365                 Flavor::Stream(ref p) => match p.recv(Some(deadline)) {
1366                     Ok(t) => return Ok(t),
1367                     Err(stream::Disconnected) => return Err(Disconnected),
1368                     Err(stream::Upgraded(rx)) => Some(rx),
1369                     Err(stream::Empty) => None,
1370                 },
1371                 Flavor::Shared(ref p) => match p.recv(Some(deadline)) {
1372                     Ok(t) => return Ok(t),
1373                     Err(shared::Disconnected) => return Err(Disconnected),
1374                     Err(shared::Empty) => None,
1375                 },
1376                 Flavor::Sync(ref p) => match p.recv(Some(deadline)) {
1377                     Ok(t) => return Ok(t),
1378                     Err(sync::Disconnected) => return Err(Disconnected),
1379                     Err(sync::Empty) => None,
1380                 },
1381             };
1382
1383             if let Some(new_port) = port_or_empty {
1384                 unsafe {
1385                     mem::swap(self.inner_mut(), new_port.inner_mut());
1386                 }
1387             }
1388
1389             // If we're already passed the deadline, and we're here without
1390             // data, return a timeout, else try again.
1391             if Instant::now() >= deadline {
1392                 return Err(Timeout);
1393             }
1394         }
1395     }
1396
1397     /// Returns an iterator that will block waiting for messages, but never
1398     /// [`panic!`]. It will return [`None`] when the channel has hung up.
1399     ///
1400     /// [`panic!`]: ../../../std/macro.panic.html
1401     /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1402     ///
1403     /// # Examples
1404     ///
1405     /// ```rust
1406     /// use std::sync::mpsc::channel;
1407     /// use std::thread;
1408     ///
1409     /// let (send, recv) = channel();
1410     ///
1411     /// thread::spawn(move || {
1412     ///     send.send(1).unwrap();
1413     ///     send.send(2).unwrap();
1414     ///     send.send(3).unwrap();
1415     /// });
1416     ///
1417     /// let mut iter = recv.iter();
1418     /// assert_eq!(iter.next(), Some(1));
1419     /// assert_eq!(iter.next(), Some(2));
1420     /// assert_eq!(iter.next(), Some(3));
1421     /// assert_eq!(iter.next(), None);
1422     /// ```
1423     #[stable(feature = "rust1", since = "1.0.0")]
1424     pub fn iter(&self) -> Iter<'_, T> {
1425         Iter { rx: self }
1426     }
1427
1428     /// Returns an iterator that will attempt to yield all pending values.
1429     /// It will return `None` if there are no more pending values or if the
1430     /// channel has hung up. The iterator will never [`panic!`] or block the
1431     /// user by waiting for values.
1432     ///
1433     /// [`panic!`]: ../../../std/macro.panic.html
1434     ///
1435     /// # Examples
1436     ///
1437     /// ```no_run
1438     /// use std::sync::mpsc::channel;
1439     /// use std::thread;
1440     /// use std::time::Duration;
1441     ///
1442     /// let (sender, receiver) = channel();
1443     ///
1444     /// // nothing is in the buffer yet
1445     /// assert!(receiver.try_iter().next().is_none());
1446     ///
1447     /// thread::spawn(move || {
1448     ///     thread::sleep(Duration::from_secs(1));
1449     ///     sender.send(1).unwrap();
1450     ///     sender.send(2).unwrap();
1451     ///     sender.send(3).unwrap();
1452     /// });
1453     ///
1454     /// // nothing is in the buffer yet
1455     /// assert!(receiver.try_iter().next().is_none());
1456     ///
1457     /// // block for two seconds
1458     /// thread::sleep(Duration::from_secs(2));
1459     ///
1460     /// let mut iter = receiver.try_iter();
1461     /// assert_eq!(iter.next(), Some(1));
1462     /// assert_eq!(iter.next(), Some(2));
1463     /// assert_eq!(iter.next(), Some(3));
1464     /// assert_eq!(iter.next(), None);
1465     /// ```
1466     #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1467     pub fn try_iter(&self) -> TryIter<'_, T> {
1468         TryIter { rx: self }
1469     }
1470 }
1471
1472 #[stable(feature = "rust1", since = "1.0.0")]
1473 impl<'a, T> Iterator for Iter<'a, T> {
1474     type Item = T;
1475
1476     fn next(&mut self) -> Option<T> {
1477         self.rx.recv().ok()
1478     }
1479 }
1480
1481 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1482 impl<'a, T> Iterator for TryIter<'a, T> {
1483     type Item = T;
1484
1485     fn next(&mut self) -> Option<T> {
1486         self.rx.try_recv().ok()
1487     }
1488 }
1489
1490 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1491 impl<'a, T> IntoIterator for &'a Receiver<T> {
1492     type Item = T;
1493     type IntoIter = Iter<'a, T>;
1494
1495     fn into_iter(self) -> Iter<'a, T> {
1496         self.iter()
1497     }
1498 }
1499
1500 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1501 impl<T> Iterator for IntoIter<T> {
1502     type Item = T;
1503     fn next(&mut self) -> Option<T> {
1504         self.rx.recv().ok()
1505     }
1506 }
1507
1508 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1509 impl<T> IntoIterator for Receiver<T> {
1510     type Item = T;
1511     type IntoIter = IntoIter<T>;
1512
1513     fn into_iter(self) -> IntoIter<T> {
1514         IntoIter { rx: self }
1515     }
1516 }
1517
1518 #[stable(feature = "rust1", since = "1.0.0")]
1519 impl<T> Drop for Receiver<T> {
1520     fn drop(&mut self) {
1521         match *unsafe { self.inner() } {
1522             Flavor::Oneshot(ref p) => p.drop_port(),
1523             Flavor::Stream(ref p) => p.drop_port(),
1524             Flavor::Shared(ref p) => p.drop_port(),
1525             Flavor::Sync(ref p) => p.drop_port(),
1526         }
1527     }
1528 }
1529
1530 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1531 impl<T> fmt::Debug for Receiver<T> {
1532     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1533         f.debug_struct("Receiver").finish()
1534     }
1535 }
1536
1537 #[stable(feature = "rust1", since = "1.0.0")]
1538 impl<T> fmt::Debug for SendError<T> {
1539     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1540         "SendError(..)".fmt(f)
1541     }
1542 }
1543
1544 #[stable(feature = "rust1", since = "1.0.0")]
1545 impl<T> fmt::Display for SendError<T> {
1546     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1547         "sending on a closed channel".fmt(f)
1548     }
1549 }
1550
1551 #[stable(feature = "rust1", since = "1.0.0")]
1552 impl<T: Send> error::Error for SendError<T> {
1553     #[allow(deprecated)]
1554     fn description(&self) -> &str {
1555         "sending on a closed channel"
1556     }
1557 }
1558
1559 #[stable(feature = "rust1", since = "1.0.0")]
1560 impl<T> fmt::Debug for TrySendError<T> {
1561     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1562         match *self {
1563             TrySendError::Full(..) => "Full(..)".fmt(f),
1564             TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1565         }
1566     }
1567 }
1568
1569 #[stable(feature = "rust1", since = "1.0.0")]
1570 impl<T> fmt::Display for TrySendError<T> {
1571     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1572         match *self {
1573             TrySendError::Full(..) => "sending on a full channel".fmt(f),
1574             TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
1575         }
1576     }
1577 }
1578
1579 #[stable(feature = "rust1", since = "1.0.0")]
1580 impl<T: Send> error::Error for TrySendError<T> {
1581     #[allow(deprecated)]
1582     fn description(&self) -> &str {
1583         match *self {
1584             TrySendError::Full(..) => "sending on a full channel",
1585             TrySendError::Disconnected(..) => "sending on a closed channel",
1586         }
1587     }
1588 }
1589
1590 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1591 impl<T> From<SendError<T>> for TrySendError<T> {
1592     fn from(err: SendError<T>) -> TrySendError<T> {
1593         match err {
1594             SendError(t) => TrySendError::Disconnected(t),
1595         }
1596     }
1597 }
1598
1599 #[stable(feature = "rust1", since = "1.0.0")]
1600 impl fmt::Display for RecvError {
1601     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1602         "receiving on a closed channel".fmt(f)
1603     }
1604 }
1605
1606 #[stable(feature = "rust1", since = "1.0.0")]
1607 impl error::Error for RecvError {
1608     #[allow(deprecated)]
1609     fn description(&self) -> &str {
1610         "receiving on a closed channel"
1611     }
1612 }
1613
1614 #[stable(feature = "rust1", since = "1.0.0")]
1615 impl fmt::Display for TryRecvError {
1616     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1617         match *self {
1618             TryRecvError::Empty => "receiving on an empty channel".fmt(f),
1619             TryRecvError::Disconnected => "receiving on a closed channel".fmt(f),
1620         }
1621     }
1622 }
1623
1624 #[stable(feature = "rust1", since = "1.0.0")]
1625 impl error::Error for TryRecvError {
1626     #[allow(deprecated)]
1627     fn description(&self) -> &str {
1628         match *self {
1629             TryRecvError::Empty => "receiving on an empty channel",
1630             TryRecvError::Disconnected => "receiving on a closed channel",
1631         }
1632     }
1633 }
1634
1635 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1636 impl From<RecvError> for TryRecvError {
1637     fn from(err: RecvError) -> TryRecvError {
1638         match err {
1639             RecvError => TryRecvError::Disconnected,
1640         }
1641     }
1642 }
1643
1644 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1645 impl fmt::Display for RecvTimeoutError {
1646     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1647         match *self {
1648             RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f),
1649             RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f),
1650         }
1651     }
1652 }
1653
1654 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1655 impl error::Error for RecvTimeoutError {
1656     #[allow(deprecated)]
1657     fn description(&self) -> &str {
1658         match *self {
1659             RecvTimeoutError::Timeout => "timed out waiting on channel",
1660             RecvTimeoutError::Disconnected => "channel is empty and sending half is closed",
1661         }
1662     }
1663 }
1664
1665 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1666 impl From<RecvError> for RecvTimeoutError {
1667     fn from(err: RecvError) -> RecvTimeoutError {
1668         match err {
1669             RecvError => RecvTimeoutError::Disconnected,
1670         }
1671     }
1672 }
1673
1674 #[cfg(all(test, not(target_os = "emscripten")))]
1675 mod tests {
1676     use super::*;
1677     use crate::env;
1678     use crate::thread;
1679     use crate::time::{Duration, Instant};
1680
1681     pub fn stress_factor() -> usize {
1682         match env::var("RUST_TEST_STRESS") {
1683             Ok(val) => val.parse().unwrap(),
1684             Err(..) => 1,
1685         }
1686     }
1687
1688     #[test]
1689     fn smoke() {
1690         let (tx, rx) = channel::<i32>();
1691         tx.send(1).unwrap();
1692         assert_eq!(rx.recv().unwrap(), 1);
1693     }
1694
1695     #[test]
1696     fn drop_full() {
1697         let (tx, _rx) = channel::<Box<isize>>();
1698         tx.send(box 1).unwrap();
1699     }
1700
1701     #[test]
1702     fn drop_full_shared() {
1703         let (tx, _rx) = channel::<Box<isize>>();
1704         drop(tx.clone());
1705         drop(tx.clone());
1706         tx.send(box 1).unwrap();
1707     }
1708
1709     #[test]
1710     fn smoke_shared() {
1711         let (tx, rx) = channel::<i32>();
1712         tx.send(1).unwrap();
1713         assert_eq!(rx.recv().unwrap(), 1);
1714         let tx = tx.clone();
1715         tx.send(1).unwrap();
1716         assert_eq!(rx.recv().unwrap(), 1);
1717     }
1718
1719     #[test]
1720     fn smoke_threads() {
1721         let (tx, rx) = channel::<i32>();
1722         let _t = thread::spawn(move || {
1723             tx.send(1).unwrap();
1724         });
1725         assert_eq!(rx.recv().unwrap(), 1);
1726     }
1727
1728     #[test]
1729     fn smoke_port_gone() {
1730         let (tx, rx) = channel::<i32>();
1731         drop(rx);
1732         assert!(tx.send(1).is_err());
1733     }
1734
1735     #[test]
1736     fn smoke_shared_port_gone() {
1737         let (tx, rx) = channel::<i32>();
1738         drop(rx);
1739         assert!(tx.send(1).is_err())
1740     }
1741
1742     #[test]
1743     fn smoke_shared_port_gone2() {
1744         let (tx, rx) = channel::<i32>();
1745         drop(rx);
1746         let tx2 = tx.clone();
1747         drop(tx);
1748         assert!(tx2.send(1).is_err());
1749     }
1750
1751     #[test]
1752     fn port_gone_concurrent() {
1753         let (tx, rx) = channel::<i32>();
1754         let _t = thread::spawn(move || {
1755             rx.recv().unwrap();
1756         });
1757         while tx.send(1).is_ok() {}
1758     }
1759
1760     #[test]
1761     fn port_gone_concurrent_shared() {
1762         let (tx, rx) = channel::<i32>();
1763         let tx2 = tx.clone();
1764         let _t = thread::spawn(move || {
1765             rx.recv().unwrap();
1766         });
1767         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1768     }
1769
1770     #[test]
1771     fn smoke_chan_gone() {
1772         let (tx, rx) = channel::<i32>();
1773         drop(tx);
1774         assert!(rx.recv().is_err());
1775     }
1776
1777     #[test]
1778     fn smoke_chan_gone_shared() {
1779         let (tx, rx) = channel::<()>();
1780         let tx2 = tx.clone();
1781         drop(tx);
1782         drop(tx2);
1783         assert!(rx.recv().is_err());
1784     }
1785
1786     #[test]
1787     fn chan_gone_concurrent() {
1788         let (tx, rx) = channel::<i32>();
1789         let _t = thread::spawn(move || {
1790             tx.send(1).unwrap();
1791             tx.send(1).unwrap();
1792         });
1793         while rx.recv().is_ok() {}
1794     }
1795
1796     #[test]
1797     fn stress() {
1798         let (tx, rx) = channel::<i32>();
1799         let t = thread::spawn(move || {
1800             for _ in 0..10000 {
1801                 tx.send(1).unwrap();
1802             }
1803         });
1804         for _ in 0..10000 {
1805             assert_eq!(rx.recv().unwrap(), 1);
1806         }
1807         t.join().ok().expect("thread panicked");
1808     }
1809
1810     #[test]
1811     fn stress_shared() {
1812         const AMT: u32 = 10000;
1813         const NTHREADS: u32 = 8;
1814         let (tx, rx) = channel::<i32>();
1815
1816         let t = thread::spawn(move || {
1817             for _ in 0..AMT * NTHREADS {
1818                 assert_eq!(rx.recv().unwrap(), 1);
1819             }
1820             match rx.try_recv() {
1821                 Ok(..) => panic!(),
1822                 _ => {}
1823             }
1824         });
1825
1826         for _ in 0..NTHREADS {
1827             let tx = tx.clone();
1828             thread::spawn(move || {
1829                 for _ in 0..AMT {
1830                     tx.send(1).unwrap();
1831                 }
1832             });
1833         }
1834         drop(tx);
1835         t.join().ok().expect("thread panicked");
1836     }
1837
1838     #[test]
1839     fn send_from_outside_runtime() {
1840         let (tx1, rx1) = channel::<()>();
1841         let (tx2, rx2) = channel::<i32>();
1842         let t1 = thread::spawn(move || {
1843             tx1.send(()).unwrap();
1844             for _ in 0..40 {
1845                 assert_eq!(rx2.recv().unwrap(), 1);
1846             }
1847         });
1848         rx1.recv().unwrap();
1849         let t2 = thread::spawn(move || {
1850             for _ in 0..40 {
1851                 tx2.send(1).unwrap();
1852             }
1853         });
1854         t1.join().ok().expect("thread panicked");
1855         t2.join().ok().expect("thread panicked");
1856     }
1857
1858     #[test]
1859     fn recv_from_outside_runtime() {
1860         let (tx, rx) = channel::<i32>();
1861         let t = thread::spawn(move || {
1862             for _ in 0..40 {
1863                 assert_eq!(rx.recv().unwrap(), 1);
1864             }
1865         });
1866         for _ in 0..40 {
1867             tx.send(1).unwrap();
1868         }
1869         t.join().ok().expect("thread panicked");
1870     }
1871
1872     #[test]
1873     fn no_runtime() {
1874         let (tx1, rx1) = channel::<i32>();
1875         let (tx2, rx2) = channel::<i32>();
1876         let t1 = thread::spawn(move || {
1877             assert_eq!(rx1.recv().unwrap(), 1);
1878             tx2.send(2).unwrap();
1879         });
1880         let t2 = thread::spawn(move || {
1881             tx1.send(1).unwrap();
1882             assert_eq!(rx2.recv().unwrap(), 2);
1883         });
1884         t1.join().ok().expect("thread panicked");
1885         t2.join().ok().expect("thread panicked");
1886     }
1887
1888     #[test]
1889     fn oneshot_single_thread_close_port_first() {
1890         // Simple test of closing without sending
1891         let (_tx, rx) = channel::<i32>();
1892         drop(rx);
1893     }
1894
1895     #[test]
1896     fn oneshot_single_thread_close_chan_first() {
1897         // Simple test of closing without sending
1898         let (tx, _rx) = channel::<i32>();
1899         drop(tx);
1900     }
1901
1902     #[test]
1903     fn oneshot_single_thread_send_port_close() {
1904         // Testing that the sender cleans up the payload if receiver is closed
1905         let (tx, rx) = channel::<Box<i32>>();
1906         drop(rx);
1907         assert!(tx.send(box 0).is_err());
1908     }
1909
1910     #[test]
1911     fn oneshot_single_thread_recv_chan_close() {
1912         // Receiving on a closed chan will panic
1913         let res = thread::spawn(move || {
1914             let (tx, rx) = channel::<i32>();
1915             drop(tx);
1916             rx.recv().unwrap();
1917         })
1918         .join();
1919         // What is our res?
1920         assert!(res.is_err());
1921     }
1922
1923     #[test]
1924     fn oneshot_single_thread_send_then_recv() {
1925         let (tx, rx) = channel::<Box<i32>>();
1926         tx.send(box 10).unwrap();
1927         assert!(*rx.recv().unwrap() == 10);
1928     }
1929
1930     #[test]
1931     fn oneshot_single_thread_try_send_open() {
1932         let (tx, rx) = channel::<i32>();
1933         assert!(tx.send(10).is_ok());
1934         assert!(rx.recv().unwrap() == 10);
1935     }
1936
1937     #[test]
1938     fn oneshot_single_thread_try_send_closed() {
1939         let (tx, rx) = channel::<i32>();
1940         drop(rx);
1941         assert!(tx.send(10).is_err());
1942     }
1943
1944     #[test]
1945     fn oneshot_single_thread_try_recv_open() {
1946         let (tx, rx) = channel::<i32>();
1947         tx.send(10).unwrap();
1948         assert!(rx.recv() == Ok(10));
1949     }
1950
1951     #[test]
1952     fn oneshot_single_thread_try_recv_closed() {
1953         let (tx, rx) = channel::<i32>();
1954         drop(tx);
1955         assert!(rx.recv().is_err());
1956     }
1957
1958     #[test]
1959     fn oneshot_single_thread_peek_data() {
1960         let (tx, rx) = channel::<i32>();
1961         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1962         tx.send(10).unwrap();
1963         assert_eq!(rx.try_recv(), Ok(10));
1964     }
1965
1966     #[test]
1967     fn oneshot_single_thread_peek_close() {
1968         let (tx, rx) = channel::<i32>();
1969         drop(tx);
1970         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1971         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1972     }
1973
1974     #[test]
1975     fn oneshot_single_thread_peek_open() {
1976         let (_tx, rx) = channel::<i32>();
1977         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1978     }
1979
1980     #[test]
1981     fn oneshot_multi_task_recv_then_send() {
1982         let (tx, rx) = channel::<Box<i32>>();
1983         let _t = thread::spawn(move || {
1984             assert!(*rx.recv().unwrap() == 10);
1985         });
1986
1987         tx.send(box 10).unwrap();
1988     }
1989
1990     #[test]
1991     fn oneshot_multi_task_recv_then_close() {
1992         let (tx, rx) = channel::<Box<i32>>();
1993         let _t = thread::spawn(move || {
1994             drop(tx);
1995         });
1996         let res = thread::spawn(move || {
1997             assert!(*rx.recv().unwrap() == 10);
1998         })
1999         .join();
2000         assert!(res.is_err());
2001     }
2002
2003     #[test]
2004     fn oneshot_multi_thread_close_stress() {
2005         for _ in 0..stress_factor() {
2006             let (tx, rx) = channel::<i32>();
2007             let _t = thread::spawn(move || {
2008                 drop(rx);
2009             });
2010             drop(tx);
2011         }
2012     }
2013
2014     #[test]
2015     fn oneshot_multi_thread_send_close_stress() {
2016         for _ in 0..stress_factor() {
2017             let (tx, rx) = channel::<i32>();
2018             let _t = thread::spawn(move || {
2019                 drop(rx);
2020             });
2021             let _ = thread::spawn(move || {
2022                 tx.send(1).unwrap();
2023             })
2024             .join();
2025         }
2026     }
2027
2028     #[test]
2029     fn oneshot_multi_thread_recv_close_stress() {
2030         for _ in 0..stress_factor() {
2031             let (tx, rx) = channel::<i32>();
2032             thread::spawn(move || {
2033                 let res = thread::spawn(move || {
2034                     rx.recv().unwrap();
2035                 })
2036                 .join();
2037                 assert!(res.is_err());
2038             });
2039             let _t = thread::spawn(move || {
2040                 thread::spawn(move || {
2041                     drop(tx);
2042                 });
2043             });
2044         }
2045     }
2046
2047     #[test]
2048     fn oneshot_multi_thread_send_recv_stress() {
2049         for _ in 0..stress_factor() {
2050             let (tx, rx) = channel::<Box<isize>>();
2051             let _t = thread::spawn(move || {
2052                 tx.send(box 10).unwrap();
2053             });
2054             assert!(*rx.recv().unwrap() == 10);
2055         }
2056     }
2057
2058     #[test]
2059     fn stream_send_recv_stress() {
2060         for _ in 0..stress_factor() {
2061             let (tx, rx) = channel();
2062
2063             send(tx, 0);
2064             recv(rx, 0);
2065
2066             fn send(tx: Sender<Box<i32>>, i: i32) {
2067                 if i == 10 {
2068                     return;
2069                 }
2070
2071                 thread::spawn(move || {
2072                     tx.send(box i).unwrap();
2073                     send(tx, i + 1);
2074                 });
2075             }
2076
2077             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2078                 if i == 10 {
2079                     return;
2080                 }
2081
2082                 thread::spawn(move || {
2083                     assert!(*rx.recv().unwrap() == i);
2084                     recv(rx, i + 1);
2085                 });
2086             }
2087         }
2088     }
2089
2090     #[test]
2091     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2092     fn oneshot_single_thread_recv_timeout() {
2093         let (tx, rx) = channel();
2094         tx.send(()).unwrap();
2095         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2096         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2097         tx.send(()).unwrap();
2098         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2099     }
2100
2101     #[test]
2102     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2103     fn stress_recv_timeout_two_threads() {
2104         let (tx, rx) = channel();
2105         let stress = stress_factor() + 100;
2106         let timeout = Duration::from_millis(100);
2107
2108         thread::spawn(move || {
2109             for i in 0..stress {
2110                 if i % 2 == 0 {
2111                     thread::sleep(timeout * 2);
2112                 }
2113                 tx.send(1usize).unwrap();
2114             }
2115         });
2116
2117         let mut recv_count = 0;
2118         loop {
2119             match rx.recv_timeout(timeout) {
2120                 Ok(n) => {
2121                     assert_eq!(n, 1usize);
2122                     recv_count += 1;
2123                 }
2124                 Err(RecvTimeoutError::Timeout) => continue,
2125                 Err(RecvTimeoutError::Disconnected) => break,
2126             }
2127         }
2128
2129         assert_eq!(recv_count, stress);
2130     }
2131
2132     #[test]
2133     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2134     fn recv_timeout_upgrade() {
2135         let (tx, rx) = channel::<()>();
2136         let timeout = Duration::from_millis(1);
2137         let _tx_clone = tx.clone();
2138
2139         let start = Instant::now();
2140         assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
2141         assert!(Instant::now() >= start + timeout);
2142     }
2143
2144     #[test]
2145     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2146     fn stress_recv_timeout_shared() {
2147         let (tx, rx) = channel();
2148         let stress = stress_factor() + 100;
2149
2150         for i in 0..stress {
2151             let tx = tx.clone();
2152             thread::spawn(move || {
2153                 thread::sleep(Duration::from_millis(i as u64 * 10));
2154                 tx.send(1usize).unwrap();
2155             });
2156         }
2157
2158         drop(tx);
2159
2160         let mut recv_count = 0;
2161         loop {
2162             match rx.recv_timeout(Duration::from_millis(10)) {
2163                 Ok(n) => {
2164                     assert_eq!(n, 1usize);
2165                     recv_count += 1;
2166                 }
2167                 Err(RecvTimeoutError::Timeout) => continue,
2168                 Err(RecvTimeoutError::Disconnected) => break,
2169             }
2170         }
2171
2172         assert_eq!(recv_count, stress);
2173     }
2174
2175     #[test]
2176     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2177     fn very_long_recv_timeout_wont_panic() {
2178         let (tx, rx) = channel::<()>();
2179         let join_handle =
2180             thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::max_value())));
2181         thread::sleep(Duration::from_secs(1));
2182         assert!(tx.send(()).is_ok());
2183         assert_eq!(join_handle.join().unwrap(), Ok(()));
2184     }
2185
2186     #[test]
2187     fn recv_a_lot() {
2188         // Regression test that we don't run out of stack in scheduler context
2189         let (tx, rx) = channel();
2190         for _ in 0..10000 {
2191             tx.send(()).unwrap();
2192         }
2193         for _ in 0..10000 {
2194             rx.recv().unwrap();
2195         }
2196     }
2197
2198     #[test]
2199     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2200     fn shared_recv_timeout() {
2201         let (tx, rx) = channel();
2202         let total = 5;
2203         for _ in 0..total {
2204             let tx = tx.clone();
2205             thread::spawn(move || {
2206                 tx.send(()).unwrap();
2207             });
2208         }
2209
2210         for _ in 0..total {
2211             rx.recv().unwrap();
2212         }
2213
2214         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2215         tx.send(()).unwrap();
2216         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2217     }
2218
2219     #[test]
2220     fn shared_chan_stress() {
2221         let (tx, rx) = channel();
2222         let total = stress_factor() + 100;
2223         for _ in 0..total {
2224             let tx = tx.clone();
2225             thread::spawn(move || {
2226                 tx.send(()).unwrap();
2227             });
2228         }
2229
2230         for _ in 0..total {
2231             rx.recv().unwrap();
2232         }
2233     }
2234
2235     #[test]
2236     fn test_nested_recv_iter() {
2237         let (tx, rx) = channel::<i32>();
2238         let (total_tx, total_rx) = channel::<i32>();
2239
2240         let _t = thread::spawn(move || {
2241             let mut acc = 0;
2242             for x in rx.iter() {
2243                 acc += x;
2244             }
2245             total_tx.send(acc).unwrap();
2246         });
2247
2248         tx.send(3).unwrap();
2249         tx.send(1).unwrap();
2250         tx.send(2).unwrap();
2251         drop(tx);
2252         assert_eq!(total_rx.recv().unwrap(), 6);
2253     }
2254
2255     #[test]
2256     fn test_recv_iter_break() {
2257         let (tx, rx) = channel::<i32>();
2258         let (count_tx, count_rx) = channel();
2259
2260         let _t = thread::spawn(move || {
2261             let mut count = 0;
2262             for x in rx.iter() {
2263                 if count >= 3 {
2264                     break;
2265                 } else {
2266                     count += x;
2267                 }
2268             }
2269             count_tx.send(count).unwrap();
2270         });
2271
2272         tx.send(2).unwrap();
2273         tx.send(2).unwrap();
2274         tx.send(2).unwrap();
2275         let _ = tx.send(2);
2276         drop(tx);
2277         assert_eq!(count_rx.recv().unwrap(), 4);
2278     }
2279
2280     #[test]
2281     fn test_recv_try_iter() {
2282         let (request_tx, request_rx) = channel();
2283         let (response_tx, response_rx) = channel();
2284
2285         // Request `x`s until we have `6`.
2286         let t = thread::spawn(move || {
2287             let mut count = 0;
2288             loop {
2289                 for x in response_rx.try_iter() {
2290                     count += x;
2291                     if count == 6 {
2292                         return count;
2293                     }
2294                 }
2295                 request_tx.send(()).unwrap();
2296             }
2297         });
2298
2299         for _ in request_rx.iter() {
2300             if response_tx.send(2).is_err() {
2301                 break;
2302             }
2303         }
2304
2305         assert_eq!(t.join().unwrap(), 6);
2306     }
2307
2308     #[test]
2309     fn test_recv_into_iter_owned() {
2310         let mut iter = {
2311             let (tx, rx) = channel::<i32>();
2312             tx.send(1).unwrap();
2313             tx.send(2).unwrap();
2314
2315             rx.into_iter()
2316         };
2317         assert_eq!(iter.next().unwrap(), 1);
2318         assert_eq!(iter.next().unwrap(), 2);
2319         assert_eq!(iter.next().is_none(), true);
2320     }
2321
2322     #[test]
2323     fn test_recv_into_iter_borrowed() {
2324         let (tx, rx) = channel::<i32>();
2325         tx.send(1).unwrap();
2326         tx.send(2).unwrap();
2327         drop(tx);
2328         let mut iter = (&rx).into_iter();
2329         assert_eq!(iter.next().unwrap(), 1);
2330         assert_eq!(iter.next().unwrap(), 2);
2331         assert_eq!(iter.next().is_none(), true);
2332     }
2333
2334     #[test]
2335     fn try_recv_states() {
2336         let (tx1, rx1) = channel::<i32>();
2337         let (tx2, rx2) = channel::<()>();
2338         let (tx3, rx3) = channel::<()>();
2339         let _t = thread::spawn(move || {
2340             rx2.recv().unwrap();
2341             tx1.send(1).unwrap();
2342             tx3.send(()).unwrap();
2343             rx2.recv().unwrap();
2344             drop(tx1);
2345             tx3.send(()).unwrap();
2346         });
2347
2348         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2349         tx2.send(()).unwrap();
2350         rx3.recv().unwrap();
2351         assert_eq!(rx1.try_recv(), Ok(1));
2352         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2353         tx2.send(()).unwrap();
2354         rx3.recv().unwrap();
2355         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2356     }
2357
2358     // This bug used to end up in a livelock inside of the Receiver destructor
2359     // because the internal state of the Shared packet was corrupted
2360     #[test]
2361     fn destroy_upgraded_shared_port_when_sender_still_active() {
2362         let (tx, rx) = channel();
2363         let (tx2, rx2) = channel();
2364         let _t = thread::spawn(move || {
2365             rx.recv().unwrap(); // wait on a oneshot
2366             drop(rx); // destroy a shared
2367             tx2.send(()).unwrap();
2368         });
2369         // make sure the other thread has gone to sleep
2370         for _ in 0..5000 {
2371             thread::yield_now();
2372         }
2373
2374         // upgrade to a shared chan and send a message
2375         let t = tx.clone();
2376         drop(tx);
2377         t.send(()).unwrap();
2378
2379         // wait for the child thread to exit before we exit
2380         rx2.recv().unwrap();
2381     }
2382
2383     #[test]
2384     fn issue_32114() {
2385         let (tx, _) = channel();
2386         let _ = tx.send(123);
2387         assert_eq!(tx.send(123), Err(SendError(123)));
2388     }
2389 }
2390
2391 #[cfg(all(test, not(target_os = "emscripten")))]
2392 mod sync_tests {
2393     use super::*;
2394     use crate::env;
2395     use crate::thread;
2396     use crate::time::Duration;
2397
2398     pub fn stress_factor() -> usize {
2399         match env::var("RUST_TEST_STRESS") {
2400             Ok(val) => val.parse().unwrap(),
2401             Err(..) => 1,
2402         }
2403     }
2404
2405     #[test]
2406     fn smoke() {
2407         let (tx, rx) = sync_channel::<i32>(1);
2408         tx.send(1).unwrap();
2409         assert_eq!(rx.recv().unwrap(), 1);
2410     }
2411
2412     #[test]
2413     fn drop_full() {
2414         let (tx, _rx) = sync_channel::<Box<isize>>(1);
2415         tx.send(box 1).unwrap();
2416     }
2417
2418     #[test]
2419     fn smoke_shared() {
2420         let (tx, rx) = sync_channel::<i32>(1);
2421         tx.send(1).unwrap();
2422         assert_eq!(rx.recv().unwrap(), 1);
2423         let tx = tx.clone();
2424         tx.send(1).unwrap();
2425         assert_eq!(rx.recv().unwrap(), 1);
2426     }
2427
2428     #[test]
2429     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2430     fn recv_timeout() {
2431         let (tx, rx) = sync_channel::<i32>(1);
2432         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2433         tx.send(1).unwrap();
2434         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2435     }
2436
2437     #[test]
2438     fn smoke_threads() {
2439         let (tx, rx) = sync_channel::<i32>(0);
2440         let _t = thread::spawn(move || {
2441             tx.send(1).unwrap();
2442         });
2443         assert_eq!(rx.recv().unwrap(), 1);
2444     }
2445
2446     #[test]
2447     fn smoke_port_gone() {
2448         let (tx, rx) = sync_channel::<i32>(0);
2449         drop(rx);
2450         assert!(tx.send(1).is_err());
2451     }
2452
2453     #[test]
2454     fn smoke_shared_port_gone2() {
2455         let (tx, rx) = sync_channel::<i32>(0);
2456         drop(rx);
2457         let tx2 = tx.clone();
2458         drop(tx);
2459         assert!(tx2.send(1).is_err());
2460     }
2461
2462     #[test]
2463     fn port_gone_concurrent() {
2464         let (tx, rx) = sync_channel::<i32>(0);
2465         let _t = thread::spawn(move || {
2466             rx.recv().unwrap();
2467         });
2468         while tx.send(1).is_ok() {}
2469     }
2470
2471     #[test]
2472     fn port_gone_concurrent_shared() {
2473         let (tx, rx) = sync_channel::<i32>(0);
2474         let tx2 = tx.clone();
2475         let _t = thread::spawn(move || {
2476             rx.recv().unwrap();
2477         });
2478         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2479     }
2480
2481     #[test]
2482     fn smoke_chan_gone() {
2483         let (tx, rx) = sync_channel::<i32>(0);
2484         drop(tx);
2485         assert!(rx.recv().is_err());
2486     }
2487
2488     #[test]
2489     fn smoke_chan_gone_shared() {
2490         let (tx, rx) = sync_channel::<()>(0);
2491         let tx2 = tx.clone();
2492         drop(tx);
2493         drop(tx2);
2494         assert!(rx.recv().is_err());
2495     }
2496
2497     #[test]
2498     fn chan_gone_concurrent() {
2499         let (tx, rx) = sync_channel::<i32>(0);
2500         thread::spawn(move || {
2501             tx.send(1).unwrap();
2502             tx.send(1).unwrap();
2503         });
2504         while rx.recv().is_ok() {}
2505     }
2506
2507     #[test]
2508     fn stress() {
2509         let (tx, rx) = sync_channel::<i32>(0);
2510         thread::spawn(move || {
2511             for _ in 0..10000 {
2512                 tx.send(1).unwrap();
2513             }
2514         });
2515         for _ in 0..10000 {
2516             assert_eq!(rx.recv().unwrap(), 1);
2517         }
2518     }
2519
2520     #[test]
2521     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2522     fn stress_recv_timeout_two_threads() {
2523         let (tx, rx) = sync_channel::<i32>(0);
2524
2525         thread::spawn(move || {
2526             for _ in 0..10000 {
2527                 tx.send(1).unwrap();
2528             }
2529         });
2530
2531         let mut recv_count = 0;
2532         loop {
2533             match rx.recv_timeout(Duration::from_millis(1)) {
2534                 Ok(v) => {
2535                     assert_eq!(v, 1);
2536                     recv_count += 1;
2537                 }
2538                 Err(RecvTimeoutError::Timeout) => continue,
2539                 Err(RecvTimeoutError::Disconnected) => break,
2540             }
2541         }
2542
2543         assert_eq!(recv_count, 10000);
2544     }
2545
2546     #[test]
2547     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2548     fn stress_recv_timeout_shared() {
2549         const AMT: u32 = 1000;
2550         const NTHREADS: u32 = 8;
2551         let (tx, rx) = sync_channel::<i32>(0);
2552         let (dtx, drx) = sync_channel::<()>(0);
2553
2554         thread::spawn(move || {
2555             let mut recv_count = 0;
2556             loop {
2557                 match rx.recv_timeout(Duration::from_millis(10)) {
2558                     Ok(v) => {
2559                         assert_eq!(v, 1);
2560                         recv_count += 1;
2561                     }
2562                     Err(RecvTimeoutError::Timeout) => continue,
2563                     Err(RecvTimeoutError::Disconnected) => break,
2564                 }
2565             }
2566
2567             assert_eq!(recv_count, AMT * NTHREADS);
2568             assert!(rx.try_recv().is_err());
2569
2570             dtx.send(()).unwrap();
2571         });
2572
2573         for _ in 0..NTHREADS {
2574             let tx = tx.clone();
2575             thread::spawn(move || {
2576                 for _ in 0..AMT {
2577                     tx.send(1).unwrap();
2578                 }
2579             });
2580         }
2581
2582         drop(tx);
2583
2584         drx.recv().unwrap();
2585     }
2586
2587     #[test]
2588     fn stress_shared() {
2589         const AMT: u32 = 1000;
2590         const NTHREADS: u32 = 8;
2591         let (tx, rx) = sync_channel::<i32>(0);
2592         let (dtx, drx) = sync_channel::<()>(0);
2593
2594         thread::spawn(move || {
2595             for _ in 0..AMT * NTHREADS {
2596                 assert_eq!(rx.recv().unwrap(), 1);
2597             }
2598             match rx.try_recv() {
2599                 Ok(..) => panic!(),
2600                 _ => {}
2601             }
2602             dtx.send(()).unwrap();
2603         });
2604
2605         for _ in 0..NTHREADS {
2606             let tx = tx.clone();
2607             thread::spawn(move || {
2608                 for _ in 0..AMT {
2609                     tx.send(1).unwrap();
2610                 }
2611             });
2612         }
2613         drop(tx);
2614         drx.recv().unwrap();
2615     }
2616
2617     #[test]
2618     fn oneshot_single_thread_close_port_first() {
2619         // Simple test of closing without sending
2620         let (_tx, rx) = sync_channel::<i32>(0);
2621         drop(rx);
2622     }
2623
2624     #[test]
2625     fn oneshot_single_thread_close_chan_first() {
2626         // Simple test of closing without sending
2627         let (tx, _rx) = sync_channel::<i32>(0);
2628         drop(tx);
2629     }
2630
2631     #[test]
2632     fn oneshot_single_thread_send_port_close() {
2633         // Testing that the sender cleans up the payload if receiver is closed
2634         let (tx, rx) = sync_channel::<Box<i32>>(0);
2635         drop(rx);
2636         assert!(tx.send(box 0).is_err());
2637     }
2638
2639     #[test]
2640     fn oneshot_single_thread_recv_chan_close() {
2641         // Receiving on a closed chan will panic
2642         let res = thread::spawn(move || {
2643             let (tx, rx) = sync_channel::<i32>(0);
2644             drop(tx);
2645             rx.recv().unwrap();
2646         })
2647         .join();
2648         // What is our res?
2649         assert!(res.is_err());
2650     }
2651
2652     #[test]
2653     fn oneshot_single_thread_send_then_recv() {
2654         let (tx, rx) = sync_channel::<Box<i32>>(1);
2655         tx.send(box 10).unwrap();
2656         assert!(*rx.recv().unwrap() == 10);
2657     }
2658
2659     #[test]
2660     fn oneshot_single_thread_try_send_open() {
2661         let (tx, rx) = sync_channel::<i32>(1);
2662         assert_eq!(tx.try_send(10), Ok(()));
2663         assert!(rx.recv().unwrap() == 10);
2664     }
2665
2666     #[test]
2667     fn oneshot_single_thread_try_send_closed() {
2668         let (tx, rx) = sync_channel::<i32>(0);
2669         drop(rx);
2670         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2671     }
2672
2673     #[test]
2674     fn oneshot_single_thread_try_send_closed2() {
2675         let (tx, _rx) = sync_channel::<i32>(0);
2676         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2677     }
2678
2679     #[test]
2680     fn oneshot_single_thread_try_recv_open() {
2681         let (tx, rx) = sync_channel::<i32>(1);
2682         tx.send(10).unwrap();
2683         assert!(rx.recv() == Ok(10));
2684     }
2685
2686     #[test]
2687     fn oneshot_single_thread_try_recv_closed() {
2688         let (tx, rx) = sync_channel::<i32>(0);
2689         drop(tx);
2690         assert!(rx.recv().is_err());
2691     }
2692
2693     #[test]
2694     fn oneshot_single_thread_try_recv_closed_with_data() {
2695         let (tx, rx) = sync_channel::<i32>(1);
2696         tx.send(10).unwrap();
2697         drop(tx);
2698         assert_eq!(rx.try_recv(), Ok(10));
2699         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2700     }
2701
2702     #[test]
2703     fn oneshot_single_thread_peek_data() {
2704         let (tx, rx) = sync_channel::<i32>(1);
2705         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2706         tx.send(10).unwrap();
2707         assert_eq!(rx.try_recv(), Ok(10));
2708     }
2709
2710     #[test]
2711     fn oneshot_single_thread_peek_close() {
2712         let (tx, rx) = sync_channel::<i32>(0);
2713         drop(tx);
2714         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2715         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2716     }
2717
2718     #[test]
2719     fn oneshot_single_thread_peek_open() {
2720         let (_tx, rx) = sync_channel::<i32>(0);
2721         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2722     }
2723
2724     #[test]
2725     fn oneshot_multi_task_recv_then_send() {
2726         let (tx, rx) = sync_channel::<Box<i32>>(0);
2727         let _t = thread::spawn(move || {
2728             assert!(*rx.recv().unwrap() == 10);
2729         });
2730
2731         tx.send(box 10).unwrap();
2732     }
2733
2734     #[test]
2735     fn oneshot_multi_task_recv_then_close() {
2736         let (tx, rx) = sync_channel::<Box<i32>>(0);
2737         let _t = thread::spawn(move || {
2738             drop(tx);
2739         });
2740         let res = thread::spawn(move || {
2741             assert!(*rx.recv().unwrap() == 10);
2742         })
2743         .join();
2744         assert!(res.is_err());
2745     }
2746
2747     #[test]
2748     fn oneshot_multi_thread_close_stress() {
2749         for _ in 0..stress_factor() {
2750             let (tx, rx) = sync_channel::<i32>(0);
2751             let _t = thread::spawn(move || {
2752                 drop(rx);
2753             });
2754             drop(tx);
2755         }
2756     }
2757
2758     #[test]
2759     fn oneshot_multi_thread_send_close_stress() {
2760         for _ in 0..stress_factor() {
2761             let (tx, rx) = sync_channel::<i32>(0);
2762             let _t = thread::spawn(move || {
2763                 drop(rx);
2764             });
2765             let _ = thread::spawn(move || {
2766                 tx.send(1).unwrap();
2767             })
2768             .join();
2769         }
2770     }
2771
2772     #[test]
2773     fn oneshot_multi_thread_recv_close_stress() {
2774         for _ in 0..stress_factor() {
2775             let (tx, rx) = sync_channel::<i32>(0);
2776             let _t = thread::spawn(move || {
2777                 let res = thread::spawn(move || {
2778                     rx.recv().unwrap();
2779                 })
2780                 .join();
2781                 assert!(res.is_err());
2782             });
2783             let _t = thread::spawn(move || {
2784                 thread::spawn(move || {
2785                     drop(tx);
2786                 });
2787             });
2788         }
2789     }
2790
2791     #[test]
2792     fn oneshot_multi_thread_send_recv_stress() {
2793         for _ in 0..stress_factor() {
2794             let (tx, rx) = sync_channel::<Box<i32>>(0);
2795             let _t = thread::spawn(move || {
2796                 tx.send(box 10).unwrap();
2797             });
2798             assert!(*rx.recv().unwrap() == 10);
2799         }
2800     }
2801
2802     #[test]
2803     fn stream_send_recv_stress() {
2804         for _ in 0..stress_factor() {
2805             let (tx, rx) = sync_channel::<Box<i32>>(0);
2806
2807             send(tx, 0);
2808             recv(rx, 0);
2809
2810             fn send(tx: SyncSender<Box<i32>>, i: i32) {
2811                 if i == 10 {
2812                     return;
2813                 }
2814
2815                 thread::spawn(move || {
2816                     tx.send(box i).unwrap();
2817                     send(tx, i + 1);
2818                 });
2819             }
2820
2821             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2822                 if i == 10 {
2823                     return;
2824                 }
2825
2826                 thread::spawn(move || {
2827                     assert!(*rx.recv().unwrap() == i);
2828                     recv(rx, i + 1);
2829                 });
2830             }
2831         }
2832     }
2833
2834     #[test]
2835     fn recv_a_lot() {
2836         // Regression test that we don't run out of stack in scheduler context
2837         let (tx, rx) = sync_channel(10000);
2838         for _ in 0..10000 {
2839             tx.send(()).unwrap();
2840         }
2841         for _ in 0..10000 {
2842             rx.recv().unwrap();
2843         }
2844     }
2845
2846     #[test]
2847     fn shared_chan_stress() {
2848         let (tx, rx) = sync_channel(0);
2849         let total = stress_factor() + 100;
2850         for _ in 0..total {
2851             let tx = tx.clone();
2852             thread::spawn(move || {
2853                 tx.send(()).unwrap();
2854             });
2855         }
2856
2857         for _ in 0..total {
2858             rx.recv().unwrap();
2859         }
2860     }
2861
2862     #[test]
2863     fn test_nested_recv_iter() {
2864         let (tx, rx) = sync_channel::<i32>(0);
2865         let (total_tx, total_rx) = sync_channel::<i32>(0);
2866
2867         let _t = thread::spawn(move || {
2868             let mut acc = 0;
2869             for x in rx.iter() {
2870                 acc += x;
2871             }
2872             total_tx.send(acc).unwrap();
2873         });
2874
2875         tx.send(3).unwrap();
2876         tx.send(1).unwrap();
2877         tx.send(2).unwrap();
2878         drop(tx);
2879         assert_eq!(total_rx.recv().unwrap(), 6);
2880     }
2881
2882     #[test]
2883     fn test_recv_iter_break() {
2884         let (tx, rx) = sync_channel::<i32>(0);
2885         let (count_tx, count_rx) = sync_channel(0);
2886
2887         let _t = thread::spawn(move || {
2888             let mut count = 0;
2889             for x in rx.iter() {
2890                 if count >= 3 {
2891                     break;
2892                 } else {
2893                     count += x;
2894                 }
2895             }
2896             count_tx.send(count).unwrap();
2897         });
2898
2899         tx.send(2).unwrap();
2900         tx.send(2).unwrap();
2901         tx.send(2).unwrap();
2902         let _ = tx.try_send(2);
2903         drop(tx);
2904         assert_eq!(count_rx.recv().unwrap(), 4);
2905     }
2906
2907     #[test]
2908     fn try_recv_states() {
2909         let (tx1, rx1) = sync_channel::<i32>(1);
2910         let (tx2, rx2) = sync_channel::<()>(1);
2911         let (tx3, rx3) = sync_channel::<()>(1);
2912         let _t = thread::spawn(move || {
2913             rx2.recv().unwrap();
2914             tx1.send(1).unwrap();
2915             tx3.send(()).unwrap();
2916             rx2.recv().unwrap();
2917             drop(tx1);
2918             tx3.send(()).unwrap();
2919         });
2920
2921         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2922         tx2.send(()).unwrap();
2923         rx3.recv().unwrap();
2924         assert_eq!(rx1.try_recv(), Ok(1));
2925         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2926         tx2.send(()).unwrap();
2927         rx3.recv().unwrap();
2928         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2929     }
2930
2931     // This bug used to end up in a livelock inside of the Receiver destructor
2932     // because the internal state of the Shared packet was corrupted
2933     #[test]
2934     fn destroy_upgraded_shared_port_when_sender_still_active() {
2935         let (tx, rx) = sync_channel::<()>(0);
2936         let (tx2, rx2) = sync_channel::<()>(0);
2937         let _t = thread::spawn(move || {
2938             rx.recv().unwrap(); // wait on a oneshot
2939             drop(rx); // destroy a shared
2940             tx2.send(()).unwrap();
2941         });
2942         // make sure the other thread has gone to sleep
2943         for _ in 0..5000 {
2944             thread::yield_now();
2945         }
2946
2947         // upgrade to a shared chan and send a message
2948         let t = tx.clone();
2949         drop(tx);
2950         t.send(()).unwrap();
2951
2952         // wait for the child thread to exit before we exit
2953         rx2.recv().unwrap();
2954     }
2955
2956     #[test]
2957     fn send1() {
2958         let (tx, rx) = sync_channel::<i32>(0);
2959         let _t = thread::spawn(move || {
2960             rx.recv().unwrap();
2961         });
2962         assert_eq!(tx.send(1), Ok(()));
2963     }
2964
2965     #[test]
2966     fn send2() {
2967         let (tx, rx) = sync_channel::<i32>(0);
2968         let _t = thread::spawn(move || {
2969             drop(rx);
2970         });
2971         assert!(tx.send(1).is_err());
2972     }
2973
2974     #[test]
2975     fn send3() {
2976         let (tx, rx) = sync_channel::<i32>(1);
2977         assert_eq!(tx.send(1), Ok(()));
2978         let _t = thread::spawn(move || {
2979             drop(rx);
2980         });
2981         assert!(tx.send(1).is_err());
2982     }
2983
2984     #[test]
2985     fn send4() {
2986         let (tx, rx) = sync_channel::<i32>(0);
2987         let tx2 = tx.clone();
2988         let (done, donerx) = channel();
2989         let done2 = done.clone();
2990         let _t = thread::spawn(move || {
2991             assert!(tx.send(1).is_err());
2992             done.send(()).unwrap();
2993         });
2994         let _t = thread::spawn(move || {
2995             assert!(tx2.send(2).is_err());
2996             done2.send(()).unwrap();
2997         });
2998         drop(rx);
2999         donerx.recv().unwrap();
3000         donerx.recv().unwrap();
3001     }
3002
3003     #[test]
3004     fn try_send1() {
3005         let (tx, _rx) = sync_channel::<i32>(0);
3006         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3007     }
3008
3009     #[test]
3010     fn try_send2() {
3011         let (tx, _rx) = sync_channel::<i32>(1);
3012         assert_eq!(tx.try_send(1), Ok(()));
3013         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3014     }
3015
3016     #[test]
3017     fn try_send3() {
3018         let (tx, rx) = sync_channel::<i32>(1);
3019         assert_eq!(tx.try_send(1), Ok(()));
3020         drop(rx);
3021         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
3022     }
3023
3024     #[test]
3025     fn issue_15761() {
3026         fn repro() {
3027             let (tx1, rx1) = sync_channel::<()>(3);
3028             let (tx2, rx2) = sync_channel::<()>(3);
3029
3030             let _t = thread::spawn(move || {
3031                 rx1.recv().unwrap();
3032                 tx2.try_send(()).unwrap();
3033             });
3034
3035             tx1.try_send(()).unwrap();
3036             rx2.recv().unwrap();
3037         }
3038
3039         for _ in 0..100 {
3040             repro()
3041         }
3042     }
3043 }