1 // ignore-tidy-filelength
3 //! Multi-producer, single-consumer FIFO queue communication primitives.
5 //! This module provides message-based communication over channels, concretely
6 //! defined among three types:
12 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
13 //! senders are clone-able (multi-producer) such that many threads can send
14 //! simultaneously to one receiver (single-consumer).
16 //! These channels come in two flavors:
18 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
19 //! will return a `(Sender, Receiver)` tuple where all sends will be
20 //! **asynchronous** (they never block). The channel conceptually has an
23 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
24 //! return a `(SyncSender, Receiver)` tuple where the storage for pending
25 //! messages is a pre-allocated buffer of a fixed size. All sends will be
26 //! **synchronous** by blocking until there is buffer space available. Note
27 //! that a bound of 0 is allowed, causing the channel to become a "rendezvous"
28 //! channel where each sender atomically hands off a message to a receiver.
30 //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
31 //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
32 //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
33 //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
34 //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
35 //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
39 //! The send and receive operations on channels will all return a [`Result`]
40 //! indicating whether the operation succeeded or not. An unsuccessful operation
41 //! is normally indicative of the other half of a channel having "hung up" by
42 //! being dropped in its corresponding thread.
44 //! Once half of a channel has been deallocated, most operations can no longer
45 //! continue to make progress, so [`Err`] will be returned. Many applications
46 //! will continue to [`unwrap`] the results returned from this module,
47 //! instigating a propagation of failure among threads if one unexpectedly dies.
49 //! [`Result`]: ../../../std/result/enum.Result.html
50 //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
51 //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
59 //! use std::sync::mpsc::channel;
61 //! // Create a simple streaming channel
62 //! let (tx, rx) = channel();
63 //! thread::spawn(move|| {
64 //! tx.send(10).unwrap();
66 //! assert_eq!(rx.recv().unwrap(), 10);
73 //! use std::sync::mpsc::channel;
75 //! // Create a shared channel that can be sent along from many threads
76 //! // where tx is the sending half (tx for transmission), and rx is the receiving
77 //! // half (rx for receiving).
78 //! let (tx, rx) = channel();
80 //! let tx = tx.clone();
81 //! thread::spawn(move|| {
82 //! tx.send(i).unwrap();
87 //! let j = rx.recv().unwrap();
88 //! assert!(0 <= j && j < 10);
92 //! Propagating panics:
95 //! use std::sync::mpsc::channel;
97 //! // The call to recv() will return an error because the channel has already
98 //! // hung up (or been deallocated)
99 //! let (tx, rx) = channel::<i32>();
101 //! assert!(rx.recv().is_err());
104 //! Synchronous channels:
108 //! use std::sync::mpsc::sync_channel;
110 //! let (tx, rx) = sync_channel::<i32>(0);
111 //! thread::spawn(move|| {
112 //! // This will wait for the parent thread to start receiving
113 //! tx.send(53).unwrap();
115 //! rx.recv().unwrap();
118 #![stable(feature = "rust1", since = "1.0.0")]
120 // A description of how Rust's channel implementation works
122 // Channels are supposed to be the basic building block for all other
123 // concurrent primitives that are used in Rust. As a result, the channel type
124 // needs to be highly optimized, flexible, and broad enough for use everywhere.
126 // The choice of implementation of all channels is to be built on lock-free data
127 // structures. The channels themselves are then consequently also lock-free data
128 // structures. As always with lock-free code, this is a very "here be dragons"
129 // territory, especially because I'm unaware of any academic papers that have
130 // gone into great length about channels of these flavors.
132 // ## Flavors of channels
134 // From the perspective of a consumer of this library, there is only one flavor
135 // of channel. This channel can be used as a stream and cloned to allow multiple
136 // senders. Under the hood, however, there are actually three flavors of
139 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
140 // case. They contain as few atomics as possible and
141 // involve one and exactly one allocation.
142 // * Streams - these channels are optimized for the non-shared use case. They
143 // use a different concurrent queue that is more tailored for this
144 // use case. The initial allocation of this flavor of channel is not
146 // * Shared - this is the most general form of channel that this module offers,
147 // a channel with multiple senders. This type is as optimized as it
148 // can be, but the previous two types mentioned are much faster for
151 // ## Concurrent queues
153 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
154 // but recv() obviously blocks. This means that under the hood there must be
155 // some shared and concurrent queue holding all of the actual data.
157 // With two flavors of channels, two flavors of queues are also used. We have
158 // chosen to use queues from a well-known author that are abbreviated as SPSC
159 // and MPSC (single producer, single consumer and multiple producer, single
160 // consumer). SPSC queues are used for streams while MPSC queues are used for
163 // ### SPSC optimizations
165 // The SPSC queue found online is essentially a linked list of nodes where one
166 // half of the nodes are the "queue of data" and the other half of nodes are a
167 // cache of unused nodes. The unused nodes are used such that an allocation is
168 // not required on every push() and a free doesn't need to happen on every
171 // As found online, however, the cache of nodes is of an infinite size. This
172 // means that if a channel at one point in its life had 50k items in the queue,
173 // then the queue will always have the capacity for 50k items. I believed that
174 // this was an unnecessary limitation of the implementation, so I have altered
175 // the queue to optionally have a bound on the cache size.
177 // By default, streams will have an unbounded SPSC queue with a small-ish cache
178 // size. The hope is that the cache is still large enough to have very fast
179 // send() operations while not too large such that millions of channels can
182 // ### MPSC optimizations
184 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
185 // a linked list under the hood to earn its unboundedness, but I have not put
186 // forth much effort into having a cache of nodes similar to the SPSC queue.
188 // For now, I believe that this is "ok" because shared channels are not the most
189 // common type, but soon we may wish to revisit this queue choice and determine
190 // another candidate for backend storage of shared channels.
192 // ## Overview of the Implementation
194 // Now that there's a little background on the concurrent queues used, it's
195 // worth going into much more detail about the channels themselves. The basic
196 // pseudocode for a send/recv are:
200 // queue.push(t) return if queue.pop()
201 // if increment() == -1 deschedule {
202 // wakeup() if decrement() > 0
203 // cancel_deschedule()
207 // As mentioned before, there are no locks in this implementation, only atomic
208 // instructions are used.
210 // ### The internal atomic counter
212 // Every channel has a shared counter with each half to keep track of the size
213 // of the queue. This counter is used to abort descheduling by the receiver and
214 // to know when to wake up on the sending side.
216 // As seen in the pseudocode, senders will increment this count and receivers
217 // will decrement the count. The theory behind this is that if a sender sees a
218 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
219 // then it doesn't need to block.
221 // The recv() method has a beginning call to pop(), and if successful, it needs
222 // to decrement the count. It is a crucial implementation detail that this
223 // decrement does *not* happen to the shared counter. If this were the case,
224 // then it would be possible for the counter to be very negative when there were
225 // no receivers waiting, in which case the senders would have to determine when
226 // it was actually appropriate to wake up a receiver.
228 // Instead, the "steal count" is kept track of separately (not atomically
229 // because it's only used by receivers), and then the decrement() call when
230 // descheduling will lump in all of the recent steals into one large decrement.
232 // The implication of this is that if a sender sees a -1 count, then there's
233 // guaranteed to be a waiter waiting!
235 // ## Native Implementation
237 // A major goal of these channels is to work seamlessly on and off the runtime.
238 // All of the previous race conditions have been worded in terms of
239 // scheduler-isms (which is obviously not available without the runtime).
241 // For now, native usage of channels (off the runtime) will fall back onto
242 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
243 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
244 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
245 // condition variable.
249 // Being able to support selection over channels has greatly influenced this
250 // design, and not only does selection need to work inside the runtime, but also
251 // outside the runtime.
253 // The implementation is fairly straightforward. The goal of select() is not to
254 // return some data, but only to return which channel can receive data without
255 // blocking. The implementation is essentially the entire blocking procedure
256 // followed by an increment as soon as its woken up. The cancellation procedure
257 // involves an increment and swapping out of to_wake to acquire ownership of the
258 // thread to unblock.
260 // Sadly this current implementation requires multiple allocations, so I have
261 // seen the throughput of select() be much worse than it should be. I do not
262 // believe that there is anything fundamental that needs to change about these
263 // channels, however, in order to support a more efficient select().
265 // FIXME: Select is now removed, so these factors are ready to be cleaned up!
269 // And now that you've seen all the races that I found and attempted to fix,
270 // here's the code for you to find some more!
272 use crate::cell::UnsafeCell;
276 use crate::sync::Arc;
277 use crate::time::{Duration, Instant};
289 /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
290 /// This half can only be owned by one thread.
292 /// Messages sent to the channel can be retrieved using [`recv`].
294 /// [`channel`]: fn.channel.html
295 /// [`sync_channel`]: fn.sync_channel.html
296 /// [`recv`]: struct.Receiver.html#method.recv
301 /// use std::sync::mpsc::channel;
303 /// use std::time::Duration;
305 /// let (send, recv) = channel();
307 /// thread::spawn(move || {
308 /// send.send("Hello world!").unwrap();
309 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
310 /// send.send("Delayed for 2 seconds").unwrap();
313 /// println!("{}", recv.recv().unwrap()); // Received immediately
314 /// println!("Waiting...");
315 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
317 #[stable(feature = "rust1", since = "1.0.0")]
318 pub struct Receiver<T> {
319 inner: UnsafeCell<Flavor<T>>,
322 // The receiver port can be sent from place to place, so long as it
323 // is not used to receive non-sendable things.
324 #[stable(feature = "rust1", since = "1.0.0")]
325 unsafe impl<T: Send> Send for Receiver<T> {}
327 #[stable(feature = "rust1", since = "1.0.0")]
328 impl<T> !Sync for Receiver<T> {}
330 /// An iterator over messages on a [`Receiver`], created by [`iter`].
332 /// This iterator will block whenever [`next`] is called,
333 /// waiting for a new message, and [`None`] will be returned
334 /// when the corresponding channel has hung up.
336 /// [`iter`]: struct.Receiver.html#method.iter
337 /// [`Receiver`]: struct.Receiver.html
338 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
339 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
344 /// use std::sync::mpsc::channel;
347 /// let (send, recv) = channel();
349 /// thread::spawn(move || {
350 /// send.send(1u8).unwrap();
351 /// send.send(2u8).unwrap();
352 /// send.send(3u8).unwrap();
355 /// for x in recv.iter() {
356 /// println!("Got: {}", x);
359 #[stable(feature = "rust1", since = "1.0.0")]
361 pub struct Iter<'a, T: 'a> {
365 /// An iterator that attempts to yield all pending values for a [`Receiver`],
366 /// created by [`try_iter`].
368 /// [`None`] will be returned when there are no pending values remaining or
369 /// if the corresponding channel has hung up.
371 /// This iterator will never block the caller in order to wait for data to
372 /// become available. Instead, it will return [`None`].
374 /// [`Receiver`]: struct.Receiver.html
375 /// [`try_iter`]: struct.Receiver.html#method.try_iter
376 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
381 /// use std::sync::mpsc::channel;
383 /// use std::time::Duration;
385 /// let (sender, receiver) = channel();
387 /// // Nothing is in the buffer yet
388 /// assert!(receiver.try_iter().next().is_none());
389 /// println!("Nothing in the buffer...");
391 /// thread::spawn(move || {
392 /// sender.send(1).unwrap();
393 /// sender.send(2).unwrap();
394 /// sender.send(3).unwrap();
397 /// println!("Going to sleep...");
398 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
400 /// for x in receiver.try_iter() {
401 /// println!("Got: {}", x);
404 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
406 pub struct TryIter<'a, T: 'a> {
410 /// An owning iterator over messages on a [`Receiver`],
411 /// created by **Receiver::into_iter**.
413 /// This iterator will block whenever [`next`]
414 /// is called, waiting for a new message, and [`None`] will be
415 /// returned if the corresponding channel has hung up.
417 /// [`Receiver`]: struct.Receiver.html
418 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
419 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
424 /// use std::sync::mpsc::channel;
427 /// let (send, recv) = channel();
429 /// thread::spawn(move || {
430 /// send.send(1u8).unwrap();
431 /// send.send(2u8).unwrap();
432 /// send.send(3u8).unwrap();
435 /// for x in recv.into_iter() {
436 /// println!("Got: {}", x);
439 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
441 pub struct IntoIter<T> {
445 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
446 /// owned by one thread, but it can be cloned to send to other threads.
448 /// Messages can be sent through this channel with [`send`].
450 /// [`channel`]: fn.channel.html
451 /// [`send`]: struct.Sender.html#method.send
456 /// use std::sync::mpsc::channel;
459 /// let (sender, receiver) = channel();
460 /// let sender2 = sender.clone();
462 /// // First thread owns sender
463 /// thread::spawn(move || {
464 /// sender.send(1).unwrap();
467 /// // Second thread owns sender2
468 /// thread::spawn(move || {
469 /// sender2.send(2).unwrap();
472 /// let msg = receiver.recv().unwrap();
473 /// let msg2 = receiver.recv().unwrap();
475 /// assert_eq!(3, msg + msg2);
477 #[stable(feature = "rust1", since = "1.0.0")]
478 pub struct Sender<T> {
479 inner: UnsafeCell<Flavor<T>>,
482 // The send port can be sent from place to place, so long as it
483 // is not used to send non-sendable things.
484 #[stable(feature = "rust1", since = "1.0.0")]
485 unsafe impl<T: Send> Send for Sender<T> {}
487 #[stable(feature = "rust1", since = "1.0.0")]
488 impl<T> !Sync for Sender<T> {}
490 /// The sending-half of Rust's synchronous [`sync_channel`] type.
492 /// Messages can be sent through this channel with [`send`] or [`try_send`].
494 /// [`send`] will block if there is no space in the internal buffer.
496 /// [`sync_channel`]: fn.sync_channel.html
497 /// [`send`]: struct.SyncSender.html#method.send
498 /// [`try_send`]: struct.SyncSender.html#method.try_send
503 /// use std::sync::mpsc::sync_channel;
506 /// // Create a sync_channel with buffer size 2
507 /// let (sync_sender, receiver) = sync_channel(2);
508 /// let sync_sender2 = sync_sender.clone();
510 /// // First thread owns sync_sender
511 /// thread::spawn(move || {
512 /// sync_sender.send(1).unwrap();
513 /// sync_sender.send(2).unwrap();
516 /// // Second thread owns sync_sender2
517 /// thread::spawn(move || {
518 /// sync_sender2.send(3).unwrap();
519 /// // thread will now block since the buffer is full
520 /// println!("Thread unblocked!");
525 /// msg = receiver.recv().unwrap();
526 /// println!("message {} received", msg);
528 /// // "Thread unblocked!" will be printed now
530 /// msg = receiver.recv().unwrap();
531 /// println!("message {} received", msg);
533 /// msg = receiver.recv().unwrap();
535 /// println!("message {} received", msg);
537 #[stable(feature = "rust1", since = "1.0.0")]
538 pub struct SyncSender<T> {
539 inner: Arc<sync::Packet<T>>,
542 #[stable(feature = "rust1", since = "1.0.0")]
543 unsafe impl<T: Send> Send for SyncSender<T> {}
545 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
546 /// function on **channel**s.
548 /// A **send** operation can only fail if the receiving end of a channel is
549 /// disconnected, implying that the data could never be received. The error
550 /// contains the data being sent as a payload so it can be recovered.
552 /// [`Sender::send`]: struct.Sender.html#method.send
553 /// [`SyncSender::send`]: struct.SyncSender.html#method.send
554 #[stable(feature = "rust1", since = "1.0.0")]
555 #[derive(PartialEq, Eq, Clone, Copy)]
556 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
558 /// An error returned from the [`recv`] function on a [`Receiver`].
560 /// The [`recv`] operation can only fail if the sending half of a
561 /// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
562 /// messages will ever be received.
564 /// [`recv`]: struct.Receiver.html#method.recv
565 /// [`Receiver`]: struct.Receiver.html
566 /// [`channel`]: fn.channel.html
567 /// [`sync_channel`]: fn.sync_channel.html
568 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
569 #[stable(feature = "rust1", since = "1.0.0")]
570 pub struct RecvError;
572 /// This enumeration is the list of the possible reasons that [`try_recv`] could
573 /// not return data when called. This can occur with both a [`channel`] and
574 /// a [`sync_channel`].
576 /// [`try_recv`]: struct.Receiver.html#method.try_recv
577 /// [`channel`]: fn.channel.html
578 /// [`sync_channel`]: fn.sync_channel.html
579 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
580 #[stable(feature = "rust1", since = "1.0.0")]
581 pub enum TryRecvError {
582 /// This **channel** is currently empty, but the **Sender**(s) have not yet
583 /// disconnected, so data may yet become available.
584 #[stable(feature = "rust1", since = "1.0.0")]
587 /// The **channel**'s sending half has become disconnected, and there will
588 /// never be any more data received on it.
589 #[stable(feature = "rust1", since = "1.0.0")]
593 /// This enumeration is the list of possible errors that made [`recv_timeout`]
594 /// unable to return data when called. This can occur with both a [`channel`] and
595 /// a [`sync_channel`].
597 /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
598 /// [`channel`]: fn.channel.html
599 /// [`sync_channel`]: fn.sync_channel.html
600 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
601 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
602 pub enum RecvTimeoutError {
603 /// This **channel** is currently empty, but the **Sender**(s) have not yet
604 /// disconnected, so data may yet become available.
605 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
607 /// The **channel**'s sending half has become disconnected, and there will
608 /// never be any more data received on it.
609 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
613 /// This enumeration is the list of the possible error outcomes for the
614 /// [`try_send`] method.
616 /// [`try_send`]: struct.SyncSender.html#method.try_send
617 #[stable(feature = "rust1", since = "1.0.0")]
618 #[derive(PartialEq, Eq, Clone, Copy)]
619 pub enum TrySendError<T> {
620 /// The data could not be sent on the [`sync_channel`] because it would require that
621 /// the callee block to send the data.
623 /// If this is a buffered channel, then the buffer is full at this time. If
624 /// this is not a buffered channel, then there is no [`Receiver`] available to
625 /// acquire the data.
627 /// [`sync_channel`]: fn.sync_channel.html
628 /// [`Receiver`]: struct.Receiver.html
629 #[stable(feature = "rust1", since = "1.0.0")]
630 Full(#[stable(feature = "rust1", since = "1.0.0")] T),
632 /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
633 /// sent. The data is returned back to the callee in this case.
635 /// [`sync_channel`]: fn.sync_channel.html
636 #[stable(feature = "rust1", since = "1.0.0")]
637 Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
641 Oneshot(Arc<oneshot::Packet<T>>),
642 Stream(Arc<stream::Packet<T>>),
643 Shared(Arc<shared::Packet<T>>),
644 Sync(Arc<sync::Packet<T>>),
648 trait UnsafeFlavor<T> {
649 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
650 unsafe fn inner_mut(&self) -> &mut Flavor<T> {
651 &mut *self.inner_unsafe().get()
653 unsafe fn inner(&self) -> &Flavor<T> {
654 &*self.inner_unsafe().get()
657 impl<T> UnsafeFlavor<T> for Sender<T> {
658 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
662 impl<T> UnsafeFlavor<T> for Receiver<T> {
663 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
668 /// Creates a new asynchronous channel, returning the sender/receiver halves.
669 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
670 /// the same order as it was sent, and no [`send`] will block the calling thread
671 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
672 /// block after its buffer limit is reached). [`recv`] will block until a message
675 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
676 /// only one [`Receiver`] is supported.
678 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
679 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
680 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
681 /// return a [`RecvError`].
683 /// [`send`]: struct.Sender.html#method.send
684 /// [`recv`]: struct.Receiver.html#method.recv
685 /// [`Sender`]: struct.Sender.html
686 /// [`Receiver`]: struct.Receiver.html
687 /// [`sync_channel`]: fn.sync_channel.html
688 /// [`SendError`]: struct.SendError.html
689 /// [`RecvError`]: struct.RecvError.html
694 /// use std::sync::mpsc::channel;
697 /// let (sender, receiver) = channel();
699 /// // Spawn off an expensive computation
700 /// thread::spawn(move|| {
701 /// # fn expensive_computation() {}
702 /// sender.send(expensive_computation()).unwrap();
705 /// // Do some useful work for awhile
707 /// // Let's see what that answer was
708 /// println!("{:?}", receiver.recv().unwrap());
710 #[stable(feature = "rust1", since = "1.0.0")]
711 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
712 let a = Arc::new(oneshot::Packet::new());
713 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
716 /// Creates a new synchronous, bounded channel.
717 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
718 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
719 /// [`Receiver`] will block until a message becomes available. `sync_channel`
720 /// differs greatly in the semantics of the sender, however.
722 /// This channel has an internal buffer on which messages will be queued.
723 /// `bound` specifies the buffer size. When the internal buffer becomes full,
724 /// future sends will *block* waiting for the buffer to open up. Note that a
725 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
726 /// where each [`send`] will not return until a [`recv`] is paired with it.
728 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
729 /// times, but only one [`Receiver`] is supported.
731 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
732 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
733 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
734 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
736 /// [`channel`]: fn.channel.html
737 /// [`send`]: struct.SyncSender.html#method.send
738 /// [`recv`]: struct.Receiver.html#method.recv
739 /// [`SyncSender`]: struct.SyncSender.html
740 /// [`Receiver`]: struct.Receiver.html
741 /// [`SendError`]: struct.SendError.html
742 /// [`RecvError`]: struct.RecvError.html
747 /// use std::sync::mpsc::sync_channel;
750 /// let (sender, receiver) = sync_channel(1);
752 /// // this returns immediately
753 /// sender.send(1).unwrap();
755 /// thread::spawn(move|| {
756 /// // this will block until the previous message has been received
757 /// sender.send(2).unwrap();
760 /// assert_eq!(receiver.recv().unwrap(), 1);
761 /// assert_eq!(receiver.recv().unwrap(), 2);
763 #[stable(feature = "rust1", since = "1.0.0")]
764 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
765 let a = Arc::new(sync::Packet::new(bound));
766 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
769 ////////////////////////////////////////////////////////////////////////////////
771 ////////////////////////////////////////////////////////////////////////////////
774 fn new(inner: Flavor<T>) -> Sender<T> {
775 Sender { inner: UnsafeCell::new(inner) }
778 /// Attempts to send a value on this channel, returning it back if it could
781 /// A successful send occurs when it is determined that the other end of
782 /// the channel has not hung up already. An unsuccessful send would be one
783 /// where the corresponding receiver has already been deallocated. Note
784 /// that a return value of [`Err`] means that the data will never be
785 /// received, but a return value of [`Ok`] does *not* mean that the data
786 /// will be received. It is possible for the corresponding receiver to
787 /// hang up immediately after this function returns [`Ok`].
789 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
790 /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
792 /// This method will never block the current thread.
797 /// use std::sync::mpsc::channel;
799 /// let (tx, rx) = channel();
801 /// // This send is always successful
802 /// tx.send(1).unwrap();
804 /// // This send will fail because the receiver is gone
806 /// assert_eq!(tx.send(1).unwrap_err().0, 1);
808 #[stable(feature = "rust1", since = "1.0.0")]
809 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
810 let (new_inner, ret) = match *unsafe { self.inner() } {
811 Flavor::Oneshot(ref p) => {
813 return p.send(t).map_err(SendError);
815 let a = Arc::new(stream::Packet::new());
816 let rx = Receiver::new(Flavor::Stream(a.clone()));
817 match p.upgrade(rx) {
818 oneshot::UpSuccess => {
822 oneshot::UpDisconnected => (a, Err(t)),
823 oneshot::UpWoke(token) => {
824 // This send cannot panic because the thread is
825 // asleep (we're looking at it), so the receiver
827 a.send(t).ok().unwrap();
834 Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
835 Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
836 Flavor::Sync(..) => unreachable!(),
840 let tmp = Sender::new(Flavor::Stream(new_inner));
841 mem::swap(self.inner_mut(), tmp.inner_mut());
843 ret.map_err(SendError)
847 #[stable(feature = "rust1", since = "1.0.0")]
848 impl<T> Clone for Sender<T> {
849 fn clone(&self) -> Sender<T> {
850 let packet = match *unsafe { self.inner() } {
851 Flavor::Oneshot(ref p) => {
852 let a = Arc::new(shared::Packet::new());
854 let guard = a.postinit_lock();
855 let rx = Receiver::new(Flavor::Shared(a.clone()));
856 let sleeper = match p.upgrade(rx) {
857 oneshot::UpSuccess | oneshot::UpDisconnected => None,
858 oneshot::UpWoke(task) => Some(task),
860 a.inherit_blocker(sleeper, guard);
864 Flavor::Stream(ref p) => {
865 let a = Arc::new(shared::Packet::new());
867 let guard = a.postinit_lock();
868 let rx = Receiver::new(Flavor::Shared(a.clone()));
869 let sleeper = match p.upgrade(rx) {
870 stream::UpSuccess | stream::UpDisconnected => None,
871 stream::UpWoke(task) => Some(task),
873 a.inherit_blocker(sleeper, guard);
877 Flavor::Shared(ref p) => {
879 return Sender::new(Flavor::Shared(p.clone()));
881 Flavor::Sync(..) => unreachable!(),
885 let tmp = Sender::new(Flavor::Shared(packet.clone()));
886 mem::swap(self.inner_mut(), tmp.inner_mut());
888 Sender::new(Flavor::Shared(packet))
892 #[stable(feature = "rust1", since = "1.0.0")]
893 impl<T> Drop for Sender<T> {
895 match *unsafe { self.inner() } {
896 Flavor::Oneshot(ref p) => p.drop_chan(),
897 Flavor::Stream(ref p) => p.drop_chan(),
898 Flavor::Shared(ref p) => p.drop_chan(),
899 Flavor::Sync(..) => unreachable!(),
904 #[stable(feature = "mpsc_debug", since = "1.8.0")]
905 impl<T> fmt::Debug for Sender<T> {
906 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
907 f.debug_struct("Sender").finish()
911 ////////////////////////////////////////////////////////////////////////////////
913 ////////////////////////////////////////////////////////////////////////////////
915 impl<T> SyncSender<T> {
916 fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
920 /// Sends a value on this synchronous channel.
922 /// This function will *block* until space in the internal buffer becomes
923 /// available or a receiver is available to hand off the message to.
925 /// Note that a successful send does *not* guarantee that the receiver will
926 /// ever see the data if there is a buffer on this channel. Items may be
927 /// enqueued in the internal buffer for the receiver to receive at a later
928 /// time. If the buffer size is 0, however, the channel becomes a rendezvous
929 /// channel and it guarantees that the receiver has indeed received
930 /// the data if this function returns success.
932 /// This function will never panic, but it may return [`Err`] if the
933 /// [`Receiver`] has disconnected and is no longer able to receive
936 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
937 /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
942 /// use std::sync::mpsc::sync_channel;
945 /// // Create a rendezvous sync_channel with buffer size 0
946 /// let (sync_sender, receiver) = sync_channel(0);
948 /// thread::spawn(move || {
949 /// println!("sending message...");
950 /// sync_sender.send(1).unwrap();
951 /// // Thread is now blocked until the message is received
953 /// println!("...message received!");
956 /// let msg = receiver.recv().unwrap();
957 /// assert_eq!(1, msg);
959 #[stable(feature = "rust1", since = "1.0.0")]
960 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
961 self.inner.send(t).map_err(SendError)
964 /// Attempts to send a value on this channel without blocking.
966 /// This method differs from [`send`] by returning immediately if the
967 /// channel's buffer is full or no receiver is waiting to acquire some
968 /// data. Compared with [`send`], this function has two failure cases
969 /// instead of one (one for disconnection, one for a full buffer).
971 /// See [`send`] for notes about guarantees of whether the
972 /// receiver has received the data or not if this function is successful.
974 /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
979 /// use std::sync::mpsc::sync_channel;
982 /// // Create a sync_channel with buffer size 1
983 /// let (sync_sender, receiver) = sync_channel(1);
984 /// let sync_sender2 = sync_sender.clone();
986 /// // First thread owns sync_sender
987 /// thread::spawn(move || {
988 /// sync_sender.send(1).unwrap();
989 /// sync_sender.send(2).unwrap();
990 /// // Thread blocked
993 /// // Second thread owns sync_sender2
994 /// thread::spawn(move || {
995 /// // This will return an error and send
996 /// // no message if the buffer is full
997 /// let _ = sync_sender2.try_send(3);
1001 /// msg = receiver.recv().unwrap();
1002 /// println!("message {} received", msg);
1004 /// msg = receiver.recv().unwrap();
1005 /// println!("message {} received", msg);
1007 /// // Third message may have never been sent
1008 /// match receiver.try_recv() {
1009 /// Ok(msg) => println!("message {} received", msg),
1010 /// Err(_) => println!("the third message was never sent"),
1013 #[stable(feature = "rust1", since = "1.0.0")]
1014 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
1015 self.inner.try_send(t)
1019 #[stable(feature = "rust1", since = "1.0.0")]
1020 impl<T> Clone for SyncSender<T> {
1021 fn clone(&self) -> SyncSender<T> {
1022 self.inner.clone_chan();
1023 SyncSender::new(self.inner.clone())
1027 #[stable(feature = "rust1", since = "1.0.0")]
1028 impl<T> Drop for SyncSender<T> {
1029 fn drop(&mut self) {
1030 self.inner.drop_chan();
1034 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1035 impl<T> fmt::Debug for SyncSender<T> {
1036 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1037 f.debug_struct("SyncSender").finish()
1041 ////////////////////////////////////////////////////////////////////////////////
1043 ////////////////////////////////////////////////////////////////////////////////
1045 impl<T> Receiver<T> {
1046 fn new(inner: Flavor<T>) -> Receiver<T> {
1047 Receiver { inner: UnsafeCell::new(inner) }
1050 /// Attempts to return a pending value on this receiver without blocking.
1052 /// This method will never block the caller in order to wait for data to
1053 /// become available. Instead, this will always return immediately with a
1054 /// possible option of pending data on the channel.
1056 /// This is useful for a flavor of "optimistic check" before deciding to
1057 /// block on a receiver.
1059 /// Compared with [`recv`], this function has two failure cases instead of one
1060 /// (one for disconnection, one for an empty buffer).
1062 /// [`recv`]: struct.Receiver.html#method.recv
1067 /// use std::sync::mpsc::{Receiver, channel};
1069 /// let (_, receiver): (_, Receiver<i32>) = channel();
1071 /// assert!(receiver.try_recv().is_err());
1073 #[stable(feature = "rust1", since = "1.0.0")]
1074 pub fn try_recv(&self) -> Result<T, TryRecvError> {
1076 let new_port = match *unsafe { self.inner() } {
1077 Flavor::Oneshot(ref p) => match p.try_recv() {
1078 Ok(t) => return Ok(t),
1079 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1080 Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected),
1081 Err(oneshot::Upgraded(rx)) => rx,
1083 Flavor::Stream(ref p) => match p.try_recv() {
1084 Ok(t) => return Ok(t),
1085 Err(stream::Empty) => return Err(TryRecvError::Empty),
1086 Err(stream::Disconnected) => return Err(TryRecvError::Disconnected),
1087 Err(stream::Upgraded(rx)) => rx,
1089 Flavor::Shared(ref p) => match p.try_recv() {
1090 Ok(t) => return Ok(t),
1091 Err(shared::Empty) => return Err(TryRecvError::Empty),
1092 Err(shared::Disconnected) => return Err(TryRecvError::Disconnected),
1094 Flavor::Sync(ref p) => match p.try_recv() {
1095 Ok(t) => return Ok(t),
1096 Err(sync::Empty) => return Err(TryRecvError::Empty),
1097 Err(sync::Disconnected) => return Err(TryRecvError::Disconnected),
1101 mem::swap(self.inner_mut(), new_port.inner_mut());
1106 /// Attempts to wait for a value on this receiver, returning an error if the
1107 /// corresponding channel has hung up.
1109 /// This function will always block the current thread if there is no data
1110 /// available and it's possible for more data to be sent. Once a message is
1111 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1112 /// receiver will wake up and return that message.
1114 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1115 /// this call is blocking, this call will wake up and return [`Err`] to
1116 /// indicate that no more messages can ever be received on this channel.
1117 /// However, since channels are buffered, messages sent before the disconnect
1118 /// will still be properly received.
1120 /// [`Sender`]: struct.Sender.html
1121 /// [`SyncSender`]: struct.SyncSender.html
1122 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1127 /// use std::sync::mpsc;
1128 /// use std::thread;
1130 /// let (send, recv) = mpsc::channel();
1131 /// let handle = thread::spawn(move || {
1132 /// send.send(1u8).unwrap();
1135 /// handle.join().unwrap();
1137 /// assert_eq!(Ok(1), recv.recv());
1140 /// Buffering behavior:
1143 /// use std::sync::mpsc;
1144 /// use std::thread;
1145 /// use std::sync::mpsc::RecvError;
1147 /// let (send, recv) = mpsc::channel();
1148 /// let handle = thread::spawn(move || {
1149 /// send.send(1u8).unwrap();
1150 /// send.send(2).unwrap();
1151 /// send.send(3).unwrap();
1155 /// // wait for the thread to join so we ensure the sender is dropped
1156 /// handle.join().unwrap();
1158 /// assert_eq!(Ok(1), recv.recv());
1159 /// assert_eq!(Ok(2), recv.recv());
1160 /// assert_eq!(Ok(3), recv.recv());
1161 /// assert_eq!(Err(RecvError), recv.recv());
1163 #[stable(feature = "rust1", since = "1.0.0")]
1164 pub fn recv(&self) -> Result<T, RecvError> {
1166 let new_port = match *unsafe { self.inner() } {
1167 Flavor::Oneshot(ref p) => match p.recv(None) {
1168 Ok(t) => return Ok(t),
1169 Err(oneshot::Disconnected) => return Err(RecvError),
1170 Err(oneshot::Upgraded(rx)) => rx,
1171 Err(oneshot::Empty) => unreachable!(),
1173 Flavor::Stream(ref p) => match p.recv(None) {
1174 Ok(t) => return Ok(t),
1175 Err(stream::Disconnected) => return Err(RecvError),
1176 Err(stream::Upgraded(rx)) => rx,
1177 Err(stream::Empty) => unreachable!(),
1179 Flavor::Shared(ref p) => match p.recv(None) {
1180 Ok(t) => return Ok(t),
1181 Err(shared::Disconnected) => return Err(RecvError),
1182 Err(shared::Empty) => unreachable!(),
1184 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1187 mem::swap(self.inner_mut(), new_port.inner_mut());
1192 /// Attempts to wait for a value on this receiver, returning an error if the
1193 /// corresponding channel has hung up, or if it waits more than `timeout`.
1195 /// This function will always block the current thread if there is no data
1196 /// available and it's possible for more data to be sent. Once a message is
1197 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1198 /// receiver will wake up and return that message.
1200 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1201 /// this call is blocking, this call will wake up and return [`Err`] to
1202 /// indicate that no more messages can ever be received on this channel.
1203 /// However, since channels are buffered, messages sent before the disconnect
1204 /// will still be properly received.
1206 /// [`Sender`]: struct.Sender.html
1207 /// [`SyncSender`]: struct.SyncSender.html
1208 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1212 /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
1213 /// to panic unexpectedly with the following example:
1216 /// use std::sync::mpsc::channel;
1217 /// use std::thread;
1218 /// use std::time::Duration;
1220 /// let (tx, rx) = channel::<String>();
1222 /// thread::spawn(move || {
1223 /// let d = Duration::from_millis(10);
1225 /// println!("recv");
1226 /// let _r = rx.recv_timeout(d);
1230 /// thread::sleep(Duration::from_millis(100));
1231 /// let _c1 = tx.clone();
1233 /// thread::sleep(Duration::from_secs(1));
1236 /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
1240 /// Successfully receiving value before encountering timeout:
1243 /// use std::thread;
1244 /// use std::time::Duration;
1245 /// use std::sync::mpsc;
1247 /// let (send, recv) = mpsc::channel();
1249 /// thread::spawn(move || {
1250 /// send.send('a').unwrap();
1254 /// recv.recv_timeout(Duration::from_millis(400)),
1259 /// Receiving an error upon reaching timeout:
1262 /// use std::thread;
1263 /// use std::time::Duration;
1264 /// use std::sync::mpsc;
1266 /// let (send, recv) = mpsc::channel();
1268 /// thread::spawn(move || {
1269 /// thread::sleep(Duration::from_millis(800));
1270 /// send.send('a').unwrap();
1274 /// recv.recv_timeout(Duration::from_millis(400)),
1275 /// Err(mpsc::RecvTimeoutError::Timeout)
1278 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1279 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1280 // Do an optimistic try_recv to avoid the performance impact of
1281 // Instant::now() in the full-channel case.
1282 match self.try_recv() {
1283 Ok(result) => Ok(result),
1284 Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
1285 Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
1286 Some(deadline) => self.recv_deadline(deadline),
1287 // So far in the future that it's practically the same as waiting indefinitely.
1288 None => self.recv().map_err(RecvTimeoutError::from),
1293 /// Attempts to wait for a value on this receiver, returning an error if the
1294 /// corresponding channel has hung up, or if `deadline` is reached.
1296 /// This function will always block the current thread if there is no data
1297 /// available and it's possible for more data to be sent. Once a message is
1298 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1299 /// receiver will wake up and return that message.
1301 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1302 /// this call is blocking, this call will wake up and return [`Err`] to
1303 /// indicate that no more messages can ever be received on this channel.
1304 /// However, since channels are buffered, messages sent before the disconnect
1305 /// will still be properly received.
1307 /// [`Sender`]: struct.Sender.html
1308 /// [`SyncSender`]: struct.SyncSender.html
1309 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1313 /// Successfully receiving value before reaching deadline:
1316 /// #![feature(deadline_api)]
1317 /// use std::thread;
1318 /// use std::time::{Duration, Instant};
1319 /// use std::sync::mpsc;
1321 /// let (send, recv) = mpsc::channel();
1323 /// thread::spawn(move || {
1324 /// send.send('a').unwrap();
1328 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1333 /// Receiving an error upon reaching deadline:
1336 /// #![feature(deadline_api)]
1337 /// use std::thread;
1338 /// use std::time::{Duration, Instant};
1339 /// use std::sync::mpsc;
1341 /// let (send, recv) = mpsc::channel();
1343 /// thread::spawn(move || {
1344 /// thread::sleep(Duration::from_millis(800));
1345 /// send.send('a').unwrap();
1349 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1350 /// Err(mpsc::RecvTimeoutError::Timeout)
1353 #[unstable(feature = "deadline_api", issue = "46316")]
1354 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1355 use self::RecvTimeoutError::*;
1358 let port_or_empty = match *unsafe { self.inner() } {
1359 Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) {
1360 Ok(t) => return Ok(t),
1361 Err(oneshot::Disconnected) => return Err(Disconnected),
1362 Err(oneshot::Upgraded(rx)) => Some(rx),
1363 Err(oneshot::Empty) => None,
1365 Flavor::Stream(ref p) => match p.recv(Some(deadline)) {
1366 Ok(t) => return Ok(t),
1367 Err(stream::Disconnected) => return Err(Disconnected),
1368 Err(stream::Upgraded(rx)) => Some(rx),
1369 Err(stream::Empty) => None,
1371 Flavor::Shared(ref p) => match p.recv(Some(deadline)) {
1372 Ok(t) => return Ok(t),
1373 Err(shared::Disconnected) => return Err(Disconnected),
1374 Err(shared::Empty) => None,
1376 Flavor::Sync(ref p) => match p.recv(Some(deadline)) {
1377 Ok(t) => return Ok(t),
1378 Err(sync::Disconnected) => return Err(Disconnected),
1379 Err(sync::Empty) => None,
1383 if let Some(new_port) = port_or_empty {
1385 mem::swap(self.inner_mut(), new_port.inner_mut());
1389 // If we're already passed the deadline, and we're here without
1390 // data, return a timeout, else try again.
1391 if Instant::now() >= deadline {
1392 return Err(Timeout);
1397 /// Returns an iterator that will block waiting for messages, but never
1398 /// [`panic!`]. It will return [`None`] when the channel has hung up.
1400 /// [`panic!`]: ../../../std/macro.panic.html
1401 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1406 /// use std::sync::mpsc::channel;
1407 /// use std::thread;
1409 /// let (send, recv) = channel();
1411 /// thread::spawn(move || {
1412 /// send.send(1).unwrap();
1413 /// send.send(2).unwrap();
1414 /// send.send(3).unwrap();
1417 /// let mut iter = recv.iter();
1418 /// assert_eq!(iter.next(), Some(1));
1419 /// assert_eq!(iter.next(), Some(2));
1420 /// assert_eq!(iter.next(), Some(3));
1421 /// assert_eq!(iter.next(), None);
1423 #[stable(feature = "rust1", since = "1.0.0")]
1424 pub fn iter(&self) -> Iter<'_, T> {
1428 /// Returns an iterator that will attempt to yield all pending values.
1429 /// It will return `None` if there are no more pending values or if the
1430 /// channel has hung up. The iterator will never [`panic!`] or block the
1431 /// user by waiting for values.
1433 /// [`panic!`]: ../../../std/macro.panic.html
1438 /// use std::sync::mpsc::channel;
1439 /// use std::thread;
1440 /// use std::time::Duration;
1442 /// let (sender, receiver) = channel();
1444 /// // nothing is in the buffer yet
1445 /// assert!(receiver.try_iter().next().is_none());
1447 /// thread::spawn(move || {
1448 /// thread::sleep(Duration::from_secs(1));
1449 /// sender.send(1).unwrap();
1450 /// sender.send(2).unwrap();
1451 /// sender.send(3).unwrap();
1454 /// // nothing is in the buffer yet
1455 /// assert!(receiver.try_iter().next().is_none());
1457 /// // block for two seconds
1458 /// thread::sleep(Duration::from_secs(2));
1460 /// let mut iter = receiver.try_iter();
1461 /// assert_eq!(iter.next(), Some(1));
1462 /// assert_eq!(iter.next(), Some(2));
1463 /// assert_eq!(iter.next(), Some(3));
1464 /// assert_eq!(iter.next(), None);
1466 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1467 pub fn try_iter(&self) -> TryIter<'_, T> {
1468 TryIter { rx: self }
1472 #[stable(feature = "rust1", since = "1.0.0")]
1473 impl<'a, T> Iterator for Iter<'a, T> {
1476 fn next(&mut self) -> Option<T> {
1481 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1482 impl<'a, T> Iterator for TryIter<'a, T> {
1485 fn next(&mut self) -> Option<T> {
1486 self.rx.try_recv().ok()
1490 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1491 impl<'a, T> IntoIterator for &'a Receiver<T> {
1493 type IntoIter = Iter<'a, T>;
1495 fn into_iter(self) -> Iter<'a, T> {
1500 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1501 impl<T> Iterator for IntoIter<T> {
1503 fn next(&mut self) -> Option<T> {
1508 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1509 impl<T> IntoIterator for Receiver<T> {
1511 type IntoIter = IntoIter<T>;
1513 fn into_iter(self) -> IntoIter<T> {
1514 IntoIter { rx: self }
1518 #[stable(feature = "rust1", since = "1.0.0")]
1519 impl<T> Drop for Receiver<T> {
1520 fn drop(&mut self) {
1521 match *unsafe { self.inner() } {
1522 Flavor::Oneshot(ref p) => p.drop_port(),
1523 Flavor::Stream(ref p) => p.drop_port(),
1524 Flavor::Shared(ref p) => p.drop_port(),
1525 Flavor::Sync(ref p) => p.drop_port(),
1530 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1531 impl<T> fmt::Debug for Receiver<T> {
1532 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1533 f.debug_struct("Receiver").finish()
1537 #[stable(feature = "rust1", since = "1.0.0")]
1538 impl<T> fmt::Debug for SendError<T> {
1539 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1540 "SendError(..)".fmt(f)
1544 #[stable(feature = "rust1", since = "1.0.0")]
1545 impl<T> fmt::Display for SendError<T> {
1546 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1547 "sending on a closed channel".fmt(f)
1551 #[stable(feature = "rust1", since = "1.0.0")]
1552 impl<T: Send> error::Error for SendError<T> {
1553 #[allow(deprecated)]
1554 fn description(&self) -> &str {
1555 "sending on a closed channel"
1559 #[stable(feature = "rust1", since = "1.0.0")]
1560 impl<T> fmt::Debug for TrySendError<T> {
1561 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1563 TrySendError::Full(..) => "Full(..)".fmt(f),
1564 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1569 #[stable(feature = "rust1", since = "1.0.0")]
1570 impl<T> fmt::Display for TrySendError<T> {
1571 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1573 TrySendError::Full(..) => "sending on a full channel".fmt(f),
1574 TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
1579 #[stable(feature = "rust1", since = "1.0.0")]
1580 impl<T: Send> error::Error for TrySendError<T> {
1581 #[allow(deprecated)]
1582 fn description(&self) -> &str {
1584 TrySendError::Full(..) => "sending on a full channel",
1585 TrySendError::Disconnected(..) => "sending on a closed channel",
1590 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1591 impl<T> From<SendError<T>> for TrySendError<T> {
1592 fn from(err: SendError<T>) -> TrySendError<T> {
1594 SendError(t) => TrySendError::Disconnected(t),
1599 #[stable(feature = "rust1", since = "1.0.0")]
1600 impl fmt::Display for RecvError {
1601 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1602 "receiving on a closed channel".fmt(f)
1606 #[stable(feature = "rust1", since = "1.0.0")]
1607 impl error::Error for RecvError {
1608 #[allow(deprecated)]
1609 fn description(&self) -> &str {
1610 "receiving on a closed channel"
1614 #[stable(feature = "rust1", since = "1.0.0")]
1615 impl fmt::Display for TryRecvError {
1616 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1618 TryRecvError::Empty => "receiving on an empty channel".fmt(f),
1619 TryRecvError::Disconnected => "receiving on a closed channel".fmt(f),
1624 #[stable(feature = "rust1", since = "1.0.0")]
1625 impl error::Error for TryRecvError {
1626 #[allow(deprecated)]
1627 fn description(&self) -> &str {
1629 TryRecvError::Empty => "receiving on an empty channel",
1630 TryRecvError::Disconnected => "receiving on a closed channel",
1635 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1636 impl From<RecvError> for TryRecvError {
1637 fn from(err: RecvError) -> TryRecvError {
1639 RecvError => TryRecvError::Disconnected,
1644 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1645 impl fmt::Display for RecvTimeoutError {
1646 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1648 RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f),
1649 RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f),
1654 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1655 impl error::Error for RecvTimeoutError {
1656 #[allow(deprecated)]
1657 fn description(&self) -> &str {
1659 RecvTimeoutError::Timeout => "timed out waiting on channel",
1660 RecvTimeoutError::Disconnected => "channel is empty and sending half is closed",
1665 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1666 impl From<RecvError> for RecvTimeoutError {
1667 fn from(err: RecvError) -> RecvTimeoutError {
1669 RecvError => RecvTimeoutError::Disconnected,
1674 #[cfg(all(test, not(target_os = "emscripten")))]
1679 use crate::time::{Duration, Instant};
1681 pub fn stress_factor() -> usize {
1682 match env::var("RUST_TEST_STRESS") {
1683 Ok(val) => val.parse().unwrap(),
1690 let (tx, rx) = channel::<i32>();
1691 tx.send(1).unwrap();
1692 assert_eq!(rx.recv().unwrap(), 1);
1697 let (tx, _rx) = channel::<Box<isize>>();
1698 tx.send(box 1).unwrap();
1702 fn drop_full_shared() {
1703 let (tx, _rx) = channel::<Box<isize>>();
1706 tx.send(box 1).unwrap();
1711 let (tx, rx) = channel::<i32>();
1712 tx.send(1).unwrap();
1713 assert_eq!(rx.recv().unwrap(), 1);
1714 let tx = tx.clone();
1715 tx.send(1).unwrap();
1716 assert_eq!(rx.recv().unwrap(), 1);
1720 fn smoke_threads() {
1721 let (tx, rx) = channel::<i32>();
1722 let _t = thread::spawn(move || {
1723 tx.send(1).unwrap();
1725 assert_eq!(rx.recv().unwrap(), 1);
1729 fn smoke_port_gone() {
1730 let (tx, rx) = channel::<i32>();
1732 assert!(tx.send(1).is_err());
1736 fn smoke_shared_port_gone() {
1737 let (tx, rx) = channel::<i32>();
1739 assert!(tx.send(1).is_err())
1743 fn smoke_shared_port_gone2() {
1744 let (tx, rx) = channel::<i32>();
1746 let tx2 = tx.clone();
1748 assert!(tx2.send(1).is_err());
1752 fn port_gone_concurrent() {
1753 let (tx, rx) = channel::<i32>();
1754 let _t = thread::spawn(move || {
1757 while tx.send(1).is_ok() {}
1761 fn port_gone_concurrent_shared() {
1762 let (tx, rx) = channel::<i32>();
1763 let tx2 = tx.clone();
1764 let _t = thread::spawn(move || {
1767 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1771 fn smoke_chan_gone() {
1772 let (tx, rx) = channel::<i32>();
1774 assert!(rx.recv().is_err());
1778 fn smoke_chan_gone_shared() {
1779 let (tx, rx) = channel::<()>();
1780 let tx2 = tx.clone();
1783 assert!(rx.recv().is_err());
1787 fn chan_gone_concurrent() {
1788 let (tx, rx) = channel::<i32>();
1789 let _t = thread::spawn(move || {
1790 tx.send(1).unwrap();
1791 tx.send(1).unwrap();
1793 while rx.recv().is_ok() {}
1798 let (tx, rx) = channel::<i32>();
1799 let t = thread::spawn(move || {
1801 tx.send(1).unwrap();
1805 assert_eq!(rx.recv().unwrap(), 1);
1807 t.join().ok().expect("thread panicked");
1811 fn stress_shared() {
1812 const AMT: u32 = 10000;
1813 const NTHREADS: u32 = 8;
1814 let (tx, rx) = channel::<i32>();
1816 let t = thread::spawn(move || {
1817 for _ in 0..AMT * NTHREADS {
1818 assert_eq!(rx.recv().unwrap(), 1);
1820 match rx.try_recv() {
1826 for _ in 0..NTHREADS {
1827 let tx = tx.clone();
1828 thread::spawn(move || {
1830 tx.send(1).unwrap();
1835 t.join().ok().expect("thread panicked");
1839 fn send_from_outside_runtime() {
1840 let (tx1, rx1) = channel::<()>();
1841 let (tx2, rx2) = channel::<i32>();
1842 let t1 = thread::spawn(move || {
1843 tx1.send(()).unwrap();
1845 assert_eq!(rx2.recv().unwrap(), 1);
1848 rx1.recv().unwrap();
1849 let t2 = thread::spawn(move || {
1851 tx2.send(1).unwrap();
1854 t1.join().ok().expect("thread panicked");
1855 t2.join().ok().expect("thread panicked");
1859 fn recv_from_outside_runtime() {
1860 let (tx, rx) = channel::<i32>();
1861 let t = thread::spawn(move || {
1863 assert_eq!(rx.recv().unwrap(), 1);
1867 tx.send(1).unwrap();
1869 t.join().ok().expect("thread panicked");
1874 let (tx1, rx1) = channel::<i32>();
1875 let (tx2, rx2) = channel::<i32>();
1876 let t1 = thread::spawn(move || {
1877 assert_eq!(rx1.recv().unwrap(), 1);
1878 tx2.send(2).unwrap();
1880 let t2 = thread::spawn(move || {
1881 tx1.send(1).unwrap();
1882 assert_eq!(rx2.recv().unwrap(), 2);
1884 t1.join().ok().expect("thread panicked");
1885 t2.join().ok().expect("thread panicked");
1889 fn oneshot_single_thread_close_port_first() {
1890 // Simple test of closing without sending
1891 let (_tx, rx) = channel::<i32>();
1896 fn oneshot_single_thread_close_chan_first() {
1897 // Simple test of closing without sending
1898 let (tx, _rx) = channel::<i32>();
1903 fn oneshot_single_thread_send_port_close() {
1904 // Testing that the sender cleans up the payload if receiver is closed
1905 let (tx, rx) = channel::<Box<i32>>();
1907 assert!(tx.send(box 0).is_err());
1911 fn oneshot_single_thread_recv_chan_close() {
1912 // Receiving on a closed chan will panic
1913 let res = thread::spawn(move || {
1914 let (tx, rx) = channel::<i32>();
1920 assert!(res.is_err());
1924 fn oneshot_single_thread_send_then_recv() {
1925 let (tx, rx) = channel::<Box<i32>>();
1926 tx.send(box 10).unwrap();
1927 assert!(*rx.recv().unwrap() == 10);
1931 fn oneshot_single_thread_try_send_open() {
1932 let (tx, rx) = channel::<i32>();
1933 assert!(tx.send(10).is_ok());
1934 assert!(rx.recv().unwrap() == 10);
1938 fn oneshot_single_thread_try_send_closed() {
1939 let (tx, rx) = channel::<i32>();
1941 assert!(tx.send(10).is_err());
1945 fn oneshot_single_thread_try_recv_open() {
1946 let (tx, rx) = channel::<i32>();
1947 tx.send(10).unwrap();
1948 assert!(rx.recv() == Ok(10));
1952 fn oneshot_single_thread_try_recv_closed() {
1953 let (tx, rx) = channel::<i32>();
1955 assert!(rx.recv().is_err());
1959 fn oneshot_single_thread_peek_data() {
1960 let (tx, rx) = channel::<i32>();
1961 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1962 tx.send(10).unwrap();
1963 assert_eq!(rx.try_recv(), Ok(10));
1967 fn oneshot_single_thread_peek_close() {
1968 let (tx, rx) = channel::<i32>();
1970 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1971 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1975 fn oneshot_single_thread_peek_open() {
1976 let (_tx, rx) = channel::<i32>();
1977 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1981 fn oneshot_multi_task_recv_then_send() {
1982 let (tx, rx) = channel::<Box<i32>>();
1983 let _t = thread::spawn(move || {
1984 assert!(*rx.recv().unwrap() == 10);
1987 tx.send(box 10).unwrap();
1991 fn oneshot_multi_task_recv_then_close() {
1992 let (tx, rx) = channel::<Box<i32>>();
1993 let _t = thread::spawn(move || {
1996 let res = thread::spawn(move || {
1997 assert!(*rx.recv().unwrap() == 10);
2000 assert!(res.is_err());
2004 fn oneshot_multi_thread_close_stress() {
2005 for _ in 0..stress_factor() {
2006 let (tx, rx) = channel::<i32>();
2007 let _t = thread::spawn(move || {
2015 fn oneshot_multi_thread_send_close_stress() {
2016 for _ in 0..stress_factor() {
2017 let (tx, rx) = channel::<i32>();
2018 let _t = thread::spawn(move || {
2021 let _ = thread::spawn(move || {
2022 tx.send(1).unwrap();
2029 fn oneshot_multi_thread_recv_close_stress() {
2030 for _ in 0..stress_factor() {
2031 let (tx, rx) = channel::<i32>();
2032 thread::spawn(move || {
2033 let res = thread::spawn(move || {
2037 assert!(res.is_err());
2039 let _t = thread::spawn(move || {
2040 thread::spawn(move || {
2048 fn oneshot_multi_thread_send_recv_stress() {
2049 for _ in 0..stress_factor() {
2050 let (tx, rx) = channel::<Box<isize>>();
2051 let _t = thread::spawn(move || {
2052 tx.send(box 10).unwrap();
2054 assert!(*rx.recv().unwrap() == 10);
2059 fn stream_send_recv_stress() {
2060 for _ in 0..stress_factor() {
2061 let (tx, rx) = channel();
2066 fn send(tx: Sender<Box<i32>>, i: i32) {
2071 thread::spawn(move || {
2072 tx.send(box i).unwrap();
2077 fn recv(rx: Receiver<Box<i32>>, i: i32) {
2082 thread::spawn(move || {
2083 assert!(*rx.recv().unwrap() == i);
2091 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2092 fn oneshot_single_thread_recv_timeout() {
2093 let (tx, rx) = channel();
2094 tx.send(()).unwrap();
2095 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2096 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2097 tx.send(()).unwrap();
2098 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2102 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2103 fn stress_recv_timeout_two_threads() {
2104 let (tx, rx) = channel();
2105 let stress = stress_factor() + 100;
2106 let timeout = Duration::from_millis(100);
2108 thread::spawn(move || {
2109 for i in 0..stress {
2111 thread::sleep(timeout * 2);
2113 tx.send(1usize).unwrap();
2117 let mut recv_count = 0;
2119 match rx.recv_timeout(timeout) {
2121 assert_eq!(n, 1usize);
2124 Err(RecvTimeoutError::Timeout) => continue,
2125 Err(RecvTimeoutError::Disconnected) => break,
2129 assert_eq!(recv_count, stress);
2133 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2134 fn recv_timeout_upgrade() {
2135 let (tx, rx) = channel::<()>();
2136 let timeout = Duration::from_millis(1);
2137 let _tx_clone = tx.clone();
2139 let start = Instant::now();
2140 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
2141 assert!(Instant::now() >= start + timeout);
2145 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2146 fn stress_recv_timeout_shared() {
2147 let (tx, rx) = channel();
2148 let stress = stress_factor() + 100;
2150 for i in 0..stress {
2151 let tx = tx.clone();
2152 thread::spawn(move || {
2153 thread::sleep(Duration::from_millis(i as u64 * 10));
2154 tx.send(1usize).unwrap();
2160 let mut recv_count = 0;
2162 match rx.recv_timeout(Duration::from_millis(10)) {
2164 assert_eq!(n, 1usize);
2167 Err(RecvTimeoutError::Timeout) => continue,
2168 Err(RecvTimeoutError::Disconnected) => break,
2172 assert_eq!(recv_count, stress);
2176 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2177 fn very_long_recv_timeout_wont_panic() {
2178 let (tx, rx) = channel::<()>();
2180 thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::max_value())));
2181 thread::sleep(Duration::from_secs(1));
2182 assert!(tx.send(()).is_ok());
2183 assert_eq!(join_handle.join().unwrap(), Ok(()));
2188 // Regression test that we don't run out of stack in scheduler context
2189 let (tx, rx) = channel();
2191 tx.send(()).unwrap();
2199 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2200 fn shared_recv_timeout() {
2201 let (tx, rx) = channel();
2204 let tx = tx.clone();
2205 thread::spawn(move || {
2206 tx.send(()).unwrap();
2214 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2215 tx.send(()).unwrap();
2216 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2220 fn shared_chan_stress() {
2221 let (tx, rx) = channel();
2222 let total = stress_factor() + 100;
2224 let tx = tx.clone();
2225 thread::spawn(move || {
2226 tx.send(()).unwrap();
2236 fn test_nested_recv_iter() {
2237 let (tx, rx) = channel::<i32>();
2238 let (total_tx, total_rx) = channel::<i32>();
2240 let _t = thread::spawn(move || {
2242 for x in rx.iter() {
2245 total_tx.send(acc).unwrap();
2248 tx.send(3).unwrap();
2249 tx.send(1).unwrap();
2250 tx.send(2).unwrap();
2252 assert_eq!(total_rx.recv().unwrap(), 6);
2256 fn test_recv_iter_break() {
2257 let (tx, rx) = channel::<i32>();
2258 let (count_tx, count_rx) = channel();
2260 let _t = thread::spawn(move || {
2262 for x in rx.iter() {
2269 count_tx.send(count).unwrap();
2272 tx.send(2).unwrap();
2273 tx.send(2).unwrap();
2274 tx.send(2).unwrap();
2277 assert_eq!(count_rx.recv().unwrap(), 4);
2281 fn test_recv_try_iter() {
2282 let (request_tx, request_rx) = channel();
2283 let (response_tx, response_rx) = channel();
2285 // Request `x`s until we have `6`.
2286 let t = thread::spawn(move || {
2289 for x in response_rx.try_iter() {
2295 request_tx.send(()).unwrap();
2299 for _ in request_rx.iter() {
2300 if response_tx.send(2).is_err() {
2305 assert_eq!(t.join().unwrap(), 6);
2309 fn test_recv_into_iter_owned() {
2311 let (tx, rx) = channel::<i32>();
2312 tx.send(1).unwrap();
2313 tx.send(2).unwrap();
2317 assert_eq!(iter.next().unwrap(), 1);
2318 assert_eq!(iter.next().unwrap(), 2);
2319 assert_eq!(iter.next().is_none(), true);
2323 fn test_recv_into_iter_borrowed() {
2324 let (tx, rx) = channel::<i32>();
2325 tx.send(1).unwrap();
2326 tx.send(2).unwrap();
2328 let mut iter = (&rx).into_iter();
2329 assert_eq!(iter.next().unwrap(), 1);
2330 assert_eq!(iter.next().unwrap(), 2);
2331 assert_eq!(iter.next().is_none(), true);
2335 fn try_recv_states() {
2336 let (tx1, rx1) = channel::<i32>();
2337 let (tx2, rx2) = channel::<()>();
2338 let (tx3, rx3) = channel::<()>();
2339 let _t = thread::spawn(move || {
2340 rx2.recv().unwrap();
2341 tx1.send(1).unwrap();
2342 tx3.send(()).unwrap();
2343 rx2.recv().unwrap();
2345 tx3.send(()).unwrap();
2348 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2349 tx2.send(()).unwrap();
2350 rx3.recv().unwrap();
2351 assert_eq!(rx1.try_recv(), Ok(1));
2352 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2353 tx2.send(()).unwrap();
2354 rx3.recv().unwrap();
2355 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2358 // This bug used to end up in a livelock inside of the Receiver destructor
2359 // because the internal state of the Shared packet was corrupted
2361 fn destroy_upgraded_shared_port_when_sender_still_active() {
2362 let (tx, rx) = channel();
2363 let (tx2, rx2) = channel();
2364 let _t = thread::spawn(move || {
2365 rx.recv().unwrap(); // wait on a oneshot
2366 drop(rx); // destroy a shared
2367 tx2.send(()).unwrap();
2369 // make sure the other thread has gone to sleep
2371 thread::yield_now();
2374 // upgrade to a shared chan and send a message
2377 t.send(()).unwrap();
2379 // wait for the child thread to exit before we exit
2380 rx2.recv().unwrap();
2385 let (tx, _) = channel();
2386 let _ = tx.send(123);
2387 assert_eq!(tx.send(123), Err(SendError(123)));
2391 #[cfg(all(test, not(target_os = "emscripten")))]
2396 use crate::time::Duration;
2398 pub fn stress_factor() -> usize {
2399 match env::var("RUST_TEST_STRESS") {
2400 Ok(val) => val.parse().unwrap(),
2407 let (tx, rx) = sync_channel::<i32>(1);
2408 tx.send(1).unwrap();
2409 assert_eq!(rx.recv().unwrap(), 1);
2414 let (tx, _rx) = sync_channel::<Box<isize>>(1);
2415 tx.send(box 1).unwrap();
2420 let (tx, rx) = sync_channel::<i32>(1);
2421 tx.send(1).unwrap();
2422 assert_eq!(rx.recv().unwrap(), 1);
2423 let tx = tx.clone();
2424 tx.send(1).unwrap();
2425 assert_eq!(rx.recv().unwrap(), 1);
2429 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2431 let (tx, rx) = sync_channel::<i32>(1);
2432 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2433 tx.send(1).unwrap();
2434 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2438 fn smoke_threads() {
2439 let (tx, rx) = sync_channel::<i32>(0);
2440 let _t = thread::spawn(move || {
2441 tx.send(1).unwrap();
2443 assert_eq!(rx.recv().unwrap(), 1);
2447 fn smoke_port_gone() {
2448 let (tx, rx) = sync_channel::<i32>(0);
2450 assert!(tx.send(1).is_err());
2454 fn smoke_shared_port_gone2() {
2455 let (tx, rx) = sync_channel::<i32>(0);
2457 let tx2 = tx.clone();
2459 assert!(tx2.send(1).is_err());
2463 fn port_gone_concurrent() {
2464 let (tx, rx) = sync_channel::<i32>(0);
2465 let _t = thread::spawn(move || {
2468 while tx.send(1).is_ok() {}
2472 fn port_gone_concurrent_shared() {
2473 let (tx, rx) = sync_channel::<i32>(0);
2474 let tx2 = tx.clone();
2475 let _t = thread::spawn(move || {
2478 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2482 fn smoke_chan_gone() {
2483 let (tx, rx) = sync_channel::<i32>(0);
2485 assert!(rx.recv().is_err());
2489 fn smoke_chan_gone_shared() {
2490 let (tx, rx) = sync_channel::<()>(0);
2491 let tx2 = tx.clone();
2494 assert!(rx.recv().is_err());
2498 fn chan_gone_concurrent() {
2499 let (tx, rx) = sync_channel::<i32>(0);
2500 thread::spawn(move || {
2501 tx.send(1).unwrap();
2502 tx.send(1).unwrap();
2504 while rx.recv().is_ok() {}
2509 let (tx, rx) = sync_channel::<i32>(0);
2510 thread::spawn(move || {
2512 tx.send(1).unwrap();
2516 assert_eq!(rx.recv().unwrap(), 1);
2521 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2522 fn stress_recv_timeout_two_threads() {
2523 let (tx, rx) = sync_channel::<i32>(0);
2525 thread::spawn(move || {
2527 tx.send(1).unwrap();
2531 let mut recv_count = 0;
2533 match rx.recv_timeout(Duration::from_millis(1)) {
2538 Err(RecvTimeoutError::Timeout) => continue,
2539 Err(RecvTimeoutError::Disconnected) => break,
2543 assert_eq!(recv_count, 10000);
2547 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2548 fn stress_recv_timeout_shared() {
2549 const AMT: u32 = 1000;
2550 const NTHREADS: u32 = 8;
2551 let (tx, rx) = sync_channel::<i32>(0);
2552 let (dtx, drx) = sync_channel::<()>(0);
2554 thread::spawn(move || {
2555 let mut recv_count = 0;
2557 match rx.recv_timeout(Duration::from_millis(10)) {
2562 Err(RecvTimeoutError::Timeout) => continue,
2563 Err(RecvTimeoutError::Disconnected) => break,
2567 assert_eq!(recv_count, AMT * NTHREADS);
2568 assert!(rx.try_recv().is_err());
2570 dtx.send(()).unwrap();
2573 for _ in 0..NTHREADS {
2574 let tx = tx.clone();
2575 thread::spawn(move || {
2577 tx.send(1).unwrap();
2584 drx.recv().unwrap();
2588 fn stress_shared() {
2589 const AMT: u32 = 1000;
2590 const NTHREADS: u32 = 8;
2591 let (tx, rx) = sync_channel::<i32>(0);
2592 let (dtx, drx) = sync_channel::<()>(0);
2594 thread::spawn(move || {
2595 for _ in 0..AMT * NTHREADS {
2596 assert_eq!(rx.recv().unwrap(), 1);
2598 match rx.try_recv() {
2602 dtx.send(()).unwrap();
2605 for _ in 0..NTHREADS {
2606 let tx = tx.clone();
2607 thread::spawn(move || {
2609 tx.send(1).unwrap();
2614 drx.recv().unwrap();
2618 fn oneshot_single_thread_close_port_first() {
2619 // Simple test of closing without sending
2620 let (_tx, rx) = sync_channel::<i32>(0);
2625 fn oneshot_single_thread_close_chan_first() {
2626 // Simple test of closing without sending
2627 let (tx, _rx) = sync_channel::<i32>(0);
2632 fn oneshot_single_thread_send_port_close() {
2633 // Testing that the sender cleans up the payload if receiver is closed
2634 let (tx, rx) = sync_channel::<Box<i32>>(0);
2636 assert!(tx.send(box 0).is_err());
2640 fn oneshot_single_thread_recv_chan_close() {
2641 // Receiving on a closed chan will panic
2642 let res = thread::spawn(move || {
2643 let (tx, rx) = sync_channel::<i32>(0);
2649 assert!(res.is_err());
2653 fn oneshot_single_thread_send_then_recv() {
2654 let (tx, rx) = sync_channel::<Box<i32>>(1);
2655 tx.send(box 10).unwrap();
2656 assert!(*rx.recv().unwrap() == 10);
2660 fn oneshot_single_thread_try_send_open() {
2661 let (tx, rx) = sync_channel::<i32>(1);
2662 assert_eq!(tx.try_send(10), Ok(()));
2663 assert!(rx.recv().unwrap() == 10);
2667 fn oneshot_single_thread_try_send_closed() {
2668 let (tx, rx) = sync_channel::<i32>(0);
2670 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2674 fn oneshot_single_thread_try_send_closed2() {
2675 let (tx, _rx) = sync_channel::<i32>(0);
2676 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2680 fn oneshot_single_thread_try_recv_open() {
2681 let (tx, rx) = sync_channel::<i32>(1);
2682 tx.send(10).unwrap();
2683 assert!(rx.recv() == Ok(10));
2687 fn oneshot_single_thread_try_recv_closed() {
2688 let (tx, rx) = sync_channel::<i32>(0);
2690 assert!(rx.recv().is_err());
2694 fn oneshot_single_thread_try_recv_closed_with_data() {
2695 let (tx, rx) = sync_channel::<i32>(1);
2696 tx.send(10).unwrap();
2698 assert_eq!(rx.try_recv(), Ok(10));
2699 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2703 fn oneshot_single_thread_peek_data() {
2704 let (tx, rx) = sync_channel::<i32>(1);
2705 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2706 tx.send(10).unwrap();
2707 assert_eq!(rx.try_recv(), Ok(10));
2711 fn oneshot_single_thread_peek_close() {
2712 let (tx, rx) = sync_channel::<i32>(0);
2714 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2715 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2719 fn oneshot_single_thread_peek_open() {
2720 let (_tx, rx) = sync_channel::<i32>(0);
2721 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2725 fn oneshot_multi_task_recv_then_send() {
2726 let (tx, rx) = sync_channel::<Box<i32>>(0);
2727 let _t = thread::spawn(move || {
2728 assert!(*rx.recv().unwrap() == 10);
2731 tx.send(box 10).unwrap();
2735 fn oneshot_multi_task_recv_then_close() {
2736 let (tx, rx) = sync_channel::<Box<i32>>(0);
2737 let _t = thread::spawn(move || {
2740 let res = thread::spawn(move || {
2741 assert!(*rx.recv().unwrap() == 10);
2744 assert!(res.is_err());
2748 fn oneshot_multi_thread_close_stress() {
2749 for _ in 0..stress_factor() {
2750 let (tx, rx) = sync_channel::<i32>(0);
2751 let _t = thread::spawn(move || {
2759 fn oneshot_multi_thread_send_close_stress() {
2760 for _ in 0..stress_factor() {
2761 let (tx, rx) = sync_channel::<i32>(0);
2762 let _t = thread::spawn(move || {
2765 let _ = thread::spawn(move || {
2766 tx.send(1).unwrap();
2773 fn oneshot_multi_thread_recv_close_stress() {
2774 for _ in 0..stress_factor() {
2775 let (tx, rx) = sync_channel::<i32>(0);
2776 let _t = thread::spawn(move || {
2777 let res = thread::spawn(move || {
2781 assert!(res.is_err());
2783 let _t = thread::spawn(move || {
2784 thread::spawn(move || {
2792 fn oneshot_multi_thread_send_recv_stress() {
2793 for _ in 0..stress_factor() {
2794 let (tx, rx) = sync_channel::<Box<i32>>(0);
2795 let _t = thread::spawn(move || {
2796 tx.send(box 10).unwrap();
2798 assert!(*rx.recv().unwrap() == 10);
2803 fn stream_send_recv_stress() {
2804 for _ in 0..stress_factor() {
2805 let (tx, rx) = sync_channel::<Box<i32>>(0);
2810 fn send(tx: SyncSender<Box<i32>>, i: i32) {
2815 thread::spawn(move || {
2816 tx.send(box i).unwrap();
2821 fn recv(rx: Receiver<Box<i32>>, i: i32) {
2826 thread::spawn(move || {
2827 assert!(*rx.recv().unwrap() == i);
2836 // Regression test that we don't run out of stack in scheduler context
2837 let (tx, rx) = sync_channel(10000);
2839 tx.send(()).unwrap();
2847 fn shared_chan_stress() {
2848 let (tx, rx) = sync_channel(0);
2849 let total = stress_factor() + 100;
2851 let tx = tx.clone();
2852 thread::spawn(move || {
2853 tx.send(()).unwrap();
2863 fn test_nested_recv_iter() {
2864 let (tx, rx) = sync_channel::<i32>(0);
2865 let (total_tx, total_rx) = sync_channel::<i32>(0);
2867 let _t = thread::spawn(move || {
2869 for x in rx.iter() {
2872 total_tx.send(acc).unwrap();
2875 tx.send(3).unwrap();
2876 tx.send(1).unwrap();
2877 tx.send(2).unwrap();
2879 assert_eq!(total_rx.recv().unwrap(), 6);
2883 fn test_recv_iter_break() {
2884 let (tx, rx) = sync_channel::<i32>(0);
2885 let (count_tx, count_rx) = sync_channel(0);
2887 let _t = thread::spawn(move || {
2889 for x in rx.iter() {
2896 count_tx.send(count).unwrap();
2899 tx.send(2).unwrap();
2900 tx.send(2).unwrap();
2901 tx.send(2).unwrap();
2902 let _ = tx.try_send(2);
2904 assert_eq!(count_rx.recv().unwrap(), 4);
2908 fn try_recv_states() {
2909 let (tx1, rx1) = sync_channel::<i32>(1);
2910 let (tx2, rx2) = sync_channel::<()>(1);
2911 let (tx3, rx3) = sync_channel::<()>(1);
2912 let _t = thread::spawn(move || {
2913 rx2.recv().unwrap();
2914 tx1.send(1).unwrap();
2915 tx3.send(()).unwrap();
2916 rx2.recv().unwrap();
2918 tx3.send(()).unwrap();
2921 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2922 tx2.send(()).unwrap();
2923 rx3.recv().unwrap();
2924 assert_eq!(rx1.try_recv(), Ok(1));
2925 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2926 tx2.send(()).unwrap();
2927 rx3.recv().unwrap();
2928 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2931 // This bug used to end up in a livelock inside of the Receiver destructor
2932 // because the internal state of the Shared packet was corrupted
2934 fn destroy_upgraded_shared_port_when_sender_still_active() {
2935 let (tx, rx) = sync_channel::<()>(0);
2936 let (tx2, rx2) = sync_channel::<()>(0);
2937 let _t = thread::spawn(move || {
2938 rx.recv().unwrap(); // wait on a oneshot
2939 drop(rx); // destroy a shared
2940 tx2.send(()).unwrap();
2942 // make sure the other thread has gone to sleep
2944 thread::yield_now();
2947 // upgrade to a shared chan and send a message
2950 t.send(()).unwrap();
2952 // wait for the child thread to exit before we exit
2953 rx2.recv().unwrap();
2958 let (tx, rx) = sync_channel::<i32>(0);
2959 let _t = thread::spawn(move || {
2962 assert_eq!(tx.send(1), Ok(()));
2967 let (tx, rx) = sync_channel::<i32>(0);
2968 let _t = thread::spawn(move || {
2971 assert!(tx.send(1).is_err());
2976 let (tx, rx) = sync_channel::<i32>(1);
2977 assert_eq!(tx.send(1), Ok(()));
2978 let _t = thread::spawn(move || {
2981 assert!(tx.send(1).is_err());
2986 let (tx, rx) = sync_channel::<i32>(0);
2987 let tx2 = tx.clone();
2988 let (done, donerx) = channel();
2989 let done2 = done.clone();
2990 let _t = thread::spawn(move || {
2991 assert!(tx.send(1).is_err());
2992 done.send(()).unwrap();
2994 let _t = thread::spawn(move || {
2995 assert!(tx2.send(2).is_err());
2996 done2.send(()).unwrap();
2999 donerx.recv().unwrap();
3000 donerx.recv().unwrap();
3005 let (tx, _rx) = sync_channel::<i32>(0);
3006 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3011 let (tx, _rx) = sync_channel::<i32>(1);
3012 assert_eq!(tx.try_send(1), Ok(()));
3013 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3018 let (tx, rx) = sync_channel::<i32>(1);
3019 assert_eq!(tx.try_send(1), Ok(()));
3021 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
3027 let (tx1, rx1) = sync_channel::<()>(3);
3028 let (tx2, rx2) = sync_channel::<()>(3);
3030 let _t = thread::spawn(move || {
3031 rx1.recv().unwrap();
3032 tx2.try_send(()).unwrap();
3035 tx1.try_send(()).unwrap();
3036 rx2.recv().unwrap();