1 //! Multi-producer, single-consumer FIFO queue communication primitives.
3 //! This module provides message-based communication over channels, concretely
4 //! defined among three types:
10 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
11 //! senders are clone-able (multi-producer) such that many threads can send
12 //! simultaneously to one receiver (single-consumer).
14 //! These channels come in two flavors:
16 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
17 //! will return a `(Sender, Receiver)` tuple where all sends will be
18 //! **asynchronous** (they never block). The channel conceptually has an
21 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
22 //! return a `(SyncSender, Receiver)` tuple where the storage for pending
23 //! messages is a pre-allocated buffer of a fixed size. All sends will be
24 //! **synchronous** by blocking until there is buffer space available. Note
25 //! that a bound of 0 is allowed, causing the channel to become a "rendezvous"
26 //! channel where each sender atomically hands off a message to a receiver.
28 //! [`send`]: Sender::send
32 //! The send and receive operations on channels will all return a [`Result`]
33 //! indicating whether the operation succeeded or not. An unsuccessful operation
34 //! is normally indicative of the other half of a channel having "hung up" by
35 //! being dropped in its corresponding thread.
37 //! Once half of a channel has been deallocated, most operations can no longer
38 //! continue to make progress, so [`Err`] will be returned. Many applications
39 //! will continue to [`unwrap`] the results returned from this module,
40 //! instigating a propagation of failure among threads if one unexpectedly dies.
42 //! [`unwrap`]: Result::unwrap
50 //! use std::sync::mpsc::channel;
52 //! // Create a simple streaming channel
53 //! let (tx, rx) = channel();
54 //! thread::spawn(move|| {
55 //! tx.send(10).unwrap();
57 //! assert_eq!(rx.recv().unwrap(), 10);
64 //! use std::sync::mpsc::channel;
66 //! // Create a shared channel that can be sent along from many threads
67 //! // where tx is the sending half (tx for transmission), and rx is the receiving
68 //! // half (rx for receiving).
69 //! let (tx, rx) = channel();
71 //! let tx = tx.clone();
72 //! thread::spawn(move|| {
73 //! tx.send(i).unwrap();
78 //! let j = rx.recv().unwrap();
79 //! assert!(0 <= j && j < 10);
83 //! Propagating panics:
86 //! use std::sync::mpsc::channel;
88 //! // The call to recv() will return an error because the channel has already
89 //! // hung up (or been deallocated)
90 //! let (tx, rx) = channel::<i32>();
92 //! assert!(rx.recv().is_err());
95 //! Synchronous channels:
99 //! use std::sync::mpsc::sync_channel;
101 //! let (tx, rx) = sync_channel::<i32>(0);
102 //! thread::spawn(move|| {
103 //! // This will wait for the parent thread to start receiving
104 //! tx.send(53).unwrap();
106 //! rx.recv().unwrap();
109 //! Unbounded receive loop:
112 //! use std::sync::mpsc::sync_channel;
115 //! let (tx, rx) = sync_channel(3);
118 //! // It would be the same without thread and clone here
119 //! // since there will still be one `tx` left.
120 //! let tx = tx.clone();
121 //! // cloned tx dropped within thread
122 //! thread::spawn(move || tx.send("ok").unwrap());
125 //! // Drop the last sender to stop `rx` waiting for message.
126 //! // The program will not complete if we comment this out.
127 //! // **All** `tx` needs to be dropped for `rx` to have `Err`.
130 //! // Unbounded receiver waiting for all senders to complete.
131 //! while let Ok(msg) = rx.recv() {
132 //! println!("{msg}");
135 //! println!("completed");
138 #![stable(feature = "rust1", since = "1.0.0")]
140 #[cfg(all(test, not(target_os = "emscripten")))]
143 #[cfg(all(test, not(target_os = "emscripten")))]
146 // A description of how Rust's channel implementation works
148 // Channels are supposed to be the basic building block for all other
149 // concurrent primitives that are used in Rust. As a result, the channel type
150 // needs to be highly optimized, flexible, and broad enough for use everywhere.
152 // The choice of implementation of all channels is to be built on lock-free data
153 // structures. The channels themselves are then consequently also lock-free data
154 // structures. As always with lock-free code, this is a very "here be dragons"
155 // territory, especially because I'm unaware of any academic papers that have
156 // gone into great length about channels of these flavors.
158 // ## Flavors of channels
160 // From the perspective of a consumer of this library, there is only one flavor
161 // of channel. This channel can be used as a stream and cloned to allow multiple
162 // senders. Under the hood, however, there are actually three flavors of
165 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
166 // case. They contain as few atomics as possible and
167 // involve one and exactly one allocation.
168 // * Streams - these channels are optimized for the non-shared use case. They
169 // use a different concurrent queue that is more tailored for this
170 // use case. The initial allocation of this flavor of channel is not
172 // * Shared - this is the most general form of channel that this module offers,
173 // a channel with multiple senders. This type is as optimized as it
174 // can be, but the previous two types mentioned are much faster for
177 // ## Concurrent queues
179 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
180 // but recv() obviously blocks. This means that under the hood there must be
181 // some shared and concurrent queue holding all of the actual data.
183 // With two flavors of channels, two flavors of queues are also used. We have
184 // chosen to use queues from a well-known author that are abbreviated as SPSC
185 // and MPSC (single producer, single consumer and multiple producer, single
186 // consumer). SPSC queues are used for streams while MPSC queues are used for
189 // ### SPSC optimizations
191 // The SPSC queue found online is essentially a linked list of nodes where one
192 // half of the nodes are the "queue of data" and the other half of nodes are a
193 // cache of unused nodes. The unused nodes are used such that an allocation is
194 // not required on every push() and a free doesn't need to happen on every
197 // As found online, however, the cache of nodes is of an infinite size. This
198 // means that if a channel at one point in its life had 50k items in the queue,
199 // then the queue will always have the capacity for 50k items. I believed that
200 // this was an unnecessary limitation of the implementation, so I have altered
201 // the queue to optionally have a bound on the cache size.
203 // By default, streams will have an unbounded SPSC queue with a small-ish cache
204 // size. The hope is that the cache is still large enough to have very fast
205 // send() operations while not too large such that millions of channels can
208 // ### MPSC optimizations
210 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
211 // a linked list under the hood to earn its unboundedness, but I have not put
212 // forth much effort into having a cache of nodes similar to the SPSC queue.
214 // For now, I believe that this is "ok" because shared channels are not the most
215 // common type, but soon we may wish to revisit this queue choice and determine
216 // another candidate for backend storage of shared channels.
218 // ## Overview of the Implementation
220 // Now that there's a little background on the concurrent queues used, it's
221 // worth going into much more detail about the channels themselves. The basic
222 // pseudocode for a send/recv are:
226 // queue.push(t) return if queue.pop()
227 // if increment() == -1 deschedule {
228 // wakeup() if decrement() > 0
229 // cancel_deschedule()
233 // As mentioned before, there are no locks in this implementation, only atomic
234 // instructions are used.
236 // ### The internal atomic counter
238 // Every channel has a shared counter with each half to keep track of the size
239 // of the queue. This counter is used to abort descheduling by the receiver and
240 // to know when to wake up on the sending side.
242 // As seen in the pseudocode, senders will increment this count and receivers
243 // will decrement the count. The theory behind this is that if a sender sees a
244 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
245 // then it doesn't need to block.
247 // The recv() method has a beginning call to pop(), and if successful, it needs
248 // to decrement the count. It is a crucial implementation detail that this
249 // decrement does *not* happen to the shared counter. If this were the case,
250 // then it would be possible for the counter to be very negative when there were
251 // no receivers waiting, in which case the senders would have to determine when
252 // it was actually appropriate to wake up a receiver.
254 // Instead, the "steal count" is kept track of separately (not atomically
255 // because it's only used by receivers), and then the decrement() call when
256 // descheduling will lump in all of the recent steals into one large decrement.
258 // The implication of this is that if a sender sees a -1 count, then there's
259 // guaranteed to be a waiter waiting!
261 // ## Native Implementation
263 // A major goal of these channels is to work seamlessly on and off the runtime.
264 // All of the previous race conditions have been worded in terms of
265 // scheduler-isms (which is obviously not available without the runtime).
267 // For now, native usage of channels (off the runtime) will fall back onto
268 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
269 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
270 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
271 // condition variable.
275 // Being able to support selection over channels has greatly influenced this
276 // design, and not only does selection need to work inside the runtime, but also
277 // outside the runtime.
279 // The implementation is fairly straightforward. The goal of select() is not to
280 // return some data, but only to return which channel can receive data without
281 // blocking. The implementation is essentially the entire blocking procedure
282 // followed by an increment as soon as its woken up. The cancellation procedure
283 // involves an increment and swapping out of to_wake to acquire ownership of the
284 // thread to unblock.
286 // Sadly this current implementation requires multiple allocations, so I have
287 // seen the throughput of select() be much worse than it should be. I do not
288 // believe that there is anything fundamental that needs to change about these
289 // channels, however, in order to support a more efficient select().
291 // FIXME: Select is now removed, so these factors are ready to be cleaned up!
295 // And now that you've seen all the races that I found and attempted to fix,
296 // here's the code for you to find some more!
298 use crate::cell::UnsafeCell;
302 use crate::sync::Arc;
303 use crate::time::{Duration, Instant};
315 /// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type.
316 /// This half can only be owned by one thread.
318 /// Messages sent to the channel can be retrieved using [`recv`].
320 /// [`recv`]: Receiver::recv
325 /// use std::sync::mpsc::channel;
327 /// use std::time::Duration;
329 /// let (send, recv) = channel();
331 /// thread::spawn(move || {
332 /// send.send("Hello world!").unwrap();
333 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
334 /// send.send("Delayed for 2 seconds").unwrap();
337 /// println!("{}", recv.recv().unwrap()); // Received immediately
338 /// println!("Waiting...");
339 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
341 #[stable(feature = "rust1", since = "1.0.0")]
342 #[cfg_attr(not(test), rustc_diagnostic_item = "Receiver")]
343 pub struct Receiver<T> {
344 inner: UnsafeCell<Flavor<T>>,
347 // The receiver port can be sent from place to place, so long as it
348 // is not used to receive non-sendable things.
349 #[stable(feature = "rust1", since = "1.0.0")]
350 unsafe impl<T: Send> Send for Receiver<T> {}
352 #[stable(feature = "rust1", since = "1.0.0")]
353 impl<T> !Sync for Receiver<T> {}
355 /// An iterator over messages on a [`Receiver`], created by [`iter`].
357 /// This iterator will block whenever [`next`] is called,
358 /// waiting for a new message, and [`None`] will be returned
359 /// when the corresponding channel has hung up.
361 /// [`iter`]: Receiver::iter
362 /// [`next`]: Iterator::next
367 /// use std::sync::mpsc::channel;
370 /// let (send, recv) = channel();
372 /// thread::spawn(move || {
373 /// send.send(1u8).unwrap();
374 /// send.send(2u8).unwrap();
375 /// send.send(3u8).unwrap();
378 /// for x in recv.iter() {
379 /// println!("Got: {x}");
382 #[stable(feature = "rust1", since = "1.0.0")]
384 pub struct Iter<'a, T: 'a> {
388 /// An iterator that attempts to yield all pending values for a [`Receiver`],
389 /// created by [`try_iter`].
391 /// [`None`] will be returned when there are no pending values remaining or
392 /// if the corresponding channel has hung up.
394 /// This iterator will never block the caller in order to wait for data to
395 /// become available. Instead, it will return [`None`].
397 /// [`try_iter`]: Receiver::try_iter
402 /// use std::sync::mpsc::channel;
404 /// use std::time::Duration;
406 /// let (sender, receiver) = channel();
408 /// // Nothing is in the buffer yet
409 /// assert!(receiver.try_iter().next().is_none());
410 /// println!("Nothing in the buffer...");
412 /// thread::spawn(move || {
413 /// sender.send(1).unwrap();
414 /// sender.send(2).unwrap();
415 /// sender.send(3).unwrap();
418 /// println!("Going to sleep...");
419 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
421 /// for x in receiver.try_iter() {
422 /// println!("Got: {x}");
425 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
427 pub struct TryIter<'a, T: 'a> {
431 /// An owning iterator over messages on a [`Receiver`],
432 /// created by [`into_iter`].
434 /// This iterator will block whenever [`next`]
435 /// is called, waiting for a new message, and [`None`] will be
436 /// returned if the corresponding channel has hung up.
438 /// [`into_iter`]: Receiver::into_iter
439 /// [`next`]: Iterator::next
444 /// use std::sync::mpsc::channel;
447 /// let (send, recv) = channel();
449 /// thread::spawn(move || {
450 /// send.send(1u8).unwrap();
451 /// send.send(2u8).unwrap();
452 /// send.send(3u8).unwrap();
455 /// for x in recv.into_iter() {
456 /// println!("Got: {x}");
459 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
461 pub struct IntoIter<T> {
465 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
466 /// owned by one thread, but it can be cloned to send to other threads.
468 /// Messages can be sent through this channel with [`send`].
470 /// Note: all senders (the original and the clones) need to be dropped for the receiver
471 /// to stop blocking to receive messages with [`Receiver::recv`].
473 /// [`send`]: Sender::send
478 /// use std::sync::mpsc::channel;
481 /// let (sender, receiver) = channel();
482 /// let sender2 = sender.clone();
484 /// // First thread owns sender
485 /// thread::spawn(move || {
486 /// sender.send(1).unwrap();
489 /// // Second thread owns sender2
490 /// thread::spawn(move || {
491 /// sender2.send(2).unwrap();
494 /// let msg = receiver.recv().unwrap();
495 /// let msg2 = receiver.recv().unwrap();
497 /// assert_eq!(3, msg + msg2);
499 #[stable(feature = "rust1", since = "1.0.0")]
500 pub struct Sender<T> {
501 inner: UnsafeCell<Flavor<T>>,
504 // The send port can be sent from place to place, so long as it
505 // is not used to send non-sendable things.
506 #[stable(feature = "rust1", since = "1.0.0")]
507 unsafe impl<T: Send> Send for Sender<T> {}
509 #[stable(feature = "rust1", since = "1.0.0")]
510 impl<T> !Sync for Sender<T> {}
512 /// The sending-half of Rust's synchronous [`sync_channel`] type.
514 /// Messages can be sent through this channel with [`send`] or [`try_send`].
516 /// [`send`] will block if there is no space in the internal buffer.
518 /// [`send`]: SyncSender::send
519 /// [`try_send`]: SyncSender::try_send
524 /// use std::sync::mpsc::sync_channel;
527 /// // Create a sync_channel with buffer size 2
528 /// let (sync_sender, receiver) = sync_channel(2);
529 /// let sync_sender2 = sync_sender.clone();
531 /// // First thread owns sync_sender
532 /// thread::spawn(move || {
533 /// sync_sender.send(1).unwrap();
534 /// sync_sender.send(2).unwrap();
537 /// // Second thread owns sync_sender2
538 /// thread::spawn(move || {
539 /// sync_sender2.send(3).unwrap();
540 /// // thread will now block since the buffer is full
541 /// println!("Thread unblocked!");
546 /// msg = receiver.recv().unwrap();
547 /// println!("message {msg} received");
549 /// // "Thread unblocked!" will be printed now
551 /// msg = receiver.recv().unwrap();
552 /// println!("message {msg} received");
554 /// msg = receiver.recv().unwrap();
556 /// println!("message {msg} received");
558 #[stable(feature = "rust1", since = "1.0.0")]
559 pub struct SyncSender<T> {
560 inner: Arc<sync::Packet<T>>,
563 #[stable(feature = "rust1", since = "1.0.0")]
564 unsafe impl<T: Send> Send for SyncSender<T> {}
566 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
567 /// function on **channel**s.
569 /// A **send** operation can only fail if the receiving end of a channel is
570 /// disconnected, implying that the data could never be received. The error
571 /// contains the data being sent as a payload so it can be recovered.
572 #[stable(feature = "rust1", since = "1.0.0")]
573 #[derive(PartialEq, Eq, Clone, Copy)]
574 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
576 /// An error returned from the [`recv`] function on a [`Receiver`].
578 /// The [`recv`] operation can only fail if the sending half of a
579 /// [`channel`] (or [`sync_channel`]) is disconnected, implying that no further
580 /// messages will ever be received.
582 /// [`recv`]: Receiver::recv
583 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
584 #[stable(feature = "rust1", since = "1.0.0")]
585 pub struct RecvError;
587 /// This enumeration is the list of the possible reasons that [`try_recv`] could
588 /// not return data when called. This can occur with both a [`channel`] and
589 /// a [`sync_channel`].
591 /// [`try_recv`]: Receiver::try_recv
592 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
593 #[stable(feature = "rust1", since = "1.0.0")]
594 pub enum TryRecvError {
595 /// This **channel** is currently empty, but the **Sender**(s) have not yet
596 /// disconnected, so data may yet become available.
597 #[stable(feature = "rust1", since = "1.0.0")]
600 /// The **channel**'s sending half has become disconnected, and there will
601 /// never be any more data received on it.
602 #[stable(feature = "rust1", since = "1.0.0")]
606 /// This enumeration is the list of possible errors that made [`recv_timeout`]
607 /// unable to return data when called. This can occur with both a [`channel`] and
608 /// a [`sync_channel`].
610 /// [`recv_timeout`]: Receiver::recv_timeout
611 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
612 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
613 pub enum RecvTimeoutError {
614 /// This **channel** is currently empty, but the **Sender**(s) have not yet
615 /// disconnected, so data may yet become available.
616 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
618 /// The **channel**'s sending half has become disconnected, and there will
619 /// never be any more data received on it.
620 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
624 /// This enumeration is the list of the possible error outcomes for the
625 /// [`try_send`] method.
627 /// [`try_send`]: SyncSender::try_send
628 #[stable(feature = "rust1", since = "1.0.0")]
629 #[derive(PartialEq, Eq, Clone, Copy)]
630 pub enum TrySendError<T> {
631 /// The data could not be sent on the [`sync_channel`] because it would require that
632 /// the callee block to send the data.
634 /// If this is a buffered channel, then the buffer is full at this time. If
635 /// this is not a buffered channel, then there is no [`Receiver`] available to
636 /// acquire the data.
637 #[stable(feature = "rust1", since = "1.0.0")]
638 Full(#[stable(feature = "rust1", since = "1.0.0")] T),
640 /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
641 /// sent. The data is returned back to the callee in this case.
642 #[stable(feature = "rust1", since = "1.0.0")]
643 Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
647 Oneshot(Arc<oneshot::Packet<T>>),
648 Stream(Arc<stream::Packet<T>>),
649 Shared(Arc<shared::Packet<T>>),
650 Sync(Arc<sync::Packet<T>>),
654 trait UnsafeFlavor<T> {
655 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
656 unsafe fn inner_mut(&self) -> &mut Flavor<T> {
657 &mut *self.inner_unsafe().get()
659 unsafe fn inner(&self) -> &Flavor<T> {
660 &*self.inner_unsafe().get()
663 impl<T> UnsafeFlavor<T> for Sender<T> {
664 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
668 impl<T> UnsafeFlavor<T> for Receiver<T> {
669 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
674 /// Creates a new asynchronous channel, returning the sender/receiver halves.
675 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
676 /// the same order as it was sent, and no [`send`] will block the calling thread
677 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
678 /// block after its buffer limit is reached). [`recv`] will block until a message
679 /// is available while there is at least one [`Sender`] alive (including clones).
681 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
682 /// only one [`Receiver`] is supported.
684 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
685 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
686 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
687 /// return a [`RecvError`].
689 /// [`send`]: Sender::send
690 /// [`recv`]: Receiver::recv
695 /// use std::sync::mpsc::channel;
698 /// let (sender, receiver) = channel();
700 /// // Spawn off an expensive computation
701 /// thread::spawn(move|| {
702 /// # fn expensive_computation() {}
703 /// sender.send(expensive_computation()).unwrap();
706 /// // Do some useful work for awhile
708 /// // Let's see what that answer was
709 /// println!("{:?}", receiver.recv().unwrap());
712 #[stable(feature = "rust1", since = "1.0.0")]
713 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
714 let a = Arc::new(oneshot::Packet::new());
715 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
718 /// Creates a new synchronous, bounded channel.
719 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
720 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
721 /// [`Receiver`] will block until a message becomes available. `sync_channel`
722 /// differs greatly in the semantics of the sender, however.
724 /// This channel has an internal buffer on which messages will be queued.
725 /// `bound` specifies the buffer size. When the internal buffer becomes full,
726 /// future sends will *block* waiting for the buffer to open up. Note that a
727 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
728 /// where each [`send`] will not return until a [`recv`] is paired with it.
730 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
731 /// times, but only one [`Receiver`] is supported.
733 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
734 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
735 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
736 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
738 /// [`send`]: SyncSender::send
739 /// [`recv`]: Receiver::recv
744 /// use std::sync::mpsc::sync_channel;
747 /// let (sender, receiver) = sync_channel(1);
749 /// // this returns immediately
750 /// sender.send(1).unwrap();
752 /// thread::spawn(move|| {
753 /// // this will block until the previous message has been received
754 /// sender.send(2).unwrap();
757 /// assert_eq!(receiver.recv().unwrap(), 1);
758 /// assert_eq!(receiver.recv().unwrap(), 2);
761 #[stable(feature = "rust1", since = "1.0.0")]
762 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
763 let a = Arc::new(sync::Packet::new(bound));
764 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
767 ////////////////////////////////////////////////////////////////////////////////
769 ////////////////////////////////////////////////////////////////////////////////
772 fn new(inner: Flavor<T>) -> Sender<T> {
773 Sender { inner: UnsafeCell::new(inner) }
776 /// Attempts to send a value on this channel, returning it back if it could
779 /// A successful send occurs when it is determined that the other end of
780 /// the channel has not hung up already. An unsuccessful send would be one
781 /// where the corresponding receiver has already been deallocated. Note
782 /// that a return value of [`Err`] means that the data will never be
783 /// received, but a return value of [`Ok`] does *not* mean that the data
784 /// will be received. It is possible for the corresponding receiver to
785 /// hang up immediately after this function returns [`Ok`].
787 /// This method will never block the current thread.
792 /// use std::sync::mpsc::channel;
794 /// let (tx, rx) = channel();
796 /// // This send is always successful
797 /// tx.send(1).unwrap();
799 /// // This send will fail because the receiver is gone
801 /// assert_eq!(tx.send(1).unwrap_err().0, 1);
803 #[stable(feature = "rust1", since = "1.0.0")]
804 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
805 let (new_inner, ret) = match *unsafe { self.inner() } {
806 Flavor::Oneshot(ref p) => {
808 return p.send(t).map_err(SendError);
810 let a = Arc::new(stream::Packet::new());
811 let rx = Receiver::new(Flavor::Stream(a.clone()));
812 match p.upgrade(rx) {
813 oneshot::UpSuccess => {
817 oneshot::UpDisconnected => (a, Err(t)),
818 oneshot::UpWoke(token) => {
819 // This send cannot panic because the thread is
820 // asleep (we're looking at it), so the receiver
822 a.send(t).ok().unwrap();
829 Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
830 Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
831 Flavor::Sync(..) => unreachable!(),
835 let tmp = Sender::new(Flavor::Stream(new_inner));
836 mem::swap(self.inner_mut(), tmp.inner_mut());
838 ret.map_err(SendError)
842 #[stable(feature = "rust1", since = "1.0.0")]
843 impl<T> Clone for Sender<T> {
844 /// Clone a sender to send to other threads.
846 /// Note, be aware of the lifetime of the sender because all senders
847 /// (including the original) need to be dropped in order for
848 /// [`Receiver::recv`] to stop blocking.
849 fn clone(&self) -> Sender<T> {
850 let packet = match *unsafe { self.inner() } {
851 Flavor::Oneshot(ref p) => {
852 let a = Arc::new(shared::Packet::new());
854 let guard = a.postinit_lock();
855 let rx = Receiver::new(Flavor::Shared(a.clone()));
856 let sleeper = match p.upgrade(rx) {
857 oneshot::UpSuccess | oneshot::UpDisconnected => None,
858 oneshot::UpWoke(task) => Some(task),
860 a.inherit_blocker(sleeper, guard);
864 Flavor::Stream(ref p) => {
865 let a = Arc::new(shared::Packet::new());
867 let guard = a.postinit_lock();
868 let rx = Receiver::new(Flavor::Shared(a.clone()));
869 let sleeper = match p.upgrade(rx) {
870 stream::UpSuccess | stream::UpDisconnected => None,
871 stream::UpWoke(task) => Some(task),
873 a.inherit_blocker(sleeper, guard);
877 Flavor::Shared(ref p) => {
879 return Sender::new(Flavor::Shared(p.clone()));
881 Flavor::Sync(..) => unreachable!(),
885 let tmp = Sender::new(Flavor::Shared(packet.clone()));
886 mem::swap(self.inner_mut(), tmp.inner_mut());
888 Sender::new(Flavor::Shared(packet))
892 #[stable(feature = "rust1", since = "1.0.0")]
893 impl<T> Drop for Sender<T> {
895 match *unsafe { self.inner() } {
896 Flavor::Oneshot(ref p) => p.drop_chan(),
897 Flavor::Stream(ref p) => p.drop_chan(),
898 Flavor::Shared(ref p) => p.drop_chan(),
899 Flavor::Sync(..) => unreachable!(),
904 #[stable(feature = "mpsc_debug", since = "1.8.0")]
905 impl<T> fmt::Debug for Sender<T> {
906 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
907 f.debug_struct("Sender").finish_non_exhaustive()
911 ////////////////////////////////////////////////////////////////////////////////
913 ////////////////////////////////////////////////////////////////////////////////
915 impl<T> SyncSender<T> {
916 fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
920 /// Sends a value on this synchronous channel.
922 /// This function will *block* until space in the internal buffer becomes
923 /// available or a receiver is available to hand off the message to.
925 /// Note that a successful send does *not* guarantee that the receiver will
926 /// ever see the data if there is a buffer on this channel. Items may be
927 /// enqueued in the internal buffer for the receiver to receive at a later
928 /// time. If the buffer size is 0, however, the channel becomes a rendezvous
929 /// channel and it guarantees that the receiver has indeed received
930 /// the data if this function returns success.
932 /// This function will never panic, but it may return [`Err`] if the
933 /// [`Receiver`] has disconnected and is no longer able to receive
939 /// use std::sync::mpsc::sync_channel;
942 /// // Create a rendezvous sync_channel with buffer size 0
943 /// let (sync_sender, receiver) = sync_channel(0);
945 /// thread::spawn(move || {
946 /// println!("sending message...");
947 /// sync_sender.send(1).unwrap();
948 /// // Thread is now blocked until the message is received
950 /// println!("...message received!");
953 /// let msg = receiver.recv().unwrap();
954 /// assert_eq!(1, msg);
956 #[stable(feature = "rust1", since = "1.0.0")]
957 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
958 self.inner.send(t).map_err(SendError)
961 /// Attempts to send a value on this channel without blocking.
963 /// This method differs from [`send`] by returning immediately if the
964 /// channel's buffer is full or no receiver is waiting to acquire some
965 /// data. Compared with [`send`], this function has two failure cases
966 /// instead of one (one for disconnection, one for a full buffer).
968 /// See [`send`] for notes about guarantees of whether the
969 /// receiver has received the data or not if this function is successful.
971 /// [`send`]: Self::send
976 /// use std::sync::mpsc::sync_channel;
979 /// // Create a sync_channel with buffer size 1
980 /// let (sync_sender, receiver) = sync_channel(1);
981 /// let sync_sender2 = sync_sender.clone();
983 /// // First thread owns sync_sender
984 /// thread::spawn(move || {
985 /// sync_sender.send(1).unwrap();
986 /// sync_sender.send(2).unwrap();
987 /// // Thread blocked
990 /// // Second thread owns sync_sender2
991 /// thread::spawn(move || {
992 /// // This will return an error and send
993 /// // no message if the buffer is full
994 /// let _ = sync_sender2.try_send(3);
998 /// msg = receiver.recv().unwrap();
999 /// println!("message {msg} received");
1001 /// msg = receiver.recv().unwrap();
1002 /// println!("message {msg} received");
1004 /// // Third message may have never been sent
1005 /// match receiver.try_recv() {
1006 /// Ok(msg) => println!("message {msg} received"),
1007 /// Err(_) => println!("the third message was never sent"),
1010 #[stable(feature = "rust1", since = "1.0.0")]
1011 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
1012 self.inner.try_send(t)
1016 #[stable(feature = "rust1", since = "1.0.0")]
1017 impl<T> Clone for SyncSender<T> {
1018 fn clone(&self) -> SyncSender<T> {
1019 self.inner.clone_chan();
1020 SyncSender::new(self.inner.clone())
1024 #[stable(feature = "rust1", since = "1.0.0")]
1025 impl<T> Drop for SyncSender<T> {
1026 fn drop(&mut self) {
1027 self.inner.drop_chan();
1031 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1032 impl<T> fmt::Debug for SyncSender<T> {
1033 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1034 f.debug_struct("SyncSender").finish_non_exhaustive()
1038 ////////////////////////////////////////////////////////////////////////////////
1040 ////////////////////////////////////////////////////////////////////////////////
1042 impl<T> Receiver<T> {
1043 fn new(inner: Flavor<T>) -> Receiver<T> {
1044 Receiver { inner: UnsafeCell::new(inner) }
1047 /// Attempts to return a pending value on this receiver without blocking.
1049 /// This method will never block the caller in order to wait for data to
1050 /// become available. Instead, this will always return immediately with a
1051 /// possible option of pending data on the channel.
1053 /// This is useful for a flavor of "optimistic check" before deciding to
1054 /// block on a receiver.
1056 /// Compared with [`recv`], this function has two failure cases instead of one
1057 /// (one for disconnection, one for an empty buffer).
1059 /// [`recv`]: Self::recv
1064 /// use std::sync::mpsc::{Receiver, channel};
1066 /// let (_, receiver): (_, Receiver<i32>) = channel();
1068 /// assert!(receiver.try_recv().is_err());
1070 #[stable(feature = "rust1", since = "1.0.0")]
1071 pub fn try_recv(&self) -> Result<T, TryRecvError> {
1073 let new_port = match *unsafe { self.inner() } {
1074 Flavor::Oneshot(ref p) => match p.try_recv() {
1075 Ok(t) => return Ok(t),
1076 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1077 Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected),
1078 Err(oneshot::Upgraded(rx)) => rx,
1080 Flavor::Stream(ref p) => match p.try_recv() {
1081 Ok(t) => return Ok(t),
1082 Err(stream::Empty) => return Err(TryRecvError::Empty),
1083 Err(stream::Disconnected) => return Err(TryRecvError::Disconnected),
1084 Err(stream::Upgraded(rx)) => rx,
1086 Flavor::Shared(ref p) => match p.try_recv() {
1087 Ok(t) => return Ok(t),
1088 Err(shared::Empty) => return Err(TryRecvError::Empty),
1089 Err(shared::Disconnected) => return Err(TryRecvError::Disconnected),
1091 Flavor::Sync(ref p) => match p.try_recv() {
1092 Ok(t) => return Ok(t),
1093 Err(sync::Empty) => return Err(TryRecvError::Empty),
1094 Err(sync::Disconnected) => return Err(TryRecvError::Disconnected),
1098 mem::swap(self.inner_mut(), new_port.inner_mut());
1103 /// Attempts to wait for a value on this receiver, returning an error if the
1104 /// corresponding channel has hung up.
1106 /// This function will always block the current thread if there is no data
1107 /// available and it's possible for more data to be sent (at least one sender
1108 /// still exists). Once a message is sent to the corresponding [`Sender`]
1109 /// (or [`SyncSender`]), this receiver will wake up and return that
1112 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1113 /// this call is blocking, this call will wake up and return [`Err`] to
1114 /// indicate that no more messages can ever be received on this channel.
1115 /// However, since channels are buffered, messages sent before the disconnect
1116 /// will still be properly received.
1121 /// use std::sync::mpsc;
1122 /// use std::thread;
1124 /// let (send, recv) = mpsc::channel();
1125 /// let handle = thread::spawn(move || {
1126 /// send.send(1u8).unwrap();
1129 /// handle.join().unwrap();
1131 /// assert_eq!(Ok(1), recv.recv());
1134 /// Buffering behavior:
1137 /// use std::sync::mpsc;
1138 /// use std::thread;
1139 /// use std::sync::mpsc::RecvError;
1141 /// let (send, recv) = mpsc::channel();
1142 /// let handle = thread::spawn(move || {
1143 /// send.send(1u8).unwrap();
1144 /// send.send(2).unwrap();
1145 /// send.send(3).unwrap();
1149 /// // wait for the thread to join so we ensure the sender is dropped
1150 /// handle.join().unwrap();
1152 /// assert_eq!(Ok(1), recv.recv());
1153 /// assert_eq!(Ok(2), recv.recv());
1154 /// assert_eq!(Ok(3), recv.recv());
1155 /// assert_eq!(Err(RecvError), recv.recv());
1157 #[stable(feature = "rust1", since = "1.0.0")]
1158 pub fn recv(&self) -> Result<T, RecvError> {
1160 let new_port = match *unsafe { self.inner() } {
1161 Flavor::Oneshot(ref p) => match p.recv(None) {
1162 Ok(t) => return Ok(t),
1163 Err(oneshot::Disconnected) => return Err(RecvError),
1164 Err(oneshot::Upgraded(rx)) => rx,
1165 Err(oneshot::Empty) => unreachable!(),
1167 Flavor::Stream(ref p) => match p.recv(None) {
1168 Ok(t) => return Ok(t),
1169 Err(stream::Disconnected) => return Err(RecvError),
1170 Err(stream::Upgraded(rx)) => rx,
1171 Err(stream::Empty) => unreachable!(),
1173 Flavor::Shared(ref p) => match p.recv(None) {
1174 Ok(t) => return Ok(t),
1175 Err(shared::Disconnected) => return Err(RecvError),
1176 Err(shared::Empty) => unreachable!(),
1178 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1181 mem::swap(self.inner_mut(), new_port.inner_mut());
1186 /// Attempts to wait for a value on this receiver, returning an error if the
1187 /// corresponding channel has hung up, or if it waits more than `timeout`.
1189 /// This function will always block the current thread if there is no data
1190 /// available and it's possible for more data to be sent (at least one sender
1191 /// still exists). Once a message is sent to the corresponding [`Sender`]
1192 /// (or [`SyncSender`]), this receiver will wake up and return that
1195 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1196 /// this call is blocking, this call will wake up and return [`Err`] to
1197 /// indicate that no more messages can ever be received on this channel.
1198 /// However, since channels are buffered, messages sent before the disconnect
1199 /// will still be properly received.
1203 /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
1204 /// to panic unexpectedly with the following example:
1207 /// use std::sync::mpsc::channel;
1208 /// use std::thread;
1209 /// use std::time::Duration;
1211 /// let (tx, rx) = channel::<String>();
1213 /// thread::spawn(move || {
1214 /// let d = Duration::from_millis(10);
1216 /// println!("recv");
1217 /// let _r = rx.recv_timeout(d);
1221 /// thread::sleep(Duration::from_millis(100));
1222 /// let _c1 = tx.clone();
1224 /// thread::sleep(Duration::from_secs(1));
1227 /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
1231 /// Successfully receiving value before encountering timeout:
1234 /// use std::thread;
1235 /// use std::time::Duration;
1236 /// use std::sync::mpsc;
1238 /// let (send, recv) = mpsc::channel();
1240 /// thread::spawn(move || {
1241 /// send.send('a').unwrap();
1245 /// recv.recv_timeout(Duration::from_millis(400)),
1250 /// Receiving an error upon reaching timeout:
1253 /// use std::thread;
1254 /// use std::time::Duration;
1255 /// use std::sync::mpsc;
1257 /// let (send, recv) = mpsc::channel();
1259 /// thread::spawn(move || {
1260 /// thread::sleep(Duration::from_millis(800));
1261 /// send.send('a').unwrap();
1265 /// recv.recv_timeout(Duration::from_millis(400)),
1266 /// Err(mpsc::RecvTimeoutError::Timeout)
1269 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1270 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1271 // Do an optimistic try_recv to avoid the performance impact of
1272 // Instant::now() in the full-channel case.
1273 match self.try_recv() {
1274 Ok(result) => Ok(result),
1275 Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
1276 Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
1277 Some(deadline) => self.recv_deadline(deadline),
1278 // So far in the future that it's practically the same as waiting indefinitely.
1279 None => self.recv().map_err(RecvTimeoutError::from),
1284 /// Attempts to wait for a value on this receiver, returning an error if the
1285 /// corresponding channel has hung up, or if `deadline` is reached.
1287 /// This function will always block the current thread if there is no data
1288 /// available and it's possible for more data to be sent. Once a message is
1289 /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this
1290 /// receiver will wake up and return that message.
1292 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1293 /// this call is blocking, this call will wake up and return [`Err`] to
1294 /// indicate that no more messages can ever be received on this channel.
1295 /// However, since channels are buffered, messages sent before the disconnect
1296 /// will still be properly received.
1300 /// Successfully receiving value before reaching deadline:
1303 /// #![feature(deadline_api)]
1304 /// use std::thread;
1305 /// use std::time::{Duration, Instant};
1306 /// use std::sync::mpsc;
1308 /// let (send, recv) = mpsc::channel();
1310 /// thread::spawn(move || {
1311 /// send.send('a').unwrap();
1315 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1320 /// Receiving an error upon reaching deadline:
1323 /// #![feature(deadline_api)]
1324 /// use std::thread;
1325 /// use std::time::{Duration, Instant};
1326 /// use std::sync::mpsc;
1328 /// let (send, recv) = mpsc::channel();
1330 /// thread::spawn(move || {
1331 /// thread::sleep(Duration::from_millis(800));
1332 /// send.send('a').unwrap();
1336 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1337 /// Err(mpsc::RecvTimeoutError::Timeout)
1340 #[unstable(feature = "deadline_api", issue = "46316")]
1341 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1342 use self::RecvTimeoutError::*;
1345 let port_or_empty = match *unsafe { self.inner() } {
1346 Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) {
1347 Ok(t) => return Ok(t),
1348 Err(oneshot::Disconnected) => return Err(Disconnected),
1349 Err(oneshot::Upgraded(rx)) => Some(rx),
1350 Err(oneshot::Empty) => None,
1352 Flavor::Stream(ref p) => match p.recv(Some(deadline)) {
1353 Ok(t) => return Ok(t),
1354 Err(stream::Disconnected) => return Err(Disconnected),
1355 Err(stream::Upgraded(rx)) => Some(rx),
1356 Err(stream::Empty) => None,
1358 Flavor::Shared(ref p) => match p.recv(Some(deadline)) {
1359 Ok(t) => return Ok(t),
1360 Err(shared::Disconnected) => return Err(Disconnected),
1361 Err(shared::Empty) => None,
1363 Flavor::Sync(ref p) => match p.recv(Some(deadline)) {
1364 Ok(t) => return Ok(t),
1365 Err(sync::Disconnected) => return Err(Disconnected),
1366 Err(sync::Empty) => None,
1370 if let Some(new_port) = port_or_empty {
1372 mem::swap(self.inner_mut(), new_port.inner_mut());
1376 // If we're already passed the deadline, and we're here without
1377 // data, return a timeout, else try again.
1378 if Instant::now() >= deadline {
1379 return Err(Timeout);
1384 /// Returns an iterator that will block waiting for messages, but never
1385 /// [`panic!`]. It will return [`None`] when the channel has hung up.
1390 /// use std::sync::mpsc::channel;
1391 /// use std::thread;
1393 /// let (send, recv) = channel();
1395 /// thread::spawn(move || {
1396 /// send.send(1).unwrap();
1397 /// send.send(2).unwrap();
1398 /// send.send(3).unwrap();
1401 /// let mut iter = recv.iter();
1402 /// assert_eq!(iter.next(), Some(1));
1403 /// assert_eq!(iter.next(), Some(2));
1404 /// assert_eq!(iter.next(), Some(3));
1405 /// assert_eq!(iter.next(), None);
1407 #[stable(feature = "rust1", since = "1.0.0")]
1408 pub fn iter(&self) -> Iter<'_, T> {
1412 /// Returns an iterator that will attempt to yield all pending values.
1413 /// It will return `None` if there are no more pending values or if the
1414 /// channel has hung up. The iterator will never [`panic!`] or block the
1415 /// user by waiting for values.
1420 /// use std::sync::mpsc::channel;
1421 /// use std::thread;
1422 /// use std::time::Duration;
1424 /// let (sender, receiver) = channel();
1426 /// // nothing is in the buffer yet
1427 /// assert!(receiver.try_iter().next().is_none());
1429 /// thread::spawn(move || {
1430 /// thread::sleep(Duration::from_secs(1));
1431 /// sender.send(1).unwrap();
1432 /// sender.send(2).unwrap();
1433 /// sender.send(3).unwrap();
1436 /// // nothing is in the buffer yet
1437 /// assert!(receiver.try_iter().next().is_none());
1439 /// // block for two seconds
1440 /// thread::sleep(Duration::from_secs(2));
1442 /// let mut iter = receiver.try_iter();
1443 /// assert_eq!(iter.next(), Some(1));
1444 /// assert_eq!(iter.next(), Some(2));
1445 /// assert_eq!(iter.next(), Some(3));
1446 /// assert_eq!(iter.next(), None);
1448 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1449 pub fn try_iter(&self) -> TryIter<'_, T> {
1450 TryIter { rx: self }
1454 #[stable(feature = "rust1", since = "1.0.0")]
1455 impl<'a, T> Iterator for Iter<'a, T> {
1458 fn next(&mut self) -> Option<T> {
1463 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1464 impl<'a, T> Iterator for TryIter<'a, T> {
1467 fn next(&mut self) -> Option<T> {
1468 self.rx.try_recv().ok()
1472 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1473 impl<'a, T> IntoIterator for &'a Receiver<T> {
1475 type IntoIter = Iter<'a, T>;
1477 fn into_iter(self) -> Iter<'a, T> {
1482 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1483 impl<T> Iterator for IntoIter<T> {
1485 fn next(&mut self) -> Option<T> {
1490 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1491 impl<T> IntoIterator for Receiver<T> {
1493 type IntoIter = IntoIter<T>;
1495 fn into_iter(self) -> IntoIter<T> {
1496 IntoIter { rx: self }
1500 #[stable(feature = "rust1", since = "1.0.0")]
1501 impl<T> Drop for Receiver<T> {
1502 fn drop(&mut self) {
1503 match *unsafe { self.inner() } {
1504 Flavor::Oneshot(ref p) => p.drop_port(),
1505 Flavor::Stream(ref p) => p.drop_port(),
1506 Flavor::Shared(ref p) => p.drop_port(),
1507 Flavor::Sync(ref p) => p.drop_port(),
1512 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1513 impl<T> fmt::Debug for Receiver<T> {
1514 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1515 f.debug_struct("Receiver").finish_non_exhaustive()
1519 #[stable(feature = "rust1", since = "1.0.0")]
1520 impl<T> fmt::Debug for SendError<T> {
1521 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1522 f.debug_struct("SendError").finish_non_exhaustive()
1526 #[stable(feature = "rust1", since = "1.0.0")]
1527 impl<T> fmt::Display for SendError<T> {
1528 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1529 "sending on a closed channel".fmt(f)
1533 #[stable(feature = "rust1", since = "1.0.0")]
1534 impl<T: Send> error::Error for SendError<T> {
1535 #[allow(deprecated)]
1536 fn description(&self) -> &str {
1537 "sending on a closed channel"
1541 #[stable(feature = "rust1", since = "1.0.0")]
1542 impl<T> fmt::Debug for TrySendError<T> {
1543 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1545 TrySendError::Full(..) => "Full(..)".fmt(f),
1546 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1551 #[stable(feature = "rust1", since = "1.0.0")]
1552 impl<T> fmt::Display for TrySendError<T> {
1553 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1555 TrySendError::Full(..) => "sending on a full channel".fmt(f),
1556 TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
1561 #[stable(feature = "rust1", since = "1.0.0")]
1562 impl<T: Send> error::Error for TrySendError<T> {
1563 #[allow(deprecated)]
1564 fn description(&self) -> &str {
1566 TrySendError::Full(..) => "sending on a full channel",
1567 TrySendError::Disconnected(..) => "sending on a closed channel",
1572 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1573 impl<T> From<SendError<T>> for TrySendError<T> {
1574 /// Converts a `SendError<T>` into a `TrySendError<T>`.
1576 /// This conversion always returns a `TrySendError::Disconnected` containing the data in the `SendError<T>`.
1578 /// No data is allocated on the heap.
1579 fn from(err: SendError<T>) -> TrySendError<T> {
1581 SendError(t) => TrySendError::Disconnected(t),
1586 #[stable(feature = "rust1", since = "1.0.0")]
1587 impl fmt::Display for RecvError {
1588 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1589 "receiving on a closed channel".fmt(f)
1593 #[stable(feature = "rust1", since = "1.0.0")]
1594 impl error::Error for RecvError {
1595 #[allow(deprecated)]
1596 fn description(&self) -> &str {
1597 "receiving on a closed channel"
1601 #[stable(feature = "rust1", since = "1.0.0")]
1602 impl fmt::Display for TryRecvError {
1603 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1605 TryRecvError::Empty => "receiving on an empty channel".fmt(f),
1606 TryRecvError::Disconnected => "receiving on a closed channel".fmt(f),
1611 #[stable(feature = "rust1", since = "1.0.0")]
1612 impl error::Error for TryRecvError {
1613 #[allow(deprecated)]
1614 fn description(&self) -> &str {
1616 TryRecvError::Empty => "receiving on an empty channel",
1617 TryRecvError::Disconnected => "receiving on a closed channel",
1622 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1623 impl From<RecvError> for TryRecvError {
1624 /// Converts a `RecvError` into a `TryRecvError`.
1626 /// This conversion always returns `TryRecvError::Disconnected`.
1628 /// No data is allocated on the heap.
1629 fn from(err: RecvError) -> TryRecvError {
1631 RecvError => TryRecvError::Disconnected,
1636 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1637 impl fmt::Display for RecvTimeoutError {
1638 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1640 RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f),
1641 RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f),
1646 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1647 impl error::Error for RecvTimeoutError {
1648 #[allow(deprecated)]
1649 fn description(&self) -> &str {
1651 RecvTimeoutError::Timeout => "timed out waiting on channel",
1652 RecvTimeoutError::Disconnected => "channel is empty and sending half is closed",
1657 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1658 impl From<RecvError> for RecvTimeoutError {
1659 /// Converts a `RecvError` into a `RecvTimeoutError`.
1661 /// This conversion always returns `RecvTimeoutError::Disconnected`.
1663 /// No data is allocated on the heap.
1664 fn from(err: RecvError) -> RecvTimeoutError {
1666 RecvError => RecvTimeoutError::Disconnected,