]> git.lizzy.rs Git - rust.git/blob - library/std/src/sync/mpsc/mod.rs
Auto merge of #94243 - compiler-errors:compiler-flags-typo, r=Mark-Simulacrum
[rust.git] / library / std / src / sync / mpsc / mod.rs
1 //! Multi-producer, single-consumer FIFO queue communication primitives.
2 //!
3 //! This module provides message-based communication over channels, concretely
4 //! defined among three types:
5 //!
6 //! * [`Sender`]
7 //! * [`SyncSender`]
8 //! * [`Receiver`]
9 //!
10 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
11 //! senders are clone-able (multi-producer) such that many threads can send
12 //! simultaneously to one receiver (single-consumer).
13 //!
14 //! These channels come in two flavors:
15 //!
16 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
17 //!    will return a `(Sender, Receiver)` tuple where all sends will be
18 //!    **asynchronous** (they never block). The channel conceptually has an
19 //!    infinite buffer.
20 //!
21 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
22 //!    return a `(SyncSender, Receiver)` tuple where the storage for pending
23 //!    messages is a pre-allocated buffer of a fixed size. All sends will be
24 //!    **synchronous** by blocking until there is buffer space available. Note
25 //!    that a bound of 0 is allowed, causing the channel to become a "rendezvous"
26 //!    channel where each sender atomically hands off a message to a receiver.
27 //!
28 //! [`send`]: Sender::send
29 //!
30 //! ## Disconnection
31 //!
32 //! The send and receive operations on channels will all return a [`Result`]
33 //! indicating whether the operation succeeded or not. An unsuccessful operation
34 //! is normally indicative of the other half of a channel having "hung up" by
35 //! being dropped in its corresponding thread.
36 //!
37 //! Once half of a channel has been deallocated, most operations can no longer
38 //! continue to make progress, so [`Err`] will be returned. Many applications
39 //! will continue to [`unwrap`] the results returned from this module,
40 //! instigating a propagation of failure among threads if one unexpectedly dies.
41 //!
42 //! [`unwrap`]: Result::unwrap
43 //!
44 //! # Examples
45 //!
46 //! Simple usage:
47 //!
48 //! ```
49 //! use std::thread;
50 //! use std::sync::mpsc::channel;
51 //!
52 //! // Create a simple streaming channel
53 //! let (tx, rx) = channel();
54 //! thread::spawn(move|| {
55 //!     tx.send(10).unwrap();
56 //! });
57 //! assert_eq!(rx.recv().unwrap(), 10);
58 //! ```
59 //!
60 //! Shared usage:
61 //!
62 //! ```
63 //! use std::thread;
64 //! use std::sync::mpsc::channel;
65 //!
66 //! // Create a shared channel that can be sent along from many threads
67 //! // where tx is the sending half (tx for transmission), and rx is the receiving
68 //! // half (rx for receiving).
69 //! let (tx, rx) = channel();
70 //! for i in 0..10 {
71 //!     let tx = tx.clone();
72 //!     thread::spawn(move|| {
73 //!         tx.send(i).unwrap();
74 //!     });
75 //! }
76 //!
77 //! for _ in 0..10 {
78 //!     let j = rx.recv().unwrap();
79 //!     assert!(0 <= j && j < 10);
80 //! }
81 //! ```
82 //!
83 //! Propagating panics:
84 //!
85 //! ```
86 //! use std::sync::mpsc::channel;
87 //!
88 //! // The call to recv() will return an error because the channel has already
89 //! // hung up (or been deallocated)
90 //! let (tx, rx) = channel::<i32>();
91 //! drop(tx);
92 //! assert!(rx.recv().is_err());
93 //! ```
94 //!
95 //! Synchronous channels:
96 //!
97 //! ```
98 //! use std::thread;
99 //! use std::sync::mpsc::sync_channel;
100 //!
101 //! let (tx, rx) = sync_channel::<i32>(0);
102 //! thread::spawn(move|| {
103 //!     // This will wait for the parent thread to start receiving
104 //!     tx.send(53).unwrap();
105 //! });
106 //! rx.recv().unwrap();
107 //! ```
108 //!
109 //! Unbounded receive loop:
110 //!
111 //! ```
112 //! use std::sync::mpsc::sync_channel;
113 //! use std::thread;
114 //!
115 //! let (tx, rx) = sync_channel(3);
116 //!
117 //! for _ in 0..3 {
118 //!     // It would be the same without thread and clone here
119 //!     // since there will still be one `tx` left.
120 //!     let tx = tx.clone();
121 //!     // cloned tx dropped within thread
122 //!     thread::spawn(move || tx.send("ok").unwrap());
123 //! }
124 //!
125 //! // Drop the last sender to stop `rx` waiting for message.
126 //! // The program will not complete if we comment this out.
127 //! // **All** `tx` needs to be dropped for `rx` to have `Err`.
128 //! drop(tx);
129 //!
130 //! // Unbounded receiver waiting for all senders to complete.
131 //! while let Ok(msg) = rx.recv() {
132 //!     println!("{msg}");
133 //! }
134 //!
135 //! println!("completed");
136 //! ```
137
138 #![stable(feature = "rust1", since = "1.0.0")]
139
140 #[cfg(all(test, not(target_os = "emscripten")))]
141 mod tests;
142
143 #[cfg(all(test, not(target_os = "emscripten")))]
144 mod sync_tests;
145
146 // A description of how Rust's channel implementation works
147 //
148 // Channels are supposed to be the basic building block for all other
149 // concurrent primitives that are used in Rust. As a result, the channel type
150 // needs to be highly optimized, flexible, and broad enough for use everywhere.
151 //
152 // The choice of implementation of all channels is to be built on lock-free data
153 // structures. The channels themselves are then consequently also lock-free data
154 // structures. As always with lock-free code, this is a very "here be dragons"
155 // territory, especially because I'm unaware of any academic papers that have
156 // gone into great length about channels of these flavors.
157 //
158 // ## Flavors of channels
159 //
160 // From the perspective of a consumer of this library, there is only one flavor
161 // of channel. This channel can be used as a stream and cloned to allow multiple
162 // senders. Under the hood, however, there are actually three flavors of
163 // channels in play.
164 //
165 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
166 //                      case. They contain as few atomics as possible and
167 //                      involve one and exactly one allocation.
168 // * Streams - these channels are optimized for the non-shared use case. They
169 //             use a different concurrent queue that is more tailored for this
170 //             use case. The initial allocation of this flavor of channel is not
171 //             optimized.
172 // * Shared - this is the most general form of channel that this module offers,
173 //            a channel with multiple senders. This type is as optimized as it
174 //            can be, but the previous two types mentioned are much faster for
175 //            their use-cases.
176 //
177 // ## Concurrent queues
178 //
179 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
180 // but recv() obviously blocks. This means that under the hood there must be
181 // some shared and concurrent queue holding all of the actual data.
182 //
183 // With two flavors of channels, two flavors of queues are also used. We have
184 // chosen to use queues from a well-known author that are abbreviated as SPSC
185 // and MPSC (single producer, single consumer and multiple producer, single
186 // consumer). SPSC queues are used for streams while MPSC queues are used for
187 // shared channels.
188 //
189 // ### SPSC optimizations
190 //
191 // The SPSC queue found online is essentially a linked list of nodes where one
192 // half of the nodes are the "queue of data" and the other half of nodes are a
193 // cache of unused nodes. The unused nodes are used such that an allocation is
194 // not required on every push() and a free doesn't need to happen on every
195 // pop().
196 //
197 // As found online, however, the cache of nodes is of an infinite size. This
198 // means that if a channel at one point in its life had 50k items in the queue,
199 // then the queue will always have the capacity for 50k items. I believed that
200 // this was an unnecessary limitation of the implementation, so I have altered
201 // the queue to optionally have a bound on the cache size.
202 //
203 // By default, streams will have an unbounded SPSC queue with a small-ish cache
204 // size. The hope is that the cache is still large enough to have very fast
205 // send() operations while not too large such that millions of channels can
206 // coexist at once.
207 //
208 // ### MPSC optimizations
209 //
210 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
211 // a linked list under the hood to earn its unboundedness, but I have not put
212 // forth much effort into having a cache of nodes similar to the SPSC queue.
213 //
214 // For now, I believe that this is "ok" because shared channels are not the most
215 // common type, but soon we may wish to revisit this queue choice and determine
216 // another candidate for backend storage of shared channels.
217 //
218 // ## Overview of the Implementation
219 //
220 // Now that there's a little background on the concurrent queues used, it's
221 // worth going into much more detail about the channels themselves. The basic
222 // pseudocode for a send/recv are:
223 //
224 //
225 //      send(t)                             recv()
226 //        queue.push(t)                       return if queue.pop()
227 //        if increment() == -1                deschedule {
228 //          wakeup()                            if decrement() > 0
229 //                                                cancel_deschedule()
230 //                                            }
231 //                                            queue.pop()
232 //
233 // As mentioned before, there are no locks in this implementation, only atomic
234 // instructions are used.
235 //
236 // ### The internal atomic counter
237 //
238 // Every channel has a shared counter with each half to keep track of the size
239 // of the queue. This counter is used to abort descheduling by the receiver and
240 // to know when to wake up on the sending side.
241 //
242 // As seen in the pseudocode, senders will increment this count and receivers
243 // will decrement the count. The theory behind this is that if a sender sees a
244 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
245 // then it doesn't need to block.
246 //
247 // The recv() method has a beginning call to pop(), and if successful, it needs
248 // to decrement the count. It is a crucial implementation detail that this
249 // decrement does *not* happen to the shared counter. If this were the case,
250 // then it would be possible for the counter to be very negative when there were
251 // no receivers waiting, in which case the senders would have to determine when
252 // it was actually appropriate to wake up a receiver.
253 //
254 // Instead, the "steal count" is kept track of separately (not atomically
255 // because it's only used by receivers), and then the decrement() call when
256 // descheduling will lump in all of the recent steals into one large decrement.
257 //
258 // The implication of this is that if a sender sees a -1 count, then there's
259 // guaranteed to be a waiter waiting!
260 //
261 // ## Native Implementation
262 //
263 // A major goal of these channels is to work seamlessly on and off the runtime.
264 // All of the previous race conditions have been worded in terms of
265 // scheduler-isms (which is obviously not available without the runtime).
266 //
267 // For now, native usage of channels (off the runtime) will fall back onto
268 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
269 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
270 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
271 // condition variable.
272 //
273 // ## Select
274 //
275 // Being able to support selection over channels has greatly influenced this
276 // design, and not only does selection need to work inside the runtime, but also
277 // outside the runtime.
278 //
279 // The implementation is fairly straightforward. The goal of select() is not to
280 // return some data, but only to return which channel can receive data without
281 // blocking. The implementation is essentially the entire blocking procedure
282 // followed by an increment as soon as its woken up. The cancellation procedure
283 // involves an increment and swapping out of to_wake to acquire ownership of the
284 // thread to unblock.
285 //
286 // Sadly this current implementation requires multiple allocations, so I have
287 // seen the throughput of select() be much worse than it should be. I do not
288 // believe that there is anything fundamental that needs to change about these
289 // channels, however, in order to support a more efficient select().
290 //
291 // FIXME: Select is now removed, so these factors are ready to be cleaned up!
292 //
293 // # Conclusion
294 //
295 // And now that you've seen all the races that I found and attempted to fix,
296 // here's the code for you to find some more!
297
298 use crate::cell::UnsafeCell;
299 use crate::error;
300 use crate::fmt;
301 use crate::mem;
302 use crate::sync::Arc;
303 use crate::time::{Duration, Instant};
304
305 mod blocking;
306 mod mpsc_queue;
307 mod oneshot;
308 mod shared;
309 mod spsc_queue;
310 mod stream;
311 mod sync;
312
313 mod cache_aligned;
314
315 /// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type.
316 /// This half can only be owned by one thread.
317 ///
318 /// Messages sent to the channel can be retrieved using [`recv`].
319 ///
320 /// [`recv`]: Receiver::recv
321 ///
322 /// # Examples
323 ///
324 /// ```rust
325 /// use std::sync::mpsc::channel;
326 /// use std::thread;
327 /// use std::time::Duration;
328 ///
329 /// let (send, recv) = channel();
330 ///
331 /// thread::spawn(move || {
332 ///     send.send("Hello world!").unwrap();
333 ///     thread::sleep(Duration::from_secs(2)); // block for two seconds
334 ///     send.send("Delayed for 2 seconds").unwrap();
335 /// });
336 ///
337 /// println!("{}", recv.recv().unwrap()); // Received immediately
338 /// println!("Waiting...");
339 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
340 /// ```
341 #[stable(feature = "rust1", since = "1.0.0")]
342 #[cfg_attr(not(test), rustc_diagnostic_item = "Receiver")]
343 pub struct Receiver<T> {
344     inner: UnsafeCell<Flavor<T>>,
345 }
346
347 // The receiver port can be sent from place to place, so long as it
348 // is not used to receive non-sendable things.
349 #[stable(feature = "rust1", since = "1.0.0")]
350 unsafe impl<T: Send> Send for Receiver<T> {}
351
352 #[stable(feature = "rust1", since = "1.0.0")]
353 impl<T> !Sync for Receiver<T> {}
354
355 /// An iterator over messages on a [`Receiver`], created by [`iter`].
356 ///
357 /// This iterator will block whenever [`next`] is called,
358 /// waiting for a new message, and [`None`] will be returned
359 /// when the corresponding channel has hung up.
360 ///
361 /// [`iter`]: Receiver::iter
362 /// [`next`]: Iterator::next
363 ///
364 /// # Examples
365 ///
366 /// ```rust
367 /// use std::sync::mpsc::channel;
368 /// use std::thread;
369 ///
370 /// let (send, recv) = channel();
371 ///
372 /// thread::spawn(move || {
373 ///     send.send(1u8).unwrap();
374 ///     send.send(2u8).unwrap();
375 ///     send.send(3u8).unwrap();
376 /// });
377 ///
378 /// for x in recv.iter() {
379 ///     println!("Got: {x}");
380 /// }
381 /// ```
382 #[stable(feature = "rust1", since = "1.0.0")]
383 #[derive(Debug)]
384 pub struct Iter<'a, T: 'a> {
385     rx: &'a Receiver<T>,
386 }
387
388 /// An iterator that attempts to yield all pending values for a [`Receiver`],
389 /// created by [`try_iter`].
390 ///
391 /// [`None`] will be returned when there are no pending values remaining or
392 /// if the corresponding channel has hung up.
393 ///
394 /// This iterator will never block the caller in order to wait for data to
395 /// become available. Instead, it will return [`None`].
396 ///
397 /// [`try_iter`]: Receiver::try_iter
398 ///
399 /// # Examples
400 ///
401 /// ```rust
402 /// use std::sync::mpsc::channel;
403 /// use std::thread;
404 /// use std::time::Duration;
405 ///
406 /// let (sender, receiver) = channel();
407 ///
408 /// // Nothing is in the buffer yet
409 /// assert!(receiver.try_iter().next().is_none());
410 /// println!("Nothing in the buffer...");
411 ///
412 /// thread::spawn(move || {
413 ///     sender.send(1).unwrap();
414 ///     sender.send(2).unwrap();
415 ///     sender.send(3).unwrap();
416 /// });
417 ///
418 /// println!("Going to sleep...");
419 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
420 ///
421 /// for x in receiver.try_iter() {
422 ///     println!("Got: {x}");
423 /// }
424 /// ```
425 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
426 #[derive(Debug)]
427 pub struct TryIter<'a, T: 'a> {
428     rx: &'a Receiver<T>,
429 }
430
431 /// An owning iterator over messages on a [`Receiver`],
432 /// created by [`into_iter`].
433 ///
434 /// This iterator will block whenever [`next`]
435 /// is called, waiting for a new message, and [`None`] will be
436 /// returned if the corresponding channel has hung up.
437 ///
438 /// [`into_iter`]: Receiver::into_iter
439 /// [`next`]: Iterator::next
440 ///
441 /// # Examples
442 ///
443 /// ```rust
444 /// use std::sync::mpsc::channel;
445 /// use std::thread;
446 ///
447 /// let (send, recv) = channel();
448 ///
449 /// thread::spawn(move || {
450 ///     send.send(1u8).unwrap();
451 ///     send.send(2u8).unwrap();
452 ///     send.send(3u8).unwrap();
453 /// });
454 ///
455 /// for x in recv.into_iter() {
456 ///     println!("Got: {x}");
457 /// }
458 /// ```
459 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
460 #[derive(Debug)]
461 pub struct IntoIter<T> {
462     rx: Receiver<T>,
463 }
464
465 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
466 /// owned by one thread, but it can be cloned to send to other threads.
467 ///
468 /// Messages can be sent through this channel with [`send`].
469 ///
470 /// Note: all senders (the original and the clones) need to be dropped for the receiver
471 /// to stop blocking to receive messages with [`Receiver::recv`].
472 ///
473 /// [`send`]: Sender::send
474 ///
475 /// # Examples
476 ///
477 /// ```rust
478 /// use std::sync::mpsc::channel;
479 /// use std::thread;
480 ///
481 /// let (sender, receiver) = channel();
482 /// let sender2 = sender.clone();
483 ///
484 /// // First thread owns sender
485 /// thread::spawn(move || {
486 ///     sender.send(1).unwrap();
487 /// });
488 ///
489 /// // Second thread owns sender2
490 /// thread::spawn(move || {
491 ///     sender2.send(2).unwrap();
492 /// });
493 ///
494 /// let msg = receiver.recv().unwrap();
495 /// let msg2 = receiver.recv().unwrap();
496 ///
497 /// assert_eq!(3, msg + msg2);
498 /// ```
499 #[stable(feature = "rust1", since = "1.0.0")]
500 pub struct Sender<T> {
501     inner: UnsafeCell<Flavor<T>>,
502 }
503
504 // The send port can be sent from place to place, so long as it
505 // is not used to send non-sendable things.
506 #[stable(feature = "rust1", since = "1.0.0")]
507 unsafe impl<T: Send> Send for Sender<T> {}
508
509 #[stable(feature = "rust1", since = "1.0.0")]
510 impl<T> !Sync for Sender<T> {}
511
512 /// The sending-half of Rust's synchronous [`sync_channel`] type.
513 ///
514 /// Messages can be sent through this channel with [`send`] or [`try_send`].
515 ///
516 /// [`send`] will block if there is no space in the internal buffer.
517 ///
518 /// [`send`]: SyncSender::send
519 /// [`try_send`]: SyncSender::try_send
520 ///
521 /// # Examples
522 ///
523 /// ```rust
524 /// use std::sync::mpsc::sync_channel;
525 /// use std::thread;
526 ///
527 /// // Create a sync_channel with buffer size 2
528 /// let (sync_sender, receiver) = sync_channel(2);
529 /// let sync_sender2 = sync_sender.clone();
530 ///
531 /// // First thread owns sync_sender
532 /// thread::spawn(move || {
533 ///     sync_sender.send(1).unwrap();
534 ///     sync_sender.send(2).unwrap();
535 /// });
536 ///
537 /// // Second thread owns sync_sender2
538 /// thread::spawn(move || {
539 ///     sync_sender2.send(3).unwrap();
540 ///     // thread will now block since the buffer is full
541 ///     println!("Thread unblocked!");
542 /// });
543 ///
544 /// let mut msg;
545 ///
546 /// msg = receiver.recv().unwrap();
547 /// println!("message {msg} received");
548 ///
549 /// // "Thread unblocked!" will be printed now
550 ///
551 /// msg = receiver.recv().unwrap();
552 /// println!("message {msg} received");
553 ///
554 /// msg = receiver.recv().unwrap();
555 ///
556 /// println!("message {msg} received");
557 /// ```
558 #[stable(feature = "rust1", since = "1.0.0")]
559 pub struct SyncSender<T> {
560     inner: Arc<sync::Packet<T>>,
561 }
562
563 #[stable(feature = "rust1", since = "1.0.0")]
564 unsafe impl<T: Send> Send for SyncSender<T> {}
565
566 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
567 /// function on **channel**s.
568 ///
569 /// A **send** operation can only fail if the receiving end of a channel is
570 /// disconnected, implying that the data could never be received. The error
571 /// contains the data being sent as a payload so it can be recovered.
572 #[stable(feature = "rust1", since = "1.0.0")]
573 #[derive(PartialEq, Eq, Clone, Copy)]
574 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
575
576 /// An error returned from the [`recv`] function on a [`Receiver`].
577 ///
578 /// The [`recv`] operation can only fail if the sending half of a
579 /// [`channel`] (or [`sync_channel`]) is disconnected, implying that no further
580 /// messages will ever be received.
581 ///
582 /// [`recv`]: Receiver::recv
583 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
584 #[stable(feature = "rust1", since = "1.0.0")]
585 pub struct RecvError;
586
587 /// This enumeration is the list of the possible reasons that [`try_recv`] could
588 /// not return data when called. This can occur with both a [`channel`] and
589 /// a [`sync_channel`].
590 ///
591 /// [`try_recv`]: Receiver::try_recv
592 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
593 #[stable(feature = "rust1", since = "1.0.0")]
594 pub enum TryRecvError {
595     /// This **channel** is currently empty, but the **Sender**(s) have not yet
596     /// disconnected, so data may yet become available.
597     #[stable(feature = "rust1", since = "1.0.0")]
598     Empty,
599
600     /// The **channel**'s sending half has become disconnected, and there will
601     /// never be any more data received on it.
602     #[stable(feature = "rust1", since = "1.0.0")]
603     Disconnected,
604 }
605
606 /// This enumeration is the list of possible errors that made [`recv_timeout`]
607 /// unable to return data when called. This can occur with both a [`channel`] and
608 /// a [`sync_channel`].
609 ///
610 /// [`recv_timeout`]: Receiver::recv_timeout
611 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
612 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
613 pub enum RecvTimeoutError {
614     /// This **channel** is currently empty, but the **Sender**(s) have not yet
615     /// disconnected, so data may yet become available.
616     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
617     Timeout,
618     /// The **channel**'s sending half has become disconnected, and there will
619     /// never be any more data received on it.
620     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
621     Disconnected,
622 }
623
624 /// This enumeration is the list of the possible error outcomes for the
625 /// [`try_send`] method.
626 ///
627 /// [`try_send`]: SyncSender::try_send
628 #[stable(feature = "rust1", since = "1.0.0")]
629 #[derive(PartialEq, Eq, Clone, Copy)]
630 pub enum TrySendError<T> {
631     /// The data could not be sent on the [`sync_channel`] because it would require that
632     /// the callee block to send the data.
633     ///
634     /// If this is a buffered channel, then the buffer is full at this time. If
635     /// this is not a buffered channel, then there is no [`Receiver`] available to
636     /// acquire the data.
637     #[stable(feature = "rust1", since = "1.0.0")]
638     Full(#[stable(feature = "rust1", since = "1.0.0")] T),
639
640     /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
641     /// sent. The data is returned back to the callee in this case.
642     #[stable(feature = "rust1", since = "1.0.0")]
643     Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
644 }
645
646 enum Flavor<T> {
647     Oneshot(Arc<oneshot::Packet<T>>),
648     Stream(Arc<stream::Packet<T>>),
649     Shared(Arc<shared::Packet<T>>),
650     Sync(Arc<sync::Packet<T>>),
651 }
652
653 #[doc(hidden)]
654 trait UnsafeFlavor<T> {
655     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
656     unsafe fn inner_mut(&self) -> &mut Flavor<T> {
657         &mut *self.inner_unsafe().get()
658     }
659     unsafe fn inner(&self) -> &Flavor<T> {
660         &*self.inner_unsafe().get()
661     }
662 }
663 impl<T> UnsafeFlavor<T> for Sender<T> {
664     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
665         &self.inner
666     }
667 }
668 impl<T> UnsafeFlavor<T> for Receiver<T> {
669     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
670         &self.inner
671     }
672 }
673
674 /// Creates a new asynchronous channel, returning the sender/receiver halves.
675 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
676 /// the same order as it was sent, and no [`send`] will block the calling thread
677 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
678 /// block after its buffer limit is reached). [`recv`] will block until a message
679 /// is available while there is at least one [`Sender`] alive (including clones).
680 ///
681 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
682 /// only one [`Receiver`] is supported.
683 ///
684 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
685 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
686 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
687 /// return a [`RecvError`].
688 ///
689 /// [`send`]: Sender::send
690 /// [`recv`]: Receiver::recv
691 ///
692 /// # Examples
693 ///
694 /// ```
695 /// use std::sync::mpsc::channel;
696 /// use std::thread;
697 ///
698 /// let (sender, receiver) = channel();
699 ///
700 /// // Spawn off an expensive computation
701 /// thread::spawn(move|| {
702 /// #   fn expensive_computation() {}
703 ///     sender.send(expensive_computation()).unwrap();
704 /// });
705 ///
706 /// // Do some useful work for awhile
707 ///
708 /// // Let's see what that answer was
709 /// println!("{:?}", receiver.recv().unwrap());
710 /// ```
711 #[must_use]
712 #[stable(feature = "rust1", since = "1.0.0")]
713 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
714     let a = Arc::new(oneshot::Packet::new());
715     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
716 }
717
718 /// Creates a new synchronous, bounded channel.
719 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
720 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
721 /// [`Receiver`] will block until a message becomes available. `sync_channel`
722 /// differs greatly in the semantics of the sender, however.
723 ///
724 /// This channel has an internal buffer on which messages will be queued.
725 /// `bound` specifies the buffer size. When the internal buffer becomes full,
726 /// future sends will *block* waiting for the buffer to open up. Note that a
727 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
728 /// where each [`send`] will not return until a [`recv`] is paired with it.
729 ///
730 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
731 /// times, but only one [`Receiver`] is supported.
732 ///
733 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
734 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
735 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
736 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
737 ///
738 /// [`send`]: SyncSender::send
739 /// [`recv`]: Receiver::recv
740 ///
741 /// # Examples
742 ///
743 /// ```
744 /// use std::sync::mpsc::sync_channel;
745 /// use std::thread;
746 ///
747 /// let (sender, receiver) = sync_channel(1);
748 ///
749 /// // this returns immediately
750 /// sender.send(1).unwrap();
751 ///
752 /// thread::spawn(move|| {
753 ///     // this will block until the previous message has been received
754 ///     sender.send(2).unwrap();
755 /// });
756 ///
757 /// assert_eq!(receiver.recv().unwrap(), 1);
758 /// assert_eq!(receiver.recv().unwrap(), 2);
759 /// ```
760 #[must_use]
761 #[stable(feature = "rust1", since = "1.0.0")]
762 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
763     let a = Arc::new(sync::Packet::new(bound));
764     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
765 }
766
767 ////////////////////////////////////////////////////////////////////////////////
768 // Sender
769 ////////////////////////////////////////////////////////////////////////////////
770
771 impl<T> Sender<T> {
772     fn new(inner: Flavor<T>) -> Sender<T> {
773         Sender { inner: UnsafeCell::new(inner) }
774     }
775
776     /// Attempts to send a value on this channel, returning it back if it could
777     /// not be sent.
778     ///
779     /// A successful send occurs when it is determined that the other end of
780     /// the channel has not hung up already. An unsuccessful send would be one
781     /// where the corresponding receiver has already been deallocated. Note
782     /// that a return value of [`Err`] means that the data will never be
783     /// received, but a return value of [`Ok`] does *not* mean that the data
784     /// will be received. It is possible for the corresponding receiver to
785     /// hang up immediately after this function returns [`Ok`].
786     ///
787     /// This method will never block the current thread.
788     ///
789     /// # Examples
790     ///
791     /// ```
792     /// use std::sync::mpsc::channel;
793     ///
794     /// let (tx, rx) = channel();
795     ///
796     /// // This send is always successful
797     /// tx.send(1).unwrap();
798     ///
799     /// // This send will fail because the receiver is gone
800     /// drop(rx);
801     /// assert_eq!(tx.send(1).unwrap_err().0, 1);
802     /// ```
803     #[stable(feature = "rust1", since = "1.0.0")]
804     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
805         let (new_inner, ret) = match *unsafe { self.inner() } {
806             Flavor::Oneshot(ref p) => {
807                 if !p.sent() {
808                     return p.send(t).map_err(SendError);
809                 } else {
810                     let a = Arc::new(stream::Packet::new());
811                     let rx = Receiver::new(Flavor::Stream(a.clone()));
812                     match p.upgrade(rx) {
813                         oneshot::UpSuccess => {
814                             let ret = a.send(t);
815                             (a, ret)
816                         }
817                         oneshot::UpDisconnected => (a, Err(t)),
818                         oneshot::UpWoke(token) => {
819                             // This send cannot panic because the thread is
820                             // asleep (we're looking at it), so the receiver
821                             // can't go away.
822                             a.send(t).ok().unwrap();
823                             token.signal();
824                             (a, Ok(()))
825                         }
826                     }
827                 }
828             }
829             Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
830             Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
831             Flavor::Sync(..) => unreachable!(),
832         };
833
834         unsafe {
835             let tmp = Sender::new(Flavor::Stream(new_inner));
836             mem::swap(self.inner_mut(), tmp.inner_mut());
837         }
838         ret.map_err(SendError)
839     }
840 }
841
842 #[stable(feature = "rust1", since = "1.0.0")]
843 impl<T> Clone for Sender<T> {
844     /// Clone a sender to send to other threads.
845     ///
846     /// Note, be aware of the lifetime of the sender because all senders
847     /// (including the original) need to be dropped in order for
848     /// [`Receiver::recv`] to stop blocking.
849     fn clone(&self) -> Sender<T> {
850         let packet = match *unsafe { self.inner() } {
851             Flavor::Oneshot(ref p) => {
852                 let a = Arc::new(shared::Packet::new());
853                 {
854                     let guard = a.postinit_lock();
855                     let rx = Receiver::new(Flavor::Shared(a.clone()));
856                     let sleeper = match p.upgrade(rx) {
857                         oneshot::UpSuccess | oneshot::UpDisconnected => None,
858                         oneshot::UpWoke(task) => Some(task),
859                     };
860                     a.inherit_blocker(sleeper, guard);
861                 }
862                 a
863             }
864             Flavor::Stream(ref p) => {
865                 let a = Arc::new(shared::Packet::new());
866                 {
867                     let guard = a.postinit_lock();
868                     let rx = Receiver::new(Flavor::Shared(a.clone()));
869                     let sleeper = match p.upgrade(rx) {
870                         stream::UpSuccess | stream::UpDisconnected => None,
871                         stream::UpWoke(task) => Some(task),
872                     };
873                     a.inherit_blocker(sleeper, guard);
874                 }
875                 a
876             }
877             Flavor::Shared(ref p) => {
878                 p.clone_chan();
879                 return Sender::new(Flavor::Shared(p.clone()));
880             }
881             Flavor::Sync(..) => unreachable!(),
882         };
883
884         unsafe {
885             let tmp = Sender::new(Flavor::Shared(packet.clone()));
886             mem::swap(self.inner_mut(), tmp.inner_mut());
887         }
888         Sender::new(Flavor::Shared(packet))
889     }
890 }
891
892 #[stable(feature = "rust1", since = "1.0.0")]
893 impl<T> Drop for Sender<T> {
894     fn drop(&mut self) {
895         match *unsafe { self.inner() } {
896             Flavor::Oneshot(ref p) => p.drop_chan(),
897             Flavor::Stream(ref p) => p.drop_chan(),
898             Flavor::Shared(ref p) => p.drop_chan(),
899             Flavor::Sync(..) => unreachable!(),
900         }
901     }
902 }
903
904 #[stable(feature = "mpsc_debug", since = "1.8.0")]
905 impl<T> fmt::Debug for Sender<T> {
906     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
907         f.debug_struct("Sender").finish_non_exhaustive()
908     }
909 }
910
911 ////////////////////////////////////////////////////////////////////////////////
912 // SyncSender
913 ////////////////////////////////////////////////////////////////////////////////
914
915 impl<T> SyncSender<T> {
916     fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
917         SyncSender { inner }
918     }
919
920     /// Sends a value on this synchronous channel.
921     ///
922     /// This function will *block* until space in the internal buffer becomes
923     /// available or a receiver is available to hand off the message to.
924     ///
925     /// Note that a successful send does *not* guarantee that the receiver will
926     /// ever see the data if there is a buffer on this channel. Items may be
927     /// enqueued in the internal buffer for the receiver to receive at a later
928     /// time. If the buffer size is 0, however, the channel becomes a rendezvous
929     /// channel and it guarantees that the receiver has indeed received
930     /// the data if this function returns success.
931     ///
932     /// This function will never panic, but it may return [`Err`] if the
933     /// [`Receiver`] has disconnected and is no longer able to receive
934     /// information.
935     ///
936     /// # Examples
937     ///
938     /// ```rust
939     /// use std::sync::mpsc::sync_channel;
940     /// use std::thread;
941     ///
942     /// // Create a rendezvous sync_channel with buffer size 0
943     /// let (sync_sender, receiver) = sync_channel(0);
944     ///
945     /// thread::spawn(move || {
946     ///    println!("sending message...");
947     ///    sync_sender.send(1).unwrap();
948     ///    // Thread is now blocked until the message is received
949     ///
950     ///    println!("...message received!");
951     /// });
952     ///
953     /// let msg = receiver.recv().unwrap();
954     /// assert_eq!(1, msg);
955     /// ```
956     #[stable(feature = "rust1", since = "1.0.0")]
957     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
958         self.inner.send(t).map_err(SendError)
959     }
960
961     /// Attempts to send a value on this channel without blocking.
962     ///
963     /// This method differs from [`send`] by returning immediately if the
964     /// channel's buffer is full or no receiver is waiting to acquire some
965     /// data. Compared with [`send`], this function has two failure cases
966     /// instead of one (one for disconnection, one for a full buffer).
967     ///
968     /// See [`send`] for notes about guarantees of whether the
969     /// receiver has received the data or not if this function is successful.
970     ///
971     /// [`send`]: Self::send
972     ///
973     /// # Examples
974     ///
975     /// ```rust
976     /// use std::sync::mpsc::sync_channel;
977     /// use std::thread;
978     ///
979     /// // Create a sync_channel with buffer size 1
980     /// let (sync_sender, receiver) = sync_channel(1);
981     /// let sync_sender2 = sync_sender.clone();
982     ///
983     /// // First thread owns sync_sender
984     /// thread::spawn(move || {
985     ///     sync_sender.send(1).unwrap();
986     ///     sync_sender.send(2).unwrap();
987     ///     // Thread blocked
988     /// });
989     ///
990     /// // Second thread owns sync_sender2
991     /// thread::spawn(move || {
992     ///     // This will return an error and send
993     ///     // no message if the buffer is full
994     ///     let _ = sync_sender2.try_send(3);
995     /// });
996     ///
997     /// let mut msg;
998     /// msg = receiver.recv().unwrap();
999     /// println!("message {msg} received");
1000     ///
1001     /// msg = receiver.recv().unwrap();
1002     /// println!("message {msg} received");
1003     ///
1004     /// // Third message may have never been sent
1005     /// match receiver.try_recv() {
1006     ///     Ok(msg) => println!("message {msg} received"),
1007     ///     Err(_) => println!("the third message was never sent"),
1008     /// }
1009     /// ```
1010     #[stable(feature = "rust1", since = "1.0.0")]
1011     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
1012         self.inner.try_send(t)
1013     }
1014 }
1015
1016 #[stable(feature = "rust1", since = "1.0.0")]
1017 impl<T> Clone for SyncSender<T> {
1018     fn clone(&self) -> SyncSender<T> {
1019         self.inner.clone_chan();
1020         SyncSender::new(self.inner.clone())
1021     }
1022 }
1023
1024 #[stable(feature = "rust1", since = "1.0.0")]
1025 impl<T> Drop for SyncSender<T> {
1026     fn drop(&mut self) {
1027         self.inner.drop_chan();
1028     }
1029 }
1030
1031 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1032 impl<T> fmt::Debug for SyncSender<T> {
1033     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1034         f.debug_struct("SyncSender").finish_non_exhaustive()
1035     }
1036 }
1037
1038 ////////////////////////////////////////////////////////////////////////////////
1039 // Receiver
1040 ////////////////////////////////////////////////////////////////////////////////
1041
1042 impl<T> Receiver<T> {
1043     fn new(inner: Flavor<T>) -> Receiver<T> {
1044         Receiver { inner: UnsafeCell::new(inner) }
1045     }
1046
1047     /// Attempts to return a pending value on this receiver without blocking.
1048     ///
1049     /// This method will never block the caller in order to wait for data to
1050     /// become available. Instead, this will always return immediately with a
1051     /// possible option of pending data on the channel.
1052     ///
1053     /// This is useful for a flavor of "optimistic check" before deciding to
1054     /// block on a receiver.
1055     ///
1056     /// Compared with [`recv`], this function has two failure cases instead of one
1057     /// (one for disconnection, one for an empty buffer).
1058     ///
1059     /// [`recv`]: Self::recv
1060     ///
1061     /// # Examples
1062     ///
1063     /// ```rust
1064     /// use std::sync::mpsc::{Receiver, channel};
1065     ///
1066     /// let (_, receiver): (_, Receiver<i32>) = channel();
1067     ///
1068     /// assert!(receiver.try_recv().is_err());
1069     /// ```
1070     #[stable(feature = "rust1", since = "1.0.0")]
1071     pub fn try_recv(&self) -> Result<T, TryRecvError> {
1072         loop {
1073             let new_port = match *unsafe { self.inner() } {
1074                 Flavor::Oneshot(ref p) => match p.try_recv() {
1075                     Ok(t) => return Ok(t),
1076                     Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1077                     Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected),
1078                     Err(oneshot::Upgraded(rx)) => rx,
1079                 },
1080                 Flavor::Stream(ref p) => match p.try_recv() {
1081                     Ok(t) => return Ok(t),
1082                     Err(stream::Empty) => return Err(TryRecvError::Empty),
1083                     Err(stream::Disconnected) => return Err(TryRecvError::Disconnected),
1084                     Err(stream::Upgraded(rx)) => rx,
1085                 },
1086                 Flavor::Shared(ref p) => match p.try_recv() {
1087                     Ok(t) => return Ok(t),
1088                     Err(shared::Empty) => return Err(TryRecvError::Empty),
1089                     Err(shared::Disconnected) => return Err(TryRecvError::Disconnected),
1090                 },
1091                 Flavor::Sync(ref p) => match p.try_recv() {
1092                     Ok(t) => return Ok(t),
1093                     Err(sync::Empty) => return Err(TryRecvError::Empty),
1094                     Err(sync::Disconnected) => return Err(TryRecvError::Disconnected),
1095                 },
1096             };
1097             unsafe {
1098                 mem::swap(self.inner_mut(), new_port.inner_mut());
1099             }
1100         }
1101     }
1102
1103     /// Attempts to wait for a value on this receiver, returning an error if the
1104     /// corresponding channel has hung up.
1105     ///
1106     /// This function will always block the current thread if there is no data
1107     /// available and it's possible for more data to be sent (at least one sender
1108     /// still exists). Once a message is sent to the corresponding [`Sender`]
1109     /// (or [`SyncSender`]), this receiver will wake up and return that
1110     /// message.
1111     ///
1112     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1113     /// this call is blocking, this call will wake up and return [`Err`] to
1114     /// indicate that no more messages can ever be received on this channel.
1115     /// However, since channels are buffered, messages sent before the disconnect
1116     /// will still be properly received.
1117     ///
1118     /// # Examples
1119     ///
1120     /// ```
1121     /// use std::sync::mpsc;
1122     /// use std::thread;
1123     ///
1124     /// let (send, recv) = mpsc::channel();
1125     /// let handle = thread::spawn(move || {
1126     ///     send.send(1u8).unwrap();
1127     /// });
1128     ///
1129     /// handle.join().unwrap();
1130     ///
1131     /// assert_eq!(Ok(1), recv.recv());
1132     /// ```
1133     ///
1134     /// Buffering behavior:
1135     ///
1136     /// ```
1137     /// use std::sync::mpsc;
1138     /// use std::thread;
1139     /// use std::sync::mpsc::RecvError;
1140     ///
1141     /// let (send, recv) = mpsc::channel();
1142     /// let handle = thread::spawn(move || {
1143     ///     send.send(1u8).unwrap();
1144     ///     send.send(2).unwrap();
1145     ///     send.send(3).unwrap();
1146     ///     drop(send);
1147     /// });
1148     ///
1149     /// // wait for the thread to join so we ensure the sender is dropped
1150     /// handle.join().unwrap();
1151     ///
1152     /// assert_eq!(Ok(1), recv.recv());
1153     /// assert_eq!(Ok(2), recv.recv());
1154     /// assert_eq!(Ok(3), recv.recv());
1155     /// assert_eq!(Err(RecvError), recv.recv());
1156     /// ```
1157     #[stable(feature = "rust1", since = "1.0.0")]
1158     pub fn recv(&self) -> Result<T, RecvError> {
1159         loop {
1160             let new_port = match *unsafe { self.inner() } {
1161                 Flavor::Oneshot(ref p) => match p.recv(None) {
1162                     Ok(t) => return Ok(t),
1163                     Err(oneshot::Disconnected) => return Err(RecvError),
1164                     Err(oneshot::Upgraded(rx)) => rx,
1165                     Err(oneshot::Empty) => unreachable!(),
1166                 },
1167                 Flavor::Stream(ref p) => match p.recv(None) {
1168                     Ok(t) => return Ok(t),
1169                     Err(stream::Disconnected) => return Err(RecvError),
1170                     Err(stream::Upgraded(rx)) => rx,
1171                     Err(stream::Empty) => unreachable!(),
1172                 },
1173                 Flavor::Shared(ref p) => match p.recv(None) {
1174                     Ok(t) => return Ok(t),
1175                     Err(shared::Disconnected) => return Err(RecvError),
1176                     Err(shared::Empty) => unreachable!(),
1177                 },
1178                 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1179             };
1180             unsafe {
1181                 mem::swap(self.inner_mut(), new_port.inner_mut());
1182             }
1183         }
1184     }
1185
1186     /// Attempts to wait for a value on this receiver, returning an error if the
1187     /// corresponding channel has hung up, or if it waits more than `timeout`.
1188     ///
1189     /// This function will always block the current thread if there is no data
1190     /// available and it's possible for more data to be sent (at least one sender
1191     /// still exists). Once a message is sent to the corresponding [`Sender`]
1192     /// (or [`SyncSender`]), this receiver will wake up and return that
1193     /// message.
1194     ///
1195     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1196     /// this call is blocking, this call will wake up and return [`Err`] to
1197     /// indicate that no more messages can ever be received on this channel.
1198     /// However, since channels are buffered, messages sent before the disconnect
1199     /// will still be properly received.
1200     ///
1201     /// # Known Issues
1202     ///
1203     /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
1204     /// to panic unexpectedly with the following example:
1205     ///
1206     /// ```no_run
1207     /// use std::sync::mpsc::channel;
1208     /// use std::thread;
1209     /// use std::time::Duration;
1210     ///
1211     /// let (tx, rx) = channel::<String>();
1212     ///
1213     /// thread::spawn(move || {
1214     ///     let d = Duration::from_millis(10);
1215     ///     loop {
1216     ///         println!("recv");
1217     ///         let _r = rx.recv_timeout(d);
1218     ///     }
1219     /// });
1220     ///
1221     /// thread::sleep(Duration::from_millis(100));
1222     /// let _c1 = tx.clone();
1223     ///
1224     /// thread::sleep(Duration::from_secs(1));
1225     /// ```
1226     ///
1227     /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
1228     ///
1229     /// # Examples
1230     ///
1231     /// Successfully receiving value before encountering timeout:
1232     ///
1233     /// ```no_run
1234     /// use std::thread;
1235     /// use std::time::Duration;
1236     /// use std::sync::mpsc;
1237     ///
1238     /// let (send, recv) = mpsc::channel();
1239     ///
1240     /// thread::spawn(move || {
1241     ///     send.send('a').unwrap();
1242     /// });
1243     ///
1244     /// assert_eq!(
1245     ///     recv.recv_timeout(Duration::from_millis(400)),
1246     ///     Ok('a')
1247     /// );
1248     /// ```
1249     ///
1250     /// Receiving an error upon reaching timeout:
1251     ///
1252     /// ```no_run
1253     /// use std::thread;
1254     /// use std::time::Duration;
1255     /// use std::sync::mpsc;
1256     ///
1257     /// let (send, recv) = mpsc::channel();
1258     ///
1259     /// thread::spawn(move || {
1260     ///     thread::sleep(Duration::from_millis(800));
1261     ///     send.send('a').unwrap();
1262     /// });
1263     ///
1264     /// assert_eq!(
1265     ///     recv.recv_timeout(Duration::from_millis(400)),
1266     ///     Err(mpsc::RecvTimeoutError::Timeout)
1267     /// );
1268     /// ```
1269     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1270     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1271         // Do an optimistic try_recv to avoid the performance impact of
1272         // Instant::now() in the full-channel case.
1273         match self.try_recv() {
1274             Ok(result) => Ok(result),
1275             Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
1276             Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
1277                 Some(deadline) => self.recv_deadline(deadline),
1278                 // So far in the future that it's practically the same as waiting indefinitely.
1279                 None => self.recv().map_err(RecvTimeoutError::from),
1280             },
1281         }
1282     }
1283
1284     /// Attempts to wait for a value on this receiver, returning an error if the
1285     /// corresponding channel has hung up, or if `deadline` is reached.
1286     ///
1287     /// This function will always block the current thread if there is no data
1288     /// available and it's possible for more data to be sent. Once a message is
1289     /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this
1290     /// receiver will wake up and return that message.
1291     ///
1292     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1293     /// this call is blocking, this call will wake up and return [`Err`] to
1294     /// indicate that no more messages can ever be received on this channel.
1295     /// However, since channels are buffered, messages sent before the disconnect
1296     /// will still be properly received.
1297     ///
1298     /// # Examples
1299     ///
1300     /// Successfully receiving value before reaching deadline:
1301     ///
1302     /// ```no_run
1303     /// #![feature(deadline_api)]
1304     /// use std::thread;
1305     /// use std::time::{Duration, Instant};
1306     /// use std::sync::mpsc;
1307     ///
1308     /// let (send, recv) = mpsc::channel();
1309     ///
1310     /// thread::spawn(move || {
1311     ///     send.send('a').unwrap();
1312     /// });
1313     ///
1314     /// assert_eq!(
1315     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1316     ///     Ok('a')
1317     /// );
1318     /// ```
1319     ///
1320     /// Receiving an error upon reaching deadline:
1321     ///
1322     /// ```no_run
1323     /// #![feature(deadline_api)]
1324     /// use std::thread;
1325     /// use std::time::{Duration, Instant};
1326     /// use std::sync::mpsc;
1327     ///
1328     /// let (send, recv) = mpsc::channel();
1329     ///
1330     /// thread::spawn(move || {
1331     ///     thread::sleep(Duration::from_millis(800));
1332     ///     send.send('a').unwrap();
1333     /// });
1334     ///
1335     /// assert_eq!(
1336     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1337     ///     Err(mpsc::RecvTimeoutError::Timeout)
1338     /// );
1339     /// ```
1340     #[unstable(feature = "deadline_api", issue = "46316")]
1341     pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1342         use self::RecvTimeoutError::*;
1343
1344         loop {
1345             let port_or_empty = match *unsafe { self.inner() } {
1346                 Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) {
1347                     Ok(t) => return Ok(t),
1348                     Err(oneshot::Disconnected) => return Err(Disconnected),
1349                     Err(oneshot::Upgraded(rx)) => Some(rx),
1350                     Err(oneshot::Empty) => None,
1351                 },
1352                 Flavor::Stream(ref p) => match p.recv(Some(deadline)) {
1353                     Ok(t) => return Ok(t),
1354                     Err(stream::Disconnected) => return Err(Disconnected),
1355                     Err(stream::Upgraded(rx)) => Some(rx),
1356                     Err(stream::Empty) => None,
1357                 },
1358                 Flavor::Shared(ref p) => match p.recv(Some(deadline)) {
1359                     Ok(t) => return Ok(t),
1360                     Err(shared::Disconnected) => return Err(Disconnected),
1361                     Err(shared::Empty) => None,
1362                 },
1363                 Flavor::Sync(ref p) => match p.recv(Some(deadline)) {
1364                     Ok(t) => return Ok(t),
1365                     Err(sync::Disconnected) => return Err(Disconnected),
1366                     Err(sync::Empty) => None,
1367                 },
1368             };
1369
1370             if let Some(new_port) = port_or_empty {
1371                 unsafe {
1372                     mem::swap(self.inner_mut(), new_port.inner_mut());
1373                 }
1374             }
1375
1376             // If we're already passed the deadline, and we're here without
1377             // data, return a timeout, else try again.
1378             if Instant::now() >= deadline {
1379                 return Err(Timeout);
1380             }
1381         }
1382     }
1383
1384     /// Returns an iterator that will block waiting for messages, but never
1385     /// [`panic!`]. It will return [`None`] when the channel has hung up.
1386     ///
1387     /// # Examples
1388     ///
1389     /// ```rust
1390     /// use std::sync::mpsc::channel;
1391     /// use std::thread;
1392     ///
1393     /// let (send, recv) = channel();
1394     ///
1395     /// thread::spawn(move || {
1396     ///     send.send(1).unwrap();
1397     ///     send.send(2).unwrap();
1398     ///     send.send(3).unwrap();
1399     /// });
1400     ///
1401     /// let mut iter = recv.iter();
1402     /// assert_eq!(iter.next(), Some(1));
1403     /// assert_eq!(iter.next(), Some(2));
1404     /// assert_eq!(iter.next(), Some(3));
1405     /// assert_eq!(iter.next(), None);
1406     /// ```
1407     #[stable(feature = "rust1", since = "1.0.0")]
1408     pub fn iter(&self) -> Iter<'_, T> {
1409         Iter { rx: self }
1410     }
1411
1412     /// Returns an iterator that will attempt to yield all pending values.
1413     /// It will return `None` if there are no more pending values or if the
1414     /// channel has hung up. The iterator will never [`panic!`] or block the
1415     /// user by waiting for values.
1416     ///
1417     /// # Examples
1418     ///
1419     /// ```no_run
1420     /// use std::sync::mpsc::channel;
1421     /// use std::thread;
1422     /// use std::time::Duration;
1423     ///
1424     /// let (sender, receiver) = channel();
1425     ///
1426     /// // nothing is in the buffer yet
1427     /// assert!(receiver.try_iter().next().is_none());
1428     ///
1429     /// thread::spawn(move || {
1430     ///     thread::sleep(Duration::from_secs(1));
1431     ///     sender.send(1).unwrap();
1432     ///     sender.send(2).unwrap();
1433     ///     sender.send(3).unwrap();
1434     /// });
1435     ///
1436     /// // nothing is in the buffer yet
1437     /// assert!(receiver.try_iter().next().is_none());
1438     ///
1439     /// // block for two seconds
1440     /// thread::sleep(Duration::from_secs(2));
1441     ///
1442     /// let mut iter = receiver.try_iter();
1443     /// assert_eq!(iter.next(), Some(1));
1444     /// assert_eq!(iter.next(), Some(2));
1445     /// assert_eq!(iter.next(), Some(3));
1446     /// assert_eq!(iter.next(), None);
1447     /// ```
1448     #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1449     pub fn try_iter(&self) -> TryIter<'_, T> {
1450         TryIter { rx: self }
1451     }
1452 }
1453
1454 #[stable(feature = "rust1", since = "1.0.0")]
1455 impl<'a, T> Iterator for Iter<'a, T> {
1456     type Item = T;
1457
1458     fn next(&mut self) -> Option<T> {
1459         self.rx.recv().ok()
1460     }
1461 }
1462
1463 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1464 impl<'a, T> Iterator for TryIter<'a, T> {
1465     type Item = T;
1466
1467     fn next(&mut self) -> Option<T> {
1468         self.rx.try_recv().ok()
1469     }
1470 }
1471
1472 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1473 impl<'a, T> IntoIterator for &'a Receiver<T> {
1474     type Item = T;
1475     type IntoIter = Iter<'a, T>;
1476
1477     fn into_iter(self) -> Iter<'a, T> {
1478         self.iter()
1479     }
1480 }
1481
1482 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1483 impl<T> Iterator for IntoIter<T> {
1484     type Item = T;
1485     fn next(&mut self) -> Option<T> {
1486         self.rx.recv().ok()
1487     }
1488 }
1489
1490 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1491 impl<T> IntoIterator for Receiver<T> {
1492     type Item = T;
1493     type IntoIter = IntoIter<T>;
1494
1495     fn into_iter(self) -> IntoIter<T> {
1496         IntoIter { rx: self }
1497     }
1498 }
1499
1500 #[stable(feature = "rust1", since = "1.0.0")]
1501 impl<T> Drop for Receiver<T> {
1502     fn drop(&mut self) {
1503         match *unsafe { self.inner() } {
1504             Flavor::Oneshot(ref p) => p.drop_port(),
1505             Flavor::Stream(ref p) => p.drop_port(),
1506             Flavor::Shared(ref p) => p.drop_port(),
1507             Flavor::Sync(ref p) => p.drop_port(),
1508         }
1509     }
1510 }
1511
1512 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1513 impl<T> fmt::Debug for Receiver<T> {
1514     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1515         f.debug_struct("Receiver").finish_non_exhaustive()
1516     }
1517 }
1518
1519 #[stable(feature = "rust1", since = "1.0.0")]
1520 impl<T> fmt::Debug for SendError<T> {
1521     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1522         f.debug_struct("SendError").finish_non_exhaustive()
1523     }
1524 }
1525
1526 #[stable(feature = "rust1", since = "1.0.0")]
1527 impl<T> fmt::Display for SendError<T> {
1528     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1529         "sending on a closed channel".fmt(f)
1530     }
1531 }
1532
1533 #[stable(feature = "rust1", since = "1.0.0")]
1534 impl<T: Send> error::Error for SendError<T> {
1535     #[allow(deprecated)]
1536     fn description(&self) -> &str {
1537         "sending on a closed channel"
1538     }
1539 }
1540
1541 #[stable(feature = "rust1", since = "1.0.0")]
1542 impl<T> fmt::Debug for TrySendError<T> {
1543     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1544         match *self {
1545             TrySendError::Full(..) => "Full(..)".fmt(f),
1546             TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1547         }
1548     }
1549 }
1550
1551 #[stable(feature = "rust1", since = "1.0.0")]
1552 impl<T> fmt::Display for TrySendError<T> {
1553     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1554         match *self {
1555             TrySendError::Full(..) => "sending on a full channel".fmt(f),
1556             TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
1557         }
1558     }
1559 }
1560
1561 #[stable(feature = "rust1", since = "1.0.0")]
1562 impl<T: Send> error::Error for TrySendError<T> {
1563     #[allow(deprecated)]
1564     fn description(&self) -> &str {
1565         match *self {
1566             TrySendError::Full(..) => "sending on a full channel",
1567             TrySendError::Disconnected(..) => "sending on a closed channel",
1568         }
1569     }
1570 }
1571
1572 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1573 impl<T> From<SendError<T>> for TrySendError<T> {
1574     /// Converts a `SendError<T>` into a `TrySendError<T>`.
1575     ///
1576     /// This conversion always returns a `TrySendError::Disconnected` containing the data in the `SendError<T>`.
1577     ///
1578     /// No data is allocated on the heap.
1579     fn from(err: SendError<T>) -> TrySendError<T> {
1580         match err {
1581             SendError(t) => TrySendError::Disconnected(t),
1582         }
1583     }
1584 }
1585
1586 #[stable(feature = "rust1", since = "1.0.0")]
1587 impl fmt::Display for RecvError {
1588     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1589         "receiving on a closed channel".fmt(f)
1590     }
1591 }
1592
1593 #[stable(feature = "rust1", since = "1.0.0")]
1594 impl error::Error for RecvError {
1595     #[allow(deprecated)]
1596     fn description(&self) -> &str {
1597         "receiving on a closed channel"
1598     }
1599 }
1600
1601 #[stable(feature = "rust1", since = "1.0.0")]
1602 impl fmt::Display for TryRecvError {
1603     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1604         match *self {
1605             TryRecvError::Empty => "receiving on an empty channel".fmt(f),
1606             TryRecvError::Disconnected => "receiving on a closed channel".fmt(f),
1607         }
1608     }
1609 }
1610
1611 #[stable(feature = "rust1", since = "1.0.0")]
1612 impl error::Error for TryRecvError {
1613     #[allow(deprecated)]
1614     fn description(&self) -> &str {
1615         match *self {
1616             TryRecvError::Empty => "receiving on an empty channel",
1617             TryRecvError::Disconnected => "receiving on a closed channel",
1618         }
1619     }
1620 }
1621
1622 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1623 impl From<RecvError> for TryRecvError {
1624     /// Converts a `RecvError` into a `TryRecvError`.
1625     ///
1626     /// This conversion always returns `TryRecvError::Disconnected`.
1627     ///
1628     /// No data is allocated on the heap.
1629     fn from(err: RecvError) -> TryRecvError {
1630         match err {
1631             RecvError => TryRecvError::Disconnected,
1632         }
1633     }
1634 }
1635
1636 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1637 impl fmt::Display for RecvTimeoutError {
1638     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1639         match *self {
1640             RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f),
1641             RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f),
1642         }
1643     }
1644 }
1645
1646 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1647 impl error::Error for RecvTimeoutError {
1648     #[allow(deprecated)]
1649     fn description(&self) -> &str {
1650         match *self {
1651             RecvTimeoutError::Timeout => "timed out waiting on channel",
1652             RecvTimeoutError::Disconnected => "channel is empty and sending half is closed",
1653         }
1654     }
1655 }
1656
1657 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1658 impl From<RecvError> for RecvTimeoutError {
1659     /// Converts a `RecvError` into a `RecvTimeoutError`.
1660     ///
1661     /// This conversion always returns `RecvTimeoutError::Disconnected`.
1662     ///
1663     /// No data is allocated on the heap.
1664     fn from(err: RecvError) -> RecvTimeoutError {
1665         match err {
1666             RecvError => RecvTimeoutError::Disconnected,
1667         }
1668     }
1669 }