1 //! Multi-producer, single-consumer FIFO queue communication primitives.
3 //! This module provides message-based communication over channels, concretely
4 //! defined among three types:
10 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
11 //! senders are clone-able (multi-producer) such that many threads can send
12 //! simultaneously to one receiver (single-consumer).
14 //! These channels come in two flavors:
16 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
17 //! will return a `(Sender, Receiver)` tuple where all sends will be
18 //! **asynchronous** (they never block). The channel conceptually has an
21 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
22 //! return a `(SyncSender, Receiver)` tuple where the storage for pending
23 //! messages is a pre-allocated buffer of a fixed size. All sends will be
24 //! **synchronous** by blocking until there is buffer space available. Note
25 //! that a bound of 0 is allowed, causing the channel to become a "rendezvous"
26 //! channel where each sender atomically hands off a message to a receiver.
28 //! [`send`]: Sender::send
32 //! The send and receive operations on channels will all return a [`Result`]
33 //! indicating whether the operation succeeded or not. An unsuccessful operation
34 //! is normally indicative of the other half of a channel having "hung up" by
35 //! being dropped in its corresponding thread.
37 //! Once half of a channel has been deallocated, most operations can no longer
38 //! continue to make progress, so [`Err`] will be returned. Many applications
39 //! will continue to [`unwrap`] the results returned from this module,
40 //! instigating a propagation of failure among threads if one unexpectedly dies.
42 //! [`unwrap`]: Result::unwrap
50 //! use std::sync::mpsc::channel;
52 //! // Create a simple streaming channel
53 //! let (tx, rx) = channel();
54 //! thread::spawn(move|| {
55 //! tx.send(10).unwrap();
57 //! assert_eq!(rx.recv().unwrap(), 10);
64 //! use std::sync::mpsc::channel;
66 //! // Create a shared channel that can be sent along from many threads
67 //! // where tx is the sending half (tx for transmission), and rx is the receiving
68 //! // half (rx for receiving).
69 //! let (tx, rx) = channel();
71 //! let tx = tx.clone();
72 //! thread::spawn(move|| {
73 //! tx.send(i).unwrap();
78 //! let j = rx.recv().unwrap();
79 //! assert!(0 <= j && j < 10);
83 //! Propagating panics:
86 //! use std::sync::mpsc::channel;
88 //! // The call to recv() will return an error because the channel has already
89 //! // hung up (or been deallocated)
90 //! let (tx, rx) = channel::<i32>();
92 //! assert!(rx.recv().is_err());
95 //! Synchronous channels:
99 //! use std::sync::mpsc::sync_channel;
101 //! let (tx, rx) = sync_channel::<i32>(0);
102 //! thread::spawn(move|| {
103 //! // This will wait for the parent thread to start receiving
104 //! tx.send(53).unwrap();
106 //! rx.recv().unwrap();
109 #![stable(feature = "rust1", since = "1.0.0")]
111 #[cfg(all(test, not(target_os = "emscripten")))]
114 #[cfg(all(test, not(target_os = "emscripten")))]
117 // A description of how Rust's channel implementation works
119 // Channels are supposed to be the basic building block for all other
120 // concurrent primitives that are used in Rust. As a result, the channel type
121 // needs to be highly optimized, flexible, and broad enough for use everywhere.
123 // The choice of implementation of all channels is to be built on lock-free data
124 // structures. The channels themselves are then consequently also lock-free data
125 // structures. As always with lock-free code, this is a very "here be dragons"
126 // territory, especially because I'm unaware of any academic papers that have
127 // gone into great length about channels of these flavors.
129 // ## Flavors of channels
131 // From the perspective of a consumer of this library, there is only one flavor
132 // of channel. This channel can be used as a stream and cloned to allow multiple
133 // senders. Under the hood, however, there are actually three flavors of
136 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
137 // case. They contain as few atomics as possible and
138 // involve one and exactly one allocation.
139 // * Streams - these channels are optimized for the non-shared use case. They
140 // use a different concurrent queue that is more tailored for this
141 // use case. The initial allocation of this flavor of channel is not
143 // * Shared - this is the most general form of channel that this module offers,
144 // a channel with multiple senders. This type is as optimized as it
145 // can be, but the previous two types mentioned are much faster for
148 // ## Concurrent queues
150 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
151 // but recv() obviously blocks. This means that under the hood there must be
152 // some shared and concurrent queue holding all of the actual data.
154 // With two flavors of channels, two flavors of queues are also used. We have
155 // chosen to use queues from a well-known author that are abbreviated as SPSC
156 // and MPSC (single producer, single consumer and multiple producer, single
157 // consumer). SPSC queues are used for streams while MPSC queues are used for
160 // ### SPSC optimizations
162 // The SPSC queue found online is essentially a linked list of nodes where one
163 // half of the nodes are the "queue of data" and the other half of nodes are a
164 // cache of unused nodes. The unused nodes are used such that an allocation is
165 // not required on every push() and a free doesn't need to happen on every
168 // As found online, however, the cache of nodes is of an infinite size. This
169 // means that if a channel at one point in its life had 50k items in the queue,
170 // then the queue will always have the capacity for 50k items. I believed that
171 // this was an unnecessary limitation of the implementation, so I have altered
172 // the queue to optionally have a bound on the cache size.
174 // By default, streams will have an unbounded SPSC queue with a small-ish cache
175 // size. The hope is that the cache is still large enough to have very fast
176 // send() operations while not too large such that millions of channels can
179 // ### MPSC optimizations
181 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
182 // a linked list under the hood to earn its unboundedness, but I have not put
183 // forth much effort into having a cache of nodes similar to the SPSC queue.
185 // For now, I believe that this is "ok" because shared channels are not the most
186 // common type, but soon we may wish to revisit this queue choice and determine
187 // another candidate for backend storage of shared channels.
189 // ## Overview of the Implementation
191 // Now that there's a little background on the concurrent queues used, it's
192 // worth going into much more detail about the channels themselves. The basic
193 // pseudocode for a send/recv are:
197 // queue.push(t) return if queue.pop()
198 // if increment() == -1 deschedule {
199 // wakeup() if decrement() > 0
200 // cancel_deschedule()
204 // As mentioned before, there are no locks in this implementation, only atomic
205 // instructions are used.
207 // ### The internal atomic counter
209 // Every channel has a shared counter with each half to keep track of the size
210 // of the queue. This counter is used to abort descheduling by the receiver and
211 // to know when to wake up on the sending side.
213 // As seen in the pseudocode, senders will increment this count and receivers
214 // will decrement the count. The theory behind this is that if a sender sees a
215 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
216 // then it doesn't need to block.
218 // The recv() method has a beginning call to pop(), and if successful, it needs
219 // to decrement the count. It is a crucial implementation detail that this
220 // decrement does *not* happen to the shared counter. If this were the case,
221 // then it would be possible for the counter to be very negative when there were
222 // no receivers waiting, in which case the senders would have to determine when
223 // it was actually appropriate to wake up a receiver.
225 // Instead, the "steal count" is kept track of separately (not atomically
226 // because it's only used by receivers), and then the decrement() call when
227 // descheduling will lump in all of the recent steals into one large decrement.
229 // The implication of this is that if a sender sees a -1 count, then there's
230 // guaranteed to be a waiter waiting!
232 // ## Native Implementation
234 // A major goal of these channels is to work seamlessly on and off the runtime.
235 // All of the previous race conditions have been worded in terms of
236 // scheduler-isms (which is obviously not available without the runtime).
238 // For now, native usage of channels (off the runtime) will fall back onto
239 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
240 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
241 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
242 // condition variable.
246 // Being able to support selection over channels has greatly influenced this
247 // design, and not only does selection need to work inside the runtime, but also
248 // outside the runtime.
250 // The implementation is fairly straightforward. The goal of select() is not to
251 // return some data, but only to return which channel can receive data without
252 // blocking. The implementation is essentially the entire blocking procedure
253 // followed by an increment as soon as its woken up. The cancellation procedure
254 // involves an increment and swapping out of to_wake to acquire ownership of the
255 // thread to unblock.
257 // Sadly this current implementation requires multiple allocations, so I have
258 // seen the throughput of select() be much worse than it should be. I do not
259 // believe that there is anything fundamental that needs to change about these
260 // channels, however, in order to support a more efficient select().
262 // FIXME: Select is now removed, so these factors are ready to be cleaned up!
266 // And now that you've seen all the races that I found and attempted to fix,
267 // here's the code for you to find some more!
269 use crate::cell::UnsafeCell;
273 use crate::sync::Arc;
274 use crate::time::{Duration, Instant};
286 /// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type.
287 /// This half can only be owned by one thread.
289 /// Messages sent to the channel can be retrieved using [`recv`].
291 /// [`recv`]: Receiver::recv
296 /// use std::sync::mpsc::channel;
298 /// use std::time::Duration;
300 /// let (send, recv) = channel();
302 /// thread::spawn(move || {
303 /// send.send("Hello world!").unwrap();
304 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
305 /// send.send("Delayed for 2 seconds").unwrap();
308 /// println!("{}", recv.recv().unwrap()); // Received immediately
309 /// println!("Waiting...");
310 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
312 #[stable(feature = "rust1", since = "1.0.0")]
313 #[cfg_attr(not(test), rustc_diagnostic_item = "Receiver")]
314 pub struct Receiver<T> {
315 inner: UnsafeCell<Flavor<T>>,
318 // The receiver port can be sent from place to place, so long as it
319 // is not used to receive non-sendable things.
320 #[stable(feature = "rust1", since = "1.0.0")]
321 unsafe impl<T: Send> Send for Receiver<T> {}
323 #[stable(feature = "rust1", since = "1.0.0")]
324 impl<T> !Sync for Receiver<T> {}
326 /// An iterator over messages on a [`Receiver`], created by [`iter`].
328 /// This iterator will block whenever [`next`] is called,
329 /// waiting for a new message, and [`None`] will be returned
330 /// when the corresponding channel has hung up.
332 /// [`iter`]: Receiver::iter
333 /// [`next`]: Iterator::next
338 /// use std::sync::mpsc::channel;
341 /// let (send, recv) = channel();
343 /// thread::spawn(move || {
344 /// send.send(1u8).unwrap();
345 /// send.send(2u8).unwrap();
346 /// send.send(3u8).unwrap();
349 /// for x in recv.iter() {
350 /// println!("Got: {}", x);
353 #[stable(feature = "rust1", since = "1.0.0")]
355 pub struct Iter<'a, T: 'a> {
359 /// An iterator that attempts to yield all pending values for a [`Receiver`],
360 /// created by [`try_iter`].
362 /// [`None`] will be returned when there are no pending values remaining or
363 /// if the corresponding channel has hung up.
365 /// This iterator will never block the caller in order to wait for data to
366 /// become available. Instead, it will return [`None`].
368 /// [`try_iter`]: Receiver::try_iter
373 /// use std::sync::mpsc::channel;
375 /// use std::time::Duration;
377 /// let (sender, receiver) = channel();
379 /// // Nothing is in the buffer yet
380 /// assert!(receiver.try_iter().next().is_none());
381 /// println!("Nothing in the buffer...");
383 /// thread::spawn(move || {
384 /// sender.send(1).unwrap();
385 /// sender.send(2).unwrap();
386 /// sender.send(3).unwrap();
389 /// println!("Going to sleep...");
390 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
392 /// for x in receiver.try_iter() {
393 /// println!("Got: {}", x);
396 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
398 pub struct TryIter<'a, T: 'a> {
402 /// An owning iterator over messages on a [`Receiver`],
403 /// created by **Receiver::into_iter**.
405 /// This iterator will block whenever [`next`]
406 /// is called, waiting for a new message, and [`None`] will be
407 /// returned if the corresponding channel has hung up.
409 /// [`next`]: Iterator::next
414 /// use std::sync::mpsc::channel;
417 /// let (send, recv) = channel();
419 /// thread::spawn(move || {
420 /// send.send(1u8).unwrap();
421 /// send.send(2u8).unwrap();
422 /// send.send(3u8).unwrap();
425 /// for x in recv.into_iter() {
426 /// println!("Got: {}", x);
429 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
431 pub struct IntoIter<T> {
435 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
436 /// owned by one thread, but it can be cloned to send to other threads.
438 /// Messages can be sent through this channel with [`send`].
440 /// [`send`]: Sender::send
445 /// use std::sync::mpsc::channel;
448 /// let (sender, receiver) = channel();
449 /// let sender2 = sender.clone();
451 /// // First thread owns sender
452 /// thread::spawn(move || {
453 /// sender.send(1).unwrap();
456 /// // Second thread owns sender2
457 /// thread::spawn(move || {
458 /// sender2.send(2).unwrap();
461 /// let msg = receiver.recv().unwrap();
462 /// let msg2 = receiver.recv().unwrap();
464 /// assert_eq!(3, msg + msg2);
466 #[stable(feature = "rust1", since = "1.0.0")]
467 pub struct Sender<T> {
468 inner: UnsafeCell<Flavor<T>>,
471 // The send port can be sent from place to place, so long as it
472 // is not used to send non-sendable things.
473 #[stable(feature = "rust1", since = "1.0.0")]
474 unsafe impl<T: Send> Send for Sender<T> {}
476 #[stable(feature = "rust1", since = "1.0.0")]
477 impl<T> !Sync for Sender<T> {}
479 /// The sending-half of Rust's synchronous [`sync_channel`] type.
481 /// Messages can be sent through this channel with [`send`] or [`try_send`].
483 /// [`send`] will block if there is no space in the internal buffer.
485 /// [`send`]: SyncSender::send
486 /// [`try_send`]: SyncSender::try_send
491 /// use std::sync::mpsc::sync_channel;
494 /// // Create a sync_channel with buffer size 2
495 /// let (sync_sender, receiver) = sync_channel(2);
496 /// let sync_sender2 = sync_sender.clone();
498 /// // First thread owns sync_sender
499 /// thread::spawn(move || {
500 /// sync_sender.send(1).unwrap();
501 /// sync_sender.send(2).unwrap();
504 /// // Second thread owns sync_sender2
505 /// thread::spawn(move || {
506 /// sync_sender2.send(3).unwrap();
507 /// // thread will now block since the buffer is full
508 /// println!("Thread unblocked!");
513 /// msg = receiver.recv().unwrap();
514 /// println!("message {} received", msg);
516 /// // "Thread unblocked!" will be printed now
518 /// msg = receiver.recv().unwrap();
519 /// println!("message {} received", msg);
521 /// msg = receiver.recv().unwrap();
523 /// println!("message {} received", msg);
525 #[stable(feature = "rust1", since = "1.0.0")]
526 pub struct SyncSender<T> {
527 inner: Arc<sync::Packet<T>>,
530 #[stable(feature = "rust1", since = "1.0.0")]
531 unsafe impl<T: Send> Send for SyncSender<T> {}
533 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
534 /// function on **channel**s.
536 /// A **send** operation can only fail if the receiving end of a channel is
537 /// disconnected, implying that the data could never be received. The error
538 /// contains the data being sent as a payload so it can be recovered.
539 #[stable(feature = "rust1", since = "1.0.0")]
540 #[derive(PartialEq, Eq, Clone, Copy)]
541 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
543 /// An error returned from the [`recv`] function on a [`Receiver`].
545 /// The [`recv`] operation can only fail if the sending half of a
546 /// [`channel`] (or [`sync_channel`]) is disconnected, implying that no further
547 /// messages will ever be received.
549 /// [`recv`]: Receiver::recv
550 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
551 #[stable(feature = "rust1", since = "1.0.0")]
552 pub struct RecvError;
554 /// This enumeration is the list of the possible reasons that [`try_recv`] could
555 /// not return data when called. This can occur with both a [`channel`] and
556 /// a [`sync_channel`].
558 /// [`try_recv`]: Receiver::try_recv
559 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
560 #[stable(feature = "rust1", since = "1.0.0")]
561 pub enum TryRecvError {
562 /// This **channel** is currently empty, but the **Sender**(s) have not yet
563 /// disconnected, so data may yet become available.
564 #[stable(feature = "rust1", since = "1.0.0")]
567 /// The **channel**'s sending half has become disconnected, and there will
568 /// never be any more data received on it.
569 #[stable(feature = "rust1", since = "1.0.0")]
573 /// This enumeration is the list of possible errors that made [`recv_timeout`]
574 /// unable to return data when called. This can occur with both a [`channel`] and
575 /// a [`sync_channel`].
577 /// [`recv_timeout`]: Receiver::recv_timeout
578 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
579 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
580 pub enum RecvTimeoutError {
581 /// This **channel** is currently empty, but the **Sender**(s) have not yet
582 /// disconnected, so data may yet become available.
583 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
585 /// The **channel**'s sending half has become disconnected, and there will
586 /// never be any more data received on it.
587 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
591 /// This enumeration is the list of the possible error outcomes for the
592 /// [`try_send`] method.
594 /// [`try_send`]: SyncSender::try_send
595 #[stable(feature = "rust1", since = "1.0.0")]
596 #[derive(PartialEq, Eq, Clone, Copy)]
597 pub enum TrySendError<T> {
598 /// The data could not be sent on the [`sync_channel`] because it would require that
599 /// the callee block to send the data.
601 /// If this is a buffered channel, then the buffer is full at this time. If
602 /// this is not a buffered channel, then there is no [`Receiver`] available to
603 /// acquire the data.
604 #[stable(feature = "rust1", since = "1.0.0")]
605 Full(#[stable(feature = "rust1", since = "1.0.0")] T),
607 /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
608 /// sent. The data is returned back to the callee in this case.
609 #[stable(feature = "rust1", since = "1.0.0")]
610 Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
614 Oneshot(Arc<oneshot::Packet<T>>),
615 Stream(Arc<stream::Packet<T>>),
616 Shared(Arc<shared::Packet<T>>),
617 Sync(Arc<sync::Packet<T>>),
621 trait UnsafeFlavor<T> {
622 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
623 unsafe fn inner_mut(&self) -> &mut Flavor<T> {
624 &mut *self.inner_unsafe().get()
626 unsafe fn inner(&self) -> &Flavor<T> {
627 &*self.inner_unsafe().get()
630 impl<T> UnsafeFlavor<T> for Sender<T> {
631 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
635 impl<T> UnsafeFlavor<T> for Receiver<T> {
636 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
641 /// Creates a new asynchronous channel, returning the sender/receiver halves.
642 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
643 /// the same order as it was sent, and no [`send`] will block the calling thread
644 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
645 /// block after its buffer limit is reached). [`recv`] will block until a message
648 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
649 /// only one [`Receiver`] is supported.
651 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
652 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
653 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
654 /// return a [`RecvError`].
656 /// [`send`]: Sender::send
657 /// [`recv`]: Receiver::recv
662 /// use std::sync::mpsc::channel;
665 /// let (sender, receiver) = channel();
667 /// // Spawn off an expensive computation
668 /// thread::spawn(move|| {
669 /// # fn expensive_computation() {}
670 /// sender.send(expensive_computation()).unwrap();
673 /// // Do some useful work for awhile
675 /// // Let's see what that answer was
676 /// println!("{:?}", receiver.recv().unwrap());
678 #[stable(feature = "rust1", since = "1.0.0")]
679 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
680 let a = Arc::new(oneshot::Packet::new());
681 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
684 /// Creates a new synchronous, bounded channel.
685 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
686 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
687 /// [`Receiver`] will block until a message becomes available. `sync_channel`
688 /// differs greatly in the semantics of the sender, however.
690 /// This channel has an internal buffer on which messages will be queued.
691 /// `bound` specifies the buffer size. When the internal buffer becomes full,
692 /// future sends will *block* waiting for the buffer to open up. Note that a
693 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
694 /// where each [`send`] will not return until a [`recv`] is paired with it.
696 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
697 /// times, but only one [`Receiver`] is supported.
699 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
700 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
701 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
702 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
704 /// [`send`]: SyncSender::send
705 /// [`recv`]: Receiver::recv
710 /// use std::sync::mpsc::sync_channel;
713 /// let (sender, receiver) = sync_channel(1);
715 /// // this returns immediately
716 /// sender.send(1).unwrap();
718 /// thread::spawn(move|| {
719 /// // this will block until the previous message has been received
720 /// sender.send(2).unwrap();
723 /// assert_eq!(receiver.recv().unwrap(), 1);
724 /// assert_eq!(receiver.recv().unwrap(), 2);
726 #[stable(feature = "rust1", since = "1.0.0")]
727 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
728 let a = Arc::new(sync::Packet::new(bound));
729 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
732 ////////////////////////////////////////////////////////////////////////////////
734 ////////////////////////////////////////////////////////////////////////////////
737 fn new(inner: Flavor<T>) -> Sender<T> {
738 Sender { inner: UnsafeCell::new(inner) }
741 /// Attempts to send a value on this channel, returning it back if it could
744 /// A successful send occurs when it is determined that the other end of
745 /// the channel has not hung up already. An unsuccessful send would be one
746 /// where the corresponding receiver has already been deallocated. Note
747 /// that a return value of [`Err`] means that the data will never be
748 /// received, but a return value of [`Ok`] does *not* mean that the data
749 /// will be received. It is possible for the corresponding receiver to
750 /// hang up immediately after this function returns [`Ok`].
752 /// This method will never block the current thread.
757 /// use std::sync::mpsc::channel;
759 /// let (tx, rx) = channel();
761 /// // This send is always successful
762 /// tx.send(1).unwrap();
764 /// // This send will fail because the receiver is gone
766 /// assert_eq!(tx.send(1).unwrap_err().0, 1);
768 #[stable(feature = "rust1", since = "1.0.0")]
769 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
770 let (new_inner, ret) = match *unsafe { self.inner() } {
771 Flavor::Oneshot(ref p) => {
773 return p.send(t).map_err(SendError);
775 let a = Arc::new(stream::Packet::new());
776 let rx = Receiver::new(Flavor::Stream(a.clone()));
777 match p.upgrade(rx) {
778 oneshot::UpSuccess => {
782 oneshot::UpDisconnected => (a, Err(t)),
783 oneshot::UpWoke(token) => {
784 // This send cannot panic because the thread is
785 // asleep (we're looking at it), so the receiver
787 a.send(t).ok().unwrap();
794 Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
795 Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
796 Flavor::Sync(..) => unreachable!(),
800 let tmp = Sender::new(Flavor::Stream(new_inner));
801 mem::swap(self.inner_mut(), tmp.inner_mut());
803 ret.map_err(SendError)
807 #[stable(feature = "rust1", since = "1.0.0")]
808 impl<T> Clone for Sender<T> {
809 fn clone(&self) -> Sender<T> {
810 let packet = match *unsafe { self.inner() } {
811 Flavor::Oneshot(ref p) => {
812 let a = Arc::new(shared::Packet::new());
814 let guard = a.postinit_lock();
815 let rx = Receiver::new(Flavor::Shared(a.clone()));
816 let sleeper = match p.upgrade(rx) {
817 oneshot::UpSuccess | oneshot::UpDisconnected => None,
818 oneshot::UpWoke(task) => Some(task),
820 a.inherit_blocker(sleeper, guard);
824 Flavor::Stream(ref p) => {
825 let a = Arc::new(shared::Packet::new());
827 let guard = a.postinit_lock();
828 let rx = Receiver::new(Flavor::Shared(a.clone()));
829 let sleeper = match p.upgrade(rx) {
830 stream::UpSuccess | stream::UpDisconnected => None,
831 stream::UpWoke(task) => Some(task),
833 a.inherit_blocker(sleeper, guard);
837 Flavor::Shared(ref p) => {
839 return Sender::new(Flavor::Shared(p.clone()));
841 Flavor::Sync(..) => unreachable!(),
845 let tmp = Sender::new(Flavor::Shared(packet.clone()));
846 mem::swap(self.inner_mut(), tmp.inner_mut());
848 Sender::new(Flavor::Shared(packet))
852 #[stable(feature = "rust1", since = "1.0.0")]
853 impl<T> Drop for Sender<T> {
855 match *unsafe { self.inner() } {
856 Flavor::Oneshot(ref p) => p.drop_chan(),
857 Flavor::Stream(ref p) => p.drop_chan(),
858 Flavor::Shared(ref p) => p.drop_chan(),
859 Flavor::Sync(..) => unreachable!(),
864 #[stable(feature = "mpsc_debug", since = "1.8.0")]
865 impl<T> fmt::Debug for Sender<T> {
866 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
867 f.debug_struct("Sender").finish_non_exhaustive()
871 ////////////////////////////////////////////////////////////////////////////////
873 ////////////////////////////////////////////////////////////////////////////////
875 impl<T> SyncSender<T> {
876 fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
880 /// Sends a value on this synchronous channel.
882 /// This function will *block* until space in the internal buffer becomes
883 /// available or a receiver is available to hand off the message to.
885 /// Note that a successful send does *not* guarantee that the receiver will
886 /// ever see the data if there is a buffer on this channel. Items may be
887 /// enqueued in the internal buffer for the receiver to receive at a later
888 /// time. If the buffer size is 0, however, the channel becomes a rendezvous
889 /// channel and it guarantees that the receiver has indeed received
890 /// the data if this function returns success.
892 /// This function will never panic, but it may return [`Err`] if the
893 /// [`Receiver`] has disconnected and is no longer able to receive
899 /// use std::sync::mpsc::sync_channel;
902 /// // Create a rendezvous sync_channel with buffer size 0
903 /// let (sync_sender, receiver) = sync_channel(0);
905 /// thread::spawn(move || {
906 /// println!("sending message...");
907 /// sync_sender.send(1).unwrap();
908 /// // Thread is now blocked until the message is received
910 /// println!("...message received!");
913 /// let msg = receiver.recv().unwrap();
914 /// assert_eq!(1, msg);
916 #[stable(feature = "rust1", since = "1.0.0")]
917 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
918 self.inner.send(t).map_err(SendError)
921 /// Attempts to send a value on this channel without blocking.
923 /// This method differs from [`send`] by returning immediately if the
924 /// channel's buffer is full or no receiver is waiting to acquire some
925 /// data. Compared with [`send`], this function has two failure cases
926 /// instead of one (one for disconnection, one for a full buffer).
928 /// See [`send`] for notes about guarantees of whether the
929 /// receiver has received the data or not if this function is successful.
931 /// [`send`]: Self::send
936 /// use std::sync::mpsc::sync_channel;
939 /// // Create a sync_channel with buffer size 1
940 /// let (sync_sender, receiver) = sync_channel(1);
941 /// let sync_sender2 = sync_sender.clone();
943 /// // First thread owns sync_sender
944 /// thread::spawn(move || {
945 /// sync_sender.send(1).unwrap();
946 /// sync_sender.send(2).unwrap();
947 /// // Thread blocked
950 /// // Second thread owns sync_sender2
951 /// thread::spawn(move || {
952 /// // This will return an error and send
953 /// // no message if the buffer is full
954 /// let _ = sync_sender2.try_send(3);
958 /// msg = receiver.recv().unwrap();
959 /// println!("message {} received", msg);
961 /// msg = receiver.recv().unwrap();
962 /// println!("message {} received", msg);
964 /// // Third message may have never been sent
965 /// match receiver.try_recv() {
966 /// Ok(msg) => println!("message {} received", msg),
967 /// Err(_) => println!("the third message was never sent"),
970 #[stable(feature = "rust1", since = "1.0.0")]
971 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
972 self.inner.try_send(t)
976 #[stable(feature = "rust1", since = "1.0.0")]
977 impl<T> Clone for SyncSender<T> {
978 fn clone(&self) -> SyncSender<T> {
979 self.inner.clone_chan();
980 SyncSender::new(self.inner.clone())
984 #[stable(feature = "rust1", since = "1.0.0")]
985 impl<T> Drop for SyncSender<T> {
987 self.inner.drop_chan();
991 #[stable(feature = "mpsc_debug", since = "1.8.0")]
992 impl<T> fmt::Debug for SyncSender<T> {
993 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
994 f.debug_struct("SyncSender").finish_non_exhaustive()
998 ////////////////////////////////////////////////////////////////////////////////
1000 ////////////////////////////////////////////////////////////////////////////////
1002 impl<T> Receiver<T> {
1003 fn new(inner: Flavor<T>) -> Receiver<T> {
1004 Receiver { inner: UnsafeCell::new(inner) }
1007 /// Attempts to return a pending value on this receiver without blocking.
1009 /// This method will never block the caller in order to wait for data to
1010 /// become available. Instead, this will always return immediately with a
1011 /// possible option of pending data on the channel.
1013 /// This is useful for a flavor of "optimistic check" before deciding to
1014 /// block on a receiver.
1016 /// Compared with [`recv`], this function has two failure cases instead of one
1017 /// (one for disconnection, one for an empty buffer).
1019 /// [`recv`]: Self::recv
1024 /// use std::sync::mpsc::{Receiver, channel};
1026 /// let (_, receiver): (_, Receiver<i32>) = channel();
1028 /// assert!(receiver.try_recv().is_err());
1030 #[stable(feature = "rust1", since = "1.0.0")]
1031 pub fn try_recv(&self) -> Result<T, TryRecvError> {
1033 let new_port = match *unsafe { self.inner() } {
1034 Flavor::Oneshot(ref p) => match p.try_recv() {
1035 Ok(t) => return Ok(t),
1036 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1037 Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected),
1038 Err(oneshot::Upgraded(rx)) => rx,
1040 Flavor::Stream(ref p) => match p.try_recv() {
1041 Ok(t) => return Ok(t),
1042 Err(stream::Empty) => return Err(TryRecvError::Empty),
1043 Err(stream::Disconnected) => return Err(TryRecvError::Disconnected),
1044 Err(stream::Upgraded(rx)) => rx,
1046 Flavor::Shared(ref p) => match p.try_recv() {
1047 Ok(t) => return Ok(t),
1048 Err(shared::Empty) => return Err(TryRecvError::Empty),
1049 Err(shared::Disconnected) => return Err(TryRecvError::Disconnected),
1051 Flavor::Sync(ref p) => match p.try_recv() {
1052 Ok(t) => return Ok(t),
1053 Err(sync::Empty) => return Err(TryRecvError::Empty),
1054 Err(sync::Disconnected) => return Err(TryRecvError::Disconnected),
1058 mem::swap(self.inner_mut(), new_port.inner_mut());
1063 /// Attempts to wait for a value on this receiver, returning an error if the
1064 /// corresponding channel has hung up.
1066 /// This function will always block the current thread if there is no data
1067 /// available and it's possible for more data to be sent. Once a message is
1068 /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this
1069 /// receiver will wake up and return that message.
1071 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1072 /// this call is blocking, this call will wake up and return [`Err`] to
1073 /// indicate that no more messages can ever be received on this channel.
1074 /// However, since channels are buffered, messages sent before the disconnect
1075 /// will still be properly received.
1080 /// use std::sync::mpsc;
1081 /// use std::thread;
1083 /// let (send, recv) = mpsc::channel();
1084 /// let handle = thread::spawn(move || {
1085 /// send.send(1u8).unwrap();
1088 /// handle.join().unwrap();
1090 /// assert_eq!(Ok(1), recv.recv());
1093 /// Buffering behavior:
1096 /// use std::sync::mpsc;
1097 /// use std::thread;
1098 /// use std::sync::mpsc::RecvError;
1100 /// let (send, recv) = mpsc::channel();
1101 /// let handle = thread::spawn(move || {
1102 /// send.send(1u8).unwrap();
1103 /// send.send(2).unwrap();
1104 /// send.send(3).unwrap();
1108 /// // wait for the thread to join so we ensure the sender is dropped
1109 /// handle.join().unwrap();
1111 /// assert_eq!(Ok(1), recv.recv());
1112 /// assert_eq!(Ok(2), recv.recv());
1113 /// assert_eq!(Ok(3), recv.recv());
1114 /// assert_eq!(Err(RecvError), recv.recv());
1116 #[stable(feature = "rust1", since = "1.0.0")]
1117 pub fn recv(&self) -> Result<T, RecvError> {
1119 let new_port = match *unsafe { self.inner() } {
1120 Flavor::Oneshot(ref p) => match p.recv(None) {
1121 Ok(t) => return Ok(t),
1122 Err(oneshot::Disconnected) => return Err(RecvError),
1123 Err(oneshot::Upgraded(rx)) => rx,
1124 Err(oneshot::Empty) => unreachable!(),
1126 Flavor::Stream(ref p) => match p.recv(None) {
1127 Ok(t) => return Ok(t),
1128 Err(stream::Disconnected) => return Err(RecvError),
1129 Err(stream::Upgraded(rx)) => rx,
1130 Err(stream::Empty) => unreachable!(),
1132 Flavor::Shared(ref p) => match p.recv(None) {
1133 Ok(t) => return Ok(t),
1134 Err(shared::Disconnected) => return Err(RecvError),
1135 Err(shared::Empty) => unreachable!(),
1137 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1140 mem::swap(self.inner_mut(), new_port.inner_mut());
1145 /// Attempts to wait for a value on this receiver, returning an error if the
1146 /// corresponding channel has hung up, or if it waits more than `timeout`.
1148 /// This function will always block the current thread if there is no data
1149 /// available and it's possible for more data to be sent. Once a message is
1150 /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this
1151 /// receiver will wake up and return that message.
1153 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1154 /// this call is blocking, this call will wake up and return [`Err`] to
1155 /// indicate that no more messages can ever be received on this channel.
1156 /// However, since channels are buffered, messages sent before the disconnect
1157 /// will still be properly received.
1161 /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
1162 /// to panic unexpectedly with the following example:
1165 /// use std::sync::mpsc::channel;
1166 /// use std::thread;
1167 /// use std::time::Duration;
1169 /// let (tx, rx) = channel::<String>();
1171 /// thread::spawn(move || {
1172 /// let d = Duration::from_millis(10);
1174 /// println!("recv");
1175 /// let _r = rx.recv_timeout(d);
1179 /// thread::sleep(Duration::from_millis(100));
1180 /// let _c1 = tx.clone();
1182 /// thread::sleep(Duration::from_secs(1));
1185 /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
1189 /// Successfully receiving value before encountering timeout:
1192 /// use std::thread;
1193 /// use std::time::Duration;
1194 /// use std::sync::mpsc;
1196 /// let (send, recv) = mpsc::channel();
1198 /// thread::spawn(move || {
1199 /// send.send('a').unwrap();
1203 /// recv.recv_timeout(Duration::from_millis(400)),
1208 /// Receiving an error upon reaching timeout:
1211 /// use std::thread;
1212 /// use std::time::Duration;
1213 /// use std::sync::mpsc;
1215 /// let (send, recv) = mpsc::channel();
1217 /// thread::spawn(move || {
1218 /// thread::sleep(Duration::from_millis(800));
1219 /// send.send('a').unwrap();
1223 /// recv.recv_timeout(Duration::from_millis(400)),
1224 /// Err(mpsc::RecvTimeoutError::Timeout)
1227 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1228 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1229 // Do an optimistic try_recv to avoid the performance impact of
1230 // Instant::now() in the full-channel case.
1231 match self.try_recv() {
1232 Ok(result) => Ok(result),
1233 Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
1234 Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
1235 Some(deadline) => self.recv_deadline(deadline),
1236 // So far in the future that it's practically the same as waiting indefinitely.
1237 None => self.recv().map_err(RecvTimeoutError::from),
1242 /// Attempts to wait for a value on this receiver, returning an error if the
1243 /// corresponding channel has hung up, or if `deadline` is reached.
1245 /// This function will always block the current thread if there is no data
1246 /// available and it's possible for more data to be sent. Once a message is
1247 /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this
1248 /// receiver will wake up and return that message.
1250 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1251 /// this call is blocking, this call will wake up and return [`Err`] to
1252 /// indicate that no more messages can ever be received on this channel.
1253 /// However, since channels are buffered, messages sent before the disconnect
1254 /// will still be properly received.
1258 /// Successfully receiving value before reaching deadline:
1261 /// #![feature(deadline_api)]
1262 /// use std::thread;
1263 /// use std::time::{Duration, Instant};
1264 /// use std::sync::mpsc;
1266 /// let (send, recv) = mpsc::channel();
1268 /// thread::spawn(move || {
1269 /// send.send('a').unwrap();
1273 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1278 /// Receiving an error upon reaching deadline:
1281 /// #![feature(deadline_api)]
1282 /// use std::thread;
1283 /// use std::time::{Duration, Instant};
1284 /// use std::sync::mpsc;
1286 /// let (send, recv) = mpsc::channel();
1288 /// thread::spawn(move || {
1289 /// thread::sleep(Duration::from_millis(800));
1290 /// send.send('a').unwrap();
1294 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1295 /// Err(mpsc::RecvTimeoutError::Timeout)
1298 #[unstable(feature = "deadline_api", issue = "46316")]
1299 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1300 use self::RecvTimeoutError::*;
1303 let port_or_empty = match *unsafe { self.inner() } {
1304 Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) {
1305 Ok(t) => return Ok(t),
1306 Err(oneshot::Disconnected) => return Err(Disconnected),
1307 Err(oneshot::Upgraded(rx)) => Some(rx),
1308 Err(oneshot::Empty) => None,
1310 Flavor::Stream(ref p) => match p.recv(Some(deadline)) {
1311 Ok(t) => return Ok(t),
1312 Err(stream::Disconnected) => return Err(Disconnected),
1313 Err(stream::Upgraded(rx)) => Some(rx),
1314 Err(stream::Empty) => None,
1316 Flavor::Shared(ref p) => match p.recv(Some(deadline)) {
1317 Ok(t) => return Ok(t),
1318 Err(shared::Disconnected) => return Err(Disconnected),
1319 Err(shared::Empty) => None,
1321 Flavor::Sync(ref p) => match p.recv(Some(deadline)) {
1322 Ok(t) => return Ok(t),
1323 Err(sync::Disconnected) => return Err(Disconnected),
1324 Err(sync::Empty) => None,
1328 if let Some(new_port) = port_or_empty {
1330 mem::swap(self.inner_mut(), new_port.inner_mut());
1334 // If we're already passed the deadline, and we're here without
1335 // data, return a timeout, else try again.
1336 if Instant::now() >= deadline {
1337 return Err(Timeout);
1342 /// Returns an iterator that will block waiting for messages, but never
1343 /// [`panic!`]. It will return [`None`] when the channel has hung up.
1348 /// use std::sync::mpsc::channel;
1349 /// use std::thread;
1351 /// let (send, recv) = channel();
1353 /// thread::spawn(move || {
1354 /// send.send(1).unwrap();
1355 /// send.send(2).unwrap();
1356 /// send.send(3).unwrap();
1359 /// let mut iter = recv.iter();
1360 /// assert_eq!(iter.next(), Some(1));
1361 /// assert_eq!(iter.next(), Some(2));
1362 /// assert_eq!(iter.next(), Some(3));
1363 /// assert_eq!(iter.next(), None);
1365 #[stable(feature = "rust1", since = "1.0.0")]
1366 pub fn iter(&self) -> Iter<'_, T> {
1370 /// Returns an iterator that will attempt to yield all pending values.
1371 /// It will return `None` if there are no more pending values or if the
1372 /// channel has hung up. The iterator will never [`panic!`] or block the
1373 /// user by waiting for values.
1378 /// use std::sync::mpsc::channel;
1379 /// use std::thread;
1380 /// use std::time::Duration;
1382 /// let (sender, receiver) = channel();
1384 /// // nothing is in the buffer yet
1385 /// assert!(receiver.try_iter().next().is_none());
1387 /// thread::spawn(move || {
1388 /// thread::sleep(Duration::from_secs(1));
1389 /// sender.send(1).unwrap();
1390 /// sender.send(2).unwrap();
1391 /// sender.send(3).unwrap();
1394 /// // nothing is in the buffer yet
1395 /// assert!(receiver.try_iter().next().is_none());
1397 /// // block for two seconds
1398 /// thread::sleep(Duration::from_secs(2));
1400 /// let mut iter = receiver.try_iter();
1401 /// assert_eq!(iter.next(), Some(1));
1402 /// assert_eq!(iter.next(), Some(2));
1403 /// assert_eq!(iter.next(), Some(3));
1404 /// assert_eq!(iter.next(), None);
1406 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1407 pub fn try_iter(&self) -> TryIter<'_, T> {
1408 TryIter { rx: self }
1412 #[stable(feature = "rust1", since = "1.0.0")]
1413 impl<'a, T> Iterator for Iter<'a, T> {
1416 fn next(&mut self) -> Option<T> {
1421 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1422 impl<'a, T> Iterator for TryIter<'a, T> {
1425 fn next(&mut self) -> Option<T> {
1426 self.rx.try_recv().ok()
1430 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1431 impl<'a, T> IntoIterator for &'a Receiver<T> {
1433 type IntoIter = Iter<'a, T>;
1435 fn into_iter(self) -> Iter<'a, T> {
1440 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1441 impl<T> Iterator for IntoIter<T> {
1443 fn next(&mut self) -> Option<T> {
1448 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1449 impl<T> IntoIterator for Receiver<T> {
1451 type IntoIter = IntoIter<T>;
1453 fn into_iter(self) -> IntoIter<T> {
1454 IntoIter { rx: self }
1458 #[stable(feature = "rust1", since = "1.0.0")]
1459 impl<T> Drop for Receiver<T> {
1460 fn drop(&mut self) {
1461 match *unsafe { self.inner() } {
1462 Flavor::Oneshot(ref p) => p.drop_port(),
1463 Flavor::Stream(ref p) => p.drop_port(),
1464 Flavor::Shared(ref p) => p.drop_port(),
1465 Flavor::Sync(ref p) => p.drop_port(),
1470 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1471 impl<T> fmt::Debug for Receiver<T> {
1472 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1473 f.debug_struct("Receiver").finish_non_exhaustive()
1477 #[stable(feature = "rust1", since = "1.0.0")]
1478 impl<T> fmt::Debug for SendError<T> {
1479 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1480 "SendError(..)".fmt(f)
1484 #[stable(feature = "rust1", since = "1.0.0")]
1485 impl<T> fmt::Display for SendError<T> {
1486 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1487 "sending on a closed channel".fmt(f)
1491 #[stable(feature = "rust1", since = "1.0.0")]
1492 impl<T: Send> error::Error for SendError<T> {
1493 #[allow(deprecated)]
1494 fn description(&self) -> &str {
1495 "sending on a closed channel"
1499 #[stable(feature = "rust1", since = "1.0.0")]
1500 impl<T> fmt::Debug for TrySendError<T> {
1501 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1503 TrySendError::Full(..) => "Full(..)".fmt(f),
1504 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1509 #[stable(feature = "rust1", since = "1.0.0")]
1510 impl<T> fmt::Display for TrySendError<T> {
1511 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1513 TrySendError::Full(..) => "sending on a full channel".fmt(f),
1514 TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
1519 #[stable(feature = "rust1", since = "1.0.0")]
1520 impl<T: Send> error::Error for TrySendError<T> {
1521 #[allow(deprecated)]
1522 fn description(&self) -> &str {
1524 TrySendError::Full(..) => "sending on a full channel",
1525 TrySendError::Disconnected(..) => "sending on a closed channel",
1530 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1531 impl<T> From<SendError<T>> for TrySendError<T> {
1532 /// Converts a `SendError<T>` into a `TrySendError<T>`.
1534 /// This conversion always returns a `TrySendError::Disconnected` containing the data in the `SendError<T>`.
1536 /// No data is allocated on the heap.
1537 fn from(err: SendError<T>) -> TrySendError<T> {
1539 SendError(t) => TrySendError::Disconnected(t),
1544 #[stable(feature = "rust1", since = "1.0.0")]
1545 impl fmt::Display for RecvError {
1546 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1547 "receiving on a closed channel".fmt(f)
1551 #[stable(feature = "rust1", since = "1.0.0")]
1552 impl error::Error for RecvError {
1553 #[allow(deprecated)]
1554 fn description(&self) -> &str {
1555 "receiving on a closed channel"
1559 #[stable(feature = "rust1", since = "1.0.0")]
1560 impl fmt::Display for TryRecvError {
1561 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1563 TryRecvError::Empty => "receiving on an empty channel".fmt(f),
1564 TryRecvError::Disconnected => "receiving on a closed channel".fmt(f),
1569 #[stable(feature = "rust1", since = "1.0.0")]
1570 impl error::Error for TryRecvError {
1571 #[allow(deprecated)]
1572 fn description(&self) -> &str {
1574 TryRecvError::Empty => "receiving on an empty channel",
1575 TryRecvError::Disconnected => "receiving on a closed channel",
1580 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1581 impl From<RecvError> for TryRecvError {
1582 /// Converts a `RecvError` into a `TryRecvError`.
1584 /// This conversion always returns `TryRecvError::Disconnected`.
1586 /// No data is allocated on the heap.
1587 fn from(err: RecvError) -> TryRecvError {
1589 RecvError => TryRecvError::Disconnected,
1594 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1595 impl fmt::Display for RecvTimeoutError {
1596 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1598 RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f),
1599 RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f),
1604 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1605 impl error::Error for RecvTimeoutError {
1606 #[allow(deprecated)]
1607 fn description(&self) -> &str {
1609 RecvTimeoutError::Timeout => "timed out waiting on channel",
1610 RecvTimeoutError::Disconnected => "channel is empty and sending half is closed",
1615 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1616 impl From<RecvError> for RecvTimeoutError {
1617 /// Converts a `RecvError` into a `RecvTimeoutError`.
1619 /// This conversion always returns `RecvTimeoutError::Disconnected`.
1621 /// No data is allocated on the heap.
1622 fn from(err: RecvError) -> RecvTimeoutError {
1624 RecvError => RecvTimeoutError::Disconnected,