1 // ignore-tidy-filelength
3 //! Multi-producer, single-consumer FIFO queue communication primitives.
5 //! This module provides message-based communication over channels, concretely
6 //! defined among three types:
12 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
13 //! senders are clone-able (multi-producer) such that many threads can send
14 //! simultaneously to one receiver (single-consumer).
16 //! These channels come in two flavors:
18 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
19 //! will return a `(Sender, Receiver)` tuple where all sends will be
20 //! **asynchronous** (they never block). The channel conceptually has an
23 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
24 //! return a `(SyncSender, Receiver)` tuple where the storage for pending
25 //! messages is a pre-allocated buffer of a fixed size. All sends will be
26 //! **synchronous** by blocking until there is buffer space available. Note
27 //! that a bound of 0 is allowed, causing the channel to become a "rendezvous"
28 //! channel where each sender atomically hands off a message to a receiver.
30 //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
31 //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
32 //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
33 //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
34 //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
35 //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
39 //! The send and receive operations on channels will all return a [`Result`]
40 //! indicating whether the operation succeeded or not. An unsuccessful operation
41 //! is normally indicative of the other half of a channel having "hung up" by
42 //! being dropped in its corresponding thread.
44 //! Once half of a channel has been deallocated, most operations can no longer
45 //! continue to make progress, so [`Err`] will be returned. Many applications
46 //! will continue to [`unwrap`] the results returned from this module,
47 //! instigating a propagation of failure among threads if one unexpectedly dies.
49 //! [`Result`]: ../../../std/result/enum.Result.html
50 //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
51 //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
59 //! use std::sync::mpsc::channel;
61 //! // Create a simple streaming channel
62 //! let (tx, rx) = channel();
63 //! thread::spawn(move|| {
64 //! tx.send(10).unwrap();
66 //! assert_eq!(rx.recv().unwrap(), 10);
73 //! use std::sync::mpsc::channel;
75 //! // Create a shared channel that can be sent along from many threads
76 //! // where tx is the sending half (tx for transmission), and rx is the receiving
77 //! // half (rx for receiving).
78 //! let (tx, rx) = channel();
80 //! let tx = tx.clone();
81 //! thread::spawn(move|| {
82 //! tx.send(i).unwrap();
87 //! let j = rx.recv().unwrap();
88 //! assert!(0 <= j && j < 10);
92 //! Propagating panics:
95 //! use std::sync::mpsc::channel;
97 //! // The call to recv() will return an error because the channel has already
98 //! // hung up (or been deallocated)
99 //! let (tx, rx) = channel::<i32>();
101 //! assert!(rx.recv().is_err());
104 //! Synchronous channels:
108 //! use std::sync::mpsc::sync_channel;
110 //! let (tx, rx) = sync_channel::<i32>(0);
111 //! thread::spawn(move|| {
112 //! // This will wait for the parent thread to start receiving
113 //! tx.send(53).unwrap();
115 //! rx.recv().unwrap();
118 #![stable(feature = "rust1", since = "1.0.0")]
119 #![allow(deprecated)] // for mpsc_select
121 // A description of how Rust's channel implementation works
123 // Channels are supposed to be the basic building block for all other
124 // concurrent primitives that are used in Rust. As a result, the channel type
125 // needs to be highly optimized, flexible, and broad enough for use everywhere.
127 // The choice of implementation of all channels is to be built on lock-free data
128 // structures. The channels themselves are then consequently also lock-free data
129 // structures. As always with lock-free code, this is a very "here be dragons"
130 // territory, especially because I'm unaware of any academic papers that have
131 // gone into great length about channels of these flavors.
133 // ## Flavors of channels
135 // From the perspective of a consumer of this library, there is only one flavor
136 // of channel. This channel can be used as a stream and cloned to allow multiple
137 // senders. Under the hood, however, there are actually three flavors of
140 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
141 // case. They contain as few atomics as possible and
142 // involve one and exactly one allocation.
143 // * Streams - these channels are optimized for the non-shared use case. They
144 // use a different concurrent queue that is more tailored for this
145 // use case. The initial allocation of this flavor of channel is not
147 // * Shared - this is the most general form of channel that this module offers,
148 // a channel with multiple senders. This type is as optimized as it
149 // can be, but the previous two types mentioned are much faster for
152 // ## Concurrent queues
154 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
155 // but recv() obviously blocks. This means that under the hood there must be
156 // some shared and concurrent queue holding all of the actual data.
158 // With two flavors of channels, two flavors of queues are also used. We have
159 // chosen to use queues from a well-known author that are abbreviated as SPSC
160 // and MPSC (single producer, single consumer and multiple producer, single
161 // consumer). SPSC queues are used for streams while MPSC queues are used for
164 // ### SPSC optimizations
166 // The SPSC queue found online is essentially a linked list of nodes where one
167 // half of the nodes are the "queue of data" and the other half of nodes are a
168 // cache of unused nodes. The unused nodes are used such that an allocation is
169 // not required on every push() and a free doesn't need to happen on every
172 // As found online, however, the cache of nodes is of an infinite size. This
173 // means that if a channel at one point in its life had 50k items in the queue,
174 // then the queue will always have the capacity for 50k items. I believed that
175 // this was an unnecessary limitation of the implementation, so I have altered
176 // the queue to optionally have a bound on the cache size.
178 // By default, streams will have an unbounded SPSC queue with a small-ish cache
179 // size. The hope is that the cache is still large enough to have very fast
180 // send() operations while not too large such that millions of channels can
183 // ### MPSC optimizations
185 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
186 // a linked list under the hood to earn its unboundedness, but I have not put
187 // forth much effort into having a cache of nodes similar to the SPSC queue.
189 // For now, I believe that this is "ok" because shared channels are not the most
190 // common type, but soon we may wish to revisit this queue choice and determine
191 // another candidate for backend storage of shared channels.
193 // ## Overview of the Implementation
195 // Now that there's a little background on the concurrent queues used, it's
196 // worth going into much more detail about the channels themselves. The basic
197 // pseudocode for a send/recv are:
201 // queue.push(t) return if queue.pop()
202 // if increment() == -1 deschedule {
203 // wakeup() if decrement() > 0
204 // cancel_deschedule()
208 // As mentioned before, there are no locks in this implementation, only atomic
209 // instructions are used.
211 // ### The internal atomic counter
213 // Every channel has a shared counter with each half to keep track of the size
214 // of the queue. This counter is used to abort descheduling by the receiver and
215 // to know when to wake up on the sending side.
217 // As seen in the pseudocode, senders will increment this count and receivers
218 // will decrement the count. The theory behind this is that if a sender sees a
219 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
220 // then it doesn't need to block.
222 // The recv() method has a beginning call to pop(), and if successful, it needs
223 // to decrement the count. It is a crucial implementation detail that this
224 // decrement does *not* happen to the shared counter. If this were the case,
225 // then it would be possible for the counter to be very negative when there were
226 // no receivers waiting, in which case the senders would have to determine when
227 // it was actually appropriate to wake up a receiver.
229 // Instead, the "steal count" is kept track of separately (not atomically
230 // because it's only used by receivers), and then the decrement() call when
231 // descheduling will lump in all of the recent steals into one large decrement.
233 // The implication of this is that if a sender sees a -1 count, then there's
234 // guaranteed to be a waiter waiting!
236 // ## Native Implementation
238 // A major goal of these channels is to work seamlessly on and off the runtime.
239 // All of the previous race conditions have been worded in terms of
240 // scheduler-isms (which is obviously not available without the runtime).
242 // For now, native usage of channels (off the runtime) will fall back onto
243 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
244 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
245 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
246 // condition variable.
250 // Being able to support selection over channels has greatly influenced this
251 // design, and not only does selection need to work inside the runtime, but also
252 // outside the runtime.
254 // The implementation is fairly straightforward. The goal of select() is not to
255 // return some data, but only to return which channel can receive data without
256 // blocking. The implementation is essentially the entire blocking procedure
257 // followed by an increment as soon as its woken up. The cancellation procedure
258 // involves an increment and swapping out of to_wake to acquire ownership of the
259 // thread to unblock.
261 // Sadly this current implementation requires multiple allocations, so I have
262 // seen the throughput of select() be much worse than it should be. I do not
263 // believe that there is anything fundamental that needs to change about these
264 // channels, however, in order to support a more efficient select().
268 // And now that you've seen all the races that I found and attempted to fix,
269 // here's the code for you to find some more!
271 use crate::sync::Arc;
275 use crate::cell::UnsafeCell;
276 use crate::time::{Duration, Instant};
278 #[unstable(feature = "mpsc_select", issue = "27800")]
279 pub use self::select::{Select, Handle};
280 use self::select::StartResult;
281 use self::select::StartResult::*;
282 use self::blocking::SignalToken;
284 #[cfg(all(test, not(target_os = "emscripten")))]
298 /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
299 /// This half can only be owned by one thread.
301 /// Messages sent to the channel can be retrieved using [`recv`].
303 /// [`channel`]: fn.channel.html
304 /// [`sync_channel`]: fn.sync_channel.html
305 /// [`recv`]: struct.Receiver.html#method.recv
310 /// use std::sync::mpsc::channel;
312 /// use std::time::Duration;
314 /// let (send, recv) = channel();
316 /// thread::spawn(move || {
317 /// send.send("Hello world!").unwrap();
318 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
319 /// send.send("Delayed for 2 seconds").unwrap();
322 /// println!("{}", recv.recv().unwrap()); // Received immediately
323 /// println!("Waiting...");
324 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
326 #[stable(feature = "rust1", since = "1.0.0")]
327 pub struct Receiver<T> {
328 inner: UnsafeCell<Flavor<T>>,
331 // The receiver port can be sent from place to place, so long as it
332 // is not used to receive non-sendable things.
333 #[stable(feature = "rust1", since = "1.0.0")]
334 unsafe impl<T: Send> Send for Receiver<T> { }
336 #[stable(feature = "rust1", since = "1.0.0")]
337 impl<T> !Sync for Receiver<T> { }
339 /// An iterator over messages on a [`Receiver`], created by [`iter`].
341 /// This iterator will block whenever [`next`] is called,
342 /// waiting for a new message, and [`None`] will be returned
343 /// when the corresponding channel has hung up.
345 /// [`iter`]: struct.Receiver.html#method.iter
346 /// [`Receiver`]: struct.Receiver.html
347 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
348 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
353 /// use std::sync::mpsc::channel;
356 /// let (send, recv) = channel();
358 /// thread::spawn(move || {
359 /// send.send(1u8).unwrap();
360 /// send.send(2u8).unwrap();
361 /// send.send(3u8).unwrap();
364 /// for x in recv.iter() {
365 /// println!("Got: {}", x);
368 #[stable(feature = "rust1", since = "1.0.0")]
370 pub struct Iter<'a, T: 'a> {
374 /// An iterator that attempts to yield all pending values for a [`Receiver`],
375 /// created by [`try_iter`].
377 /// [`None`] will be returned when there are no pending values remaining or
378 /// if the corresponding channel has hung up.
380 /// This iterator will never block the caller in order to wait for data to
381 /// become available. Instead, it will return [`None`].
383 /// [`Receiver`]: struct.Receiver.html
384 /// [`try_iter`]: struct.Receiver.html#method.try_iter
385 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
390 /// use std::sync::mpsc::channel;
392 /// use std::time::Duration;
394 /// let (sender, receiver) = channel();
396 /// // Nothing is in the buffer yet
397 /// assert!(receiver.try_iter().next().is_none());
398 /// println!("Nothing in the buffer...");
400 /// thread::spawn(move || {
401 /// sender.send(1).unwrap();
402 /// sender.send(2).unwrap();
403 /// sender.send(3).unwrap();
406 /// println!("Going to sleep...");
407 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
409 /// for x in receiver.try_iter() {
410 /// println!("Got: {}", x);
413 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
415 pub struct TryIter<'a, T: 'a> {
419 /// An owning iterator over messages on a [`Receiver`],
420 /// created by **Receiver::into_iter**.
422 /// This iterator will block whenever [`next`]
423 /// is called, waiting for a new message, and [`None`] will be
424 /// returned if the corresponding channel has hung up.
426 /// [`Receiver`]: struct.Receiver.html
427 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
428 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
433 /// use std::sync::mpsc::channel;
436 /// let (send, recv) = channel();
438 /// thread::spawn(move || {
439 /// send.send(1u8).unwrap();
440 /// send.send(2u8).unwrap();
441 /// send.send(3u8).unwrap();
444 /// for x in recv.into_iter() {
445 /// println!("Got: {}", x);
448 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
450 pub struct IntoIter<T> {
454 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
455 /// owned by one thread, but it can be cloned to send to other threads.
457 /// Messages can be sent through this channel with [`send`].
459 /// [`channel`]: fn.channel.html
460 /// [`send`]: struct.Sender.html#method.send
465 /// use std::sync::mpsc::channel;
468 /// let (sender, receiver) = channel();
469 /// let sender2 = sender.clone();
471 /// // First thread owns sender
472 /// thread::spawn(move || {
473 /// sender.send(1).unwrap();
476 /// // Second thread owns sender2
477 /// thread::spawn(move || {
478 /// sender2.send(2).unwrap();
481 /// let msg = receiver.recv().unwrap();
482 /// let msg2 = receiver.recv().unwrap();
484 /// assert_eq!(3, msg + msg2);
486 #[stable(feature = "rust1", since = "1.0.0")]
487 pub struct Sender<T> {
488 inner: UnsafeCell<Flavor<T>>,
491 // The send port can be sent from place to place, so long as it
492 // is not used to send non-sendable things.
493 #[stable(feature = "rust1", since = "1.0.0")]
494 unsafe impl<T: Send> Send for Sender<T> { }
496 #[stable(feature = "rust1", since = "1.0.0")]
497 impl<T> !Sync for Sender<T> { }
499 /// The sending-half of Rust's synchronous [`sync_channel`] type.
501 /// Messages can be sent through this channel with [`send`] or [`try_send`].
503 /// [`send`] will block if there is no space in the internal buffer.
505 /// [`sync_channel`]: fn.sync_channel.html
506 /// [`send`]: struct.SyncSender.html#method.send
507 /// [`try_send`]: struct.SyncSender.html#method.try_send
512 /// use std::sync::mpsc::sync_channel;
515 /// // Create a sync_channel with buffer size 2
516 /// let (sync_sender, receiver) = sync_channel(2);
517 /// let sync_sender2 = sync_sender.clone();
519 /// // First thread owns sync_sender
520 /// thread::spawn(move || {
521 /// sync_sender.send(1).unwrap();
522 /// sync_sender.send(2).unwrap();
525 /// // Second thread owns sync_sender2
526 /// thread::spawn(move || {
527 /// sync_sender2.send(3).unwrap();
528 /// // thread will now block since the buffer is full
529 /// println!("Thread unblocked!");
534 /// msg = receiver.recv().unwrap();
535 /// println!("message {} received", msg);
537 /// // "Thread unblocked!" will be printed now
539 /// msg = receiver.recv().unwrap();
540 /// println!("message {} received", msg);
542 /// msg = receiver.recv().unwrap();
544 /// println!("message {} received", msg);
546 #[stable(feature = "rust1", since = "1.0.0")]
547 pub struct SyncSender<T> {
548 inner: Arc<sync::Packet<T>>,
551 #[stable(feature = "rust1", since = "1.0.0")]
552 unsafe impl<T: Send> Send for SyncSender<T> {}
554 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
555 /// function on **channel**s.
557 /// A **send** operation can only fail if the receiving end of a channel is
558 /// disconnected, implying that the data could never be received. The error
559 /// contains the data being sent as a payload so it can be recovered.
561 /// [`Sender::send`]: struct.Sender.html#method.send
562 /// [`SyncSender::send`]: struct.SyncSender.html#method.send
563 #[stable(feature = "rust1", since = "1.0.0")]
564 #[derive(PartialEq, Eq, Clone, Copy)]
565 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
567 /// An error returned from the [`recv`] function on a [`Receiver`].
569 /// The [`recv`] operation can only fail if the sending half of a
570 /// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
571 /// messages will ever be received.
573 /// [`recv`]: struct.Receiver.html#method.recv
574 /// [`Receiver`]: struct.Receiver.html
575 /// [`channel`]: fn.channel.html
576 /// [`sync_channel`]: fn.sync_channel.html
577 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
578 #[stable(feature = "rust1", since = "1.0.0")]
579 pub struct RecvError;
581 /// This enumeration is the list of the possible reasons that [`try_recv`] could
582 /// not return data when called. This can occur with both a [`channel`] and
583 /// a [`sync_channel`].
585 /// [`try_recv`]: struct.Receiver.html#method.try_recv
586 /// [`channel`]: fn.channel.html
587 /// [`sync_channel`]: fn.sync_channel.html
588 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
589 #[stable(feature = "rust1", since = "1.0.0")]
590 pub enum TryRecvError {
591 /// This **channel** is currently empty, but the **Sender**(s) have not yet
592 /// disconnected, so data may yet become available.
593 #[stable(feature = "rust1", since = "1.0.0")]
596 /// The **channel**'s sending half has become disconnected, and there will
597 /// never be any more data received on it.
598 #[stable(feature = "rust1", since = "1.0.0")]
602 /// This enumeration is the list of possible errors that made [`recv_timeout`]
603 /// unable to return data when called. This can occur with both a [`channel`] and
604 /// a [`sync_channel`].
606 /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
607 /// [`channel`]: fn.channel.html
608 /// [`sync_channel`]: fn.sync_channel.html
609 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
610 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
611 pub enum RecvTimeoutError {
612 /// This **channel** is currently empty, but the **Sender**(s) have not yet
613 /// disconnected, so data may yet become available.
614 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
616 /// The **channel**'s sending half has become disconnected, and there will
617 /// never be any more data received on it.
618 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
622 /// This enumeration is the list of the possible error outcomes for the
623 /// [`try_send`] method.
625 /// [`try_send`]: struct.SyncSender.html#method.try_send
626 #[stable(feature = "rust1", since = "1.0.0")]
627 #[derive(PartialEq, Eq, Clone, Copy)]
628 pub enum TrySendError<T> {
629 /// The data could not be sent on the [`sync_channel`] because it would require that
630 /// the callee block to send the data.
632 /// If this is a buffered channel, then the buffer is full at this time. If
633 /// this is not a buffered channel, then there is no [`Receiver`] available to
634 /// acquire the data.
636 /// [`sync_channel`]: fn.sync_channel.html
637 /// [`Receiver`]: struct.Receiver.html
638 #[stable(feature = "rust1", since = "1.0.0")]
639 Full(#[stable(feature = "rust1", since = "1.0.0")] T),
641 /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
642 /// sent. The data is returned back to the callee in this case.
644 /// [`sync_channel`]: fn.sync_channel.html
645 #[stable(feature = "rust1", since = "1.0.0")]
646 Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
650 Oneshot(Arc<oneshot::Packet<T>>),
651 Stream(Arc<stream::Packet<T>>),
652 Shared(Arc<shared::Packet<T>>),
653 Sync(Arc<sync::Packet<T>>),
657 trait UnsafeFlavor<T> {
658 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
659 unsafe fn inner_mut(&self) -> &mut Flavor<T> {
660 &mut *self.inner_unsafe().get()
662 unsafe fn inner(&self) -> &Flavor<T> {
663 &*self.inner_unsafe().get()
666 impl<T> UnsafeFlavor<T> for Sender<T> {
667 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
671 impl<T> UnsafeFlavor<T> for Receiver<T> {
672 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
677 /// Creates a new asynchronous channel, returning the sender/receiver halves.
678 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
679 /// the same order as it was sent, and no [`send`] will block the calling thread
680 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
681 /// block after its buffer limit is reached). [`recv`] will block until a message
684 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
685 /// only one [`Receiver`] is supported.
687 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
688 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
689 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
690 /// return a [`RecvError`].
692 /// [`send`]: struct.Sender.html#method.send
693 /// [`recv`]: struct.Receiver.html#method.recv
694 /// [`Sender`]: struct.Sender.html
695 /// [`Receiver`]: struct.Receiver.html
696 /// [`sync_channel`]: fn.sync_channel.html
697 /// [`SendError`]: struct.SendError.html
698 /// [`RecvError`]: struct.RecvError.html
703 /// use std::sync::mpsc::channel;
706 /// let (sender, receiver) = channel();
708 /// // Spawn off an expensive computation
709 /// thread::spawn(move|| {
710 /// # fn expensive_computation() {}
711 /// sender.send(expensive_computation()).unwrap();
714 /// // Do some useful work for awhile
716 /// // Let's see what that answer was
717 /// println!("{:?}", receiver.recv().unwrap());
719 #[stable(feature = "rust1", since = "1.0.0")]
720 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
721 let a = Arc::new(oneshot::Packet::new());
722 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
725 /// Creates a new synchronous, bounded channel.
726 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
727 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
728 /// [`Receiver`] will block until a message becomes available. `sync_channel`
729 /// differs greatly in the semantics of the sender, however.
731 /// This channel has an internal buffer on which messages will be queued.
732 /// `bound` specifies the buffer size. When the internal buffer becomes full,
733 /// future sends will *block* waiting for the buffer to open up. Note that a
734 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
735 /// where each [`send`] will not return until a [`recv`] is paired with it.
737 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
738 /// times, but only one [`Receiver`] is supported.
740 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
741 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
742 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
743 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
745 /// [`channel`]: fn.channel.html
746 /// [`send`]: struct.SyncSender.html#method.send
747 /// [`recv`]: struct.Receiver.html#method.recv
748 /// [`SyncSender`]: struct.SyncSender.html
749 /// [`Receiver`]: struct.Receiver.html
750 /// [`SendError`]: struct.SendError.html
751 /// [`RecvError`]: struct.RecvError.html
756 /// use std::sync::mpsc::sync_channel;
759 /// let (sender, receiver) = sync_channel(1);
761 /// // this returns immediately
762 /// sender.send(1).unwrap();
764 /// thread::spawn(move|| {
765 /// // this will block until the previous message has been received
766 /// sender.send(2).unwrap();
769 /// assert_eq!(receiver.recv().unwrap(), 1);
770 /// assert_eq!(receiver.recv().unwrap(), 2);
772 #[stable(feature = "rust1", since = "1.0.0")]
773 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
774 let a = Arc::new(sync::Packet::new(bound));
775 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
778 ////////////////////////////////////////////////////////////////////////////////
780 ////////////////////////////////////////////////////////////////////////////////
783 fn new(inner: Flavor<T>) -> Sender<T> {
785 inner: UnsafeCell::new(inner),
789 /// Attempts to send a value on this channel, returning it back if it could
792 /// A successful send occurs when it is determined that the other end of
793 /// the channel has not hung up already. An unsuccessful send would be one
794 /// where the corresponding receiver has already been deallocated. Note
795 /// that a return value of [`Err`] means that the data will never be
796 /// received, but a return value of [`Ok`] does *not* mean that the data
797 /// will be received. It is possible for the corresponding receiver to
798 /// hang up immediately after this function returns [`Ok`].
800 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
801 /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
803 /// This method will never block the current thread.
808 /// use std::sync::mpsc::channel;
810 /// let (tx, rx) = channel();
812 /// // This send is always successful
813 /// tx.send(1).unwrap();
815 /// // This send will fail because the receiver is gone
817 /// assert_eq!(tx.send(1).unwrap_err().0, 1);
819 #[stable(feature = "rust1", since = "1.0.0")]
820 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
821 let (new_inner, ret) = match *unsafe { self.inner() } {
822 Flavor::Oneshot(ref p) => {
824 return p.send(t).map_err(SendError);
826 let a = Arc::new(stream::Packet::new());
827 let rx = Receiver::new(Flavor::Stream(a.clone()));
828 match p.upgrade(rx) {
829 oneshot::UpSuccess => {
833 oneshot::UpDisconnected => (a, Err(t)),
834 oneshot::UpWoke(token) => {
835 // This send cannot panic because the thread is
836 // asleep (we're looking at it), so the receiver
838 a.send(t).ok().unwrap();
845 Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
846 Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
847 Flavor::Sync(..) => unreachable!(),
851 let tmp = Sender::new(Flavor::Stream(new_inner));
852 mem::swap(self.inner_mut(), tmp.inner_mut());
854 ret.map_err(SendError)
858 #[stable(feature = "rust1", since = "1.0.0")]
859 impl<T> Clone for Sender<T> {
860 fn clone(&self) -> Sender<T> {
861 let packet = match *unsafe { self.inner() } {
862 Flavor::Oneshot(ref p) => {
863 let a = Arc::new(shared::Packet::new());
865 let guard = a.postinit_lock();
866 let rx = Receiver::new(Flavor::Shared(a.clone()));
867 let sleeper = match p.upgrade(rx) {
869 oneshot::UpDisconnected => None,
870 oneshot::UpWoke(task) => Some(task),
872 a.inherit_blocker(sleeper, guard);
876 Flavor::Stream(ref p) => {
877 let a = Arc::new(shared::Packet::new());
879 let guard = a.postinit_lock();
880 let rx = Receiver::new(Flavor::Shared(a.clone()));
881 let sleeper = match p.upgrade(rx) {
883 stream::UpDisconnected => None,
884 stream::UpWoke(task) => Some(task),
886 a.inherit_blocker(sleeper, guard);
890 Flavor::Shared(ref p) => {
892 return Sender::new(Flavor::Shared(p.clone()));
894 Flavor::Sync(..) => unreachable!(),
898 let tmp = Sender::new(Flavor::Shared(packet.clone()));
899 mem::swap(self.inner_mut(), tmp.inner_mut());
901 Sender::new(Flavor::Shared(packet))
905 #[stable(feature = "rust1", since = "1.0.0")]
906 impl<T> Drop for Sender<T> {
908 match *unsafe { self.inner() } {
909 Flavor::Oneshot(ref p) => p.drop_chan(),
910 Flavor::Stream(ref p) => p.drop_chan(),
911 Flavor::Shared(ref p) => p.drop_chan(),
912 Flavor::Sync(..) => unreachable!(),
917 #[stable(feature = "mpsc_debug", since = "1.8.0")]
918 impl<T> fmt::Debug for Sender<T> {
919 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
920 f.debug_struct("Sender").finish()
924 ////////////////////////////////////////////////////////////////////////////////
926 ////////////////////////////////////////////////////////////////////////////////
928 impl<T> SyncSender<T> {
929 fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
933 /// Sends a value on this synchronous channel.
935 /// This function will *block* until space in the internal buffer becomes
936 /// available or a receiver is available to hand off the message to.
938 /// Note that a successful send does *not* guarantee that the receiver will
939 /// ever see the data if there is a buffer on this channel. Items may be
940 /// enqueued in the internal buffer for the receiver to receive at a later
941 /// time. If the buffer size is 0, however, the channel becomes a rendezvous
942 /// channel and it guarantees that the receiver has indeed received
943 /// the data if this function returns success.
945 /// This function will never panic, but it may return [`Err`] if the
946 /// [`Receiver`] has disconnected and is no longer able to receive
949 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
950 /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
955 /// use std::sync::mpsc::sync_channel;
958 /// // Create a rendezvous sync_channel with buffer size 0
959 /// let (sync_sender, receiver) = sync_channel(0);
961 /// thread::spawn(move || {
962 /// println!("sending message...");
963 /// sync_sender.send(1).unwrap();
964 /// // Thread is now blocked until the message is received
966 /// println!("...message received!");
969 /// let msg = receiver.recv().unwrap();
970 /// assert_eq!(1, msg);
972 #[stable(feature = "rust1", since = "1.0.0")]
973 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
974 self.inner.send(t).map_err(SendError)
977 /// Attempts to send a value on this channel without blocking.
979 /// This method differs from [`send`] by returning immediately if the
980 /// channel's buffer is full or no receiver is waiting to acquire some
981 /// data. Compared with [`send`], this function has two failure cases
982 /// instead of one (one for disconnection, one for a full buffer).
984 /// See [`send`] for notes about guarantees of whether the
985 /// receiver has received the data or not if this function is successful.
987 /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
992 /// use std::sync::mpsc::sync_channel;
995 /// // Create a sync_channel with buffer size 1
996 /// let (sync_sender, receiver) = sync_channel(1);
997 /// let sync_sender2 = sync_sender.clone();
999 /// // First thread owns sync_sender
1000 /// thread::spawn(move || {
1001 /// sync_sender.send(1).unwrap();
1002 /// sync_sender.send(2).unwrap();
1003 /// // Thread blocked
1006 /// // Second thread owns sync_sender2
1007 /// thread::spawn(move || {
1008 /// // This will return an error and send
1009 /// // no message if the buffer is full
1010 /// let _ = sync_sender2.try_send(3);
1014 /// msg = receiver.recv().unwrap();
1015 /// println!("message {} received", msg);
1017 /// msg = receiver.recv().unwrap();
1018 /// println!("message {} received", msg);
1020 /// // Third message may have never been sent
1021 /// match receiver.try_recv() {
1022 /// Ok(msg) => println!("message {} received", msg),
1023 /// Err(_) => println!("the third message was never sent"),
1026 #[stable(feature = "rust1", since = "1.0.0")]
1027 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
1028 self.inner.try_send(t)
1032 #[stable(feature = "rust1", since = "1.0.0")]
1033 impl<T> Clone for SyncSender<T> {
1034 fn clone(&self) -> SyncSender<T> {
1035 self.inner.clone_chan();
1036 SyncSender::new(self.inner.clone())
1040 #[stable(feature = "rust1", since = "1.0.0")]
1041 impl<T> Drop for SyncSender<T> {
1042 fn drop(&mut self) {
1043 self.inner.drop_chan();
1047 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1048 impl<T> fmt::Debug for SyncSender<T> {
1049 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1050 f.debug_struct("SyncSender").finish()
1054 ////////////////////////////////////////////////////////////////////////////////
1056 ////////////////////////////////////////////////////////////////////////////////
1058 impl<T> Receiver<T> {
1059 fn new(inner: Flavor<T>) -> Receiver<T> {
1060 Receiver { inner: UnsafeCell::new(inner) }
1063 /// Attempts to return a pending value on this receiver without blocking.
1065 /// This method will never block the caller in order to wait for data to
1066 /// become available. Instead, this will always return immediately with a
1067 /// possible option of pending data on the channel.
1069 /// This is useful for a flavor of "optimistic check" before deciding to
1070 /// block on a receiver.
1072 /// Compared with [`recv`], this function has two failure cases instead of one
1073 /// (one for disconnection, one for an empty buffer).
1075 /// [`recv`]: struct.Receiver.html#method.recv
1080 /// use std::sync::mpsc::{Receiver, channel};
1082 /// let (_, receiver): (_, Receiver<i32>) = channel();
1084 /// assert!(receiver.try_recv().is_err());
1086 #[stable(feature = "rust1", since = "1.0.0")]
1087 pub fn try_recv(&self) -> Result<T, TryRecvError> {
1089 let new_port = match *unsafe { self.inner() } {
1090 Flavor::Oneshot(ref p) => {
1091 match p.try_recv() {
1092 Ok(t) => return Ok(t),
1093 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1094 Err(oneshot::Disconnected) => {
1095 return Err(TryRecvError::Disconnected)
1097 Err(oneshot::Upgraded(rx)) => rx,
1100 Flavor::Stream(ref p) => {
1101 match p.try_recv() {
1102 Ok(t) => return Ok(t),
1103 Err(stream::Empty) => return Err(TryRecvError::Empty),
1104 Err(stream::Disconnected) => {
1105 return Err(TryRecvError::Disconnected)
1107 Err(stream::Upgraded(rx)) => rx,
1110 Flavor::Shared(ref p) => {
1111 match p.try_recv() {
1112 Ok(t) => return Ok(t),
1113 Err(shared::Empty) => return Err(TryRecvError::Empty),
1114 Err(shared::Disconnected) => {
1115 return Err(TryRecvError::Disconnected)
1119 Flavor::Sync(ref p) => {
1120 match p.try_recv() {
1121 Ok(t) => return Ok(t),
1122 Err(sync::Empty) => return Err(TryRecvError::Empty),
1123 Err(sync::Disconnected) => {
1124 return Err(TryRecvError::Disconnected)
1130 mem::swap(self.inner_mut(),
1131 new_port.inner_mut());
1136 /// Attempts to wait for a value on this receiver, returning an error if the
1137 /// corresponding channel has hung up.
1139 /// This function will always block the current thread if there is no data
1140 /// available and it's possible for more data to be sent. Once a message is
1141 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1142 /// receiver will wake up and return that message.
1144 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1145 /// this call is blocking, this call will wake up and return [`Err`] to
1146 /// indicate that no more messages can ever be received on this channel.
1147 /// However, since channels are buffered, messages sent before the disconnect
1148 /// will still be properly received.
1150 /// [`Sender`]: struct.Sender.html
1151 /// [`SyncSender`]: struct.SyncSender.html
1152 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1157 /// use std::sync::mpsc;
1158 /// use std::thread;
1160 /// let (send, recv) = mpsc::channel();
1161 /// let handle = thread::spawn(move || {
1162 /// send.send(1u8).unwrap();
1165 /// handle.join().unwrap();
1167 /// assert_eq!(Ok(1), recv.recv());
1170 /// Buffering behavior:
1173 /// use std::sync::mpsc;
1174 /// use std::thread;
1175 /// use std::sync::mpsc::RecvError;
1177 /// let (send, recv) = mpsc::channel();
1178 /// let handle = thread::spawn(move || {
1179 /// send.send(1u8).unwrap();
1180 /// send.send(2).unwrap();
1181 /// send.send(3).unwrap();
1185 /// // wait for the thread to join so we ensure the sender is dropped
1186 /// handle.join().unwrap();
1188 /// assert_eq!(Ok(1), recv.recv());
1189 /// assert_eq!(Ok(2), recv.recv());
1190 /// assert_eq!(Ok(3), recv.recv());
1191 /// assert_eq!(Err(RecvError), recv.recv());
1193 #[stable(feature = "rust1", since = "1.0.0")]
1194 pub fn recv(&self) -> Result<T, RecvError> {
1196 let new_port = match *unsafe { self.inner() } {
1197 Flavor::Oneshot(ref p) => {
1198 match p.recv(None) {
1199 Ok(t) => return Ok(t),
1200 Err(oneshot::Disconnected) => return Err(RecvError),
1201 Err(oneshot::Upgraded(rx)) => rx,
1202 Err(oneshot::Empty) => unreachable!(),
1205 Flavor::Stream(ref p) => {
1206 match p.recv(None) {
1207 Ok(t) => return Ok(t),
1208 Err(stream::Disconnected) => return Err(RecvError),
1209 Err(stream::Upgraded(rx)) => rx,
1210 Err(stream::Empty) => unreachable!(),
1213 Flavor::Shared(ref p) => {
1214 match p.recv(None) {
1215 Ok(t) => return Ok(t),
1216 Err(shared::Disconnected) => return Err(RecvError),
1217 Err(shared::Empty) => unreachable!(),
1220 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1223 mem::swap(self.inner_mut(), new_port.inner_mut());
1228 /// Attempts to wait for a value on this receiver, returning an error if the
1229 /// corresponding channel has hung up, or if it waits more than `timeout`.
1231 /// This function will always block the current thread if there is no data
1232 /// available and it's possible for more data to be sent. Once a message is
1233 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1234 /// receiver will wake up and return that message.
1236 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1237 /// this call is blocking, this call will wake up and return [`Err`] to
1238 /// indicate that no more messages can ever be received on this channel.
1239 /// However, since channels are buffered, messages sent before the disconnect
1240 /// will still be properly received.
1242 /// [`Sender`]: struct.Sender.html
1243 /// [`SyncSender`]: struct.SyncSender.html
1244 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1248 /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
1249 /// to panic unexpectedly with the following example:
1252 /// use std::sync::mpsc::channel;
1253 /// use std::thread;
1254 /// use std::time::Duration;
1256 /// let (tx, rx) = channel::<String>();
1258 /// thread::spawn(move || {
1259 /// let d = Duration::from_millis(10);
1261 /// println!("recv");
1262 /// let _r = rx.recv_timeout(d);
1266 /// thread::sleep(Duration::from_millis(100));
1267 /// let _c1 = tx.clone();
1269 /// thread::sleep(Duration::from_secs(1));
1272 /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
1276 /// Successfully receiving value before encountering timeout:
1279 /// use std::thread;
1280 /// use std::time::Duration;
1281 /// use std::sync::mpsc;
1283 /// let (send, recv) = mpsc::channel();
1285 /// thread::spawn(move || {
1286 /// send.send('a').unwrap();
1290 /// recv.recv_timeout(Duration::from_millis(400)),
1295 /// Receiving an error upon reaching timeout:
1298 /// use std::thread;
1299 /// use std::time::Duration;
1300 /// use std::sync::mpsc;
1302 /// let (send, recv) = mpsc::channel();
1304 /// thread::spawn(move || {
1305 /// thread::sleep(Duration::from_millis(800));
1306 /// send.send('a').unwrap();
1310 /// recv.recv_timeout(Duration::from_millis(400)),
1311 /// Err(mpsc::RecvTimeoutError::Timeout)
1314 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1315 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1316 // Do an optimistic try_recv to avoid the performance impact of
1317 // Instant::now() in the full-channel case.
1318 match self.try_recv() {
1319 Ok(result) => Ok(result),
1320 Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
1321 Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
1322 Some(deadline) => self.recv_deadline(deadline),
1323 // So far in the future that it's practically the same as waiting indefinitely.
1324 None => self.recv().map_err(RecvTimeoutError::from),
1329 /// Attempts to wait for a value on this receiver, returning an error if the
1330 /// corresponding channel has hung up, or if `deadline` is reached.
1332 /// This function will always block the current thread if there is no data
1333 /// available and it's possible for more data to be sent. Once a message is
1334 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1335 /// receiver will wake up and return that message.
1337 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1338 /// this call is blocking, this call will wake up and return [`Err`] to
1339 /// indicate that no more messages can ever be received on this channel.
1340 /// However, since channels are buffered, messages sent before the disconnect
1341 /// will still be properly received.
1343 /// [`Sender`]: struct.Sender.html
1344 /// [`SyncSender`]: struct.SyncSender.html
1345 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1349 /// Successfully receiving value before reaching deadline:
1352 /// #![feature(deadline_api)]
1353 /// use std::thread;
1354 /// use std::time::{Duration, Instant};
1355 /// use std::sync::mpsc;
1357 /// let (send, recv) = mpsc::channel();
1359 /// thread::spawn(move || {
1360 /// send.send('a').unwrap();
1364 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1369 /// Receiving an error upon reaching deadline:
1372 /// #![feature(deadline_api)]
1373 /// use std::thread;
1374 /// use std::time::{Duration, Instant};
1375 /// use std::sync::mpsc;
1377 /// let (send, recv) = mpsc::channel();
1379 /// thread::spawn(move || {
1380 /// thread::sleep(Duration::from_millis(800));
1381 /// send.send('a').unwrap();
1385 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1386 /// Err(mpsc::RecvTimeoutError::Timeout)
1389 #[unstable(feature = "deadline_api", issue = "46316")]
1390 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1391 use self::RecvTimeoutError::*;
1394 let port_or_empty = match *unsafe { self.inner() } {
1395 Flavor::Oneshot(ref p) => {
1396 match p.recv(Some(deadline)) {
1397 Ok(t) => return Ok(t),
1398 Err(oneshot::Disconnected) => return Err(Disconnected),
1399 Err(oneshot::Upgraded(rx)) => Some(rx),
1400 Err(oneshot::Empty) => None,
1403 Flavor::Stream(ref p) => {
1404 match p.recv(Some(deadline)) {
1405 Ok(t) => return Ok(t),
1406 Err(stream::Disconnected) => return Err(Disconnected),
1407 Err(stream::Upgraded(rx)) => Some(rx),
1408 Err(stream::Empty) => None,
1411 Flavor::Shared(ref p) => {
1412 match p.recv(Some(deadline)) {
1413 Ok(t) => return Ok(t),
1414 Err(shared::Disconnected) => return Err(Disconnected),
1415 Err(shared::Empty) => None,
1418 Flavor::Sync(ref p) => {
1419 match p.recv(Some(deadline)) {
1420 Ok(t) => return Ok(t),
1421 Err(sync::Disconnected) => return Err(Disconnected),
1422 Err(sync::Empty) => None,
1427 if let Some(new_port) = port_or_empty {
1429 mem::swap(self.inner_mut(), new_port.inner_mut());
1433 // If we're already passed the deadline, and we're here without
1434 // data, return a timeout, else try again.
1435 if Instant::now() >= deadline {
1436 return Err(Timeout);
1441 /// Returns an iterator that will block waiting for messages, but never
1442 /// [`panic!`]. It will return [`None`] when the channel has hung up.
1444 /// [`panic!`]: ../../../std/macro.panic.html
1445 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1450 /// use std::sync::mpsc::channel;
1451 /// use std::thread;
1453 /// let (send, recv) = channel();
1455 /// thread::spawn(move || {
1456 /// send.send(1).unwrap();
1457 /// send.send(2).unwrap();
1458 /// send.send(3).unwrap();
1461 /// let mut iter = recv.iter();
1462 /// assert_eq!(iter.next(), Some(1));
1463 /// assert_eq!(iter.next(), Some(2));
1464 /// assert_eq!(iter.next(), Some(3));
1465 /// assert_eq!(iter.next(), None);
1467 #[stable(feature = "rust1", since = "1.0.0")]
1468 pub fn iter(&self) -> Iter<'_, T> {
1472 /// Returns an iterator that will attempt to yield all pending values.
1473 /// It will return `None` if there are no more pending values or if the
1474 /// channel has hung up. The iterator will never [`panic!`] or block the
1475 /// user by waiting for values.
1477 /// [`panic!`]: ../../../std/macro.panic.html
1482 /// use std::sync::mpsc::channel;
1483 /// use std::thread;
1484 /// use std::time::Duration;
1486 /// let (sender, receiver) = channel();
1488 /// // nothing is in the buffer yet
1489 /// assert!(receiver.try_iter().next().is_none());
1491 /// thread::spawn(move || {
1492 /// thread::sleep(Duration::from_secs(1));
1493 /// sender.send(1).unwrap();
1494 /// sender.send(2).unwrap();
1495 /// sender.send(3).unwrap();
1498 /// // nothing is in the buffer yet
1499 /// assert!(receiver.try_iter().next().is_none());
1501 /// // block for two seconds
1502 /// thread::sleep(Duration::from_secs(2));
1504 /// let mut iter = receiver.try_iter();
1505 /// assert_eq!(iter.next(), Some(1));
1506 /// assert_eq!(iter.next(), Some(2));
1507 /// assert_eq!(iter.next(), Some(3));
1508 /// assert_eq!(iter.next(), None);
1510 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1511 pub fn try_iter(&self) -> TryIter<'_, T> {
1512 TryIter { rx: self }
1517 impl<T> select::Packet for Receiver<T> {
1518 fn can_recv(&self) -> bool {
1520 let new_port = match *unsafe { self.inner() } {
1521 Flavor::Oneshot(ref p) => {
1522 match p.can_recv() {
1523 Ok(ret) => return ret,
1524 Err(upgrade) => upgrade,
1527 Flavor::Stream(ref p) => {
1528 match p.can_recv() {
1529 Ok(ret) => return ret,
1530 Err(upgrade) => upgrade,
1533 Flavor::Shared(ref p) => return p.can_recv(),
1534 Flavor::Sync(ref p) => return p.can_recv(),
1537 mem::swap(self.inner_mut(),
1538 new_port.inner_mut());
1543 fn start_selection(&self, mut token: SignalToken) -> StartResult {
1545 let (t, new_port) = match *unsafe { self.inner() } {
1546 Flavor::Oneshot(ref p) => {
1547 match p.start_selection(token) {
1548 oneshot::SelSuccess => return Installed,
1549 oneshot::SelCanceled => return Abort,
1550 oneshot::SelUpgraded(t, rx) => (t, rx),
1553 Flavor::Stream(ref p) => {
1554 match p.start_selection(token) {
1555 stream::SelSuccess => return Installed,
1556 stream::SelCanceled => return Abort,
1557 stream::SelUpgraded(t, rx) => (t, rx),
1560 Flavor::Shared(ref p) => return p.start_selection(token),
1561 Flavor::Sync(ref p) => return p.start_selection(token),
1565 mem::swap(self.inner_mut(), new_port.inner_mut());
1570 fn abort_selection(&self) -> bool {
1571 let mut was_upgrade = false;
1573 let result = match *unsafe { self.inner() } {
1574 Flavor::Oneshot(ref p) => p.abort_selection(),
1575 Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
1576 Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
1577 Flavor::Sync(ref p) => return p.abort_selection(),
1579 let new_port = match result { Ok(b) => return b, Err(p) => p };
1582 mem::swap(self.inner_mut(),
1583 new_port.inner_mut());
1589 #[stable(feature = "rust1", since = "1.0.0")]
1590 impl<'a, T> Iterator for Iter<'a, T> {
1593 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1596 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1597 impl<'a, T> Iterator for TryIter<'a, T> {
1600 fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
1603 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1604 impl<'a, T> IntoIterator for &'a Receiver<T> {
1606 type IntoIter = Iter<'a, T>;
1608 fn into_iter(self) -> Iter<'a, T> { self.iter() }
1611 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1612 impl<T> Iterator for IntoIter<T> {
1614 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1617 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1618 impl <T> IntoIterator for Receiver<T> {
1620 type IntoIter = IntoIter<T>;
1622 fn into_iter(self) -> IntoIter<T> {
1623 IntoIter { rx: self }
1627 #[stable(feature = "rust1", since = "1.0.0")]
1628 impl<T> Drop for Receiver<T> {
1629 fn drop(&mut self) {
1630 match *unsafe { self.inner() } {
1631 Flavor::Oneshot(ref p) => p.drop_port(),
1632 Flavor::Stream(ref p) => p.drop_port(),
1633 Flavor::Shared(ref p) => p.drop_port(),
1634 Flavor::Sync(ref p) => p.drop_port(),
1639 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1640 impl<T> fmt::Debug for Receiver<T> {
1641 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1642 f.debug_struct("Receiver").finish()
1646 #[stable(feature = "rust1", since = "1.0.0")]
1647 impl<T> fmt::Debug for SendError<T> {
1648 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1649 "SendError(..)".fmt(f)
1653 #[stable(feature = "rust1", since = "1.0.0")]
1654 impl<T> fmt::Display for SendError<T> {
1655 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1656 "sending on a closed channel".fmt(f)
1660 #[stable(feature = "rust1", since = "1.0.0")]
1661 impl<T: Send> error::Error for SendError<T> {
1662 fn description(&self) -> &str {
1663 "sending on a closed channel"
1666 fn cause(&self) -> Option<&dyn error::Error> {
1671 #[stable(feature = "rust1", since = "1.0.0")]
1672 impl<T> fmt::Debug for TrySendError<T> {
1673 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1675 TrySendError::Full(..) => "Full(..)".fmt(f),
1676 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1681 #[stable(feature = "rust1", since = "1.0.0")]
1682 impl<T> fmt::Display for TrySendError<T> {
1683 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1685 TrySendError::Full(..) => {
1686 "sending on a full channel".fmt(f)
1688 TrySendError::Disconnected(..) => {
1689 "sending on a closed channel".fmt(f)
1695 #[stable(feature = "rust1", since = "1.0.0")]
1696 impl<T: Send> error::Error for TrySendError<T> {
1698 fn description(&self) -> &str {
1700 TrySendError::Full(..) => {
1701 "sending on a full channel"
1703 TrySendError::Disconnected(..) => {
1704 "sending on a closed channel"
1709 fn cause(&self) -> Option<&dyn error::Error> {
1714 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1715 impl<T> From<SendError<T>> for TrySendError<T> {
1716 fn from(err: SendError<T>) -> TrySendError<T> {
1718 SendError(t) => TrySendError::Disconnected(t),
1723 #[stable(feature = "rust1", since = "1.0.0")]
1724 impl fmt::Display for RecvError {
1725 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1726 "receiving on a closed channel".fmt(f)
1730 #[stable(feature = "rust1", since = "1.0.0")]
1731 impl error::Error for RecvError {
1733 fn description(&self) -> &str {
1734 "receiving on a closed channel"
1737 fn cause(&self) -> Option<&dyn error::Error> {
1742 #[stable(feature = "rust1", since = "1.0.0")]
1743 impl fmt::Display for TryRecvError {
1744 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1746 TryRecvError::Empty => {
1747 "receiving on an empty channel".fmt(f)
1749 TryRecvError::Disconnected => {
1750 "receiving on a closed channel".fmt(f)
1756 #[stable(feature = "rust1", since = "1.0.0")]
1757 impl error::Error for TryRecvError {
1759 fn description(&self) -> &str {
1761 TryRecvError::Empty => {
1762 "receiving on an empty channel"
1764 TryRecvError::Disconnected => {
1765 "receiving on a closed channel"
1770 fn cause(&self) -> Option<&dyn error::Error> {
1775 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1776 impl From<RecvError> for TryRecvError {
1777 fn from(err: RecvError) -> TryRecvError {
1779 RecvError => TryRecvError::Disconnected,
1784 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1785 impl fmt::Display for RecvTimeoutError {
1786 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1788 RecvTimeoutError::Timeout => {
1789 "timed out waiting on channel".fmt(f)
1791 RecvTimeoutError::Disconnected => {
1792 "channel is empty and sending half is closed".fmt(f)
1798 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1799 impl error::Error for RecvTimeoutError {
1800 fn description(&self) -> &str {
1802 RecvTimeoutError::Timeout => {
1803 "timed out waiting on channel"
1805 RecvTimeoutError::Disconnected => {
1806 "channel is empty and sending half is closed"
1811 fn cause(&self) -> Option<&dyn error::Error> {
1816 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1817 impl From<RecvError> for RecvTimeoutError {
1818 fn from(err: RecvError) -> RecvTimeoutError {
1820 RecvError => RecvTimeoutError::Disconnected,
1825 #[cfg(all(test, not(target_os = "emscripten")))]
1830 use crate::time::{Duration, Instant};
1832 pub fn stress_factor() -> usize {
1833 match env::var("RUST_TEST_STRESS") {
1834 Ok(val) => val.parse().unwrap(),
1841 let (tx, rx) = channel::<i32>();
1842 tx.send(1).unwrap();
1843 assert_eq!(rx.recv().unwrap(), 1);
1848 let (tx, _rx) = channel::<Box<isize>>();
1849 tx.send(box 1).unwrap();
1853 fn drop_full_shared() {
1854 let (tx, _rx) = channel::<Box<isize>>();
1857 tx.send(box 1).unwrap();
1862 let (tx, rx) = channel::<i32>();
1863 tx.send(1).unwrap();
1864 assert_eq!(rx.recv().unwrap(), 1);
1865 let tx = tx.clone();
1866 tx.send(1).unwrap();
1867 assert_eq!(rx.recv().unwrap(), 1);
1871 fn smoke_threads() {
1872 let (tx, rx) = channel::<i32>();
1873 let _t = thread::spawn(move|| {
1874 tx.send(1).unwrap();
1876 assert_eq!(rx.recv().unwrap(), 1);
1880 fn smoke_port_gone() {
1881 let (tx, rx) = channel::<i32>();
1883 assert!(tx.send(1).is_err());
1887 fn smoke_shared_port_gone() {
1888 let (tx, rx) = channel::<i32>();
1890 assert!(tx.send(1).is_err())
1894 fn smoke_shared_port_gone2() {
1895 let (tx, rx) = channel::<i32>();
1897 let tx2 = tx.clone();
1899 assert!(tx2.send(1).is_err());
1903 fn port_gone_concurrent() {
1904 let (tx, rx) = channel::<i32>();
1905 let _t = thread::spawn(move|| {
1908 while tx.send(1).is_ok() {}
1912 fn port_gone_concurrent_shared() {
1913 let (tx, rx) = channel::<i32>();
1914 let tx2 = tx.clone();
1915 let _t = thread::spawn(move|| {
1918 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1922 fn smoke_chan_gone() {
1923 let (tx, rx) = channel::<i32>();
1925 assert!(rx.recv().is_err());
1929 fn smoke_chan_gone_shared() {
1930 let (tx, rx) = channel::<()>();
1931 let tx2 = tx.clone();
1934 assert!(rx.recv().is_err());
1938 fn chan_gone_concurrent() {
1939 let (tx, rx) = channel::<i32>();
1940 let _t = thread::spawn(move|| {
1941 tx.send(1).unwrap();
1942 tx.send(1).unwrap();
1944 while rx.recv().is_ok() {}
1949 let (tx, rx) = channel::<i32>();
1950 let t = thread::spawn(move|| {
1951 for _ in 0..10000 { tx.send(1).unwrap(); }
1954 assert_eq!(rx.recv().unwrap(), 1);
1956 t.join().ok().expect("thread panicked");
1960 fn stress_shared() {
1961 const AMT: u32 = 10000;
1962 const NTHREADS: u32 = 8;
1963 let (tx, rx) = channel::<i32>();
1965 let t = thread::spawn(move|| {
1966 for _ in 0..AMT * NTHREADS {
1967 assert_eq!(rx.recv().unwrap(), 1);
1969 match rx.try_recv() {
1975 for _ in 0..NTHREADS {
1976 let tx = tx.clone();
1977 thread::spawn(move|| {
1978 for _ in 0..AMT { tx.send(1).unwrap(); }
1982 t.join().ok().expect("thread panicked");
1986 fn send_from_outside_runtime() {
1987 let (tx1, rx1) = channel::<()>();
1988 let (tx2, rx2) = channel::<i32>();
1989 let t1 = thread::spawn(move|| {
1990 tx1.send(()).unwrap();
1992 assert_eq!(rx2.recv().unwrap(), 1);
1995 rx1.recv().unwrap();
1996 let t2 = thread::spawn(move|| {
1998 tx2.send(1).unwrap();
2001 t1.join().ok().expect("thread panicked");
2002 t2.join().ok().expect("thread panicked");
2006 fn recv_from_outside_runtime() {
2007 let (tx, rx) = channel::<i32>();
2008 let t = thread::spawn(move|| {
2010 assert_eq!(rx.recv().unwrap(), 1);
2014 tx.send(1).unwrap();
2016 t.join().ok().expect("thread panicked");
2021 let (tx1, rx1) = channel::<i32>();
2022 let (tx2, rx2) = channel::<i32>();
2023 let t1 = thread::spawn(move|| {
2024 assert_eq!(rx1.recv().unwrap(), 1);
2025 tx2.send(2).unwrap();
2027 let t2 = thread::spawn(move|| {
2028 tx1.send(1).unwrap();
2029 assert_eq!(rx2.recv().unwrap(), 2);
2031 t1.join().ok().expect("thread panicked");
2032 t2.join().ok().expect("thread panicked");
2036 fn oneshot_single_thread_close_port_first() {
2037 // Simple test of closing without sending
2038 let (_tx, rx) = channel::<i32>();
2043 fn oneshot_single_thread_close_chan_first() {
2044 // Simple test of closing without sending
2045 let (tx, _rx) = channel::<i32>();
2050 fn oneshot_single_thread_send_port_close() {
2051 // Testing that the sender cleans up the payload if receiver is closed
2052 let (tx, rx) = channel::<Box<i32>>();
2054 assert!(tx.send(box 0).is_err());
2058 fn oneshot_single_thread_recv_chan_close() {
2059 // Receiving on a closed chan will panic
2060 let res = thread::spawn(move|| {
2061 let (tx, rx) = channel::<i32>();
2066 assert!(res.is_err());
2070 fn oneshot_single_thread_send_then_recv() {
2071 let (tx, rx) = channel::<Box<i32>>();
2072 tx.send(box 10).unwrap();
2073 assert!(*rx.recv().unwrap() == 10);
2077 fn oneshot_single_thread_try_send_open() {
2078 let (tx, rx) = channel::<i32>();
2079 assert!(tx.send(10).is_ok());
2080 assert!(rx.recv().unwrap() == 10);
2084 fn oneshot_single_thread_try_send_closed() {
2085 let (tx, rx) = channel::<i32>();
2087 assert!(tx.send(10).is_err());
2091 fn oneshot_single_thread_try_recv_open() {
2092 let (tx, rx) = channel::<i32>();
2093 tx.send(10).unwrap();
2094 assert!(rx.recv() == Ok(10));
2098 fn oneshot_single_thread_try_recv_closed() {
2099 let (tx, rx) = channel::<i32>();
2101 assert!(rx.recv().is_err());
2105 fn oneshot_single_thread_peek_data() {
2106 let (tx, rx) = channel::<i32>();
2107 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2108 tx.send(10).unwrap();
2109 assert_eq!(rx.try_recv(), Ok(10));
2113 fn oneshot_single_thread_peek_close() {
2114 let (tx, rx) = channel::<i32>();
2116 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2117 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2121 fn oneshot_single_thread_peek_open() {
2122 let (_tx, rx) = channel::<i32>();
2123 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2127 fn oneshot_multi_task_recv_then_send() {
2128 let (tx, rx) = channel::<Box<i32>>();
2129 let _t = thread::spawn(move|| {
2130 assert!(*rx.recv().unwrap() == 10);
2133 tx.send(box 10).unwrap();
2137 fn oneshot_multi_task_recv_then_close() {
2138 let (tx, rx) = channel::<Box<i32>>();
2139 let _t = thread::spawn(move|| {
2142 let res = thread::spawn(move|| {
2143 assert!(*rx.recv().unwrap() == 10);
2145 assert!(res.is_err());
2149 fn oneshot_multi_thread_close_stress() {
2150 for _ in 0..stress_factor() {
2151 let (tx, rx) = channel::<i32>();
2152 let _t = thread::spawn(move|| {
2160 fn oneshot_multi_thread_send_close_stress() {
2161 for _ in 0..stress_factor() {
2162 let (tx, rx) = channel::<i32>();
2163 let _t = thread::spawn(move|| {
2166 let _ = thread::spawn(move|| {
2167 tx.send(1).unwrap();
2173 fn oneshot_multi_thread_recv_close_stress() {
2174 for _ in 0..stress_factor() {
2175 let (tx, rx) = channel::<i32>();
2176 thread::spawn(move|| {
2177 let res = thread::spawn(move|| {
2180 assert!(res.is_err());
2182 let _t = thread::spawn(move|| {
2183 thread::spawn(move|| {
2191 fn oneshot_multi_thread_send_recv_stress() {
2192 for _ in 0..stress_factor() {
2193 let (tx, rx) = channel::<Box<isize>>();
2194 let _t = thread::spawn(move|| {
2195 tx.send(box 10).unwrap();
2197 assert!(*rx.recv().unwrap() == 10);
2202 fn stream_send_recv_stress() {
2203 for _ in 0..stress_factor() {
2204 let (tx, rx) = channel();
2209 fn send(tx: Sender<Box<i32>>, i: i32) {
2210 if i == 10 { return }
2212 thread::spawn(move|| {
2213 tx.send(box i).unwrap();
2218 fn recv(rx: Receiver<Box<i32>>, i: i32) {
2219 if i == 10 { return }
2221 thread::spawn(move|| {
2222 assert!(*rx.recv().unwrap() == i);
2230 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2231 fn oneshot_single_thread_recv_timeout() {
2232 let (tx, rx) = channel();
2233 tx.send(()).unwrap();
2234 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2235 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2236 tx.send(()).unwrap();
2237 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2241 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2242 fn stress_recv_timeout_two_threads() {
2243 let (tx, rx) = channel();
2244 let stress = stress_factor() + 100;
2245 let timeout = Duration::from_millis(100);
2247 thread::spawn(move || {
2248 for i in 0..stress {
2250 thread::sleep(timeout * 2);
2252 tx.send(1usize).unwrap();
2256 let mut recv_count = 0;
2258 match rx.recv_timeout(timeout) {
2260 assert_eq!(n, 1usize);
2263 Err(RecvTimeoutError::Timeout) => continue,
2264 Err(RecvTimeoutError::Disconnected) => break,
2268 assert_eq!(recv_count, stress);
2272 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2273 fn recv_timeout_upgrade() {
2274 let (tx, rx) = channel::<()>();
2275 let timeout = Duration::from_millis(1);
2276 let _tx_clone = tx.clone();
2278 let start = Instant::now();
2279 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
2280 assert!(Instant::now() >= start + timeout);
2284 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2285 fn stress_recv_timeout_shared() {
2286 let (tx, rx) = channel();
2287 let stress = stress_factor() + 100;
2289 for i in 0..stress {
2290 let tx = tx.clone();
2291 thread::spawn(move || {
2292 thread::sleep(Duration::from_millis(i as u64 * 10));
2293 tx.send(1usize).unwrap();
2299 let mut recv_count = 0;
2301 match rx.recv_timeout(Duration::from_millis(10)) {
2303 assert_eq!(n, 1usize);
2306 Err(RecvTimeoutError::Timeout) => continue,
2307 Err(RecvTimeoutError::Disconnected) => break,
2311 assert_eq!(recv_count, stress);
2315 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2316 fn very_long_recv_timeout_wont_panic() {
2317 let (tx, rx) = channel::<()>();
2318 let join_handle = thread::spawn(move || {
2319 rx.recv_timeout(Duration::from_secs(u64::max_value()))
2321 thread::sleep(Duration::from_secs(1));
2322 assert!(tx.send(()).is_ok());
2323 assert_eq!(join_handle.join().unwrap(), Ok(()));
2328 // Regression test that we don't run out of stack in scheduler context
2329 let (tx, rx) = channel();
2330 for _ in 0..10000 { tx.send(()).unwrap(); }
2331 for _ in 0..10000 { rx.recv().unwrap(); }
2335 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2336 fn shared_recv_timeout() {
2337 let (tx, rx) = channel();
2340 let tx = tx.clone();
2341 thread::spawn(move|| {
2342 tx.send(()).unwrap();
2346 for _ in 0..total { rx.recv().unwrap(); }
2348 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2349 tx.send(()).unwrap();
2350 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2354 fn shared_chan_stress() {
2355 let (tx, rx) = channel();
2356 let total = stress_factor() + 100;
2358 let tx = tx.clone();
2359 thread::spawn(move|| {
2360 tx.send(()).unwrap();
2370 fn test_nested_recv_iter() {
2371 let (tx, rx) = channel::<i32>();
2372 let (total_tx, total_rx) = channel::<i32>();
2374 let _t = thread::spawn(move|| {
2376 for x in rx.iter() {
2379 total_tx.send(acc).unwrap();
2382 tx.send(3).unwrap();
2383 tx.send(1).unwrap();
2384 tx.send(2).unwrap();
2386 assert_eq!(total_rx.recv().unwrap(), 6);
2390 fn test_recv_iter_break() {
2391 let (tx, rx) = channel::<i32>();
2392 let (count_tx, count_rx) = channel();
2394 let _t = thread::spawn(move|| {
2396 for x in rx.iter() {
2403 count_tx.send(count).unwrap();
2406 tx.send(2).unwrap();
2407 tx.send(2).unwrap();
2408 tx.send(2).unwrap();
2411 assert_eq!(count_rx.recv().unwrap(), 4);
2415 fn test_recv_try_iter() {
2416 let (request_tx, request_rx) = channel();
2417 let (response_tx, response_rx) = channel();
2419 // Request `x`s until we have `6`.
2420 let t = thread::spawn(move|| {
2423 for x in response_rx.try_iter() {
2429 request_tx.send(()).unwrap();
2433 for _ in request_rx.iter() {
2434 if response_tx.send(2).is_err() {
2439 assert_eq!(t.join().unwrap(), 6);
2443 fn test_recv_into_iter_owned() {
2445 let (tx, rx) = channel::<i32>();
2446 tx.send(1).unwrap();
2447 tx.send(2).unwrap();
2451 assert_eq!(iter.next().unwrap(), 1);
2452 assert_eq!(iter.next().unwrap(), 2);
2453 assert_eq!(iter.next().is_none(), true);
2457 fn test_recv_into_iter_borrowed() {
2458 let (tx, rx) = channel::<i32>();
2459 tx.send(1).unwrap();
2460 tx.send(2).unwrap();
2462 let mut iter = (&rx).into_iter();
2463 assert_eq!(iter.next().unwrap(), 1);
2464 assert_eq!(iter.next().unwrap(), 2);
2465 assert_eq!(iter.next().is_none(), true);
2469 fn try_recv_states() {
2470 let (tx1, rx1) = channel::<i32>();
2471 let (tx2, rx2) = channel::<()>();
2472 let (tx3, rx3) = channel::<()>();
2473 let _t = thread::spawn(move|| {
2474 rx2.recv().unwrap();
2475 tx1.send(1).unwrap();
2476 tx3.send(()).unwrap();
2477 rx2.recv().unwrap();
2479 tx3.send(()).unwrap();
2482 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2483 tx2.send(()).unwrap();
2484 rx3.recv().unwrap();
2485 assert_eq!(rx1.try_recv(), Ok(1));
2486 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2487 tx2.send(()).unwrap();
2488 rx3.recv().unwrap();
2489 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2492 // This bug used to end up in a livelock inside of the Receiver destructor
2493 // because the internal state of the Shared packet was corrupted
2495 fn destroy_upgraded_shared_port_when_sender_still_active() {
2496 let (tx, rx) = channel();
2497 let (tx2, rx2) = channel();
2498 let _t = thread::spawn(move|| {
2499 rx.recv().unwrap(); // wait on a oneshot
2500 drop(rx); // destroy a shared
2501 tx2.send(()).unwrap();
2503 // make sure the other thread has gone to sleep
2504 for _ in 0..5000 { thread::yield_now(); }
2506 // upgrade to a shared chan and send a message
2509 t.send(()).unwrap();
2511 // wait for the child thread to exit before we exit
2512 rx2.recv().unwrap();
2517 let (tx, _) = channel();
2518 let _ = tx.send(123);
2519 assert_eq!(tx.send(123), Err(SendError(123)));
2523 #[cfg(all(test, not(target_os = "emscripten")))]
2528 use crate::time::Duration;
2530 pub fn stress_factor() -> usize {
2531 match env::var("RUST_TEST_STRESS") {
2532 Ok(val) => val.parse().unwrap(),
2539 let (tx, rx) = sync_channel::<i32>(1);
2540 tx.send(1).unwrap();
2541 assert_eq!(rx.recv().unwrap(), 1);
2546 let (tx, _rx) = sync_channel::<Box<isize>>(1);
2547 tx.send(box 1).unwrap();
2552 let (tx, rx) = sync_channel::<i32>(1);
2553 tx.send(1).unwrap();
2554 assert_eq!(rx.recv().unwrap(), 1);
2555 let tx = tx.clone();
2556 tx.send(1).unwrap();
2557 assert_eq!(rx.recv().unwrap(), 1);
2561 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2563 let (tx, rx) = sync_channel::<i32>(1);
2564 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2565 tx.send(1).unwrap();
2566 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2570 fn smoke_threads() {
2571 let (tx, rx) = sync_channel::<i32>(0);
2572 let _t = thread::spawn(move|| {
2573 tx.send(1).unwrap();
2575 assert_eq!(rx.recv().unwrap(), 1);
2579 fn smoke_port_gone() {
2580 let (tx, rx) = sync_channel::<i32>(0);
2582 assert!(tx.send(1).is_err());
2586 fn smoke_shared_port_gone2() {
2587 let (tx, rx) = sync_channel::<i32>(0);
2589 let tx2 = tx.clone();
2591 assert!(tx2.send(1).is_err());
2595 fn port_gone_concurrent() {
2596 let (tx, rx) = sync_channel::<i32>(0);
2597 let _t = thread::spawn(move|| {
2600 while tx.send(1).is_ok() {}
2604 fn port_gone_concurrent_shared() {
2605 let (tx, rx) = sync_channel::<i32>(0);
2606 let tx2 = tx.clone();
2607 let _t = thread::spawn(move|| {
2610 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2614 fn smoke_chan_gone() {
2615 let (tx, rx) = sync_channel::<i32>(0);
2617 assert!(rx.recv().is_err());
2621 fn smoke_chan_gone_shared() {
2622 let (tx, rx) = sync_channel::<()>(0);
2623 let tx2 = tx.clone();
2626 assert!(rx.recv().is_err());
2630 fn chan_gone_concurrent() {
2631 let (tx, rx) = sync_channel::<i32>(0);
2632 thread::spawn(move|| {
2633 tx.send(1).unwrap();
2634 tx.send(1).unwrap();
2636 while rx.recv().is_ok() {}
2641 let (tx, rx) = sync_channel::<i32>(0);
2642 thread::spawn(move|| {
2643 for _ in 0..10000 { tx.send(1).unwrap(); }
2646 assert_eq!(rx.recv().unwrap(), 1);
2651 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2652 fn stress_recv_timeout_two_threads() {
2653 let (tx, rx) = sync_channel::<i32>(0);
2655 thread::spawn(move|| {
2656 for _ in 0..10000 { tx.send(1).unwrap(); }
2659 let mut recv_count = 0;
2661 match rx.recv_timeout(Duration::from_millis(1)) {
2666 Err(RecvTimeoutError::Timeout) => continue,
2667 Err(RecvTimeoutError::Disconnected) => break,
2671 assert_eq!(recv_count, 10000);
2675 #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2676 fn stress_recv_timeout_shared() {
2677 const AMT: u32 = 1000;
2678 const NTHREADS: u32 = 8;
2679 let (tx, rx) = sync_channel::<i32>(0);
2680 let (dtx, drx) = sync_channel::<()>(0);
2682 thread::spawn(move|| {
2683 let mut recv_count = 0;
2685 match rx.recv_timeout(Duration::from_millis(10)) {
2690 Err(RecvTimeoutError::Timeout) => continue,
2691 Err(RecvTimeoutError::Disconnected) => break,
2695 assert_eq!(recv_count, AMT * NTHREADS);
2696 assert!(rx.try_recv().is_err());
2698 dtx.send(()).unwrap();
2701 for _ in 0..NTHREADS {
2702 let tx = tx.clone();
2703 thread::spawn(move|| {
2704 for _ in 0..AMT { tx.send(1).unwrap(); }
2710 drx.recv().unwrap();
2714 fn stress_shared() {
2715 const AMT: u32 = 1000;
2716 const NTHREADS: u32 = 8;
2717 let (tx, rx) = sync_channel::<i32>(0);
2718 let (dtx, drx) = sync_channel::<()>(0);
2720 thread::spawn(move|| {
2721 for _ in 0..AMT * NTHREADS {
2722 assert_eq!(rx.recv().unwrap(), 1);
2724 match rx.try_recv() {
2728 dtx.send(()).unwrap();
2731 for _ in 0..NTHREADS {
2732 let tx = tx.clone();
2733 thread::spawn(move|| {
2734 for _ in 0..AMT { tx.send(1).unwrap(); }
2738 drx.recv().unwrap();
2742 fn oneshot_single_thread_close_port_first() {
2743 // Simple test of closing without sending
2744 let (_tx, rx) = sync_channel::<i32>(0);
2749 fn oneshot_single_thread_close_chan_first() {
2750 // Simple test of closing without sending
2751 let (tx, _rx) = sync_channel::<i32>(0);
2756 fn oneshot_single_thread_send_port_close() {
2757 // Testing that the sender cleans up the payload if receiver is closed
2758 let (tx, rx) = sync_channel::<Box<i32>>(0);
2760 assert!(tx.send(box 0).is_err());
2764 fn oneshot_single_thread_recv_chan_close() {
2765 // Receiving on a closed chan will panic
2766 let res = thread::spawn(move|| {
2767 let (tx, rx) = sync_channel::<i32>(0);
2772 assert!(res.is_err());
2776 fn oneshot_single_thread_send_then_recv() {
2777 let (tx, rx) = sync_channel::<Box<i32>>(1);
2778 tx.send(box 10).unwrap();
2779 assert!(*rx.recv().unwrap() == 10);
2783 fn oneshot_single_thread_try_send_open() {
2784 let (tx, rx) = sync_channel::<i32>(1);
2785 assert_eq!(tx.try_send(10), Ok(()));
2786 assert!(rx.recv().unwrap() == 10);
2790 fn oneshot_single_thread_try_send_closed() {
2791 let (tx, rx) = sync_channel::<i32>(0);
2793 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2797 fn oneshot_single_thread_try_send_closed2() {
2798 let (tx, _rx) = sync_channel::<i32>(0);
2799 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2803 fn oneshot_single_thread_try_recv_open() {
2804 let (tx, rx) = sync_channel::<i32>(1);
2805 tx.send(10).unwrap();
2806 assert!(rx.recv() == Ok(10));
2810 fn oneshot_single_thread_try_recv_closed() {
2811 let (tx, rx) = sync_channel::<i32>(0);
2813 assert!(rx.recv().is_err());
2817 fn oneshot_single_thread_try_recv_closed_with_data() {
2818 let (tx, rx) = sync_channel::<i32>(1);
2819 tx.send(10).unwrap();
2821 assert_eq!(rx.try_recv(), Ok(10));
2822 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2826 fn oneshot_single_thread_peek_data() {
2827 let (tx, rx) = sync_channel::<i32>(1);
2828 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2829 tx.send(10).unwrap();
2830 assert_eq!(rx.try_recv(), Ok(10));
2834 fn oneshot_single_thread_peek_close() {
2835 let (tx, rx) = sync_channel::<i32>(0);
2837 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2838 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2842 fn oneshot_single_thread_peek_open() {
2843 let (_tx, rx) = sync_channel::<i32>(0);
2844 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2848 fn oneshot_multi_task_recv_then_send() {
2849 let (tx, rx) = sync_channel::<Box<i32>>(0);
2850 let _t = thread::spawn(move|| {
2851 assert!(*rx.recv().unwrap() == 10);
2854 tx.send(box 10).unwrap();
2858 fn oneshot_multi_task_recv_then_close() {
2859 let (tx, rx) = sync_channel::<Box<i32>>(0);
2860 let _t = thread::spawn(move|| {
2863 let res = thread::spawn(move|| {
2864 assert!(*rx.recv().unwrap() == 10);
2866 assert!(res.is_err());
2870 fn oneshot_multi_thread_close_stress() {
2871 for _ in 0..stress_factor() {
2872 let (tx, rx) = sync_channel::<i32>(0);
2873 let _t = thread::spawn(move|| {
2881 fn oneshot_multi_thread_send_close_stress() {
2882 for _ in 0..stress_factor() {
2883 let (tx, rx) = sync_channel::<i32>(0);
2884 let _t = thread::spawn(move|| {
2887 let _ = thread::spawn(move || {
2888 tx.send(1).unwrap();
2894 fn oneshot_multi_thread_recv_close_stress() {
2895 for _ in 0..stress_factor() {
2896 let (tx, rx) = sync_channel::<i32>(0);
2897 let _t = thread::spawn(move|| {
2898 let res = thread::spawn(move|| {
2901 assert!(res.is_err());
2903 let _t = thread::spawn(move|| {
2904 thread::spawn(move|| {
2912 fn oneshot_multi_thread_send_recv_stress() {
2913 for _ in 0..stress_factor() {
2914 let (tx, rx) = sync_channel::<Box<i32>>(0);
2915 let _t = thread::spawn(move|| {
2916 tx.send(box 10).unwrap();
2918 assert!(*rx.recv().unwrap() == 10);
2923 fn stream_send_recv_stress() {
2924 for _ in 0..stress_factor() {
2925 let (tx, rx) = sync_channel::<Box<i32>>(0);
2930 fn send(tx: SyncSender<Box<i32>>, i: i32) {
2931 if i == 10 { return }
2933 thread::spawn(move|| {
2934 tx.send(box i).unwrap();
2939 fn recv(rx: Receiver<Box<i32>>, i: i32) {
2940 if i == 10 { return }
2942 thread::spawn(move|| {
2943 assert!(*rx.recv().unwrap() == i);
2952 // Regression test that we don't run out of stack in scheduler context
2953 let (tx, rx) = sync_channel(10000);
2954 for _ in 0..10000 { tx.send(()).unwrap(); }
2955 for _ in 0..10000 { rx.recv().unwrap(); }
2959 fn shared_chan_stress() {
2960 let (tx, rx) = sync_channel(0);
2961 let total = stress_factor() + 100;
2963 let tx = tx.clone();
2964 thread::spawn(move|| {
2965 tx.send(()).unwrap();
2975 fn test_nested_recv_iter() {
2976 let (tx, rx) = sync_channel::<i32>(0);
2977 let (total_tx, total_rx) = sync_channel::<i32>(0);
2979 let _t = thread::spawn(move|| {
2981 for x in rx.iter() {
2984 total_tx.send(acc).unwrap();
2987 tx.send(3).unwrap();
2988 tx.send(1).unwrap();
2989 tx.send(2).unwrap();
2991 assert_eq!(total_rx.recv().unwrap(), 6);
2995 fn test_recv_iter_break() {
2996 let (tx, rx) = sync_channel::<i32>(0);
2997 let (count_tx, count_rx) = sync_channel(0);
2999 let _t = thread::spawn(move|| {
3001 for x in rx.iter() {
3008 count_tx.send(count).unwrap();
3011 tx.send(2).unwrap();
3012 tx.send(2).unwrap();
3013 tx.send(2).unwrap();
3014 let _ = tx.try_send(2);
3016 assert_eq!(count_rx.recv().unwrap(), 4);
3020 fn try_recv_states() {
3021 let (tx1, rx1) = sync_channel::<i32>(1);
3022 let (tx2, rx2) = sync_channel::<()>(1);
3023 let (tx3, rx3) = sync_channel::<()>(1);
3024 let _t = thread::spawn(move|| {
3025 rx2.recv().unwrap();
3026 tx1.send(1).unwrap();
3027 tx3.send(()).unwrap();
3028 rx2.recv().unwrap();
3030 tx3.send(()).unwrap();
3033 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
3034 tx2.send(()).unwrap();
3035 rx3.recv().unwrap();
3036 assert_eq!(rx1.try_recv(), Ok(1));
3037 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
3038 tx2.send(()).unwrap();
3039 rx3.recv().unwrap();
3040 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
3043 // This bug used to end up in a livelock inside of the Receiver destructor
3044 // because the internal state of the Shared packet was corrupted
3046 fn destroy_upgraded_shared_port_when_sender_still_active() {
3047 let (tx, rx) = sync_channel::<()>(0);
3048 let (tx2, rx2) = sync_channel::<()>(0);
3049 let _t = thread::spawn(move|| {
3050 rx.recv().unwrap(); // wait on a oneshot
3051 drop(rx); // destroy a shared
3052 tx2.send(()).unwrap();
3054 // make sure the other thread has gone to sleep
3055 for _ in 0..5000 { thread::yield_now(); }
3057 // upgrade to a shared chan and send a message
3060 t.send(()).unwrap();
3062 // wait for the child thread to exit before we exit
3063 rx2.recv().unwrap();
3068 let (tx, rx) = sync_channel::<i32>(0);
3069 let _t = thread::spawn(move|| { rx.recv().unwrap(); });
3070 assert_eq!(tx.send(1), Ok(()));
3075 let (tx, rx) = sync_channel::<i32>(0);
3076 let _t = thread::spawn(move|| { drop(rx); });
3077 assert!(tx.send(1).is_err());
3082 let (tx, rx) = sync_channel::<i32>(1);
3083 assert_eq!(tx.send(1), Ok(()));
3084 let _t =thread::spawn(move|| { drop(rx); });
3085 assert!(tx.send(1).is_err());
3090 let (tx, rx) = sync_channel::<i32>(0);
3091 let tx2 = tx.clone();
3092 let (done, donerx) = channel();
3093 let done2 = done.clone();
3094 let _t = thread::spawn(move|| {
3095 assert!(tx.send(1).is_err());
3096 done.send(()).unwrap();
3098 let _t = thread::spawn(move|| {
3099 assert!(tx2.send(2).is_err());
3100 done2.send(()).unwrap();
3103 donerx.recv().unwrap();
3104 donerx.recv().unwrap();
3109 let (tx, _rx) = sync_channel::<i32>(0);
3110 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3115 let (tx, _rx) = sync_channel::<i32>(1);
3116 assert_eq!(tx.try_send(1), Ok(()));
3117 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3122 let (tx, rx) = sync_channel::<i32>(1);
3123 assert_eq!(tx.try_send(1), Ok(()));
3125 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
3131 let (tx1, rx1) = sync_channel::<()>(3);
3132 let (tx2, rx2) = sync_channel::<()>(3);
3134 let _t = thread::spawn(move|| {
3135 rx1.recv().unwrap();
3136 tx2.try_send(()).unwrap();
3139 tx1.try_send(()).unwrap();
3140 rx2.recv().unwrap();