1 // ignore-tidy-filelength
3 //! Multi-producer, single-consumer FIFO queue communication primitives.
5 //! This module provides message-based communication over channels, concretely
6 //! defined among three types:
12 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
13 //! senders are clone-able (multi-producer) such that many threads can send
14 //! simultaneously to one receiver (single-consumer).
16 //! These channels come in two flavors:
18 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
19 //! will return a `(Sender, Receiver)` tuple where all sends will be
20 //! **asynchronous** (they never block). The channel conceptually has an
23 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
24 //! return a `(SyncSender, Receiver)` tuple where the storage for pending
25 //! messages is a pre-allocated buffer of a fixed size. All sends will be
26 //! **synchronous** by blocking until there is buffer space available. Note
27 //! that a bound of 0 is allowed, causing the channel to become a "rendezvous"
28 //! channel where each sender atomically hands off a message to a receiver.
30 //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
31 //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
32 //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
33 //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
34 //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
35 //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
39 //! The send and receive operations on channels will all return a [`Result`]
40 //! indicating whether the operation succeeded or not. An unsuccessful operation
41 //! is normally indicative of the other half of a channel having "hung up" by
42 //! being dropped in its corresponding thread.
44 //! Once half of a channel has been deallocated, most operations can no longer
45 //! continue to make progress, so [`Err`] will be returned. Many applications
46 //! will continue to [`unwrap`] the results returned from this module,
47 //! instigating a propagation of failure among threads if one unexpectedly dies.
49 //! [`Result`]: ../../../std/result/enum.Result.html
50 //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
51 //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
59 //! use std::sync::mpsc::channel;
61 //! // Create a simple streaming channel
62 //! let (tx, rx) = channel();
63 //! thread::spawn(move|| {
64 //! tx.send(10).unwrap();
66 //! assert_eq!(rx.recv().unwrap(), 10);
73 //! use std::sync::mpsc::channel;
75 //! // Create a shared channel that can be sent along from many threads
76 //! // where tx is the sending half (tx for transmission), and rx is the receiving
77 //! // half (rx for receiving).
78 //! let (tx, rx) = channel();
80 //! let tx = tx.clone();
81 //! thread::spawn(move|| {
82 //! tx.send(i).unwrap();
87 //! let j = rx.recv().unwrap();
88 //! assert!(0 <= j && j < 10);
92 //! Propagating panics:
95 //! use std::sync::mpsc::channel;
97 //! // The call to recv() will return an error because the channel has already
98 //! // hung up (or been deallocated)
99 //! let (tx, rx) = channel::<i32>();
101 //! assert!(rx.recv().is_err());
104 //! Synchronous channels:
108 //! use std::sync::mpsc::sync_channel;
110 //! let (tx, rx) = sync_channel::<i32>(0);
111 //! thread::spawn(move|| {
112 //! // This will wait for the parent thread to start receiving
113 //! tx.send(53).unwrap();
115 //! rx.recv().unwrap();
118 #![stable(feature = "rust1", since = "1.0.0")]
120 // A description of how Rust's channel implementation works
122 // Channels are supposed to be the basic building block for all other
123 // concurrent primitives that are used in Rust. As a result, the channel type
124 // needs to be highly optimized, flexible, and broad enough for use everywhere.
126 // The choice of implementation of all channels is to be built on lock-free data
127 // structures. The channels themselves are then consequently also lock-free data
128 // structures. As always with lock-free code, this is a very "here be dragons"
129 // territory, especially because I'm unaware of any academic papers that have
130 // gone into great length about channels of these flavors.
132 // ## Flavors of channels
134 // From the perspective of a consumer of this library, there is only one flavor
135 // of channel. This channel can be used as a stream and cloned to allow multiple
136 // senders. Under the hood, however, there are actually three flavors of
139 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
140 // case. They contain as few atomics as possible and
141 // involve one and exactly one allocation.
142 // * Streams - these channels are optimized for the non-shared use case. They
143 // use a different concurrent queue that is more tailored for this
144 // use case. The initial allocation of this flavor of channel is not
146 // * Shared - this is the most general form of channel that this module offers,
147 // a channel with multiple senders. This type is as optimized as it
148 // can be, but the previous two types mentioned are much faster for
151 // ## Concurrent queues
153 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
154 // but recv() obviously blocks. This means that under the hood there must be
155 // some shared and concurrent queue holding all of the actual data.
157 // With two flavors of channels, two flavors of queues are also used. We have
158 // chosen to use queues from a well-known author that are abbreviated as SPSC
159 // and MPSC (single producer, single consumer and multiple producer, single
160 // consumer). SPSC queues are used for streams while MPSC queues are used for
163 // ### SPSC optimizations
165 // The SPSC queue found online is essentially a linked list of nodes where one
166 // half of the nodes are the "queue of data" and the other half of nodes are a
167 // cache of unused nodes. The unused nodes are used such that an allocation is
168 // not required on every push() and a free doesn't need to happen on every
171 // As found online, however, the cache of nodes is of an infinite size. This
172 // means that if a channel at one point in its life had 50k items in the queue,
173 // then the queue will always have the capacity for 50k items. I believed that
174 // this was an unnecessary limitation of the implementation, so I have altered
175 // the queue to optionally have a bound on the cache size.
177 // By default, streams will have an unbounded SPSC queue with a small-ish cache
178 // size. The hope is that the cache is still large enough to have very fast
179 // send() operations while not too large such that millions of channels can
182 // ### MPSC optimizations
184 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
185 // a linked list under the hood to earn its unboundedness, but I have not put
186 // forth much effort into having a cache of nodes similar to the SPSC queue.
188 // For now, I believe that this is "ok" because shared channels are not the most
189 // common type, but soon we may wish to revisit this queue choice and determine
190 // another candidate for backend storage of shared channels.
192 // ## Overview of the Implementation
194 // Now that there's a little background on the concurrent queues used, it's
195 // worth going into much more detail about the channels themselves. The basic
196 // pseudocode for a send/recv are:
200 // queue.push(t) return if queue.pop()
201 // if increment() == -1 deschedule {
202 // wakeup() if decrement() > 0
203 // cancel_deschedule()
207 // As mentioned before, there are no locks in this implementation, only atomic
208 // instructions are used.
210 // ### The internal atomic counter
212 // Every channel has a shared counter with each half to keep track of the size
213 // of the queue. This counter is used to abort descheduling by the receiver and
214 // to know when to wake up on the sending side.
216 // As seen in the pseudocode, senders will increment this count and receivers
217 // will decrement the count. The theory behind this is that if a sender sees a
218 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
219 // then it doesn't need to block.
221 // The recv() method has a beginning call to pop(), and if successful, it needs
222 // to decrement the count. It is a crucial implementation detail that this
223 // decrement does *not* happen to the shared counter. If this were the case,
224 // then it would be possible for the counter to be very negative when there were
225 // no receivers waiting, in which case the senders would have to determine when
226 // it was actually appropriate to wake up a receiver.
228 // Instead, the "steal count" is kept track of separately (not atomically
229 // because it's only used by receivers), and then the decrement() call when
230 // descheduling will lump in all of the recent steals into one large decrement.
232 // The implication of this is that if a sender sees a -1 count, then there's
233 // guaranteed to be a waiter waiting!
235 // ## Native Implementation
237 // A major goal of these channels is to work seamlessly on and off the runtime.
238 // All of the previous race conditions have been worded in terms of
239 // scheduler-isms (which is obviously not available without the runtime).
241 // For now, native usage of channels (off the runtime) will fall back onto
242 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
243 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
244 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
245 // condition variable.
249 // Being able to support selection over channels has greatly influenced this
250 // design, and not only does selection need to work inside the runtime, but also
251 // outside the runtime.
253 // The implementation is fairly straightforward. The goal of select() is not to
254 // return some data, but only to return which channel can receive data without
255 // blocking. The implementation is essentially the entire blocking procedure
256 // followed by an increment as soon as its woken up. The cancellation procedure
257 // involves an increment and swapping out of to_wake to acquire ownership of the
258 // thread to unblock.
260 // Sadly this current implementation requires multiple allocations, so I have
261 // seen the throughput of select() be much worse than it should be. I do not
262 // believe that there is anything fundamental that needs to change about these
263 // channels, however, in order to support a more efficient select().
265 // FIXME: Select is now removed, so these factors are ready to be cleaned up!
269 // And now that you've seen all the races that I found and attempted to fix,
270 // here's the code for you to find some more!
272 use crate::sync::Arc;
276 use crate::cell::UnsafeCell;
277 use crate::time::{Duration, Instant};
289 /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
290 /// This half can only be owned by one thread.
292 /// Messages sent to the channel can be retrieved using [`recv`].
294 /// [`channel`]: fn.channel.html
295 /// [`sync_channel`]: fn.sync_channel.html
296 /// [`recv`]: struct.Receiver.html#method.recv
301 /// use std::sync::mpsc::channel;
303 /// use std::time::Duration;
305 /// let (send, recv) = channel();
307 /// thread::spawn(move || {
308 /// send.send("Hello world!").unwrap();
309 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
310 /// send.send("Delayed for 2 seconds").unwrap();
313 /// println!("{}", recv.recv().unwrap()); // Received immediately
314 /// println!("Waiting...");
315 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
317 #[stable(feature = "rust1", since = "1.0.0")]
318 pub struct Receiver<T> {
319 inner: UnsafeCell<Flavor<T>>,
322 // The receiver port can be sent from place to place, so long as it
323 // is not used to receive non-sendable things.
324 #[stable(feature = "rust1", since = "1.0.0")]
325 unsafe impl<T: Send> Send for Receiver<T> { }
327 #[stable(feature = "rust1", since = "1.0.0")]
328 impl<T> !Sync for Receiver<T> { }
330 /// An iterator over messages on a [`Receiver`], created by [`iter`].
332 /// This iterator will block whenever [`next`] is called,
333 /// waiting for a new message, and [`None`] will be returned
334 /// when the corresponding channel has hung up.
336 /// [`iter`]: struct.Receiver.html#method.iter
337 /// [`Receiver`]: struct.Receiver.html
338 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
339 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
344 /// use std::sync::mpsc::channel;
347 /// let (send, recv) = channel();
349 /// thread::spawn(move || {
350 /// send.send(1u8).unwrap();
351 /// send.send(2u8).unwrap();
352 /// send.send(3u8).unwrap();
355 /// for x in recv.iter() {
356 /// println!("Got: {}", x);
359 #[stable(feature = "rust1", since = "1.0.0")]
361 pub struct Iter<'a, T: 'a> {
365 /// An iterator that attempts to yield all pending values for a [`Receiver`],
366 /// created by [`try_iter`].
368 /// [`None`] will be returned when there are no pending values remaining or
369 /// if the corresponding channel has hung up.
371 /// This iterator will never block the caller in order to wait for data to
372 /// become available. Instead, it will return [`None`].
374 /// [`Receiver`]: struct.Receiver.html
375 /// [`try_iter`]: struct.Receiver.html#method.try_iter
376 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
381 /// use std::sync::mpsc::channel;
383 /// use std::time::Duration;
385 /// let (sender, receiver) = channel();
387 /// // Nothing is in the buffer yet
388 /// assert!(receiver.try_iter().next().is_none());
389 /// println!("Nothing in the buffer...");
391 /// thread::spawn(move || {
392 /// sender.send(1).unwrap();
393 /// sender.send(2).unwrap();
394 /// sender.send(3).unwrap();
397 /// println!("Going to sleep...");
398 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
400 /// for x in receiver.try_iter() {
401 /// println!("Got: {}", x);
404 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
406 pub struct TryIter<'a, T: 'a> {
410 /// An owning iterator over messages on a [`Receiver`],
411 /// created by **Receiver::into_iter**.
413 /// This iterator will block whenever [`next`]
414 /// is called, waiting for a new message, and [`None`] will be
415 /// returned if the corresponding channel has hung up.
417 /// [`Receiver`]: struct.Receiver.html
418 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
419 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
424 /// use std::sync::mpsc::channel;
427 /// let (send, recv) = channel();
429 /// thread::spawn(move || {
430 /// send.send(1u8).unwrap();
431 /// send.send(2u8).unwrap();
432 /// send.send(3u8).unwrap();
435 /// for x in recv.into_iter() {
436 /// println!("Got: {}", x);
439 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
441 pub struct IntoIter<T> {
445 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
446 /// owned by one thread, but it can be cloned to send to other threads.
448 /// Messages can be sent through this channel with [`send`].
450 /// [`channel`]: fn.channel.html
451 /// [`send`]: struct.Sender.html#method.send
456 /// use std::sync::mpsc::channel;
459 /// let (sender, receiver) = channel();
460 /// let sender2 = sender.clone();
462 /// // First thread owns sender
463 /// thread::spawn(move || {
464 /// sender.send(1).unwrap();
467 /// // Second thread owns sender2
468 /// thread::spawn(move || {
469 /// sender2.send(2).unwrap();
472 /// let msg = receiver.recv().unwrap();
473 /// let msg2 = receiver.recv().unwrap();
475 /// assert_eq!(3, msg + msg2);
477 #[stable(feature = "rust1", since = "1.0.0")]
478 pub struct Sender<T> {
479 inner: UnsafeCell<Flavor<T>>,
482 // The send port can be sent from place to place, so long as it
483 // is not used to send non-sendable things.
484 #[stable(feature = "rust1", since = "1.0.0")]
485 unsafe impl<T: Send> Send for Sender<T> { }
487 #[stable(feature = "rust1", since = "1.0.0")]
488 impl<T> !Sync for Sender<T> { }
490 /// The sending-half of Rust's synchronous [`sync_channel`] type.
492 /// Messages can be sent through this channel with [`send`] or [`try_send`].
494 /// [`send`] will block if there is no space in the internal buffer.
496 /// [`sync_channel`]: fn.sync_channel.html
497 /// [`send`]: struct.SyncSender.html#method.send
498 /// [`try_send`]: struct.SyncSender.html#method.try_send
503 /// use std::sync::mpsc::sync_channel;
506 /// // Create a sync_channel with buffer size 2
507 /// let (sync_sender, receiver) = sync_channel(2);
508 /// let sync_sender2 = sync_sender.clone();
510 /// // First thread owns sync_sender
511 /// thread::spawn(move || {
512 /// sync_sender.send(1).unwrap();
513 /// sync_sender.send(2).unwrap();
516 /// // Second thread owns sync_sender2
517 /// thread::spawn(move || {
518 /// sync_sender2.send(3).unwrap();
519 /// // thread will now block since the buffer is full
520 /// println!("Thread unblocked!");
525 /// msg = receiver.recv().unwrap();
526 /// println!("message {} received", msg);
528 /// // "Thread unblocked!" will be printed now
530 /// msg = receiver.recv().unwrap();
531 /// println!("message {} received", msg);
533 /// msg = receiver.recv().unwrap();
535 /// println!("message {} received", msg);
537 #[stable(feature = "rust1", since = "1.0.0")]
538 pub struct SyncSender<T> {
539 inner: Arc<sync::Packet<T>>,
542 #[stable(feature = "rust1", since = "1.0.0")]
543 unsafe impl<T: Send> Send for SyncSender<T> {}
545 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
546 /// function on **channel**s.
548 /// A **send** operation can only fail if the receiving end of a channel is
549 /// disconnected, implying that the data could never be received. The error
550 /// contains the data being sent as a payload so it can be recovered.
552 /// [`Sender::send`]: struct.Sender.html#method.send
553 /// [`SyncSender::send`]: struct.SyncSender.html#method.send
554 #[stable(feature = "rust1", since = "1.0.0")]
555 #[derive(PartialEq, Eq, Clone, Copy)]
556 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
558 /// An error returned from the [`recv`] function on a [`Receiver`].
560 /// The [`recv`] operation can only fail if the sending half of a
561 /// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
562 /// messages will ever be received.
564 /// [`recv`]: struct.Receiver.html#method.recv
565 /// [`Receiver`]: struct.Receiver.html
566 /// [`channel`]: fn.channel.html
567 /// [`sync_channel`]: fn.sync_channel.html
568 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
569 #[stable(feature = "rust1", since = "1.0.0")]
570 pub struct RecvError;
572 /// This enumeration is the list of the possible reasons that [`try_recv`] could
573 /// not return data when called. This can occur with both a [`channel`] and
574 /// a [`sync_channel`].
576 /// [`try_recv`]: struct.Receiver.html#method.try_recv
577 /// [`channel`]: fn.channel.html
578 /// [`sync_channel`]: fn.sync_channel.html
579 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
580 #[stable(feature = "rust1", since = "1.0.0")]
581 pub enum TryRecvError {
582 /// This **channel** is currently empty, but the **Sender**(s) have not yet
583 /// disconnected, so data may yet become available.
584 #[stable(feature = "rust1", since = "1.0.0")]
587 /// The **channel**'s sending half has become disconnected, and there will
588 /// never be any more data received on it.
589 #[stable(feature = "rust1", since = "1.0.0")]
593 /// This enumeration is the list of possible errors that made [`recv_timeout`]
594 /// unable to return data when called. This can occur with both a [`channel`] and
595 /// a [`sync_channel`].
597 /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
598 /// [`channel`]: fn.channel.html
599 /// [`sync_channel`]: fn.sync_channel.html
600 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
601 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
602 pub enum RecvTimeoutError {
603 /// This **channel** is currently empty, but the **Sender**(s) have not yet
604 /// disconnected, so data may yet become available.
605 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
607 /// The **channel**'s sending half has become disconnected, and there will
608 /// never be any more data received on it.
609 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
613 /// This enumeration is the list of the possible error outcomes for the
614 /// [`try_send`] method.
616 /// [`try_send`]: struct.SyncSender.html#method.try_send
617 #[stable(feature = "rust1", since = "1.0.0")]
618 #[derive(PartialEq, Eq, Clone, Copy)]
619 pub enum TrySendError<T> {
620 /// The data could not be sent on the [`sync_channel`] because it would require that
621 /// the callee block to send the data.
623 /// If this is a buffered channel, then the buffer is full at this time. If
624 /// this is not a buffered channel, then there is no [`Receiver`] available to
625 /// acquire the data.
627 /// [`sync_channel`]: fn.sync_channel.html
628 /// [`Receiver`]: struct.Receiver.html
629 #[stable(feature = "rust1", since = "1.0.0")]
630 Full(#[stable(feature = "rust1", since = "1.0.0")] T),
632 /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
633 /// sent. The data is returned back to the callee in this case.
635 /// [`sync_channel`]: fn.sync_channel.html
636 #[stable(feature = "rust1", since = "1.0.0")]
637 Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
641 Oneshot(Arc<oneshot::Packet<T>>),
642 Stream(Arc<stream::Packet<T>>),
643 Shared(Arc<shared::Packet<T>>),
644 Sync(Arc<sync::Packet<T>>),
648 trait UnsafeFlavor<T> {
649 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
650 unsafe fn inner_mut(&self) -> &mut Flavor<T> {
651 &mut *self.inner_unsafe().get()
653 unsafe fn inner(&self) -> &Flavor<T> {
654 &*self.inner_unsafe().get()
657 impl<T> UnsafeFlavor<T> for Sender<T> {
658 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
662 impl<T> UnsafeFlavor<T> for Receiver<T> {
663 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
668 /// Creates a new asynchronous channel, returning the sender/receiver halves.
669 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
670 /// the same order as it was sent, and no [`send`] will block the calling thread
671 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
672 /// block after its buffer limit is reached). [`recv`] will block until a message
675 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
676 /// only one [`Receiver`] is supported.
678 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
679 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
680 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
681 /// return a [`RecvError`].
683 /// [`send`]: struct.Sender.html#method.send
684 /// [`recv`]: struct.Receiver.html#method.recv
685 /// [`Sender`]: struct.Sender.html
686 /// [`Receiver`]: struct.Receiver.html
687 /// [`sync_channel`]: fn.sync_channel.html
688 /// [`SendError`]: struct.SendError.html
689 /// [`RecvError`]: struct.RecvError.html
694 /// use std::sync::mpsc::channel;
697 /// let (sender, receiver) = channel();
699 /// // Spawn off an expensive computation
700 /// thread::spawn(move|| {
701 /// # fn expensive_computation() {}
702 /// sender.send(expensive_computation()).unwrap();
705 /// // Do some useful work for awhile
707 /// // Let's see what that answer was
708 /// println!("{:?}", receiver.recv().unwrap());
710 #[stable(feature = "rust1", since = "1.0.0")]
711 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
712 let a = Arc::new(oneshot::Packet::new());
713 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
716 /// Creates a new synchronous, bounded channel.
717 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
718 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
719 /// [`Receiver`] will block until a message becomes available. `sync_channel`
720 /// differs greatly in the semantics of the sender, however.
722 /// This channel has an internal buffer on which messages will be queued.
723 /// `bound` specifies the buffer size. When the internal buffer becomes full,
724 /// future sends will *block* waiting for the buffer to open up. Note that a
725 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
726 /// where each [`send`] will not return until a [`recv`] is paired with it.
728 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
729 /// times, but only one [`Receiver`] is supported.
731 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
732 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
733 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
734 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
736 /// [`channel`]: fn.channel.html
737 /// [`send`]: struct.SyncSender.html#method.send
738 /// [`recv`]: struct.Receiver.html#method.recv
739 /// [`SyncSender`]: struct.SyncSender.html
740 /// [`Receiver`]: struct.Receiver.html
741 /// [`SendError`]: struct.SendError.html
742 /// [`RecvError`]: struct.RecvError.html
747 /// use std::sync::mpsc::sync_channel;
750 /// let (sender, receiver) = sync_channel(1);
752 /// // this returns immediately
753 /// sender.send(1).unwrap();
755 /// thread::spawn(move|| {
756 /// // this will block until the previous message has been received
757 /// sender.send(2).unwrap();
760 /// assert_eq!(receiver.recv().unwrap(), 1);
761 /// assert_eq!(receiver.recv().unwrap(), 2);
763 #[stable(feature = "rust1", since = "1.0.0")]
764 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
765 let a = Arc::new(sync::Packet::new(bound));
766 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
769 ////////////////////////////////////////////////////////////////////////////////
771 ////////////////////////////////////////////////////////////////////////////////
774 fn new(inner: Flavor<T>) -> Sender<T> {
776 inner: UnsafeCell::new(inner),
780 /// Attempts to send a value on this channel, returning it back if it could
783 /// A successful send occurs when it is determined that the other end of
784 /// the channel has not hung up already. An unsuccessful send would be one
785 /// where the corresponding receiver has already been deallocated. Note
786 /// that a return value of [`Err`] means that the data will never be
787 /// received, but a return value of [`Ok`] does *not* mean that the data
788 /// will be received. It is possible for the corresponding receiver to
789 /// hang up immediately after this function returns [`Ok`].
791 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
792 /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
794 /// This method will never block the current thread.
799 /// use std::sync::mpsc::channel;
801 /// let (tx, rx) = channel();
803 /// // This send is always successful
804 /// tx.send(1).unwrap();
806 /// // This send will fail because the receiver is gone
808 /// assert_eq!(tx.send(1).unwrap_err().0, 1);
810 #[stable(feature = "rust1", since = "1.0.0")]
811 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
812 let (new_inner, ret) = match *unsafe { self.inner() } {
813 Flavor::Oneshot(ref p) => {
815 return p.send(t).map_err(SendError);
817 let a = Arc::new(stream::Packet::new());
818 let rx = Receiver::new(Flavor::Stream(a.clone()));
819 match p.upgrade(rx) {
820 oneshot::UpSuccess => {
824 oneshot::UpDisconnected => (a, Err(t)),
825 oneshot::UpWoke(token) => {
826 // This send cannot panic because the thread is
827 // asleep (we're looking at it), so the receiver
829 a.send(t).ok().unwrap();
836 Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
837 Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
838 Flavor::Sync(..) => unreachable!(),
842 let tmp = Sender::new(Flavor::Stream(new_inner));
843 mem::swap(self.inner_mut(), tmp.inner_mut());
845 ret.map_err(SendError)
849 #[stable(feature = "rust1", since = "1.0.0")]
850 impl<T> Clone for Sender<T> {
851 fn clone(&self) -> Sender<T> {
852 let packet = match *unsafe { self.inner() } {
853 Flavor::Oneshot(ref p) => {
854 let a = Arc::new(shared::Packet::new());
856 let guard = a.postinit_lock();
857 let rx = Receiver::new(Flavor::Shared(a.clone()));
858 let sleeper = match p.upgrade(rx) {
860 oneshot::UpDisconnected => None,
861 oneshot::UpWoke(task) => Some(task),
863 a.inherit_blocker(sleeper, guard);
867 Flavor::Stream(ref p) => {
868 let a = Arc::new(shared::Packet::new());
870 let guard = a.postinit_lock();
871 let rx = Receiver::new(Flavor::Shared(a.clone()));
872 let sleeper = match p.upgrade(rx) {
874 stream::UpDisconnected => None,
875 stream::UpWoke(task) => Some(task),
877 a.inherit_blocker(sleeper, guard);
881 Flavor::Shared(ref p) => {
883 return Sender::new(Flavor::Shared(p.clone()));
885 Flavor::Sync(..) => unreachable!(),
889 let tmp = Sender::new(Flavor::Shared(packet.clone()));
890 mem::swap(self.inner_mut(), tmp.inner_mut());
892 Sender::new(Flavor::Shared(packet))
896 #[stable(feature = "rust1", since = "1.0.0")]
897 impl<T> Drop for Sender<T> {
899 match *unsafe { self.inner() } {
900 Flavor::Oneshot(ref p) => p.drop_chan(),
901 Flavor::Stream(ref p) => p.drop_chan(),
902 Flavor::Shared(ref p) => p.drop_chan(),
903 Flavor::Sync(..) => unreachable!(),
908 #[stable(feature = "mpsc_debug", since = "1.8.0")]
909 impl<T> fmt::Debug for Sender<T> {
910 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
911 f.debug_struct("Sender").finish()
915 ////////////////////////////////////////////////////////////////////////////////
917 ////////////////////////////////////////////////////////////////////////////////
919 impl<T> SyncSender<T> {
920 fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
924 /// Sends a value on this synchronous channel.
926 /// This function will *block* until space in the internal buffer becomes
927 /// available or a receiver is available to hand off the message to.
929 /// Note that a successful send does *not* guarantee that the receiver will
930 /// ever see the data if there is a buffer on this channel. Items may be
931 /// enqueued in the internal buffer for the receiver to receive at a later
932 /// time. If the buffer size is 0, however, the channel becomes a rendezvous
933 /// channel and it guarantees that the receiver has indeed received
934 /// the data if this function returns success.
936 /// This function will never panic, but it may return [`Err`] if the
937 /// [`Receiver`] has disconnected and is no longer able to receive
940 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
941 /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
946 /// use std::sync::mpsc::sync_channel;
949 /// // Create a rendezvous sync_channel with buffer size 0
950 /// let (sync_sender, receiver) = sync_channel(0);
952 /// thread::spawn(move || {
953 /// println!("sending message...");
954 /// sync_sender.send(1).unwrap();
955 /// // Thread is now blocked until the message is received
957 /// println!("...message received!");
960 /// let msg = receiver.recv().unwrap();
961 /// assert_eq!(1, msg);
963 #[stable(feature = "rust1", since = "1.0.0")]
964 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
965 self.inner.send(t).map_err(SendError)
968 /// Attempts to send a value on this channel without blocking.
970 /// This method differs from [`send`] by returning immediately if the
971 /// channel's buffer is full or no receiver is waiting to acquire some
972 /// data. Compared with [`send`], this function has two failure cases
973 /// instead of one (one for disconnection, one for a full buffer).
975 /// See [`send`] for notes about guarantees of whether the
976 /// receiver has received the data or not if this function is successful.
978 /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
983 /// use std::sync::mpsc::sync_channel;
986 /// // Create a sync_channel with buffer size 1
987 /// let (sync_sender, receiver) = sync_channel(1);
988 /// let sync_sender2 = sync_sender.clone();
990 /// // First thread owns sync_sender
991 /// thread::spawn(move || {
992 /// sync_sender.send(1).unwrap();
993 /// sync_sender.send(2).unwrap();
994 /// // Thread blocked
997 /// // Second thread owns sync_sender2
998 /// thread::spawn(move || {
999 /// // This will return an error and send
1000 /// // no message if the buffer is full
1001 /// let _ = sync_sender2.try_send(3);
1005 /// msg = receiver.recv().unwrap();
1006 /// println!("message {} received", msg);
1008 /// msg = receiver.recv().unwrap();
1009 /// println!("message {} received", msg);
1011 /// // Third message may have never been sent
1012 /// match receiver.try_recv() {
1013 /// Ok(msg) => println!("message {} received", msg),
1014 /// Err(_) => println!("the third message was never sent"),
1017 #[stable(feature = "rust1", since = "1.0.0")]
1018 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
1019 self.inner.try_send(t)
1023 #[stable(feature = "rust1", since = "1.0.0")]
1024 impl<T> Clone for SyncSender<T> {
1025 fn clone(&self) -> SyncSender<T> {
1026 self.inner.clone_chan();
1027 SyncSender::new(self.inner.clone())
1031 #[stable(feature = "rust1", since = "1.0.0")]
1032 impl<T> Drop for SyncSender<T> {
1033 fn drop(&mut self) {
1034 self.inner.drop_chan();
1038 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1039 impl<T> fmt::Debug for SyncSender<T> {
1040 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1041 f.debug_struct("SyncSender").finish()
1045 ////////////////////////////////////////////////////////////////////////////////
1047 ////////////////////////////////////////////////////////////////////////////////
1049 impl<T> Receiver<T> {
1050 fn new(inner: Flavor<T>) -> Receiver<T> {
1051 Receiver { inner: UnsafeCell::new(inner) }
1054 /// Attempts to return a pending value on this receiver without blocking.
1056 /// This method will never block the caller in order to wait for data to
1057 /// become available. Instead, this will always return immediately with a
1058 /// possible option of pending data on the channel.
1060 /// This is useful for a flavor of "optimistic check" before deciding to
1061 /// block on a receiver.
1063 /// Compared with [`recv`], this function has two failure cases instead of one
1064 /// (one for disconnection, one for an empty buffer).
1066 /// [`recv`]: struct.Receiver.html#method.recv
1071 /// use std::sync::mpsc::{Receiver, channel};
1073 /// let (_, receiver): (_, Receiver<i32>) = channel();
1075 /// assert!(receiver.try_recv().is_err());
1077 #[stable(feature = "rust1", since = "1.0.0")]
1078 pub fn try_recv(&self) -> Result<T, TryRecvError> {
1080 let new_port = match *unsafe { self.inner() } {
1081 Flavor::Oneshot(ref p) => {
1082 match p.try_recv() {
1083 Ok(t) => return Ok(t),
1084 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1085 Err(oneshot::Disconnected) => {
1086 return Err(TryRecvError::Disconnected)
1088 Err(oneshot::Upgraded(rx)) => rx,
1091 Flavor::Stream(ref p) => {
1092 match p.try_recv() {
1093 Ok(t) => return Ok(t),
1094 Err(stream::Empty) => return Err(TryRecvError::Empty),
1095 Err(stream::Disconnected) => {
1096 return Err(TryRecvError::Disconnected)
1098 Err(stream::Upgraded(rx)) => rx,
1101 Flavor::Shared(ref p) => {
1102 match p.try_recv() {
1103 Ok(t) => return Ok(t),
1104 Err(shared::Empty) => return Err(TryRecvError::Empty),
1105 Err(shared::Disconnected) => {
1106 return Err(TryRecvError::Disconnected)
1110 Flavor::Sync(ref p) => {
1111 match p.try_recv() {
1112 Ok(t) => return Ok(t),
1113 Err(sync::Empty) => return Err(TryRecvError::Empty),
1114 Err(sync::Disconnected) => {
1115 return Err(TryRecvError::Disconnected)
1121 mem::swap(self.inner_mut(),
1122 new_port.inner_mut());
1127 /// Attempts to wait for a value on this receiver, returning an error if the
1128 /// corresponding channel has hung up.
1130 /// This function will always block the current thread if there is no data
1131 /// available and it's possible for more data to be sent. Once a message is
1132 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1133 /// receiver will wake up and return that message.
1135 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1136 /// this call is blocking, this call will wake up and return [`Err`] to
1137 /// indicate that no more messages can ever be received on this channel.
1138 /// However, since channels are buffered, messages sent before the disconnect
1139 /// will still be properly received.
1141 /// [`Sender`]: struct.Sender.html
1142 /// [`SyncSender`]: struct.SyncSender.html
1143 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1148 /// use std::sync::mpsc;
1149 /// use std::thread;
1151 /// let (send, recv) = mpsc::channel();
1152 /// let handle = thread::spawn(move || {
1153 /// send.send(1u8).unwrap();
1156 /// handle.join().unwrap();
1158 /// assert_eq!(Ok(1), recv.recv());
1161 /// Buffering behavior:
1164 /// use std::sync::mpsc;
1165 /// use std::thread;
1166 /// use std::sync::mpsc::RecvError;
1168 /// let (send, recv) = mpsc::channel();
1169 /// let handle = thread::spawn(move || {
1170 /// send.send(1u8).unwrap();
1171 /// send.send(2).unwrap();
1172 /// send.send(3).unwrap();
1176 /// // wait for the thread to join so we ensure the sender is dropped
1177 /// handle.join().unwrap();
1179 /// assert_eq!(Ok(1), recv.recv());
1180 /// assert_eq!(Ok(2), recv.recv());
1181 /// assert_eq!(Ok(3), recv.recv());
1182 /// assert_eq!(Err(RecvError), recv.recv());
1184 #[stable(feature = "rust1", since = "1.0.0")]
1185 pub fn recv(&self) -> Result<T, RecvError> {
1187 let new_port = match *unsafe { self.inner() } {
1188 Flavor::Oneshot(ref p) => {
1189 match p.recv(None) {
1190 Ok(t) => return Ok(t),
1191 Err(oneshot::Disconnected) => return Err(RecvError),
1192 Err(oneshot::Upgraded(rx)) => rx,
1193 Err(oneshot::Empty) => unreachable!(),
1196 Flavor::Stream(ref p) => {
1197 match p.recv(None) {
1198 Ok(t) => return Ok(t),
1199 Err(stream::Disconnected) => return Err(RecvError),
1200 Err(stream::Upgraded(rx)) => rx,
1201 Err(stream::Empty) => unreachable!(),
1204 Flavor::Shared(ref p) => {
1205 match p.recv(None) {
1206 Ok(t) => return Ok(t),
1207 Err(shared::Disconnected) => return Err(RecvError),
1208 Err(shared::Empty) => unreachable!(),
1211 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1214 mem::swap(self.inner_mut(), new_port.inner_mut());
1219 /// Attempts to wait for a value on this receiver, returning an error if the
1220 /// corresponding channel has hung up, or if it waits more than `timeout`.
1222 /// This function will always block the current thread if there is no data
1223 /// available and it's possible for more data to be sent. Once a message is
1224 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1225 /// receiver will wake up and return that message.
1227 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1228 /// this call is blocking, this call will wake up and return [`Err`] to
1229 /// indicate that no more messages can ever be received on this channel.
1230 /// However, since channels are buffered, messages sent before the disconnect
1231 /// will still be properly received.
1233 /// [`Sender`]: struct.Sender.html
1234 /// [`SyncSender`]: struct.SyncSender.html
1235 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1239 /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
1240 /// to panic unexpectedly with the following example:
1243 /// use std::sync::mpsc::channel;
1244 /// use std::thread;
1245 /// use std::time::Duration;
1247 /// let (tx, rx) = channel::<String>();
1249 /// thread::spawn(move || {
1250 /// let d = Duration::from_millis(10);
1252 /// println!("recv");
1253 /// let _r = rx.recv_timeout(d);
1257 /// thread::sleep(Duration::from_millis(100));
1258 /// let _c1 = tx.clone();
1260 /// thread::sleep(Duration::from_secs(1));
1263 /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
1267 /// Successfully receiving value before encountering timeout:
1270 /// use std::thread;
1271 /// use std::time::Duration;
1272 /// use std::sync::mpsc;
1274 /// let (send, recv) = mpsc::channel();
1276 /// thread::spawn(move || {
1277 /// send.send('a').unwrap();
1281 /// recv.recv_timeout(Duration::from_millis(400)),
1286 /// Receiving an error upon reaching timeout:
1289 /// use std::thread;
1290 /// use std::time::Duration;
1291 /// use std::sync::mpsc;
1293 /// let (send, recv) = mpsc::channel();
1295 /// thread::spawn(move || {
1296 /// thread::sleep(Duration::from_millis(800));
1297 /// send.send('a').unwrap();
1301 /// recv.recv_timeout(Duration::from_millis(400)),
1302 /// Err(mpsc::RecvTimeoutError::Timeout)
1305 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1306 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1307 // Do an optimistic try_recv to avoid the performance impact of
1308 // Instant::now() in the full-channel case.
1309 match self.try_recv() {
1310 Ok(result) => Ok(result),
1311 Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
1312 Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
1313 Some(deadline) => self.recv_deadline(deadline),
1314 // So far in the future that it's practically the same as waiting indefinitely.
1315 None => self.recv().map_err(RecvTimeoutError::from),
1320 /// Attempts to wait for a value on this receiver, returning an error if the
1321 /// corresponding channel has hung up, or if `deadline` is reached.
1323 /// This function will always block the current thread if there is no data
1324 /// available and it's possible for more data to be sent. Once a message is
1325 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1326 /// receiver will wake up and return that message.
1328 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1329 /// this call is blocking, this call will wake up and return [`Err`] to
1330 /// indicate that no more messages can ever be received on this channel.
1331 /// However, since channels are buffered, messages sent before the disconnect
1332 /// will still be properly received.
1334 /// [`Sender`]: struct.Sender.html
1335 /// [`SyncSender`]: struct.SyncSender.html
1336 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1340 /// Successfully receiving value before reaching deadline:
1343 /// #![feature(deadline_api)]
1344 /// use std::thread;
1345 /// use std::time::{Duration, Instant};
1346 /// use std::sync::mpsc;
1348 /// let (send, recv) = mpsc::channel();
1350 /// thread::spawn(move || {
1351 /// send.send('a').unwrap();
1355 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1360 /// Receiving an error upon reaching deadline:
1363 /// #![feature(deadline_api)]
1364 /// use std::thread;
1365 /// use std::time::{Duration, Instant};
1366 /// use std::sync::mpsc;
1368 /// let (send, recv) = mpsc::channel();
1370 /// thread::spawn(move || {
1371 /// thread::sleep(Duration::from_millis(800));
1372 /// send.send('a').unwrap();
1376 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1377 /// Err(mpsc::RecvTimeoutError::Timeout)
1380 #[unstable(feature = "deadline_api", issue = "46316")]
1381 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1382 use self::RecvTimeoutError::*;
1385 let port_or_empty = match *unsafe { self.inner() } {
1386 Flavor::Oneshot(ref p) => {
1387 match p.recv(Some(deadline)) {
1388 Ok(t) => return Ok(t),
1389 Err(oneshot::Disconnected) => return Err(Disconnected),
1390 Err(oneshot::Upgraded(rx)) => Some(rx),
1391 Err(oneshot::Empty) => None,
1394 Flavor::Stream(ref p) => {
1395 match p.recv(Some(deadline)) {
1396 Ok(t) => return Ok(t),
1397 Err(stream::Disconnected) => return Err(Disconnected),
1398 Err(stream::Upgraded(rx)) => Some(rx),
1399 Err(stream::Empty) => None,
1402 Flavor::Shared(ref p) => {
1403 match p.recv(Some(deadline)) {
1404 Ok(t) => return Ok(t),
1405 Err(shared::Disconnected) => return Err(Disconnected),
1406 Err(shared::Empty) => None,
1409 Flavor::Sync(ref p) => {
1410 match p.recv(Some(deadline)) {
1411 Ok(t) => return Ok(t),
1412 Err(sync::Disconnected) => return Err(Disconnected),
1413 Err(sync::Empty) => None,
1418 if let Some(new_port) = port_or_empty {
1420 mem::swap(self.inner_mut(), new_port.inner_mut());
1424 // If we're already passed the deadline, and we're here without
1425 // data, return a timeout, else try again.
1426 if Instant::now() >= deadline {
1427 return Err(Timeout);
1432 /// Returns an iterator that will block waiting for messages, but never
1433 /// [`panic!`]. It will return [`None`] when the channel has hung up.
1435 /// [`panic!`]: ../../../std/macro.panic.html
1436 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1441 /// use std::sync::mpsc::channel;
1442 /// use std::thread;
1444 /// let (send, recv) = channel();
1446 /// thread::spawn(move || {
1447 /// send.send(1).unwrap();
1448 /// send.send(2).unwrap();
1449 /// send.send(3).unwrap();
1452 /// let mut iter = recv.iter();
1453 /// assert_eq!(iter.next(), Some(1));
1454 /// assert_eq!(iter.next(), Some(2));
1455 /// assert_eq!(iter.next(), Some(3));
1456 /// assert_eq!(iter.next(), None);
1458 #[stable(feature = "rust1", since = "1.0.0")]
1459 pub fn iter(&self) -> Iter<'_, T> {
1463 /// Returns an iterator that will attempt to yield all pending values.
1464 /// It will return `None` if there are no more pending values or if the
1465 /// channel has hung up. The iterator will never [`panic!`] or block the
1466 /// user by waiting for values.
1468 /// [`panic!`]: ../../../std/macro.panic.html
1473 /// use std::sync::mpsc::channel;
1474 /// use std::thread;
1475 /// use std::time::Duration;
1477 /// let (sender, receiver) = channel();
1479 /// // nothing is in the buffer yet
1480 /// assert!(receiver.try_iter().next().is_none());
1482 /// thread::spawn(move || {
1483 /// thread::sleep(Duration::from_secs(1));
1484 /// sender.send(1).unwrap();
1485 /// sender.send(2).unwrap();
1486 /// sender.send(3).unwrap();
1489 /// // nothing is in the buffer yet
1490 /// assert!(receiver.try_iter().next().is_none());
1492 /// // block for two seconds
1493 /// thread::sleep(Duration::from_secs(2));
1495 /// let mut iter = receiver.try_iter();
1496 /// assert_eq!(iter.next(), Some(1));
1497 /// assert_eq!(iter.next(), Some(2));
1498 /// assert_eq!(iter.next(), Some(3));
1499 /// assert_eq!(iter.next(), None);
1501 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1502 pub fn try_iter(&self) -> TryIter<'_, T> {
1503 TryIter { rx: self }
1508 #[stable(feature = "rust1", since = "1.0.0")]
1509 impl<'a, T> Iterator for Iter<'a, T> {
1512 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1515 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1516 impl<'a, T> Iterator for TryIter<'a, T> {
1519 fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
1522 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1523 impl<'a, T> IntoIterator for &'a Receiver<T> {
1525 type IntoIter = Iter<'a, T>;
1527 fn into_iter(self) -> Iter<'a, T> { self.iter() }
1530 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1531 impl<T> Iterator for IntoIter<T> {
1533 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1536 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1537 impl <T> IntoIterator for Receiver<T> {
1539 type IntoIter = IntoIter<T>;
1541 fn into_iter(self) -> IntoIter<T> {
1542 IntoIter { rx: self }
1546 #[stable(feature = "rust1", since = "1.0.0")]
1547 impl<T> Drop for Receiver<T> {
1548 fn drop(&mut self) {
1549 match *unsafe { self.inner() } {
1550 Flavor::Oneshot(ref p) => p.drop_port(),
1551 Flavor::Stream(ref p) => p.drop_port(),
1552 Flavor::Shared(ref p) => p.drop_port(),
1553 Flavor::Sync(ref p) => p.drop_port(),
1558 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1559 impl<T> fmt::Debug for Receiver<T> {
1560 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1561 f.debug_struct("Receiver").finish()
1565 #[stable(feature = "rust1", since = "1.0.0")]
1566 impl<T> fmt::Debug for SendError<T> {
1567 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1568 "SendError(..)".fmt(f)
1572 #[stable(feature = "rust1", since = "1.0.0")]
1573 impl<T> fmt::Display for SendError<T> {
1574 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1575 "sending on a closed channel".fmt(f)
1579 #[stable(feature = "rust1", since = "1.0.0")]
1580 impl<T: Send> error::Error for SendError<T> {
1581 fn description(&self) -> &str {
1582 "sending on a closed channel"
1586 #[stable(feature = "rust1", since = "1.0.0")]
1587 impl<T> fmt::Debug for TrySendError<T> {
1588 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1590 TrySendError::Full(..) => "Full(..)".fmt(f),
1591 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1596 #[stable(feature = "rust1", since = "1.0.0")]
1597 impl<T> fmt::Display for TrySendError<T> {
1598 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1600 TrySendError::Full(..) => {
1601 "sending on a full channel".fmt(f)
1603 TrySendError::Disconnected(..) => {
1604 "sending on a closed channel".fmt(f)
1610 #[stable(feature = "rust1", since = "1.0.0")]
1611 impl<T: Send> error::Error for TrySendError<T> {
1613 fn description(&self) -> &str {
1615 TrySendError::Full(..) => {
1616 "sending on a full channel"
1618 TrySendError::Disconnected(..) => {
1619 "sending on a closed channel"
1625 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1626 impl<T> From<SendError<T>> for TrySendError<T> {
1627 fn from(err: SendError<T>) -> TrySendError<T> {
1629 SendError(t) => TrySendError::Disconnected(t),
1634 #[stable(feature = "rust1", since = "1.0.0")]
1635 impl fmt::Display for RecvError {
1636 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1637 "receiving on a closed channel".fmt(f)
1641 #[stable(feature = "rust1", since = "1.0.0")]
1642 impl error::Error for RecvError {
1644 fn description(&self) -> &str {
1645 "receiving on a closed channel"
1649 #[stable(feature = "rust1", since = "1.0.0")]
1650 impl fmt::Display for TryRecvError {
1651 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1653 TryRecvError::Empty => {
1654 "receiving on an empty channel".fmt(f)
1656 TryRecvError::Disconnected => {
1657 "receiving on a closed channel".fmt(f)
1663 #[stable(feature = "rust1", since = "1.0.0")]
1664 impl error::Error for TryRecvError {
1666 fn description(&self) -> &str {
1668 TryRecvError::Empty => {
1669 "receiving on an empty channel"
1671 TryRecvError::Disconnected => {
1672 "receiving on a closed channel"
1678 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1679 impl From<RecvError> for TryRecvError {
1680 fn from(err: RecvError) -> TryRecvError {
1682 RecvError => TryRecvError::Disconnected,
1687 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1688 impl fmt::Display for RecvTimeoutError {
1689 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1691 RecvTimeoutError::Timeout => {
1692 "timed out waiting on channel".fmt(f)
1694 RecvTimeoutError::Disconnected => {
1695 "channel is empty and sending half is closed".fmt(f)
1701 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1702 impl error::Error for RecvTimeoutError {
1703 fn description(&self) -> &str {
1705 RecvTimeoutError::Timeout => {
1706 "timed out waiting on channel"
1708 RecvTimeoutError::Disconnected => {
1709 "channel is empty and sending half is closed"
1715 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1716 impl From<RecvError> for RecvTimeoutError {
1717 fn from(err: RecvError) -> RecvTimeoutError {
1719 RecvError => RecvTimeoutError::Disconnected,
1724 #[cfg(all(test, not(target_os = "emscripten")))]
1729 use crate::time::{Duration, Instant};
1731 pub fn stress_factor() -> usize {
1732 match env::var("RUST_TEST_STRESS") {
1733 Ok(val) => val.parse().unwrap(),
1740 let (tx, rx) = channel::<i32>();
1741 tx.send(1).unwrap();
1742 assert_eq!(rx.recv().unwrap(), 1);
1747 let (tx, _rx) = channel::<Box<isize>>();
1748 tx.send(box 1).unwrap();
1752 fn drop_full_shared() {
1753 let (tx, _rx) = channel::<Box<isize>>();
1756 tx.send(box 1).unwrap();
1761 let (tx, rx) = channel::<i32>();
1762 tx.send(1).unwrap();
1763 assert_eq!(rx.recv().unwrap(), 1);
1764 let tx = tx.clone();
1765 tx.send(1).unwrap();
1766 assert_eq!(rx.recv().unwrap(), 1);
1770 fn smoke_threads() {
1771 let (tx, rx) = channel::<i32>();
1772 let _t = thread::spawn(move|| {
1773 tx.send(1).unwrap();
1775 assert_eq!(rx.recv().unwrap(), 1);
1779 fn smoke_port_gone() {
1780 let (tx, rx) = channel::<i32>();
1782 assert!(tx.send(1).is_err());
1786 fn smoke_shared_port_gone() {
1787 let (tx, rx) = channel::<i32>();
1789 assert!(tx.send(1).is_err())
1793 fn smoke_shared_port_gone2() {
1794 let (tx, rx) = channel::<i32>();
1796 let tx2 = tx.clone();
1798 assert!(tx2.send(1).is_err());
1802 fn port_gone_concurrent() {
1803 let (tx, rx) = channel::<i32>();
1804 let _t = thread::spawn(move|| {
1807 while tx.send(1).is_ok() {}
1811 fn port_gone_concurrent_shared() {
1812 let (tx, rx) = channel::<i32>();
1813 let tx2 = tx.clone();
1814 let _t = thread::spawn(move|| {
1817 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1821 fn smoke_chan_gone() {
1822 let (tx, rx) = channel::<i32>();
1824 assert!(rx.recv().is_err());
1828 fn smoke_chan_gone_shared() {
1829 let (tx, rx) = channel::<()>();
1830 let tx2 = tx.clone();
1833 assert!(rx.recv().is_err());
1837 fn chan_gone_concurrent() {
1838 let (tx, rx) = channel::<i32>();
1839 let _t = thread::spawn(move|| {
1840 tx.send(1).unwrap();
1841 tx.send(1).unwrap();
1843 while rx.recv().is_ok() {}
1848 let (tx, rx) = channel::<i32>();
1849 let t = thread::spawn(move|| {
1850 for _ in 0..10000 { tx.send(1).unwrap(); }
1853 assert_eq!(rx.recv().unwrap(), 1);
1855 t.join().ok().expect("thread panicked");
1859 fn stress_shared() {
1860 const AMT: u32 = 10000;
1861 const NTHREADS: u32 = 8;
1862 let (tx, rx) = channel::<i32>();
1864 let t = thread::spawn(move|| {
1865 for _ in 0..AMT * NTHREADS {
1866 assert_eq!(rx.recv().unwrap(), 1);
1868 match rx.try_recv() {
1874 for _ in 0..NTHREADS {
1875 let tx = tx.clone();
1876 thread::spawn(move|| {
1877 for _ in 0..AMT { tx.send(1).unwrap(); }
1881 t.join().ok().expect("thread panicked");
1885 fn send_from_outside_runtime() {
1886 let (tx1, rx1) = channel::<()>();
1887 let (tx2, rx2) = channel::<i32>();
1888 let t1 = thread::spawn(move|| {
1889 tx1.send(()).unwrap();
1891 assert_eq!(rx2.recv().unwrap(), 1);
1894 rx1.recv().unwrap();
1895 let t2 = thread::spawn(move|| {
1897 tx2.send(1).unwrap();
1900 t1.join().ok().expect("thread panicked");
1901 t2.join().ok().expect("thread panicked");
1905 fn recv_from_outside_runtime() {
1906 let (tx, rx) = channel::<i32>();
1907 let t = thread::spawn(move|| {
1909 assert_eq!(rx.recv().unwrap(), 1);
1913 tx.send(1).unwrap();
1915 t.join().ok().expect("thread panicked");
1920 let (tx1, rx1) = channel::<i32>();
1921 let (tx2, rx2) = channel::<i32>();
1922 let t1 = thread::spawn(move|| {
1923 assert_eq!(rx1.recv().unwrap(), 1);
1924 tx2.send(2).unwrap();
1926 let t2 = thread::spawn(move|| {
1927 tx1.send(1).unwrap();
1928 assert_eq!(rx2.recv().unwrap(), 2);
1930 t1.join().ok().expect("thread panicked");
1931 t2.join().ok().expect("thread panicked");
1935 fn oneshot_single_thread_close_port_first() {
1936 // Simple test of closing without sending
1937 let (_tx, rx) = channel::<i32>();
1942 fn oneshot_single_thread_close_chan_first() {
1943 // Simple test of closing without sending
1944 let (tx, _rx) = channel::<i32>();
1949 fn oneshot_single_thread_send_port_close() {
1950 // Testing that the sender cleans up the payload if receiver is closed
1951 let (tx, rx) = channel::<Box<i32>>();
1953 assert!(tx.send(box 0).is_err());
1957 fn oneshot_single_thread_recv_chan_close() {
1958 // Receiving on a closed chan will panic
1959 let res = thread::spawn(move|| {
1960 let (tx, rx) = channel::<i32>();
1965 assert!(res.is_err());
1969 fn oneshot_single_thread_send_then_recv() {
1970 let (tx, rx) = channel::<Box<i32>>();
1971 tx.send(box 10).unwrap();
1972 assert!(*rx.recv().unwrap() == 10);
1976 fn oneshot_single_thread_try_send_open() {
1977 let (tx, rx) = channel::<i32>();
1978 assert!(tx.send(10).is_ok());
1979 assert!(rx.recv().unwrap() == 10);
1983 fn oneshot_single_thread_try_send_closed() {
1984 let (tx, rx) = channel::<i32>();
1986 assert!(tx.send(10).is_err());
1990 fn oneshot_single_thread_try_recv_open() {
1991 let (tx, rx) = channel::<i32>();
1992 tx.send(10).unwrap();
1993 assert!(rx.recv() == Ok(10));
1997 fn oneshot_single_thread_try_recv_closed() {
1998 let (tx, rx) = channel::<i32>();
2000 assert!(rx.recv().is_err());
2004 fn oneshot_single_thread_peek_data() {
2005 let (tx, rx) = channel::<i32>();
2006 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2007 tx.send(10).unwrap();
2008 assert_eq!(rx.try_recv(), Ok(10));
2012 fn oneshot_single_thread_peek_close() {
2013 let (tx, rx) = channel::<i32>();
2015 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2016 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2020 fn oneshot_single_thread_peek_open() {
2021 let (_tx, rx) = channel::<i32>();
2022 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2026 fn oneshot_multi_task_recv_then_send() {
2027 let (tx, rx) = channel::<Box<i32>>();
2028 let _t = thread::spawn(move|| {
2029 assert!(*rx.recv().unwrap() == 10);
2032 tx.send(box 10).unwrap();
2036 fn oneshot_multi_task_recv_then_close() {
2037 let (tx, rx) = channel::<Box<i32>>();
2038 let _t = thread::spawn(move|| {
2041 let res = thread::spawn(move|| {
2042 assert!(*rx.recv().unwrap() == 10);
2044 assert!(res.is_err());
2048 fn oneshot_multi_thread_close_stress() {
2049 for _ in 0..stress_factor() {
2050 let (tx, rx) = channel::<i32>();
2051 let _t = thread::spawn(move|| {
2059 fn oneshot_multi_thread_send_close_stress() {
2060 for _ in 0..stress_factor() {
2061 let (tx, rx) = channel::<i32>();
2062 let _t = thread::spawn(move|| {
2065 let _ = thread::spawn(move|| {
2066 tx.send(1).unwrap();
2072 fn oneshot_multi_thread_recv_close_stress() {
2073 for _ in 0..stress_factor() {
2074 let (tx, rx) = channel::<i32>();
2075 thread::spawn(move|| {
2076 let res = thread::spawn(move|| {
2079 assert!(res.is_err());
2081 let _t = thread::spawn(move|| {
2082 thread::spawn(move|| {
2090 fn oneshot_multi_thread_send_recv_stress() {
2091 for _ in 0..stress_factor() {
2092 let (tx, rx) = channel::<Box<isize>>();
2093 let _t = thread::spawn(move|| {
2094 tx.send(box 10).unwrap();
2096 assert!(*rx.recv().unwrap() == 10);
2101 fn stream_send_recv_stress() {
2102 for _ in 0..stress_factor() {
2103 let (tx, rx) = channel();
2108 fn send(tx: Sender<Box<i32>>, i: i32) {
2109 if i == 10 { return }
2111 thread::spawn(move|| {
2112 tx.send(box i).unwrap();
2117 fn recv(rx: Receiver<Box<i32>>, i: i32) {
2118 if i == 10 { return }
2120 thread::spawn(move|| {
2121 assert!(*rx.recv().unwrap() == i);
2129 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2130 fn oneshot_single_thread_recv_timeout() {
2131 let (tx, rx) = channel();
2132 tx.send(()).unwrap();
2133 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2134 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2135 tx.send(()).unwrap();
2136 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2140 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2141 fn stress_recv_timeout_two_threads() {
2142 let (tx, rx) = channel();
2143 let stress = stress_factor() + 100;
2144 let timeout = Duration::from_millis(100);
2146 thread::spawn(move || {
2147 for i in 0..stress {
2149 thread::sleep(timeout * 2);
2151 tx.send(1usize).unwrap();
2155 let mut recv_count = 0;
2157 match rx.recv_timeout(timeout) {
2159 assert_eq!(n, 1usize);
2162 Err(RecvTimeoutError::Timeout) => continue,
2163 Err(RecvTimeoutError::Disconnected) => break,
2167 assert_eq!(recv_count, stress);
2171 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2172 fn recv_timeout_upgrade() {
2173 let (tx, rx) = channel::<()>();
2174 let timeout = Duration::from_millis(1);
2175 let _tx_clone = tx.clone();
2177 let start = Instant::now();
2178 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
2179 assert!(Instant::now() >= start + timeout);
2183 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2184 fn stress_recv_timeout_shared() {
2185 let (tx, rx) = channel();
2186 let stress = stress_factor() + 100;
2188 for i in 0..stress {
2189 let tx = tx.clone();
2190 thread::spawn(move || {
2191 thread::sleep(Duration::from_millis(i as u64 * 10));
2192 tx.send(1usize).unwrap();
2198 let mut recv_count = 0;
2200 match rx.recv_timeout(Duration::from_millis(10)) {
2202 assert_eq!(n, 1usize);
2205 Err(RecvTimeoutError::Timeout) => continue,
2206 Err(RecvTimeoutError::Disconnected) => break,
2210 assert_eq!(recv_count, stress);
2214 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2215 fn very_long_recv_timeout_wont_panic() {
2216 let (tx, rx) = channel::<()>();
2217 let join_handle = thread::spawn(move || {
2218 rx.recv_timeout(Duration::from_secs(u64::max_value()))
2220 thread::sleep(Duration::from_secs(1));
2221 assert!(tx.send(()).is_ok());
2222 assert_eq!(join_handle.join().unwrap(), Ok(()));
2227 // Regression test that we don't run out of stack in scheduler context
2228 let (tx, rx) = channel();
2229 for _ in 0..10000 { tx.send(()).unwrap(); }
2230 for _ in 0..10000 { rx.recv().unwrap(); }
2234 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2235 fn shared_recv_timeout() {
2236 let (tx, rx) = channel();
2239 let tx = tx.clone();
2240 thread::spawn(move|| {
2241 tx.send(()).unwrap();
2245 for _ in 0..total { rx.recv().unwrap(); }
2247 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2248 tx.send(()).unwrap();
2249 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2253 fn shared_chan_stress() {
2254 let (tx, rx) = channel();
2255 let total = stress_factor() + 100;
2257 let tx = tx.clone();
2258 thread::spawn(move|| {
2259 tx.send(()).unwrap();
2269 fn test_nested_recv_iter() {
2270 let (tx, rx) = channel::<i32>();
2271 let (total_tx, total_rx) = channel::<i32>();
2273 let _t = thread::spawn(move|| {
2275 for x in rx.iter() {
2278 total_tx.send(acc).unwrap();
2281 tx.send(3).unwrap();
2282 tx.send(1).unwrap();
2283 tx.send(2).unwrap();
2285 assert_eq!(total_rx.recv().unwrap(), 6);
2289 fn test_recv_iter_break() {
2290 let (tx, rx) = channel::<i32>();
2291 let (count_tx, count_rx) = channel();
2293 let _t = thread::spawn(move|| {
2295 for x in rx.iter() {
2302 count_tx.send(count).unwrap();
2305 tx.send(2).unwrap();
2306 tx.send(2).unwrap();
2307 tx.send(2).unwrap();
2310 assert_eq!(count_rx.recv().unwrap(), 4);
2314 fn test_recv_try_iter() {
2315 let (request_tx, request_rx) = channel();
2316 let (response_tx, response_rx) = channel();
2318 // Request `x`s until we have `6`.
2319 let t = thread::spawn(move|| {
2322 for x in response_rx.try_iter() {
2328 request_tx.send(()).unwrap();
2332 for _ in request_rx.iter() {
2333 if response_tx.send(2).is_err() {
2338 assert_eq!(t.join().unwrap(), 6);
2342 fn test_recv_into_iter_owned() {
2344 let (tx, rx) = channel::<i32>();
2345 tx.send(1).unwrap();
2346 tx.send(2).unwrap();
2350 assert_eq!(iter.next().unwrap(), 1);
2351 assert_eq!(iter.next().unwrap(), 2);
2352 assert_eq!(iter.next().is_none(), true);
2356 fn test_recv_into_iter_borrowed() {
2357 let (tx, rx) = channel::<i32>();
2358 tx.send(1).unwrap();
2359 tx.send(2).unwrap();
2361 let mut iter = (&rx).into_iter();
2362 assert_eq!(iter.next().unwrap(), 1);
2363 assert_eq!(iter.next().unwrap(), 2);
2364 assert_eq!(iter.next().is_none(), true);
2368 fn try_recv_states() {
2369 let (tx1, rx1) = channel::<i32>();
2370 let (tx2, rx2) = channel::<()>();
2371 let (tx3, rx3) = channel::<()>();
2372 let _t = thread::spawn(move|| {
2373 rx2.recv().unwrap();
2374 tx1.send(1).unwrap();
2375 tx3.send(()).unwrap();
2376 rx2.recv().unwrap();
2378 tx3.send(()).unwrap();
2381 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2382 tx2.send(()).unwrap();
2383 rx3.recv().unwrap();
2384 assert_eq!(rx1.try_recv(), Ok(1));
2385 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2386 tx2.send(()).unwrap();
2387 rx3.recv().unwrap();
2388 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2391 // This bug used to end up in a livelock inside of the Receiver destructor
2392 // because the internal state of the Shared packet was corrupted
2394 fn destroy_upgraded_shared_port_when_sender_still_active() {
2395 let (tx, rx) = channel();
2396 let (tx2, rx2) = channel();
2397 let _t = thread::spawn(move|| {
2398 rx.recv().unwrap(); // wait on a oneshot
2399 drop(rx); // destroy a shared
2400 tx2.send(()).unwrap();
2402 // make sure the other thread has gone to sleep
2403 for _ in 0..5000 { thread::yield_now(); }
2405 // upgrade to a shared chan and send a message
2408 t.send(()).unwrap();
2410 // wait for the child thread to exit before we exit
2411 rx2.recv().unwrap();
2416 let (tx, _) = channel();
2417 let _ = tx.send(123);
2418 assert_eq!(tx.send(123), Err(SendError(123)));
2422 #[cfg(all(test, not(target_os = "emscripten")))]
2427 use crate::time::Duration;
2429 pub fn stress_factor() -> usize {
2430 match env::var("RUST_TEST_STRESS") {
2431 Ok(val) => val.parse().unwrap(),
2438 let (tx, rx) = sync_channel::<i32>(1);
2439 tx.send(1).unwrap();
2440 assert_eq!(rx.recv().unwrap(), 1);
2445 let (tx, _rx) = sync_channel::<Box<isize>>(1);
2446 tx.send(box 1).unwrap();
2451 let (tx, rx) = sync_channel::<i32>(1);
2452 tx.send(1).unwrap();
2453 assert_eq!(rx.recv().unwrap(), 1);
2454 let tx = tx.clone();
2455 tx.send(1).unwrap();
2456 assert_eq!(rx.recv().unwrap(), 1);
2460 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2462 let (tx, rx) = sync_channel::<i32>(1);
2463 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2464 tx.send(1).unwrap();
2465 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2469 fn smoke_threads() {
2470 let (tx, rx) = sync_channel::<i32>(0);
2471 let _t = thread::spawn(move|| {
2472 tx.send(1).unwrap();
2474 assert_eq!(rx.recv().unwrap(), 1);
2478 fn smoke_port_gone() {
2479 let (tx, rx) = sync_channel::<i32>(0);
2481 assert!(tx.send(1).is_err());
2485 fn smoke_shared_port_gone2() {
2486 let (tx, rx) = sync_channel::<i32>(0);
2488 let tx2 = tx.clone();
2490 assert!(tx2.send(1).is_err());
2494 fn port_gone_concurrent() {
2495 let (tx, rx) = sync_channel::<i32>(0);
2496 let _t = thread::spawn(move|| {
2499 while tx.send(1).is_ok() {}
2503 fn port_gone_concurrent_shared() {
2504 let (tx, rx) = sync_channel::<i32>(0);
2505 let tx2 = tx.clone();
2506 let _t = thread::spawn(move|| {
2509 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2513 fn smoke_chan_gone() {
2514 let (tx, rx) = sync_channel::<i32>(0);
2516 assert!(rx.recv().is_err());
2520 fn smoke_chan_gone_shared() {
2521 let (tx, rx) = sync_channel::<()>(0);
2522 let tx2 = tx.clone();
2525 assert!(rx.recv().is_err());
2529 fn chan_gone_concurrent() {
2530 let (tx, rx) = sync_channel::<i32>(0);
2531 thread::spawn(move|| {
2532 tx.send(1).unwrap();
2533 tx.send(1).unwrap();
2535 while rx.recv().is_ok() {}
2540 let (tx, rx) = sync_channel::<i32>(0);
2541 thread::spawn(move|| {
2542 for _ in 0..10000 { tx.send(1).unwrap(); }
2545 assert_eq!(rx.recv().unwrap(), 1);
2550 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2551 fn stress_recv_timeout_two_threads() {
2552 let (tx, rx) = sync_channel::<i32>(0);
2554 thread::spawn(move|| {
2555 for _ in 0..10000 { tx.send(1).unwrap(); }
2558 let mut recv_count = 0;
2560 match rx.recv_timeout(Duration::from_millis(1)) {
2565 Err(RecvTimeoutError::Timeout) => continue,
2566 Err(RecvTimeoutError::Disconnected) => break,
2570 assert_eq!(recv_count, 10000);
2574 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2575 fn stress_recv_timeout_shared() {
2576 const AMT: u32 = 1000;
2577 const NTHREADS: u32 = 8;
2578 let (tx, rx) = sync_channel::<i32>(0);
2579 let (dtx, drx) = sync_channel::<()>(0);
2581 thread::spawn(move|| {
2582 let mut recv_count = 0;
2584 match rx.recv_timeout(Duration::from_millis(10)) {
2589 Err(RecvTimeoutError::Timeout) => continue,
2590 Err(RecvTimeoutError::Disconnected) => break,
2594 assert_eq!(recv_count, AMT * NTHREADS);
2595 assert!(rx.try_recv().is_err());
2597 dtx.send(()).unwrap();
2600 for _ in 0..NTHREADS {
2601 let tx = tx.clone();
2602 thread::spawn(move|| {
2603 for _ in 0..AMT { tx.send(1).unwrap(); }
2609 drx.recv().unwrap();
2613 fn stress_shared() {
2614 const AMT: u32 = 1000;
2615 const NTHREADS: u32 = 8;
2616 let (tx, rx) = sync_channel::<i32>(0);
2617 let (dtx, drx) = sync_channel::<()>(0);
2619 thread::spawn(move|| {
2620 for _ in 0..AMT * NTHREADS {
2621 assert_eq!(rx.recv().unwrap(), 1);
2623 match rx.try_recv() {
2627 dtx.send(()).unwrap();
2630 for _ in 0..NTHREADS {
2631 let tx = tx.clone();
2632 thread::spawn(move|| {
2633 for _ in 0..AMT { tx.send(1).unwrap(); }
2637 drx.recv().unwrap();
2641 fn oneshot_single_thread_close_port_first() {
2642 // Simple test of closing without sending
2643 let (_tx, rx) = sync_channel::<i32>(0);
2648 fn oneshot_single_thread_close_chan_first() {
2649 // Simple test of closing without sending
2650 let (tx, _rx) = sync_channel::<i32>(0);
2655 fn oneshot_single_thread_send_port_close() {
2656 // Testing that the sender cleans up the payload if receiver is closed
2657 let (tx, rx) = sync_channel::<Box<i32>>(0);
2659 assert!(tx.send(box 0).is_err());
2663 fn oneshot_single_thread_recv_chan_close() {
2664 // Receiving on a closed chan will panic
2665 let res = thread::spawn(move|| {
2666 let (tx, rx) = sync_channel::<i32>(0);
2671 assert!(res.is_err());
2675 fn oneshot_single_thread_send_then_recv() {
2676 let (tx, rx) = sync_channel::<Box<i32>>(1);
2677 tx.send(box 10).unwrap();
2678 assert!(*rx.recv().unwrap() == 10);
2682 fn oneshot_single_thread_try_send_open() {
2683 let (tx, rx) = sync_channel::<i32>(1);
2684 assert_eq!(tx.try_send(10), Ok(()));
2685 assert!(rx.recv().unwrap() == 10);
2689 fn oneshot_single_thread_try_send_closed() {
2690 let (tx, rx) = sync_channel::<i32>(0);
2692 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2696 fn oneshot_single_thread_try_send_closed2() {
2697 let (tx, _rx) = sync_channel::<i32>(0);
2698 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2702 fn oneshot_single_thread_try_recv_open() {
2703 let (tx, rx) = sync_channel::<i32>(1);
2704 tx.send(10).unwrap();
2705 assert!(rx.recv() == Ok(10));
2709 fn oneshot_single_thread_try_recv_closed() {
2710 let (tx, rx) = sync_channel::<i32>(0);
2712 assert!(rx.recv().is_err());
2716 fn oneshot_single_thread_try_recv_closed_with_data() {
2717 let (tx, rx) = sync_channel::<i32>(1);
2718 tx.send(10).unwrap();
2720 assert_eq!(rx.try_recv(), Ok(10));
2721 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2725 fn oneshot_single_thread_peek_data() {
2726 let (tx, rx) = sync_channel::<i32>(1);
2727 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2728 tx.send(10).unwrap();
2729 assert_eq!(rx.try_recv(), Ok(10));
2733 fn oneshot_single_thread_peek_close() {
2734 let (tx, rx) = sync_channel::<i32>(0);
2736 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2737 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2741 fn oneshot_single_thread_peek_open() {
2742 let (_tx, rx) = sync_channel::<i32>(0);
2743 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2747 fn oneshot_multi_task_recv_then_send() {
2748 let (tx, rx) = sync_channel::<Box<i32>>(0);
2749 let _t = thread::spawn(move|| {
2750 assert!(*rx.recv().unwrap() == 10);
2753 tx.send(box 10).unwrap();
2757 fn oneshot_multi_task_recv_then_close() {
2758 let (tx, rx) = sync_channel::<Box<i32>>(0);
2759 let _t = thread::spawn(move|| {
2762 let res = thread::spawn(move|| {
2763 assert!(*rx.recv().unwrap() == 10);
2765 assert!(res.is_err());
2769 fn oneshot_multi_thread_close_stress() {
2770 for _ in 0..stress_factor() {
2771 let (tx, rx) = sync_channel::<i32>(0);
2772 let _t = thread::spawn(move|| {
2780 fn oneshot_multi_thread_send_close_stress() {
2781 for _ in 0..stress_factor() {
2782 let (tx, rx) = sync_channel::<i32>(0);
2783 let _t = thread::spawn(move|| {
2786 let _ = thread::spawn(move || {
2787 tx.send(1).unwrap();
2793 fn oneshot_multi_thread_recv_close_stress() {
2794 for _ in 0..stress_factor() {
2795 let (tx, rx) = sync_channel::<i32>(0);
2796 let _t = thread::spawn(move|| {
2797 let res = thread::spawn(move|| {
2800 assert!(res.is_err());
2802 let _t = thread::spawn(move|| {
2803 thread::spawn(move|| {
2811 fn oneshot_multi_thread_send_recv_stress() {
2812 for _ in 0..stress_factor() {
2813 let (tx, rx) = sync_channel::<Box<i32>>(0);
2814 let _t = thread::spawn(move|| {
2815 tx.send(box 10).unwrap();
2817 assert!(*rx.recv().unwrap() == 10);
2822 fn stream_send_recv_stress() {
2823 for _ in 0..stress_factor() {
2824 let (tx, rx) = sync_channel::<Box<i32>>(0);
2829 fn send(tx: SyncSender<Box<i32>>, i: i32) {
2830 if i == 10 { return }
2832 thread::spawn(move|| {
2833 tx.send(box i).unwrap();
2838 fn recv(rx: Receiver<Box<i32>>, i: i32) {
2839 if i == 10 { return }
2841 thread::spawn(move|| {
2842 assert!(*rx.recv().unwrap() == i);
2851 // Regression test that we don't run out of stack in scheduler context
2852 let (tx, rx) = sync_channel(10000);
2853 for _ in 0..10000 { tx.send(()).unwrap(); }
2854 for _ in 0..10000 { rx.recv().unwrap(); }
2858 fn shared_chan_stress() {
2859 let (tx, rx) = sync_channel(0);
2860 let total = stress_factor() + 100;
2862 let tx = tx.clone();
2863 thread::spawn(move|| {
2864 tx.send(()).unwrap();
2874 fn test_nested_recv_iter() {
2875 let (tx, rx) = sync_channel::<i32>(0);
2876 let (total_tx, total_rx) = sync_channel::<i32>(0);
2878 let _t = thread::spawn(move|| {
2880 for x in rx.iter() {
2883 total_tx.send(acc).unwrap();
2886 tx.send(3).unwrap();
2887 tx.send(1).unwrap();
2888 tx.send(2).unwrap();
2890 assert_eq!(total_rx.recv().unwrap(), 6);
2894 fn test_recv_iter_break() {
2895 let (tx, rx) = sync_channel::<i32>(0);
2896 let (count_tx, count_rx) = sync_channel(0);
2898 let _t = thread::spawn(move|| {
2900 for x in rx.iter() {
2907 count_tx.send(count).unwrap();
2910 tx.send(2).unwrap();
2911 tx.send(2).unwrap();
2912 tx.send(2).unwrap();
2913 let _ = tx.try_send(2);
2915 assert_eq!(count_rx.recv().unwrap(), 4);
2919 fn try_recv_states() {
2920 let (tx1, rx1) = sync_channel::<i32>(1);
2921 let (tx2, rx2) = sync_channel::<()>(1);
2922 let (tx3, rx3) = sync_channel::<()>(1);
2923 let _t = thread::spawn(move|| {
2924 rx2.recv().unwrap();
2925 tx1.send(1).unwrap();
2926 tx3.send(()).unwrap();
2927 rx2.recv().unwrap();
2929 tx3.send(()).unwrap();
2932 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2933 tx2.send(()).unwrap();
2934 rx3.recv().unwrap();
2935 assert_eq!(rx1.try_recv(), Ok(1));
2936 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2937 tx2.send(()).unwrap();
2938 rx3.recv().unwrap();
2939 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2942 // This bug used to end up in a livelock inside of the Receiver destructor
2943 // because the internal state of the Shared packet was corrupted
2945 fn destroy_upgraded_shared_port_when_sender_still_active() {
2946 let (tx, rx) = sync_channel::<()>(0);
2947 let (tx2, rx2) = sync_channel::<()>(0);
2948 let _t = thread::spawn(move|| {
2949 rx.recv().unwrap(); // wait on a oneshot
2950 drop(rx); // destroy a shared
2951 tx2.send(()).unwrap();
2953 // make sure the other thread has gone to sleep
2954 for _ in 0..5000 { thread::yield_now(); }
2956 // upgrade to a shared chan and send a message
2959 t.send(()).unwrap();
2961 // wait for the child thread to exit before we exit
2962 rx2.recv().unwrap();
2967 let (tx, rx) = sync_channel::<i32>(0);
2968 let _t = thread::spawn(move|| { rx.recv().unwrap(); });
2969 assert_eq!(tx.send(1), Ok(()));
2974 let (tx, rx) = sync_channel::<i32>(0);
2975 let _t = thread::spawn(move|| { drop(rx); });
2976 assert!(tx.send(1).is_err());
2981 let (tx, rx) = sync_channel::<i32>(1);
2982 assert_eq!(tx.send(1), Ok(()));
2983 let _t =thread::spawn(move|| { drop(rx); });
2984 assert!(tx.send(1).is_err());
2989 let (tx, rx) = sync_channel::<i32>(0);
2990 let tx2 = tx.clone();
2991 let (done, donerx) = channel();
2992 let done2 = done.clone();
2993 let _t = thread::spawn(move|| {
2994 assert!(tx.send(1).is_err());
2995 done.send(()).unwrap();
2997 let _t = thread::spawn(move|| {
2998 assert!(tx2.send(2).is_err());
2999 done2.send(()).unwrap();
3002 donerx.recv().unwrap();
3003 donerx.recv().unwrap();
3008 let (tx, _rx) = sync_channel::<i32>(0);
3009 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3014 let (tx, _rx) = sync_channel::<i32>(1);
3015 assert_eq!(tx.try_send(1), Ok(()));
3016 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3021 let (tx, rx) = sync_channel::<i32>(1);
3022 assert_eq!(tx.try_send(1), Ok(()));
3024 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
3030 let (tx1, rx1) = sync_channel::<()>(3);
3031 let (tx2, rx2) = sync_channel::<()>(3);
3033 let _t = thread::spawn(move|| {
3034 rx1.recv().unwrap();
3035 tx2.try_send(()).unwrap();
3038 tx1.try_send(()).unwrap();
3039 rx2.recv().unwrap();