1 // ignore-tidy-filelength
3 //! Multi-producer, single-consumer FIFO queue communication primitives.
5 //! This module provides message-based communication over channels, concretely
6 //! defined among three types:
12 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
13 //! senders are clone-able (multi-producer) such that many threads can send
14 //! simultaneously to one receiver (single-consumer).
16 //! These channels come in two flavors:
18 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
19 //! will return a `(Sender, Receiver)` tuple where all sends will be
20 //! **asynchronous** (they never block). The channel conceptually has an
23 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
24 //! return a `(SyncSender, Receiver)` tuple where the storage for pending
25 //! messages is a pre-allocated buffer of a fixed size. All sends will be
26 //! **synchronous** by blocking until there is buffer space available. Note
27 //! that a bound of 0 is allowed, causing the channel to become a "rendezvous"
28 //! channel where each sender atomically hands off a message to a receiver.
30 //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
31 //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
32 //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
33 //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
34 //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
35 //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
39 //! The send and receive operations on channels will all return a [`Result`]
40 //! indicating whether the operation succeeded or not. An unsuccessful operation
41 //! is normally indicative of the other half of a channel having "hung up" by
42 //! being dropped in its corresponding thread.
44 //! Once half of a channel has been deallocated, most operations can no longer
45 //! continue to make progress, so [`Err`] will be returned. Many applications
46 //! will continue to [`unwrap`] the results returned from this module,
47 //! instigating a propagation of failure among threads if one unexpectedly dies.
49 //! [`Result`]: ../../../std/result/enum.Result.html
50 //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
51 //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
59 //! use std::sync::mpsc::channel;
61 //! // Create a simple streaming channel
62 //! let (tx, rx) = channel();
63 //! thread::spawn(move|| {
64 //! tx.send(10).unwrap();
66 //! assert_eq!(rx.recv().unwrap(), 10);
73 //! use std::sync::mpsc::channel;
75 //! // Create a shared channel that can be sent along from many threads
76 //! // where tx is the sending half (tx for transmission), and rx is the receiving
77 //! // half (rx for receiving).
78 //! let (tx, rx) = channel();
80 //! let tx = tx.clone();
81 //! thread::spawn(move|| {
82 //! tx.send(i).unwrap();
87 //! let j = rx.recv().unwrap();
88 //! assert!(0 <= j && j < 10);
92 //! Propagating panics:
95 //! use std::sync::mpsc::channel;
97 //! // The call to recv() will return an error because the channel has already
98 //! // hung up (or been deallocated)
99 //! let (tx, rx) = channel::<i32>();
101 //! assert!(rx.recv().is_err());
104 //! Synchronous channels:
108 //! use std::sync::mpsc::sync_channel;
110 //! let (tx, rx) = sync_channel::<i32>(0);
111 //! thread::spawn(move|| {
112 //! // This will wait for the parent thread to start receiving
113 //! tx.send(53).unwrap();
115 //! rx.recv().unwrap();
118 #![stable(feature = "rust1", since = "1.0.0")]
120 // A description of how Rust's channel implementation works
122 // Channels are supposed to be the basic building block for all other
123 // concurrent primitives that are used in Rust. As a result, the channel type
124 // needs to be highly optimized, flexible, and broad enough for use everywhere.
126 // The choice of implementation of all channels is to be built on lock-free data
127 // structures. The channels themselves are then consequently also lock-free data
128 // structures. As always with lock-free code, this is a very "here be dragons"
129 // territory, especially because I'm unaware of any academic papers that have
130 // gone into great length about channels of these flavors.
132 // ## Flavors of channels
134 // From the perspective of a consumer of this library, there is only one flavor
135 // of channel. This channel can be used as a stream and cloned to allow multiple
136 // senders. Under the hood, however, there are actually three flavors of
139 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
140 // case. They contain as few atomics as possible and
141 // involve one and exactly one allocation.
142 // * Streams - these channels are optimized for the non-shared use case. They
143 // use a different concurrent queue that is more tailored for this
144 // use case. The initial allocation of this flavor of channel is not
146 // * Shared - this is the most general form of channel that this module offers,
147 // a channel with multiple senders. This type is as optimized as it
148 // can be, but the previous two types mentioned are much faster for
151 // ## Concurrent queues
153 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
154 // but recv() obviously blocks. This means that under the hood there must be
155 // some shared and concurrent queue holding all of the actual data.
157 // With two flavors of channels, two flavors of queues are also used. We have
158 // chosen to use queues from a well-known author that are abbreviated as SPSC
159 // and MPSC (single producer, single consumer and multiple producer, single
160 // consumer). SPSC queues are used for streams while MPSC queues are used for
163 // ### SPSC optimizations
165 // The SPSC queue found online is essentially a linked list of nodes where one
166 // half of the nodes are the "queue of data" and the other half of nodes are a
167 // cache of unused nodes. The unused nodes are used such that an allocation is
168 // not required on every push() and a free doesn't need to happen on every
171 // As found online, however, the cache of nodes is of an infinite size. This
172 // means that if a channel at one point in its life had 50k items in the queue,
173 // then the queue will always have the capacity for 50k items. I believed that
174 // this was an unnecessary limitation of the implementation, so I have altered
175 // the queue to optionally have a bound on the cache size.
177 // By default, streams will have an unbounded SPSC queue with a small-ish cache
178 // size. The hope is that the cache is still large enough to have very fast
179 // send() operations while not too large such that millions of channels can
182 // ### MPSC optimizations
184 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
185 // a linked list under the hood to earn its unboundedness, but I have not put
186 // forth much effort into having a cache of nodes similar to the SPSC queue.
188 // For now, I believe that this is "ok" because shared channels are not the most
189 // common type, but soon we may wish to revisit this queue choice and determine
190 // another candidate for backend storage of shared channels.
192 // ## Overview of the Implementation
194 // Now that there's a little background on the concurrent queues used, it's
195 // worth going into much more detail about the channels themselves. The basic
196 // pseudocode for a send/recv are:
200 // queue.push(t) return if queue.pop()
201 // if increment() == -1 deschedule {
202 // wakeup() if decrement() > 0
203 // cancel_deschedule()
207 // As mentioned before, there are no locks in this implementation, only atomic
208 // instructions are used.
210 // ### The internal atomic counter
212 // Every channel has a shared counter with each half to keep track of the size
213 // of the queue. This counter is used to abort descheduling by the receiver and
214 // to know when to wake up on the sending side.
216 // As seen in the pseudocode, senders will increment this count and receivers
217 // will decrement the count. The theory behind this is that if a sender sees a
218 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
219 // then it doesn't need to block.
221 // The recv() method has a beginning call to pop(), and if successful, it needs
222 // to decrement the count. It is a crucial implementation detail that this
223 // decrement does *not* happen to the shared counter. If this were the case,
224 // then it would be possible for the counter to be very negative when there were
225 // no receivers waiting, in which case the senders would have to determine when
226 // it was actually appropriate to wake up a receiver.
228 // Instead, the "steal count" is kept track of separately (not atomically
229 // because it's only used by receivers), and then the decrement() call when
230 // descheduling will lump in all of the recent steals into one large decrement.
232 // The implication of this is that if a sender sees a -1 count, then there's
233 // guaranteed to be a waiter waiting!
235 // ## Native Implementation
237 // A major goal of these channels is to work seamlessly on and off the runtime.
238 // All of the previous race conditions have been worded in terms of
239 // scheduler-isms (which is obviously not available without the runtime).
241 // For now, native usage of channels (off the runtime) will fall back onto
242 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
243 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
244 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
245 // condition variable.
249 // Being able to support selection over channels has greatly influenced this
250 // design, and not only does selection need to work inside the runtime, but also
251 // outside the runtime.
253 // The implementation is fairly straightforward. The goal of select() is not to
254 // return some data, but only to return which channel can receive data without
255 // blocking. The implementation is essentially the entire blocking procedure
256 // followed by an increment as soon as its woken up. The cancellation procedure
257 // involves an increment and swapping out of to_wake to acquire ownership of the
258 // thread to unblock.
260 // Sadly this current implementation requires multiple allocations, so I have
261 // seen the throughput of select() be much worse than it should be. I do not
262 // believe that there is anything fundamental that needs to change about these
263 // channels, however, in order to support a more efficient select().
265 // FIXME: Select is now removed, so these factors are ready to be cleaned up!
269 // And now that you've seen all the races that I found and attempted to fix,
270 // here's the code for you to find some more!
272 use crate::cell::UnsafeCell;
276 use crate::sync::Arc;
277 use crate::time::{Duration, Instant};
289 /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
290 /// This half can only be owned by one thread.
292 /// Messages sent to the channel can be retrieved using [`recv`].
294 /// [`channel`]: fn.channel.html
295 /// [`sync_channel`]: fn.sync_channel.html
296 /// [`recv`]: struct.Receiver.html#method.recv
301 /// use std::sync::mpsc::channel;
303 /// use std::time::Duration;
305 /// let (send, recv) = channel();
307 /// thread::spawn(move || {
308 /// send.send("Hello world!").unwrap();
309 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
310 /// send.send("Delayed for 2 seconds").unwrap();
313 /// println!("{}", recv.recv().unwrap()); // Received immediately
314 /// println!("Waiting...");
315 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
317 #[stable(feature = "rust1", since = "1.0.0")]
318 pub struct Receiver<T> {
319 inner: UnsafeCell<Flavor<T>>,
322 // The receiver port can be sent from place to place, so long as it
323 // is not used to receive non-sendable things.
324 #[stable(feature = "rust1", since = "1.0.0")]
325 unsafe impl<T: Send> Send for Receiver<T> {}
327 #[stable(feature = "rust1", since = "1.0.0")]
328 impl<T> !Sync for Receiver<T> {}
330 /// An iterator over messages on a [`Receiver`], created by [`iter`].
332 /// This iterator will block whenever [`next`] is called,
333 /// waiting for a new message, and [`None`] will be returned
334 /// when the corresponding channel has hung up.
336 /// [`iter`]: struct.Receiver.html#method.iter
337 /// [`Receiver`]: struct.Receiver.html
338 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
339 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
344 /// use std::sync::mpsc::channel;
347 /// let (send, recv) = channel();
349 /// thread::spawn(move || {
350 /// send.send(1u8).unwrap();
351 /// send.send(2u8).unwrap();
352 /// send.send(3u8).unwrap();
355 /// for x in recv.iter() {
356 /// println!("Got: {}", x);
359 #[stable(feature = "rust1", since = "1.0.0")]
361 pub struct Iter<'a, T: 'a> {
365 /// An iterator that attempts to yield all pending values for a [`Receiver`],
366 /// created by [`try_iter`].
368 /// [`None`] will be returned when there are no pending values remaining or
369 /// if the corresponding channel has hung up.
371 /// This iterator will never block the caller in order to wait for data to
372 /// become available. Instead, it will return [`None`].
374 /// [`Receiver`]: struct.Receiver.html
375 /// [`try_iter`]: struct.Receiver.html#method.try_iter
376 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
381 /// use std::sync::mpsc::channel;
383 /// use std::time::Duration;
385 /// let (sender, receiver) = channel();
387 /// // Nothing is in the buffer yet
388 /// assert!(receiver.try_iter().next().is_none());
389 /// println!("Nothing in the buffer...");
391 /// thread::spawn(move || {
392 /// sender.send(1).unwrap();
393 /// sender.send(2).unwrap();
394 /// sender.send(3).unwrap();
397 /// println!("Going to sleep...");
398 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
400 /// for x in receiver.try_iter() {
401 /// println!("Got: {}", x);
404 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
406 pub struct TryIter<'a, T: 'a> {
410 /// An owning iterator over messages on a [`Receiver`],
411 /// created by **Receiver::into_iter**.
413 /// This iterator will block whenever [`next`]
414 /// is called, waiting for a new message, and [`None`] will be
415 /// returned if the corresponding channel has hung up.
417 /// [`Receiver`]: struct.Receiver.html
418 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
419 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
424 /// use std::sync::mpsc::channel;
427 /// let (send, recv) = channel();
429 /// thread::spawn(move || {
430 /// send.send(1u8).unwrap();
431 /// send.send(2u8).unwrap();
432 /// send.send(3u8).unwrap();
435 /// for x in recv.into_iter() {
436 /// println!("Got: {}", x);
439 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
441 pub struct IntoIter<T> {
445 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
446 /// owned by one thread, but it can be cloned to send to other threads.
448 /// Messages can be sent through this channel with [`send`].
450 /// [`channel`]: fn.channel.html
451 /// [`send`]: struct.Sender.html#method.send
456 /// use std::sync::mpsc::channel;
459 /// let (sender, receiver) = channel();
460 /// let sender2 = sender.clone();
462 /// // First thread owns sender
463 /// thread::spawn(move || {
464 /// sender.send(1).unwrap();
467 /// // Second thread owns sender2
468 /// thread::spawn(move || {
469 /// sender2.send(2).unwrap();
472 /// let msg = receiver.recv().unwrap();
473 /// let msg2 = receiver.recv().unwrap();
475 /// assert_eq!(3, msg + msg2);
477 #[stable(feature = "rust1", since = "1.0.0")]
478 pub struct Sender<T> {
479 inner: UnsafeCell<Flavor<T>>,
482 // The send port can be sent from place to place, so long as it
483 // is not used to send non-sendable things.
484 #[stable(feature = "rust1", since = "1.0.0")]
485 unsafe impl<T: Send> Send for Sender<T> {}
487 #[stable(feature = "rust1", since = "1.0.0")]
488 impl<T> !Sync for Sender<T> {}
490 /// The sending-half of Rust's synchronous [`sync_channel`] type.
492 /// Messages can be sent through this channel with [`send`] or [`try_send`].
494 /// [`send`] will block if there is no space in the internal buffer.
496 /// [`sync_channel`]: fn.sync_channel.html
497 /// [`send`]: struct.SyncSender.html#method.send
498 /// [`try_send`]: struct.SyncSender.html#method.try_send
503 /// use std::sync::mpsc::sync_channel;
506 /// // Create a sync_channel with buffer size 2
507 /// let (sync_sender, receiver) = sync_channel(2);
508 /// let sync_sender2 = sync_sender.clone();
510 /// // First thread owns sync_sender
511 /// thread::spawn(move || {
512 /// sync_sender.send(1).unwrap();
513 /// sync_sender.send(2).unwrap();
516 /// // Second thread owns sync_sender2
517 /// thread::spawn(move || {
518 /// sync_sender2.send(3).unwrap();
519 /// // thread will now block since the buffer is full
520 /// println!("Thread unblocked!");
525 /// msg = receiver.recv().unwrap();
526 /// println!("message {} received", msg);
528 /// // "Thread unblocked!" will be printed now
530 /// msg = receiver.recv().unwrap();
531 /// println!("message {} received", msg);
533 /// msg = receiver.recv().unwrap();
535 /// println!("message {} received", msg);
537 #[stable(feature = "rust1", since = "1.0.0")]
538 pub struct SyncSender<T> {
539 inner: Arc<sync::Packet<T>>,
542 #[stable(feature = "rust1", since = "1.0.0")]
543 unsafe impl<T: Send> Send for SyncSender<T> {}
545 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
546 /// function on **channel**s.
548 /// A **send** operation can only fail if the receiving end of a channel is
549 /// disconnected, implying that the data could never be received. The error
550 /// contains the data being sent as a payload so it can be recovered.
552 /// [`Sender::send`]: struct.Sender.html#method.send
553 /// [`SyncSender::send`]: struct.SyncSender.html#method.send
554 #[stable(feature = "rust1", since = "1.0.0")]
555 #[derive(PartialEq, Eq, Clone, Copy)]
556 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
558 /// An error returned from the [`recv`] function on a [`Receiver`].
560 /// The [`recv`] operation can only fail if the sending half of a
561 /// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
562 /// messages will ever be received.
564 /// [`recv`]: struct.Receiver.html#method.recv
565 /// [`Receiver`]: struct.Receiver.html
566 /// [`channel`]: fn.channel.html
567 /// [`sync_channel`]: fn.sync_channel.html
568 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
569 #[stable(feature = "rust1", since = "1.0.0")]
570 pub struct RecvError;
572 /// This enumeration is the list of the possible reasons that [`try_recv`] could
573 /// not return data when called. This can occur with both a [`channel`] and
574 /// a [`sync_channel`].
576 /// [`try_recv`]: struct.Receiver.html#method.try_recv
577 /// [`channel`]: fn.channel.html
578 /// [`sync_channel`]: fn.sync_channel.html
579 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
580 #[stable(feature = "rust1", since = "1.0.0")]
581 pub enum TryRecvError {
582 /// This **channel** is currently empty, but the **Sender**(s) have not yet
583 /// disconnected, so data may yet become available.
584 #[stable(feature = "rust1", since = "1.0.0")]
587 /// The **channel**'s sending half has become disconnected, and there will
588 /// never be any more data received on it.
589 #[stable(feature = "rust1", since = "1.0.0")]
593 /// This enumeration is the list of possible errors that made [`recv_timeout`]
594 /// unable to return data when called. This can occur with both a [`channel`] and
595 /// a [`sync_channel`].
597 /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
598 /// [`channel`]: fn.channel.html
599 /// [`sync_channel`]: fn.sync_channel.html
600 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
601 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
602 pub enum RecvTimeoutError {
603 /// This **channel** is currently empty, but the **Sender**(s) have not yet
604 /// disconnected, so data may yet become available.
605 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
607 /// The **channel**'s sending half has become disconnected, and there will
608 /// never be any more data received on it.
609 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
613 /// This enumeration is the list of the possible error outcomes for the
614 /// [`try_send`] method.
616 /// [`try_send`]: struct.SyncSender.html#method.try_send
617 #[stable(feature = "rust1", since = "1.0.0")]
618 #[derive(PartialEq, Eq, Clone, Copy)]
619 pub enum TrySendError<T> {
620 /// The data could not be sent on the [`sync_channel`] because it would require that
621 /// the callee block to send the data.
623 /// If this is a buffered channel, then the buffer is full at this time. If
624 /// this is not a buffered channel, then there is no [`Receiver`] available to
625 /// acquire the data.
627 /// [`sync_channel`]: fn.sync_channel.html
628 /// [`Receiver`]: struct.Receiver.html
629 #[stable(feature = "rust1", since = "1.0.0")]
630 Full(#[stable(feature = "rust1", since = "1.0.0")] T),
632 /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
633 /// sent. The data is returned back to the callee in this case.
635 /// [`sync_channel`]: fn.sync_channel.html
636 #[stable(feature = "rust1", since = "1.0.0")]
637 Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
641 Oneshot(Arc<oneshot::Packet<T>>),
642 Stream(Arc<stream::Packet<T>>),
643 Shared(Arc<shared::Packet<T>>),
644 Sync(Arc<sync::Packet<T>>),
648 trait UnsafeFlavor<T> {
649 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
650 unsafe fn inner_mut(&self) -> &mut Flavor<T> {
651 &mut *self.inner_unsafe().get()
653 unsafe fn inner(&self) -> &Flavor<T> {
654 &*self.inner_unsafe().get()
657 impl<T> UnsafeFlavor<T> for Sender<T> {
658 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
662 impl<T> UnsafeFlavor<T> for Receiver<T> {
663 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
668 /// Creates a new asynchronous channel, returning the sender/receiver halves.
669 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
670 /// the same order as it was sent, and no [`send`] will block the calling thread
671 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
672 /// block after its buffer limit is reached). [`recv`] will block until a message
675 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
676 /// only one [`Receiver`] is supported.
678 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
679 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
680 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
681 /// return a [`RecvError`].
683 /// [`send`]: struct.Sender.html#method.send
684 /// [`recv`]: struct.Receiver.html#method.recv
685 /// [`Sender`]: struct.Sender.html
686 /// [`Receiver`]: struct.Receiver.html
687 /// [`sync_channel`]: fn.sync_channel.html
688 /// [`SendError`]: struct.SendError.html
689 /// [`RecvError`]: struct.RecvError.html
694 /// use std::sync::mpsc::channel;
697 /// let (sender, receiver) = channel();
699 /// // Spawn off an expensive computation
700 /// thread::spawn(move|| {
701 /// # fn expensive_computation() {}
702 /// sender.send(expensive_computation()).unwrap();
705 /// // Do some useful work for awhile
707 /// // Let's see what that answer was
708 /// println!("{:?}", receiver.recv().unwrap());
710 #[stable(feature = "rust1", since = "1.0.0")]
711 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
712 let a = Arc::new(oneshot::Packet::new());
713 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
716 /// Creates a new synchronous, bounded channel.
717 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
718 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
719 /// [`Receiver`] will block until a message becomes available. `sync_channel`
720 /// differs greatly in the semantics of the sender, however.
722 /// This channel has an internal buffer on which messages will be queued.
723 /// `bound` specifies the buffer size. When the internal buffer becomes full,
724 /// future sends will *block* waiting for the buffer to open up. Note that a
725 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
726 /// where each [`send`] will not return until a [`recv`] is paired with it.
728 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
729 /// times, but only one [`Receiver`] is supported.
731 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
732 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
733 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
734 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
736 /// [`channel`]: fn.channel.html
737 /// [`send`]: struct.SyncSender.html#method.send
738 /// [`recv`]: struct.Receiver.html#method.recv
739 /// [`SyncSender`]: struct.SyncSender.html
740 /// [`Receiver`]: struct.Receiver.html
741 /// [`SendError`]: struct.SendError.html
742 /// [`RecvError`]: struct.RecvError.html
747 /// use std::sync::mpsc::sync_channel;
750 /// let (sender, receiver) = sync_channel(1);
752 /// // this returns immediately
753 /// sender.send(1).unwrap();
755 /// thread::spawn(move|| {
756 /// // this will block until the previous message has been received
757 /// sender.send(2).unwrap();
760 /// assert_eq!(receiver.recv().unwrap(), 1);
761 /// assert_eq!(receiver.recv().unwrap(), 2);
763 #[stable(feature = "rust1", since = "1.0.0")]
764 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
765 let a = Arc::new(sync::Packet::new(bound));
766 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
769 ////////////////////////////////////////////////////////////////////////////////
771 ////////////////////////////////////////////////////////////////////////////////
774 fn new(inner: Flavor<T>) -> Sender<T> {
775 Sender { inner: UnsafeCell::new(inner) }
778 /// Attempts to send a value on this channel, returning it back if it could
781 /// A successful send occurs when it is determined that the other end of
782 /// the channel has not hung up already. An unsuccessful send would be one
783 /// where the corresponding receiver has already been deallocated. Note
784 /// that a return value of [`Err`] means that the data will never be
785 /// received, but a return value of [`Ok`] does *not* mean that the data
786 /// will be received. It is possible for the corresponding receiver to
787 /// hang up immediately after this function returns [`Ok`].
789 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
790 /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
792 /// This method will never block the current thread.
797 /// use std::sync::mpsc::channel;
799 /// let (tx, rx) = channel();
801 /// // This send is always successful
802 /// tx.send(1).unwrap();
804 /// // This send will fail because the receiver is gone
806 /// assert_eq!(tx.send(1).unwrap_err().0, 1);
808 #[stable(feature = "rust1", since = "1.0.0")]
809 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
810 let (new_inner, ret) = match *unsafe { self.inner() } {
811 Flavor::Oneshot(ref p) => {
813 return p.send(t).map_err(SendError);
815 let a = Arc::new(stream::Packet::new());
816 let rx = Receiver::new(Flavor::Stream(a.clone()));
817 match p.upgrade(rx) {
818 oneshot::UpSuccess => {
822 oneshot::UpDisconnected => (a, Err(t)),
823 oneshot::UpWoke(token) => {
824 // This send cannot panic because the thread is
825 // asleep (we're looking at it), so the receiver
827 a.send(t).ok().unwrap();
834 Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
835 Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
836 Flavor::Sync(..) => unreachable!(),
840 let tmp = Sender::new(Flavor::Stream(new_inner));
841 mem::swap(self.inner_mut(), tmp.inner_mut());
843 ret.map_err(SendError)
847 #[stable(feature = "rust1", since = "1.0.0")]
848 impl<T> Clone for Sender<T> {
849 fn clone(&self) -> Sender<T> {
850 let packet = match *unsafe { self.inner() } {
851 Flavor::Oneshot(ref p) => {
852 let a = Arc::new(shared::Packet::new());
854 let guard = a.postinit_lock();
855 let rx = Receiver::new(Flavor::Shared(a.clone()));
856 let sleeper = match p.upgrade(rx) {
857 oneshot::UpSuccess | oneshot::UpDisconnected => None,
858 oneshot::UpWoke(task) => Some(task),
860 a.inherit_blocker(sleeper, guard);
864 Flavor::Stream(ref p) => {
865 let a = Arc::new(shared::Packet::new());
867 let guard = a.postinit_lock();
868 let rx = Receiver::new(Flavor::Shared(a.clone()));
869 let sleeper = match p.upgrade(rx) {
870 stream::UpSuccess | stream::UpDisconnected => None,
871 stream::UpWoke(task) => Some(task),
873 a.inherit_blocker(sleeper, guard);
877 Flavor::Shared(ref p) => {
879 return Sender::new(Flavor::Shared(p.clone()));
881 Flavor::Sync(..) => unreachable!(),
885 let tmp = Sender::new(Flavor::Shared(packet.clone()));
886 mem::swap(self.inner_mut(), tmp.inner_mut());
888 Sender::new(Flavor::Shared(packet))
892 #[stable(feature = "rust1", since = "1.0.0")]
893 impl<T> Drop for Sender<T> {
895 match *unsafe { self.inner() } {
896 Flavor::Oneshot(ref p) => p.drop_chan(),
897 Flavor::Stream(ref p) => p.drop_chan(),
898 Flavor::Shared(ref p) => p.drop_chan(),
899 Flavor::Sync(..) => unreachable!(),
904 #[stable(feature = "mpsc_debug", since = "1.8.0")]
905 impl<T> fmt::Debug for Sender<T> {
906 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
907 f.debug_struct("Sender").finish()
911 ////////////////////////////////////////////////////////////////////////////////
913 ////////////////////////////////////////////////////////////////////////////////
915 impl<T> SyncSender<T> {
916 fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
920 /// Sends a value on this synchronous channel.
922 /// This function will *block* until space in the internal buffer becomes
923 /// available or a receiver is available to hand off the message to.
925 /// Note that a successful send does *not* guarantee that the receiver will
926 /// ever see the data if there is a buffer on this channel. Items may be
927 /// enqueued in the internal buffer for the receiver to receive at a later
928 /// time. If the buffer size is 0, however, the channel becomes a rendezvous
929 /// channel and it guarantees that the receiver has indeed received
930 /// the data if this function returns success.
932 /// This function will never panic, but it may return [`Err`] if the
933 /// [`Receiver`] has disconnected and is no longer able to receive
936 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
937 /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
942 /// use std::sync::mpsc::sync_channel;
945 /// // Create a rendezvous sync_channel with buffer size 0
946 /// let (sync_sender, receiver) = sync_channel(0);
948 /// thread::spawn(move || {
949 /// println!("sending message...");
950 /// sync_sender.send(1).unwrap();
951 /// // Thread is now blocked until the message is received
953 /// println!("...message received!");
956 /// let msg = receiver.recv().unwrap();
957 /// assert_eq!(1, msg);
959 #[stable(feature = "rust1", since = "1.0.0")]
960 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
961 self.inner.send(t).map_err(SendError)
964 /// Attempts to send a value on this channel without blocking.
966 /// This method differs from [`send`] by returning immediately if the
967 /// channel's buffer is full or no receiver is waiting to acquire some
968 /// data. Compared with [`send`], this function has two failure cases
969 /// instead of one (one for disconnection, one for a full buffer).
971 /// See [`send`] for notes about guarantees of whether the
972 /// receiver has received the data or not if this function is successful.
974 /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
979 /// use std::sync::mpsc::sync_channel;
982 /// // Create a sync_channel with buffer size 1
983 /// let (sync_sender, receiver) = sync_channel(1);
984 /// let sync_sender2 = sync_sender.clone();
986 /// // First thread owns sync_sender
987 /// thread::spawn(move || {
988 /// sync_sender.send(1).unwrap();
989 /// sync_sender.send(2).unwrap();
990 /// // Thread blocked
993 /// // Second thread owns sync_sender2
994 /// thread::spawn(move || {
995 /// // This will return an error and send
996 /// // no message if the buffer is full
997 /// let _ = sync_sender2.try_send(3);
1001 /// msg = receiver.recv().unwrap();
1002 /// println!("message {} received", msg);
1004 /// msg = receiver.recv().unwrap();
1005 /// println!("message {} received", msg);
1007 /// // Third message may have never been sent
1008 /// match receiver.try_recv() {
1009 /// Ok(msg) => println!("message {} received", msg),
1010 /// Err(_) => println!("the third message was never sent"),
1013 #[stable(feature = "rust1", since = "1.0.0")]
1014 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
1015 self.inner.try_send(t)
1019 #[stable(feature = "rust1", since = "1.0.0")]
1020 impl<T> Clone for SyncSender<T> {
1021 fn clone(&self) -> SyncSender<T> {
1022 self.inner.clone_chan();
1023 SyncSender::new(self.inner.clone())
1027 #[stable(feature = "rust1", since = "1.0.0")]
1028 impl<T> Drop for SyncSender<T> {
1029 fn drop(&mut self) {
1030 self.inner.drop_chan();
1034 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1035 impl<T> fmt::Debug for SyncSender<T> {
1036 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1037 f.debug_struct("SyncSender").finish()
1041 ////////////////////////////////////////////////////////////////////////////////
1043 ////////////////////////////////////////////////////////////////////////////////
1045 impl<T> Receiver<T> {
1046 fn new(inner: Flavor<T>) -> Receiver<T> {
1047 Receiver { inner: UnsafeCell::new(inner) }
1050 /// Attempts to return a pending value on this receiver without blocking.
1052 /// This method will never block the caller in order to wait for data to
1053 /// become available. Instead, this will always return immediately with a
1054 /// possible option of pending data on the channel.
1056 /// This is useful for a flavor of "optimistic check" before deciding to
1057 /// block on a receiver.
1059 /// Compared with [`recv`], this function has two failure cases instead of one
1060 /// (one for disconnection, one for an empty buffer).
1062 /// [`recv`]: struct.Receiver.html#method.recv
1067 /// use std::sync::mpsc::{Receiver, channel};
1069 /// let (_, receiver): (_, Receiver<i32>) = channel();
1071 /// assert!(receiver.try_recv().is_err());
1073 #[stable(feature = "rust1", since = "1.0.0")]
1074 pub fn try_recv(&self) -> Result<T, TryRecvError> {
1076 let new_port = match *unsafe { self.inner() } {
1077 Flavor::Oneshot(ref p) => match p.try_recv() {
1078 Ok(t) => return Ok(t),
1079 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1080 Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected),
1081 Err(oneshot::Upgraded(rx)) => rx,
1083 Flavor::Stream(ref p) => match p.try_recv() {
1084 Ok(t) => return Ok(t),
1085 Err(stream::Empty) => return Err(TryRecvError::Empty),
1086 Err(stream::Disconnected) => return Err(TryRecvError::Disconnected),
1087 Err(stream::Upgraded(rx)) => rx,
1089 Flavor::Shared(ref p) => match p.try_recv() {
1090 Ok(t) => return Ok(t),
1091 Err(shared::Empty) => return Err(TryRecvError::Empty),
1092 Err(shared::Disconnected) => return Err(TryRecvError::Disconnected),
1094 Flavor::Sync(ref p) => match p.try_recv() {
1095 Ok(t) => return Ok(t),
1096 Err(sync::Empty) => return Err(TryRecvError::Empty),
1097 Err(sync::Disconnected) => return Err(TryRecvError::Disconnected),
1101 mem::swap(self.inner_mut(), new_port.inner_mut());
1106 /// Attempts to wait for a value on this receiver, returning an error if the
1107 /// corresponding channel has hung up.
1109 /// This function will always block the current thread if there is no data
1110 /// available and it's possible for more data to be sent. Once a message is
1111 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1112 /// receiver will wake up and return that message.
1114 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1115 /// this call is blocking, this call will wake up and return [`Err`] to
1116 /// indicate that no more messages can ever be received on this channel.
1117 /// However, since channels are buffered, messages sent before the disconnect
1118 /// will still be properly received.
1120 /// [`Sender`]: struct.Sender.html
1121 /// [`SyncSender`]: struct.SyncSender.html
1122 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1127 /// use std::sync::mpsc;
1128 /// use std::thread;
1130 /// let (send, recv) = mpsc::channel();
1131 /// let handle = thread::spawn(move || {
1132 /// send.send(1u8).unwrap();
1135 /// handle.join().unwrap();
1137 /// assert_eq!(Ok(1), recv.recv());
1140 /// Buffering behavior:
1143 /// use std::sync::mpsc;
1144 /// use std::thread;
1145 /// use std::sync::mpsc::RecvError;
1147 /// let (send, recv) = mpsc::channel();
1148 /// let handle = thread::spawn(move || {
1149 /// send.send(1u8).unwrap();
1150 /// send.send(2).unwrap();
1151 /// send.send(3).unwrap();
1155 /// // wait for the thread to join so we ensure the sender is dropped
1156 /// handle.join().unwrap();
1158 /// assert_eq!(Ok(1), recv.recv());
1159 /// assert_eq!(Ok(2), recv.recv());
1160 /// assert_eq!(Ok(3), recv.recv());
1161 /// assert_eq!(Err(RecvError), recv.recv());
1163 #[stable(feature = "rust1", since = "1.0.0")]
1164 pub fn recv(&self) -> Result<T, RecvError> {
1166 let new_port = match *unsafe { self.inner() } {
1167 Flavor::Oneshot(ref p) => match p.recv(None) {
1168 Ok(t) => return Ok(t),
1169 Err(oneshot::Disconnected) => return Err(RecvError),
1170 Err(oneshot::Upgraded(rx)) => rx,
1171 Err(oneshot::Empty) => unreachable!(),
1173 Flavor::Stream(ref p) => match p.recv(None) {
1174 Ok(t) => return Ok(t),
1175 Err(stream::Disconnected) => return Err(RecvError),
1176 Err(stream::Upgraded(rx)) => rx,
1177 Err(stream::Empty) => unreachable!(),
1179 Flavor::Shared(ref p) => match p.recv(None) {
1180 Ok(t) => return Ok(t),
1181 Err(shared::Disconnected) => return Err(RecvError),
1182 Err(shared::Empty) => unreachable!(),
1184 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1187 mem::swap(self.inner_mut(), new_port.inner_mut());
1192 /// Attempts to wait for a value on this receiver, returning an error if the
1193 /// corresponding channel has hung up, or if it waits more than `timeout`.
1195 /// This function will always block the current thread if there is no data
1196 /// available and it's possible for more data to be sent. Once a message is
1197 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1198 /// receiver will wake up and return that message.
1200 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1201 /// this call is blocking, this call will wake up and return [`Err`] to
1202 /// indicate that no more messages can ever be received on this channel.
1203 /// However, since channels are buffered, messages sent before the disconnect
1204 /// will still be properly received.
1206 /// [`Sender`]: struct.Sender.html
1207 /// [`SyncSender`]: struct.SyncSender.html
1208 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1212 /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
1213 /// to panic unexpectedly with the following example:
1216 /// use std::sync::mpsc::channel;
1217 /// use std::thread;
1218 /// use std::time::Duration;
1220 /// let (tx, rx) = channel::<String>();
1222 /// thread::spawn(move || {
1223 /// let d = Duration::from_millis(10);
1225 /// println!("recv");
1226 /// let _r = rx.recv_timeout(d);
1230 /// thread::sleep(Duration::from_millis(100));
1231 /// let _c1 = tx.clone();
1233 /// thread::sleep(Duration::from_secs(1));
1236 /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
1240 /// Successfully receiving value before encountering timeout:
1243 /// use std::thread;
1244 /// use std::time::Duration;
1245 /// use std::sync::mpsc;
1247 /// let (send, recv) = mpsc::channel();
1249 /// thread::spawn(move || {
1250 /// send.send('a').unwrap();
1254 /// recv.recv_timeout(Duration::from_millis(400)),
1259 /// Receiving an error upon reaching timeout:
1262 /// use std::thread;
1263 /// use std::time::Duration;
1264 /// use std::sync::mpsc;
1266 /// let (send, recv) = mpsc::channel();
1268 /// thread::spawn(move || {
1269 /// thread::sleep(Duration::from_millis(800));
1270 /// send.send('a').unwrap();
1274 /// recv.recv_timeout(Duration::from_millis(400)),
1275 /// Err(mpsc::RecvTimeoutError::Timeout)
1278 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1279 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1280 // Do an optimistic try_recv to avoid the performance impact of
1281 // Instant::now() in the full-channel case.
1282 match self.try_recv() {
1283 Ok(result) => Ok(result),
1284 Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
1285 Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
1286 Some(deadline) => self.recv_deadline(deadline),
1287 // So far in the future that it's practically the same as waiting indefinitely.
1288 None => self.recv().map_err(RecvTimeoutError::from),
1293 /// Attempts to wait for a value on this receiver, returning an error if the
1294 /// corresponding channel has hung up, or if `deadline` is reached.
1296 /// This function will always block the current thread if there is no data
1297 /// available and it's possible for more data to be sent. Once a message is
1298 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1299 /// receiver will wake up and return that message.
1301 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1302 /// this call is blocking, this call will wake up and return [`Err`] to
1303 /// indicate that no more messages can ever be received on this channel.
1304 /// However, since channels are buffered, messages sent before the disconnect
1305 /// will still be properly received.
1307 /// [`Sender`]: struct.Sender.html
1308 /// [`SyncSender`]: struct.SyncSender.html
1309 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1313 /// Successfully receiving value before reaching deadline:
1316 /// #![feature(deadline_api)]
1317 /// use std::thread;
1318 /// use std::time::{Duration, Instant};
1319 /// use std::sync::mpsc;
1321 /// let (send, recv) = mpsc::channel();
1323 /// thread::spawn(move || {
1324 /// send.send('a').unwrap();
1328 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1333 /// Receiving an error upon reaching deadline:
1336 /// #![feature(deadline_api)]
1337 /// use std::thread;
1338 /// use std::time::{Duration, Instant};
1339 /// use std::sync::mpsc;
1341 /// let (send, recv) = mpsc::channel();
1343 /// thread::spawn(move || {
1344 /// thread::sleep(Duration::from_millis(800));
1345 /// send.send('a').unwrap();
1349 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1350 /// Err(mpsc::RecvTimeoutError::Timeout)
1353 #[unstable(feature = "deadline_api", issue = "46316")]
1354 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1355 use self::RecvTimeoutError::*;
1358 let port_or_empty = match *unsafe { self.inner() } {
1359 Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) {
1360 Ok(t) => return Ok(t),
1361 Err(oneshot::Disconnected) => return Err(Disconnected),
1362 Err(oneshot::Upgraded(rx)) => Some(rx),
1363 Err(oneshot::Empty) => None,
1365 Flavor::Stream(ref p) => match p.recv(Some(deadline)) {
1366 Ok(t) => return Ok(t),
1367 Err(stream::Disconnected) => return Err(Disconnected),
1368 Err(stream::Upgraded(rx)) => Some(rx),
1369 Err(stream::Empty) => None,
1371 Flavor::Shared(ref p) => match p.recv(Some(deadline)) {
1372 Ok(t) => return Ok(t),
1373 Err(shared::Disconnected) => return Err(Disconnected),
1374 Err(shared::Empty) => None,
1376 Flavor::Sync(ref p) => match p.recv(Some(deadline)) {
1377 Ok(t) => return Ok(t),
1378 Err(sync::Disconnected) => return Err(Disconnected),
1379 Err(sync::Empty) => None,
1383 if let Some(new_port) = port_or_empty {
1385 mem::swap(self.inner_mut(), new_port.inner_mut());
1389 // If we're already passed the deadline, and we're here without
1390 // data, return a timeout, else try again.
1391 if Instant::now() >= deadline {
1392 return Err(Timeout);
1397 /// Returns an iterator that will block waiting for messages, but never
1398 /// [`panic!`]. It will return [`None`] when the channel has hung up.
1400 /// [`panic!`]: ../../../std/macro.panic.html
1401 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1406 /// use std::sync::mpsc::channel;
1407 /// use std::thread;
1409 /// let (send, recv) = channel();
1411 /// thread::spawn(move || {
1412 /// send.send(1).unwrap();
1413 /// send.send(2).unwrap();
1414 /// send.send(3).unwrap();
1417 /// let mut iter = recv.iter();
1418 /// assert_eq!(iter.next(), Some(1));
1419 /// assert_eq!(iter.next(), Some(2));
1420 /// assert_eq!(iter.next(), Some(3));
1421 /// assert_eq!(iter.next(), None);
1423 #[stable(feature = "rust1", since = "1.0.0")]
1424 pub fn iter(&self) -> Iter<'_, T> {
1428 /// Returns an iterator that will attempt to yield all pending values.
1429 /// It will return `None` if there are no more pending values or if the
1430 /// channel has hung up. The iterator will never [`panic!`] or block the
1431 /// user by waiting for values.
1433 /// [`panic!`]: ../../../std/macro.panic.html
1438 /// use std::sync::mpsc::channel;
1439 /// use std::thread;
1440 /// use std::time::Duration;
1442 /// let (sender, receiver) = channel();
1444 /// // nothing is in the buffer yet
1445 /// assert!(receiver.try_iter().next().is_none());
1447 /// thread::spawn(move || {
1448 /// thread::sleep(Duration::from_secs(1));
1449 /// sender.send(1).unwrap();
1450 /// sender.send(2).unwrap();
1451 /// sender.send(3).unwrap();
1454 /// // nothing is in the buffer yet
1455 /// assert!(receiver.try_iter().next().is_none());
1457 /// // block for two seconds
1458 /// thread::sleep(Duration::from_secs(2));
1460 /// let mut iter = receiver.try_iter();
1461 /// assert_eq!(iter.next(), Some(1));
1462 /// assert_eq!(iter.next(), Some(2));
1463 /// assert_eq!(iter.next(), Some(3));
1464 /// assert_eq!(iter.next(), None);
1466 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1467 pub fn try_iter(&self) -> TryIter<'_, T> {
1468 TryIter { rx: self }
1472 #[stable(feature = "rust1", since = "1.0.0")]
1473 impl<'a, T> Iterator for Iter<'a, T> {
1476 fn next(&mut self) -> Option<T> {
1481 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1482 impl<'a, T> Iterator for TryIter<'a, T> {
1485 fn next(&mut self) -> Option<T> {
1486 self.rx.try_recv().ok()
1490 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1491 impl<'a, T> IntoIterator for &'a Receiver<T> {
1493 type IntoIter = Iter<'a, T>;
1495 fn into_iter(self) -> Iter<'a, T> {
1500 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1501 impl<T> Iterator for IntoIter<T> {
1503 fn next(&mut self) -> Option<T> {
1508 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1509 impl<T> IntoIterator for Receiver<T> {
1511 type IntoIter = IntoIter<T>;
1513 fn into_iter(self) -> IntoIter<T> {
1514 IntoIter { rx: self }
1518 #[stable(feature = "rust1", since = "1.0.0")]
1519 impl<T> Drop for Receiver<T> {
1520 fn drop(&mut self) {
1521 match *unsafe { self.inner() } {
1522 Flavor::Oneshot(ref p) => p.drop_port(),
1523 Flavor::Stream(ref p) => p.drop_port(),
1524 Flavor::Shared(ref p) => p.drop_port(),
1525 Flavor::Sync(ref p) => p.drop_port(),
1530 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1531 impl<T> fmt::Debug for Receiver<T> {
1532 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1533 f.debug_struct("Receiver").finish()
1537 #[stable(feature = "rust1", since = "1.0.0")]
1538 impl<T> fmt::Debug for SendError<T> {
1539 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1540 "SendError(..)".fmt(f)
1544 #[stable(feature = "rust1", since = "1.0.0")]
1545 impl<T> fmt::Display for SendError<T> {
1546 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1547 "sending on a closed channel".fmt(f)
1551 #[stable(feature = "rust1", since = "1.0.0")]
1552 impl<T: Send> error::Error for SendError<T> {
1553 fn description(&self) -> &str {
1554 "sending on a closed channel"
1558 #[stable(feature = "rust1", since = "1.0.0")]
1559 impl<T> fmt::Debug for TrySendError<T> {
1560 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1562 TrySendError::Full(..) => "Full(..)".fmt(f),
1563 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1568 #[stable(feature = "rust1", since = "1.0.0")]
1569 impl<T> fmt::Display for TrySendError<T> {
1570 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1572 TrySendError::Full(..) => "sending on a full channel".fmt(f),
1573 TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
1578 #[stable(feature = "rust1", since = "1.0.0")]
1579 impl<T: Send> error::Error for TrySendError<T> {
1580 fn description(&self) -> &str {
1582 TrySendError::Full(..) => "sending on a full channel",
1583 TrySendError::Disconnected(..) => "sending on a closed channel",
1588 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1589 impl<T> From<SendError<T>> for TrySendError<T> {
1590 fn from(err: SendError<T>) -> TrySendError<T> {
1592 SendError(t) => TrySendError::Disconnected(t),
1597 #[stable(feature = "rust1", since = "1.0.0")]
1598 impl fmt::Display for RecvError {
1599 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1600 "receiving on a closed channel".fmt(f)
1604 #[stable(feature = "rust1", since = "1.0.0")]
1605 impl error::Error for RecvError {
1606 fn description(&self) -> &str {
1607 "receiving on a closed channel"
1611 #[stable(feature = "rust1", since = "1.0.0")]
1612 impl fmt::Display for TryRecvError {
1613 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1615 TryRecvError::Empty => "receiving on an empty channel".fmt(f),
1616 TryRecvError::Disconnected => "receiving on a closed channel".fmt(f),
1621 #[stable(feature = "rust1", since = "1.0.0")]
1622 impl error::Error for TryRecvError {
1623 fn description(&self) -> &str {
1625 TryRecvError::Empty => "receiving on an empty channel",
1626 TryRecvError::Disconnected => "receiving on a closed channel",
1631 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1632 impl From<RecvError> for TryRecvError {
1633 fn from(err: RecvError) -> TryRecvError {
1635 RecvError => TryRecvError::Disconnected,
1640 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1641 impl fmt::Display for RecvTimeoutError {
1642 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1644 RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f),
1645 RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f),
1650 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1651 impl error::Error for RecvTimeoutError {
1652 fn description(&self) -> &str {
1654 RecvTimeoutError::Timeout => "timed out waiting on channel",
1655 RecvTimeoutError::Disconnected => "channel is empty and sending half is closed",
1660 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1661 impl From<RecvError> for RecvTimeoutError {
1662 fn from(err: RecvError) -> RecvTimeoutError {
1664 RecvError => RecvTimeoutError::Disconnected,
1669 #[cfg(all(test, not(target_os = "emscripten")))]
1674 use crate::time::{Duration, Instant};
1676 pub fn stress_factor() -> usize {
1677 match env::var("RUST_TEST_STRESS") {
1678 Ok(val) => val.parse().unwrap(),
1685 let (tx, rx) = channel::<i32>();
1686 tx.send(1).unwrap();
1687 assert_eq!(rx.recv().unwrap(), 1);
1692 let (tx, _rx) = channel::<Box<isize>>();
1693 tx.send(box 1).unwrap();
1697 fn drop_full_shared() {
1698 let (tx, _rx) = channel::<Box<isize>>();
1701 tx.send(box 1).unwrap();
1706 let (tx, rx) = channel::<i32>();
1707 tx.send(1).unwrap();
1708 assert_eq!(rx.recv().unwrap(), 1);
1709 let tx = tx.clone();
1710 tx.send(1).unwrap();
1711 assert_eq!(rx.recv().unwrap(), 1);
1715 fn smoke_threads() {
1716 let (tx, rx) = channel::<i32>();
1717 let _t = thread::spawn(move || {
1718 tx.send(1).unwrap();
1720 assert_eq!(rx.recv().unwrap(), 1);
1724 fn smoke_port_gone() {
1725 let (tx, rx) = channel::<i32>();
1727 assert!(tx.send(1).is_err());
1731 fn smoke_shared_port_gone() {
1732 let (tx, rx) = channel::<i32>();
1734 assert!(tx.send(1).is_err())
1738 fn smoke_shared_port_gone2() {
1739 let (tx, rx) = channel::<i32>();
1741 let tx2 = tx.clone();
1743 assert!(tx2.send(1).is_err());
1747 fn port_gone_concurrent() {
1748 let (tx, rx) = channel::<i32>();
1749 let _t = thread::spawn(move || {
1752 while tx.send(1).is_ok() {}
1756 fn port_gone_concurrent_shared() {
1757 let (tx, rx) = channel::<i32>();
1758 let tx2 = tx.clone();
1759 let _t = thread::spawn(move || {
1762 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1766 fn smoke_chan_gone() {
1767 let (tx, rx) = channel::<i32>();
1769 assert!(rx.recv().is_err());
1773 fn smoke_chan_gone_shared() {
1774 let (tx, rx) = channel::<()>();
1775 let tx2 = tx.clone();
1778 assert!(rx.recv().is_err());
1782 fn chan_gone_concurrent() {
1783 let (tx, rx) = channel::<i32>();
1784 let _t = thread::spawn(move || {
1785 tx.send(1).unwrap();
1786 tx.send(1).unwrap();
1788 while rx.recv().is_ok() {}
1793 let (tx, rx) = channel::<i32>();
1794 let t = thread::spawn(move || {
1796 tx.send(1).unwrap();
1800 assert_eq!(rx.recv().unwrap(), 1);
1802 t.join().ok().expect("thread panicked");
1806 fn stress_shared() {
1807 const AMT: u32 = 10000;
1808 const NTHREADS: u32 = 8;
1809 let (tx, rx) = channel::<i32>();
1811 let t = thread::spawn(move || {
1812 for _ in 0..AMT * NTHREADS {
1813 assert_eq!(rx.recv().unwrap(), 1);
1815 match rx.try_recv() {
1821 for _ in 0..NTHREADS {
1822 let tx = tx.clone();
1823 thread::spawn(move || {
1825 tx.send(1).unwrap();
1830 t.join().ok().expect("thread panicked");
1834 fn send_from_outside_runtime() {
1835 let (tx1, rx1) = channel::<()>();
1836 let (tx2, rx2) = channel::<i32>();
1837 let t1 = thread::spawn(move || {
1838 tx1.send(()).unwrap();
1840 assert_eq!(rx2.recv().unwrap(), 1);
1843 rx1.recv().unwrap();
1844 let t2 = thread::spawn(move || {
1846 tx2.send(1).unwrap();
1849 t1.join().ok().expect("thread panicked");
1850 t2.join().ok().expect("thread panicked");
1854 fn recv_from_outside_runtime() {
1855 let (tx, rx) = channel::<i32>();
1856 let t = thread::spawn(move || {
1858 assert_eq!(rx.recv().unwrap(), 1);
1862 tx.send(1).unwrap();
1864 t.join().ok().expect("thread panicked");
1869 let (tx1, rx1) = channel::<i32>();
1870 let (tx2, rx2) = channel::<i32>();
1871 let t1 = thread::spawn(move || {
1872 assert_eq!(rx1.recv().unwrap(), 1);
1873 tx2.send(2).unwrap();
1875 let t2 = thread::spawn(move || {
1876 tx1.send(1).unwrap();
1877 assert_eq!(rx2.recv().unwrap(), 2);
1879 t1.join().ok().expect("thread panicked");
1880 t2.join().ok().expect("thread panicked");
1884 fn oneshot_single_thread_close_port_first() {
1885 // Simple test of closing without sending
1886 let (_tx, rx) = channel::<i32>();
1891 fn oneshot_single_thread_close_chan_first() {
1892 // Simple test of closing without sending
1893 let (tx, _rx) = channel::<i32>();
1898 fn oneshot_single_thread_send_port_close() {
1899 // Testing that the sender cleans up the payload if receiver is closed
1900 let (tx, rx) = channel::<Box<i32>>();
1902 assert!(tx.send(box 0).is_err());
1906 fn oneshot_single_thread_recv_chan_close() {
1907 // Receiving on a closed chan will panic
1908 let res = thread::spawn(move || {
1909 let (tx, rx) = channel::<i32>();
1915 assert!(res.is_err());
1919 fn oneshot_single_thread_send_then_recv() {
1920 let (tx, rx) = channel::<Box<i32>>();
1921 tx.send(box 10).unwrap();
1922 assert!(*rx.recv().unwrap() == 10);
1926 fn oneshot_single_thread_try_send_open() {
1927 let (tx, rx) = channel::<i32>();
1928 assert!(tx.send(10).is_ok());
1929 assert!(rx.recv().unwrap() == 10);
1933 fn oneshot_single_thread_try_send_closed() {
1934 let (tx, rx) = channel::<i32>();
1936 assert!(tx.send(10).is_err());
1940 fn oneshot_single_thread_try_recv_open() {
1941 let (tx, rx) = channel::<i32>();
1942 tx.send(10).unwrap();
1943 assert!(rx.recv() == Ok(10));
1947 fn oneshot_single_thread_try_recv_closed() {
1948 let (tx, rx) = channel::<i32>();
1950 assert!(rx.recv().is_err());
1954 fn oneshot_single_thread_peek_data() {
1955 let (tx, rx) = channel::<i32>();
1956 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1957 tx.send(10).unwrap();
1958 assert_eq!(rx.try_recv(), Ok(10));
1962 fn oneshot_single_thread_peek_close() {
1963 let (tx, rx) = channel::<i32>();
1965 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1966 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1970 fn oneshot_single_thread_peek_open() {
1971 let (_tx, rx) = channel::<i32>();
1972 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1976 fn oneshot_multi_task_recv_then_send() {
1977 let (tx, rx) = channel::<Box<i32>>();
1978 let _t = thread::spawn(move || {
1979 assert!(*rx.recv().unwrap() == 10);
1982 tx.send(box 10).unwrap();
1986 fn oneshot_multi_task_recv_then_close() {
1987 let (tx, rx) = channel::<Box<i32>>();
1988 let _t = thread::spawn(move || {
1991 let res = thread::spawn(move || {
1992 assert!(*rx.recv().unwrap() == 10);
1995 assert!(res.is_err());
1999 fn oneshot_multi_thread_close_stress() {
2000 for _ in 0..stress_factor() {
2001 let (tx, rx) = channel::<i32>();
2002 let _t = thread::spawn(move || {
2010 fn oneshot_multi_thread_send_close_stress() {
2011 for _ in 0..stress_factor() {
2012 let (tx, rx) = channel::<i32>();
2013 let _t = thread::spawn(move || {
2016 let _ = thread::spawn(move || {
2017 tx.send(1).unwrap();
2024 fn oneshot_multi_thread_recv_close_stress() {
2025 for _ in 0..stress_factor() {
2026 let (tx, rx) = channel::<i32>();
2027 thread::spawn(move || {
2028 let res = thread::spawn(move || {
2032 assert!(res.is_err());
2034 let _t = thread::spawn(move || {
2035 thread::spawn(move || {
2043 fn oneshot_multi_thread_send_recv_stress() {
2044 for _ in 0..stress_factor() {
2045 let (tx, rx) = channel::<Box<isize>>();
2046 let _t = thread::spawn(move || {
2047 tx.send(box 10).unwrap();
2049 assert!(*rx.recv().unwrap() == 10);
2054 fn stream_send_recv_stress() {
2055 for _ in 0..stress_factor() {
2056 let (tx, rx) = channel();
2061 fn send(tx: Sender<Box<i32>>, i: i32) {
2066 thread::spawn(move || {
2067 tx.send(box i).unwrap();
2072 fn recv(rx: Receiver<Box<i32>>, i: i32) {
2077 thread::spawn(move || {
2078 assert!(*rx.recv().unwrap() == i);
2086 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2087 fn oneshot_single_thread_recv_timeout() {
2088 let (tx, rx) = channel();
2089 tx.send(()).unwrap();
2090 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2091 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2092 tx.send(()).unwrap();
2093 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2097 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2098 fn stress_recv_timeout_two_threads() {
2099 let (tx, rx) = channel();
2100 let stress = stress_factor() + 100;
2101 let timeout = Duration::from_millis(100);
2103 thread::spawn(move || {
2104 for i in 0..stress {
2106 thread::sleep(timeout * 2);
2108 tx.send(1usize).unwrap();
2112 let mut recv_count = 0;
2114 match rx.recv_timeout(timeout) {
2116 assert_eq!(n, 1usize);
2119 Err(RecvTimeoutError::Timeout) => continue,
2120 Err(RecvTimeoutError::Disconnected) => break,
2124 assert_eq!(recv_count, stress);
2128 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2129 fn recv_timeout_upgrade() {
2130 let (tx, rx) = channel::<()>();
2131 let timeout = Duration::from_millis(1);
2132 let _tx_clone = tx.clone();
2134 let start = Instant::now();
2135 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
2136 assert!(Instant::now() >= start + timeout);
2140 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2141 fn stress_recv_timeout_shared() {
2142 let (tx, rx) = channel();
2143 let stress = stress_factor() + 100;
2145 for i in 0..stress {
2146 let tx = tx.clone();
2147 thread::spawn(move || {
2148 thread::sleep(Duration::from_millis(i as u64 * 10));
2149 tx.send(1usize).unwrap();
2155 let mut recv_count = 0;
2157 match rx.recv_timeout(Duration::from_millis(10)) {
2159 assert_eq!(n, 1usize);
2162 Err(RecvTimeoutError::Timeout) => continue,
2163 Err(RecvTimeoutError::Disconnected) => break,
2167 assert_eq!(recv_count, stress);
2171 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2172 fn very_long_recv_timeout_wont_panic() {
2173 let (tx, rx) = channel::<()>();
2175 thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::max_value())));
2176 thread::sleep(Duration::from_secs(1));
2177 assert!(tx.send(()).is_ok());
2178 assert_eq!(join_handle.join().unwrap(), Ok(()));
2183 // Regression test that we don't run out of stack in scheduler context
2184 let (tx, rx) = channel();
2186 tx.send(()).unwrap();
2194 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2195 fn shared_recv_timeout() {
2196 let (tx, rx) = channel();
2199 let tx = tx.clone();
2200 thread::spawn(move || {
2201 tx.send(()).unwrap();
2209 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2210 tx.send(()).unwrap();
2211 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2215 fn shared_chan_stress() {
2216 let (tx, rx) = channel();
2217 let total = stress_factor() + 100;
2219 let tx = tx.clone();
2220 thread::spawn(move || {
2221 tx.send(()).unwrap();
2231 fn test_nested_recv_iter() {
2232 let (tx, rx) = channel::<i32>();
2233 let (total_tx, total_rx) = channel::<i32>();
2235 let _t = thread::spawn(move || {
2237 for x in rx.iter() {
2240 total_tx.send(acc).unwrap();
2243 tx.send(3).unwrap();
2244 tx.send(1).unwrap();
2245 tx.send(2).unwrap();
2247 assert_eq!(total_rx.recv().unwrap(), 6);
2251 fn test_recv_iter_break() {
2252 let (tx, rx) = channel::<i32>();
2253 let (count_tx, count_rx) = channel();
2255 let _t = thread::spawn(move || {
2257 for x in rx.iter() {
2264 count_tx.send(count).unwrap();
2267 tx.send(2).unwrap();
2268 tx.send(2).unwrap();
2269 tx.send(2).unwrap();
2272 assert_eq!(count_rx.recv().unwrap(), 4);
2276 fn test_recv_try_iter() {
2277 let (request_tx, request_rx) = channel();
2278 let (response_tx, response_rx) = channel();
2280 // Request `x`s until we have `6`.
2281 let t = thread::spawn(move || {
2284 for x in response_rx.try_iter() {
2290 request_tx.send(()).unwrap();
2294 for _ in request_rx.iter() {
2295 if response_tx.send(2).is_err() {
2300 assert_eq!(t.join().unwrap(), 6);
2304 fn test_recv_into_iter_owned() {
2306 let (tx, rx) = channel::<i32>();
2307 tx.send(1).unwrap();
2308 tx.send(2).unwrap();
2312 assert_eq!(iter.next().unwrap(), 1);
2313 assert_eq!(iter.next().unwrap(), 2);
2314 assert_eq!(iter.next().is_none(), true);
2318 fn test_recv_into_iter_borrowed() {
2319 let (tx, rx) = channel::<i32>();
2320 tx.send(1).unwrap();
2321 tx.send(2).unwrap();
2323 let mut iter = (&rx).into_iter();
2324 assert_eq!(iter.next().unwrap(), 1);
2325 assert_eq!(iter.next().unwrap(), 2);
2326 assert_eq!(iter.next().is_none(), true);
2330 fn try_recv_states() {
2331 let (tx1, rx1) = channel::<i32>();
2332 let (tx2, rx2) = channel::<()>();
2333 let (tx3, rx3) = channel::<()>();
2334 let _t = thread::spawn(move || {
2335 rx2.recv().unwrap();
2336 tx1.send(1).unwrap();
2337 tx3.send(()).unwrap();
2338 rx2.recv().unwrap();
2340 tx3.send(()).unwrap();
2343 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2344 tx2.send(()).unwrap();
2345 rx3.recv().unwrap();
2346 assert_eq!(rx1.try_recv(), Ok(1));
2347 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2348 tx2.send(()).unwrap();
2349 rx3.recv().unwrap();
2350 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2353 // This bug used to end up in a livelock inside of the Receiver destructor
2354 // because the internal state of the Shared packet was corrupted
2356 fn destroy_upgraded_shared_port_when_sender_still_active() {
2357 let (tx, rx) = channel();
2358 let (tx2, rx2) = channel();
2359 let _t = thread::spawn(move || {
2360 rx.recv().unwrap(); // wait on a oneshot
2361 drop(rx); // destroy a shared
2362 tx2.send(()).unwrap();
2364 // make sure the other thread has gone to sleep
2366 thread::yield_now();
2369 // upgrade to a shared chan and send a message
2372 t.send(()).unwrap();
2374 // wait for the child thread to exit before we exit
2375 rx2.recv().unwrap();
2380 let (tx, _) = channel();
2381 let _ = tx.send(123);
2382 assert_eq!(tx.send(123), Err(SendError(123)));
2386 #[cfg(all(test, not(target_os = "emscripten")))]
2391 use crate::time::Duration;
2393 pub fn stress_factor() -> usize {
2394 match env::var("RUST_TEST_STRESS") {
2395 Ok(val) => val.parse().unwrap(),
2402 let (tx, rx) = sync_channel::<i32>(1);
2403 tx.send(1).unwrap();
2404 assert_eq!(rx.recv().unwrap(), 1);
2409 let (tx, _rx) = sync_channel::<Box<isize>>(1);
2410 tx.send(box 1).unwrap();
2415 let (tx, rx) = sync_channel::<i32>(1);
2416 tx.send(1).unwrap();
2417 assert_eq!(rx.recv().unwrap(), 1);
2418 let tx = tx.clone();
2419 tx.send(1).unwrap();
2420 assert_eq!(rx.recv().unwrap(), 1);
2424 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2426 let (tx, rx) = sync_channel::<i32>(1);
2427 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2428 tx.send(1).unwrap();
2429 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2433 fn smoke_threads() {
2434 let (tx, rx) = sync_channel::<i32>(0);
2435 let _t = thread::spawn(move || {
2436 tx.send(1).unwrap();
2438 assert_eq!(rx.recv().unwrap(), 1);
2442 fn smoke_port_gone() {
2443 let (tx, rx) = sync_channel::<i32>(0);
2445 assert!(tx.send(1).is_err());
2449 fn smoke_shared_port_gone2() {
2450 let (tx, rx) = sync_channel::<i32>(0);
2452 let tx2 = tx.clone();
2454 assert!(tx2.send(1).is_err());
2458 fn port_gone_concurrent() {
2459 let (tx, rx) = sync_channel::<i32>(0);
2460 let _t = thread::spawn(move || {
2463 while tx.send(1).is_ok() {}
2467 fn port_gone_concurrent_shared() {
2468 let (tx, rx) = sync_channel::<i32>(0);
2469 let tx2 = tx.clone();
2470 let _t = thread::spawn(move || {
2473 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2477 fn smoke_chan_gone() {
2478 let (tx, rx) = sync_channel::<i32>(0);
2480 assert!(rx.recv().is_err());
2484 fn smoke_chan_gone_shared() {
2485 let (tx, rx) = sync_channel::<()>(0);
2486 let tx2 = tx.clone();
2489 assert!(rx.recv().is_err());
2493 fn chan_gone_concurrent() {
2494 let (tx, rx) = sync_channel::<i32>(0);
2495 thread::spawn(move || {
2496 tx.send(1).unwrap();
2497 tx.send(1).unwrap();
2499 while rx.recv().is_ok() {}
2504 let (tx, rx) = sync_channel::<i32>(0);
2505 thread::spawn(move || {
2507 tx.send(1).unwrap();
2511 assert_eq!(rx.recv().unwrap(), 1);
2516 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2517 fn stress_recv_timeout_two_threads() {
2518 let (tx, rx) = sync_channel::<i32>(0);
2520 thread::spawn(move || {
2522 tx.send(1).unwrap();
2526 let mut recv_count = 0;
2528 match rx.recv_timeout(Duration::from_millis(1)) {
2533 Err(RecvTimeoutError::Timeout) => continue,
2534 Err(RecvTimeoutError::Disconnected) => break,
2538 assert_eq!(recv_count, 10000);
2542 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2543 fn stress_recv_timeout_shared() {
2544 const AMT: u32 = 1000;
2545 const NTHREADS: u32 = 8;
2546 let (tx, rx) = sync_channel::<i32>(0);
2547 let (dtx, drx) = sync_channel::<()>(0);
2549 thread::spawn(move || {
2550 let mut recv_count = 0;
2552 match rx.recv_timeout(Duration::from_millis(10)) {
2557 Err(RecvTimeoutError::Timeout) => continue,
2558 Err(RecvTimeoutError::Disconnected) => break,
2562 assert_eq!(recv_count, AMT * NTHREADS);
2563 assert!(rx.try_recv().is_err());
2565 dtx.send(()).unwrap();
2568 for _ in 0..NTHREADS {
2569 let tx = tx.clone();
2570 thread::spawn(move || {
2572 tx.send(1).unwrap();
2579 drx.recv().unwrap();
2583 fn stress_shared() {
2584 const AMT: u32 = 1000;
2585 const NTHREADS: u32 = 8;
2586 let (tx, rx) = sync_channel::<i32>(0);
2587 let (dtx, drx) = sync_channel::<()>(0);
2589 thread::spawn(move || {
2590 for _ in 0..AMT * NTHREADS {
2591 assert_eq!(rx.recv().unwrap(), 1);
2593 match rx.try_recv() {
2597 dtx.send(()).unwrap();
2600 for _ in 0..NTHREADS {
2601 let tx = tx.clone();
2602 thread::spawn(move || {
2604 tx.send(1).unwrap();
2609 drx.recv().unwrap();
2613 fn oneshot_single_thread_close_port_first() {
2614 // Simple test of closing without sending
2615 let (_tx, rx) = sync_channel::<i32>(0);
2620 fn oneshot_single_thread_close_chan_first() {
2621 // Simple test of closing without sending
2622 let (tx, _rx) = sync_channel::<i32>(0);
2627 fn oneshot_single_thread_send_port_close() {
2628 // Testing that the sender cleans up the payload if receiver is closed
2629 let (tx, rx) = sync_channel::<Box<i32>>(0);
2631 assert!(tx.send(box 0).is_err());
2635 fn oneshot_single_thread_recv_chan_close() {
2636 // Receiving on a closed chan will panic
2637 let res = thread::spawn(move || {
2638 let (tx, rx) = sync_channel::<i32>(0);
2644 assert!(res.is_err());
2648 fn oneshot_single_thread_send_then_recv() {
2649 let (tx, rx) = sync_channel::<Box<i32>>(1);
2650 tx.send(box 10).unwrap();
2651 assert!(*rx.recv().unwrap() == 10);
2655 fn oneshot_single_thread_try_send_open() {
2656 let (tx, rx) = sync_channel::<i32>(1);
2657 assert_eq!(tx.try_send(10), Ok(()));
2658 assert!(rx.recv().unwrap() == 10);
2662 fn oneshot_single_thread_try_send_closed() {
2663 let (tx, rx) = sync_channel::<i32>(0);
2665 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2669 fn oneshot_single_thread_try_send_closed2() {
2670 let (tx, _rx) = sync_channel::<i32>(0);
2671 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2675 fn oneshot_single_thread_try_recv_open() {
2676 let (tx, rx) = sync_channel::<i32>(1);
2677 tx.send(10).unwrap();
2678 assert!(rx.recv() == Ok(10));
2682 fn oneshot_single_thread_try_recv_closed() {
2683 let (tx, rx) = sync_channel::<i32>(0);
2685 assert!(rx.recv().is_err());
2689 fn oneshot_single_thread_try_recv_closed_with_data() {
2690 let (tx, rx) = sync_channel::<i32>(1);
2691 tx.send(10).unwrap();
2693 assert_eq!(rx.try_recv(), Ok(10));
2694 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2698 fn oneshot_single_thread_peek_data() {
2699 let (tx, rx) = sync_channel::<i32>(1);
2700 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2701 tx.send(10).unwrap();
2702 assert_eq!(rx.try_recv(), Ok(10));
2706 fn oneshot_single_thread_peek_close() {
2707 let (tx, rx) = sync_channel::<i32>(0);
2709 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2710 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2714 fn oneshot_single_thread_peek_open() {
2715 let (_tx, rx) = sync_channel::<i32>(0);
2716 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2720 fn oneshot_multi_task_recv_then_send() {
2721 let (tx, rx) = sync_channel::<Box<i32>>(0);
2722 let _t = thread::spawn(move || {
2723 assert!(*rx.recv().unwrap() == 10);
2726 tx.send(box 10).unwrap();
2730 fn oneshot_multi_task_recv_then_close() {
2731 let (tx, rx) = sync_channel::<Box<i32>>(0);
2732 let _t = thread::spawn(move || {
2735 let res = thread::spawn(move || {
2736 assert!(*rx.recv().unwrap() == 10);
2739 assert!(res.is_err());
2743 fn oneshot_multi_thread_close_stress() {
2744 for _ in 0..stress_factor() {
2745 let (tx, rx) = sync_channel::<i32>(0);
2746 let _t = thread::spawn(move || {
2754 fn oneshot_multi_thread_send_close_stress() {
2755 for _ in 0..stress_factor() {
2756 let (tx, rx) = sync_channel::<i32>(0);
2757 let _t = thread::spawn(move || {
2760 let _ = thread::spawn(move || {
2761 tx.send(1).unwrap();
2768 fn oneshot_multi_thread_recv_close_stress() {
2769 for _ in 0..stress_factor() {
2770 let (tx, rx) = sync_channel::<i32>(0);
2771 let _t = thread::spawn(move || {
2772 let res = thread::spawn(move || {
2776 assert!(res.is_err());
2778 let _t = thread::spawn(move || {
2779 thread::spawn(move || {
2787 fn oneshot_multi_thread_send_recv_stress() {
2788 for _ in 0..stress_factor() {
2789 let (tx, rx) = sync_channel::<Box<i32>>(0);
2790 let _t = thread::spawn(move || {
2791 tx.send(box 10).unwrap();
2793 assert!(*rx.recv().unwrap() == 10);
2798 fn stream_send_recv_stress() {
2799 for _ in 0..stress_factor() {
2800 let (tx, rx) = sync_channel::<Box<i32>>(0);
2805 fn send(tx: SyncSender<Box<i32>>, i: i32) {
2810 thread::spawn(move || {
2811 tx.send(box i).unwrap();
2816 fn recv(rx: Receiver<Box<i32>>, i: i32) {
2821 thread::spawn(move || {
2822 assert!(*rx.recv().unwrap() == i);
2831 // Regression test that we don't run out of stack in scheduler context
2832 let (tx, rx) = sync_channel(10000);
2834 tx.send(()).unwrap();
2842 fn shared_chan_stress() {
2843 let (tx, rx) = sync_channel(0);
2844 let total = stress_factor() + 100;
2846 let tx = tx.clone();
2847 thread::spawn(move || {
2848 tx.send(()).unwrap();
2858 fn test_nested_recv_iter() {
2859 let (tx, rx) = sync_channel::<i32>(0);
2860 let (total_tx, total_rx) = sync_channel::<i32>(0);
2862 let _t = thread::spawn(move || {
2864 for x in rx.iter() {
2867 total_tx.send(acc).unwrap();
2870 tx.send(3).unwrap();
2871 tx.send(1).unwrap();
2872 tx.send(2).unwrap();
2874 assert_eq!(total_rx.recv().unwrap(), 6);
2878 fn test_recv_iter_break() {
2879 let (tx, rx) = sync_channel::<i32>(0);
2880 let (count_tx, count_rx) = sync_channel(0);
2882 let _t = thread::spawn(move || {
2884 for x in rx.iter() {
2891 count_tx.send(count).unwrap();
2894 tx.send(2).unwrap();
2895 tx.send(2).unwrap();
2896 tx.send(2).unwrap();
2897 let _ = tx.try_send(2);
2899 assert_eq!(count_rx.recv().unwrap(), 4);
2903 fn try_recv_states() {
2904 let (tx1, rx1) = sync_channel::<i32>(1);
2905 let (tx2, rx2) = sync_channel::<()>(1);
2906 let (tx3, rx3) = sync_channel::<()>(1);
2907 let _t = thread::spawn(move || {
2908 rx2.recv().unwrap();
2909 tx1.send(1).unwrap();
2910 tx3.send(()).unwrap();
2911 rx2.recv().unwrap();
2913 tx3.send(()).unwrap();
2916 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2917 tx2.send(()).unwrap();
2918 rx3.recv().unwrap();
2919 assert_eq!(rx1.try_recv(), Ok(1));
2920 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2921 tx2.send(()).unwrap();
2922 rx3.recv().unwrap();
2923 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2926 // This bug used to end up in a livelock inside of the Receiver destructor
2927 // because the internal state of the Shared packet was corrupted
2929 fn destroy_upgraded_shared_port_when_sender_still_active() {
2930 let (tx, rx) = sync_channel::<()>(0);
2931 let (tx2, rx2) = sync_channel::<()>(0);
2932 let _t = thread::spawn(move || {
2933 rx.recv().unwrap(); // wait on a oneshot
2934 drop(rx); // destroy a shared
2935 tx2.send(()).unwrap();
2937 // make sure the other thread has gone to sleep
2939 thread::yield_now();
2942 // upgrade to a shared chan and send a message
2945 t.send(()).unwrap();
2947 // wait for the child thread to exit before we exit
2948 rx2.recv().unwrap();
2953 let (tx, rx) = sync_channel::<i32>(0);
2954 let _t = thread::spawn(move || {
2957 assert_eq!(tx.send(1), Ok(()));
2962 let (tx, rx) = sync_channel::<i32>(0);
2963 let _t = thread::spawn(move || {
2966 assert!(tx.send(1).is_err());
2971 let (tx, rx) = sync_channel::<i32>(1);
2972 assert_eq!(tx.send(1), Ok(()));
2973 let _t = thread::spawn(move || {
2976 assert!(tx.send(1).is_err());
2981 let (tx, rx) = sync_channel::<i32>(0);
2982 let tx2 = tx.clone();
2983 let (done, donerx) = channel();
2984 let done2 = done.clone();
2985 let _t = thread::spawn(move || {
2986 assert!(tx.send(1).is_err());
2987 done.send(()).unwrap();
2989 let _t = thread::spawn(move || {
2990 assert!(tx2.send(2).is_err());
2991 done2.send(()).unwrap();
2994 donerx.recv().unwrap();
2995 donerx.recv().unwrap();
3000 let (tx, _rx) = sync_channel::<i32>(0);
3001 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3006 let (tx, _rx) = sync_channel::<i32>(1);
3007 assert_eq!(tx.try_send(1), Ok(()));
3008 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3013 let (tx, rx) = sync_channel::<i32>(1);
3014 assert_eq!(tx.try_send(1), Ok(()));
3016 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
3022 let (tx1, rx1) = sync_channel::<()>(3);
3023 let (tx2, rx2) = sync_channel::<()>(3);
3025 let _t = thread::spawn(move || {
3026 rx1.recv().unwrap();
3027 tx2.try_send(()).unwrap();
3030 tx1.try_send(()).unwrap();
3031 rx2.recv().unwrap();