1 //! Multi-producer, single-consumer FIFO queue communication primitives.
3 //! This module provides message-based communication over channels, concretely
4 //! defined among three types:
10 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
11 //! senders are clone-able (multi-producer) such that many threads can send
12 //! simultaneously to one receiver (single-consumer).
14 //! These channels come in two flavors:
16 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
17 //! will return a `(Sender, Receiver)` tuple where all sends will be
18 //! **asynchronous** (they never block). The channel conceptually has an
21 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
22 //! return a `(SyncSender, Receiver)` tuple where the storage for pending
23 //! messages is a pre-allocated buffer of a fixed size. All sends will be
24 //! **synchronous** by blocking until there is buffer space available. Note
25 //! that a bound of 0 is allowed, causing the channel to become a "rendezvous"
26 //! channel where each sender atomically hands off a message to a receiver.
28 //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
29 //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
30 //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
31 //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
32 //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
33 //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
37 //! The send and receive operations on channels will all return a [`Result`]
38 //! indicating whether the operation succeeded or not. An unsuccessful operation
39 //! is normally indicative of the other half of a channel having "hung up" by
40 //! being dropped in its corresponding thread.
42 //! Once half of a channel has been deallocated, most operations can no longer
43 //! continue to make progress, so [`Err`] will be returned. Many applications
44 //! will continue to [`unwrap`] the results returned from this module,
45 //! instigating a propagation of failure among threads if one unexpectedly dies.
47 //! [`Result`]: ../../../std/result/enum.Result.html
48 //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
49 //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
57 //! use std::sync::mpsc::channel;
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! thread::spawn(move|| {
62 //! tx.send(10).unwrap();
64 //! assert_eq!(rx.recv().unwrap(), 10);
71 //! use std::sync::mpsc::channel;
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
78 //! let tx = tx.clone();
79 //! thread::spawn(move|| {
80 //! tx.send(i).unwrap();
85 //! let j = rx.recv().unwrap();
86 //! assert!(0 <= j && j < 10);
90 //! Propagating panics:
93 //! use std::sync::mpsc::channel;
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<i32>();
99 //! assert!(rx.recv().is_err());
102 //! Synchronous channels:
106 //! use std::sync::mpsc::sync_channel;
108 //! let (tx, rx) = sync_channel::<i32>(0);
109 //! thread::spawn(move|| {
110 //! // This will wait for the parent thread to start receiving
111 //! tx.send(53).unwrap();
113 //! rx.recv().unwrap();
116 #![stable(feature = "rust1", since = "1.0.0")]
117 #![allow(deprecated)] // for mpsc_select
119 // A description of how Rust's channel implementation works
121 // Channels are supposed to be the basic building block for all other
122 // concurrent primitives that are used in Rust. As a result, the channel type
123 // needs to be highly optimized, flexible, and broad enough for use everywhere.
125 // The choice of implementation of all channels is to be built on lock-free data
126 // structures. The channels themselves are then consequently also lock-free data
127 // structures. As always with lock-free code, this is a very "here be dragons"
128 // territory, especially because I'm unaware of any academic papers that have
129 // gone into great length about channels of these flavors.
131 // ## Flavors of channels
133 // From the perspective of a consumer of this library, there is only one flavor
134 // of channel. This channel can be used as a stream and cloned to allow multiple
135 // senders. Under the hood, however, there are actually three flavors of
138 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
139 // case. They contain as few atomics as possible and
140 // involve one and exactly one allocation.
141 // * Streams - these channels are optimized for the non-shared use case. They
142 // use a different concurrent queue that is more tailored for this
143 // use case. The initial allocation of this flavor of channel is not
145 // * Shared - this is the most general form of channel that this module offers,
146 // a channel with multiple senders. This type is as optimized as it
147 // can be, but the previous two types mentioned are much faster for
150 // ## Concurrent queues
152 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
153 // but recv() obviously blocks. This means that under the hood there must be
154 // some shared and concurrent queue holding all of the actual data.
156 // With two flavors of channels, two flavors of queues are also used. We have
157 // chosen to use queues from a well-known author that are abbreviated as SPSC
158 // and MPSC (single producer, single consumer and multiple producer, single
159 // consumer). SPSC queues are used for streams while MPSC queues are used for
162 // ### SPSC optimizations
164 // The SPSC queue found online is essentially a linked list of nodes where one
165 // half of the nodes are the "queue of data" and the other half of nodes are a
166 // cache of unused nodes. The unused nodes are used such that an allocation is
167 // not required on every push() and a free doesn't need to happen on every
170 // As found online, however, the cache of nodes is of an infinite size. This
171 // means that if a channel at one point in its life had 50k items in the queue,
172 // then the queue will always have the capacity for 50k items. I believed that
173 // this was an unnecessary limitation of the implementation, so I have altered
174 // the queue to optionally have a bound on the cache size.
176 // By default, streams will have an unbounded SPSC queue with a small-ish cache
177 // size. The hope is that the cache is still large enough to have very fast
178 // send() operations while not too large such that millions of channels can
181 // ### MPSC optimizations
183 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
184 // a linked list under the hood to earn its unboundedness, but I have not put
185 // forth much effort into having a cache of nodes similar to the SPSC queue.
187 // For now, I believe that this is "ok" because shared channels are not the most
188 // common type, but soon we may wish to revisit this queue choice and determine
189 // another candidate for backend storage of shared channels.
191 // ## Overview of the Implementation
193 // Now that there's a little background on the concurrent queues used, it's
194 // worth going into much more detail about the channels themselves. The basic
195 // pseudocode for a send/recv are:
199 // queue.push(t) return if queue.pop()
200 // if increment() == -1 deschedule {
201 // wakeup() if decrement() > 0
202 // cancel_deschedule()
206 // As mentioned before, there are no locks in this implementation, only atomic
207 // instructions are used.
209 // ### The internal atomic counter
211 // Every channel has a shared counter with each half to keep track of the size
212 // of the queue. This counter is used to abort descheduling by the receiver and
213 // to know when to wake up on the sending side.
215 // As seen in the pseudocode, senders will increment this count and receivers
216 // will decrement the count. The theory behind this is that if a sender sees a
217 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
218 // then it doesn't need to block.
220 // The recv() method has a beginning call to pop(), and if successful, it needs
221 // to decrement the count. It is a crucial implementation detail that this
222 // decrement does *not* happen to the shared counter. If this were the case,
223 // then it would be possible for the counter to be very negative when there were
224 // no receivers waiting, in which case the senders would have to determine when
225 // it was actually appropriate to wake up a receiver.
227 // Instead, the "steal count" is kept track of separately (not atomically
228 // because it's only used by receivers), and then the decrement() call when
229 // descheduling will lump in all of the recent steals into one large decrement.
231 // The implication of this is that if a sender sees a -1 count, then there's
232 // guaranteed to be a waiter waiting!
234 // ## Native Implementation
236 // A major goal of these channels is to work seamlessly on and off the runtime.
237 // All of the previous race conditions have been worded in terms of
238 // scheduler-isms (which is obviously not available without the runtime).
240 // For now, native usage of channels (off the runtime) will fall back onto
241 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
242 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
243 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
244 // condition variable.
248 // Being able to support selection over channels has greatly influenced this
249 // design, and not only does selection need to work inside the runtime, but also
250 // outside the runtime.
252 // The implementation is fairly straightforward. The goal of select() is not to
253 // return some data, but only to return which channel can receive data without
254 // blocking. The implementation is essentially the entire blocking procedure
255 // followed by an increment as soon as its woken up. The cancellation procedure
256 // involves an increment and swapping out of to_wake to acquire ownership of the
257 // thread to unblock.
259 // Sadly this current implementation requires multiple allocations, so I have
260 // seen the throughput of select() be much worse than it should be. I do not
261 // believe that there is anything fundamental that needs to change about these
262 // channels, however, in order to support a more efficient select().
266 // And now that you've seen all the races that I found and attempted to fix,
267 // here's the code for you to find some more!
269 use crate::sync::Arc;
273 use crate::cell::UnsafeCell;
274 use crate::time::{Duration, Instant};
276 #[unstable(feature = "mpsc_select", issue = "27800")]
277 pub use self::select::{Select, Handle};
278 use self::select::StartResult;
279 use self::select::StartResult::*;
280 use self::blocking::SignalToken;
282 #[cfg(all(test, not(target_os = "emscripten")))]
296 /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
297 /// This half can only be owned by one thread.
299 /// Messages sent to the channel can be retrieved using [`recv`].
301 /// [`channel`]: fn.channel.html
302 /// [`sync_channel`]: fn.sync_channel.html
303 /// [`recv`]: struct.Receiver.html#method.recv
308 /// use std::sync::mpsc::channel;
310 /// use std::time::Duration;
312 /// let (send, recv) = channel();
314 /// thread::spawn(move || {
315 /// send.send("Hello world!").unwrap();
316 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
317 /// send.send("Delayed for 2 seconds").unwrap();
320 /// println!("{}", recv.recv().unwrap()); // Received immediately
321 /// println!("Waiting...");
322 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
324 #[stable(feature = "rust1", since = "1.0.0")]
325 pub struct Receiver<T> {
326 inner: UnsafeCell<Flavor<T>>,
329 // The receiver port can be sent from place to place, so long as it
330 // is not used to receive non-sendable things.
331 #[stable(feature = "rust1", since = "1.0.0")]
332 unsafe impl<T: Send> Send for Receiver<T> { }
334 #[stable(feature = "rust1", since = "1.0.0")]
335 impl<T> !Sync for Receiver<T> { }
337 /// An iterator over messages on a [`Receiver`], created by [`iter`].
339 /// This iterator will block whenever [`next`] is called,
340 /// waiting for a new message, and [`None`] will be returned
341 /// when the corresponding channel has hung up.
343 /// [`iter`]: struct.Receiver.html#method.iter
344 /// [`Receiver`]: struct.Receiver.html
345 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
346 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
351 /// use std::sync::mpsc::channel;
354 /// let (send, recv) = channel();
356 /// thread::spawn(move || {
357 /// send.send(1u8).unwrap();
358 /// send.send(2u8).unwrap();
359 /// send.send(3u8).unwrap();
362 /// for x in recv.iter() {
363 /// println!("Got: {}", x);
366 #[stable(feature = "rust1", since = "1.0.0")]
368 pub struct Iter<'a, T: 'a> {
372 /// An iterator that attempts to yield all pending values for a [`Receiver`],
373 /// created by [`try_iter`].
375 /// [`None`] will be returned when there are no pending values remaining or
376 /// if the corresponding channel has hung up.
378 /// This iterator will never block the caller in order to wait for data to
379 /// become available. Instead, it will return [`None`].
381 /// [`Receiver`]: struct.Receiver.html
382 /// [`try_iter`]: struct.Receiver.html#method.try_iter
383 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
388 /// use std::sync::mpsc::channel;
390 /// use std::time::Duration;
392 /// let (sender, receiver) = channel();
394 /// // Nothing is in the buffer yet
395 /// assert!(receiver.try_iter().next().is_none());
396 /// println!("Nothing in the buffer...");
398 /// thread::spawn(move || {
399 /// sender.send(1).unwrap();
400 /// sender.send(2).unwrap();
401 /// sender.send(3).unwrap();
404 /// println!("Going to sleep...");
405 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
407 /// for x in receiver.try_iter() {
408 /// println!("Got: {}", x);
411 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
413 pub struct TryIter<'a, T: 'a> {
417 /// An owning iterator over messages on a [`Receiver`],
418 /// created by **Receiver::into_iter**.
420 /// This iterator will block whenever [`next`]
421 /// is called, waiting for a new message, and [`None`] will be
422 /// returned if the corresponding channel has hung up.
424 /// [`Receiver`]: struct.Receiver.html
425 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
426 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
431 /// use std::sync::mpsc::channel;
434 /// let (send, recv) = channel();
436 /// thread::spawn(move || {
437 /// send.send(1u8).unwrap();
438 /// send.send(2u8).unwrap();
439 /// send.send(3u8).unwrap();
442 /// for x in recv.into_iter() {
443 /// println!("Got: {}", x);
446 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
448 pub struct IntoIter<T> {
452 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
453 /// owned by one thread, but it can be cloned to send to other threads.
455 /// Messages can be sent through this channel with [`send`].
457 /// [`channel`]: fn.channel.html
458 /// [`send`]: struct.Sender.html#method.send
463 /// use std::sync::mpsc::channel;
466 /// let (sender, receiver) = channel();
467 /// let sender2 = sender.clone();
469 /// // First thread owns sender
470 /// thread::spawn(move || {
471 /// sender.send(1).unwrap();
474 /// // Second thread owns sender2
475 /// thread::spawn(move || {
476 /// sender2.send(2).unwrap();
479 /// let msg = receiver.recv().unwrap();
480 /// let msg2 = receiver.recv().unwrap();
482 /// assert_eq!(3, msg + msg2);
484 #[stable(feature = "rust1", since = "1.0.0")]
485 pub struct Sender<T> {
486 inner: UnsafeCell<Flavor<T>>,
489 // The send port can be sent from place to place, so long as it
490 // is not used to send non-sendable things.
491 #[stable(feature = "rust1", since = "1.0.0")]
492 unsafe impl<T: Send> Send for Sender<T> { }
494 #[stable(feature = "rust1", since = "1.0.0")]
495 impl<T> !Sync for Sender<T> { }
497 /// The sending-half of Rust's synchronous [`sync_channel`] type.
499 /// Messages can be sent through this channel with [`send`] or [`try_send`].
501 /// [`send`] will block if there is no space in the internal buffer.
503 /// [`sync_channel`]: fn.sync_channel.html
504 /// [`send`]: struct.SyncSender.html#method.send
505 /// [`try_send`]: struct.SyncSender.html#method.try_send
510 /// use std::sync::mpsc::sync_channel;
513 /// // Create a sync_channel with buffer size 2
514 /// let (sync_sender, receiver) = sync_channel(2);
515 /// let sync_sender2 = sync_sender.clone();
517 /// // First thread owns sync_sender
518 /// thread::spawn(move || {
519 /// sync_sender.send(1).unwrap();
520 /// sync_sender.send(2).unwrap();
523 /// // Second thread owns sync_sender2
524 /// thread::spawn(move || {
525 /// sync_sender2.send(3).unwrap();
526 /// // thread will now block since the buffer is full
527 /// println!("Thread unblocked!");
532 /// msg = receiver.recv().unwrap();
533 /// println!("message {} received", msg);
535 /// // "Thread unblocked!" will be printed now
537 /// msg = receiver.recv().unwrap();
538 /// println!("message {} received", msg);
540 /// msg = receiver.recv().unwrap();
542 /// println!("message {} received", msg);
544 #[stable(feature = "rust1", since = "1.0.0")]
545 pub struct SyncSender<T> {
546 inner: Arc<sync::Packet<T>>,
549 #[stable(feature = "rust1", since = "1.0.0")]
550 unsafe impl<T: Send> Send for SyncSender<T> {}
552 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
553 /// function on **channel**s.
555 /// A **send** operation can only fail if the receiving end of a channel is
556 /// disconnected, implying that the data could never be received. The error
557 /// contains the data being sent as a payload so it can be recovered.
559 /// [`Sender::send`]: struct.Sender.html#method.send
560 /// [`SyncSender::send`]: struct.SyncSender.html#method.send
561 #[stable(feature = "rust1", since = "1.0.0")]
562 #[derive(PartialEq, Eq, Clone, Copy)]
563 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
565 /// An error returned from the [`recv`] function on a [`Receiver`].
567 /// The [`recv`] operation can only fail if the sending half of a
568 /// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
569 /// messages will ever be received.
571 /// [`recv`]: struct.Receiver.html#method.recv
572 /// [`Receiver`]: struct.Receiver.html
573 /// [`channel`]: fn.channel.html
574 /// [`sync_channel`]: fn.sync_channel.html
575 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
576 #[stable(feature = "rust1", since = "1.0.0")]
577 pub struct RecvError;
579 /// This enumeration is the list of the possible reasons that [`try_recv`] could
580 /// not return data when called. This can occur with both a [`channel`] and
581 /// a [`sync_channel`].
583 /// [`try_recv`]: struct.Receiver.html#method.try_recv
584 /// [`channel`]: fn.channel.html
585 /// [`sync_channel`]: fn.sync_channel.html
586 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
587 #[stable(feature = "rust1", since = "1.0.0")]
588 pub enum TryRecvError {
589 /// This **channel** is currently empty, but the **Sender**(s) have not yet
590 /// disconnected, so data may yet become available.
591 #[stable(feature = "rust1", since = "1.0.0")]
594 /// The **channel**'s sending half has become disconnected, and there will
595 /// never be any more data received on it.
596 #[stable(feature = "rust1", since = "1.0.0")]
600 /// This enumeration is the list of possible errors that made [`recv_timeout`]
601 /// unable to return data when called. This can occur with both a [`channel`] and
602 /// a [`sync_channel`].
604 /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
605 /// [`channel`]: fn.channel.html
606 /// [`sync_channel`]: fn.sync_channel.html
607 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
608 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
609 pub enum RecvTimeoutError {
610 /// This **channel** is currently empty, but the **Sender**(s) have not yet
611 /// disconnected, so data may yet become available.
612 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
614 /// The **channel**'s sending half has become disconnected, and there will
615 /// never be any more data received on it.
616 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
620 /// This enumeration is the list of the possible error outcomes for the
621 /// [`try_send`] method.
623 /// [`try_send`]: struct.SyncSender.html#method.try_send
624 #[stable(feature = "rust1", since = "1.0.0")]
625 #[derive(PartialEq, Eq, Clone, Copy)]
626 pub enum TrySendError<T> {
627 /// The data could not be sent on the [`sync_channel`] because it would require that
628 /// the callee block to send the data.
630 /// If this is a buffered channel, then the buffer is full at this time. If
631 /// this is not a buffered channel, then there is no [`Receiver`] available to
632 /// acquire the data.
634 /// [`sync_channel`]: fn.sync_channel.html
635 /// [`Receiver`]: struct.Receiver.html
636 #[stable(feature = "rust1", since = "1.0.0")]
637 Full(#[stable(feature = "rust1", since = "1.0.0")] T),
639 /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
640 /// sent. The data is returned back to the callee in this case.
642 /// [`sync_channel`]: fn.sync_channel.html
643 #[stable(feature = "rust1", since = "1.0.0")]
644 Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
648 Oneshot(Arc<oneshot::Packet<T>>),
649 Stream(Arc<stream::Packet<T>>),
650 Shared(Arc<shared::Packet<T>>),
651 Sync(Arc<sync::Packet<T>>),
655 trait UnsafeFlavor<T> {
656 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
657 unsafe fn inner_mut(&self) -> &mut Flavor<T> {
658 &mut *self.inner_unsafe().get()
660 unsafe fn inner(&self) -> &Flavor<T> {
661 &*self.inner_unsafe().get()
664 impl<T> UnsafeFlavor<T> for Sender<T> {
665 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
669 impl<T> UnsafeFlavor<T> for Receiver<T> {
670 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
675 /// Creates a new asynchronous channel, returning the sender/receiver halves.
676 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
677 /// the same order as it was sent, and no [`send`] will block the calling thread
678 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
679 /// block after its buffer limit is reached). [`recv`] will block until a message
682 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
683 /// only one [`Receiver`] is supported.
685 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
686 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
687 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
688 /// return a [`RecvError`].
690 /// [`send`]: struct.Sender.html#method.send
691 /// [`recv`]: struct.Receiver.html#method.recv
692 /// [`Sender`]: struct.Sender.html
693 /// [`Receiver`]: struct.Receiver.html
694 /// [`sync_channel`]: fn.sync_channel.html
695 /// [`SendError`]: struct.SendError.html
696 /// [`RecvError`]: struct.RecvError.html
701 /// use std::sync::mpsc::channel;
704 /// let (sender, receiver) = channel();
706 /// // Spawn off an expensive computation
707 /// thread::spawn(move|| {
708 /// # fn expensive_computation() {}
709 /// sender.send(expensive_computation()).unwrap();
712 /// // Do some useful work for awhile
714 /// // Let's see what that answer was
715 /// println!("{:?}", receiver.recv().unwrap());
717 #[stable(feature = "rust1", since = "1.0.0")]
718 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
719 let a = Arc::new(oneshot::Packet::new());
720 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
723 /// Creates a new synchronous, bounded channel.
724 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
725 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
726 /// [`Receiver`] will block until a message becomes available. `sync_channel`
727 /// differs greatly in the semantics of the sender, however.
729 /// This channel has an internal buffer on which messages will be queued.
730 /// `bound` specifies the buffer size. When the internal buffer becomes full,
731 /// future sends will *block* waiting for the buffer to open up. Note that a
732 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
733 /// where each [`send`] will not return until a [`recv`] is paired with it.
735 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
736 /// times, but only one [`Receiver`] is supported.
738 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
739 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
740 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
741 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
743 /// [`channel`]: fn.channel.html
744 /// [`send`]: struct.SyncSender.html#method.send
745 /// [`recv`]: struct.Receiver.html#method.recv
746 /// [`SyncSender`]: struct.SyncSender.html
747 /// [`Receiver`]: struct.Receiver.html
748 /// [`SendError`]: struct.SendError.html
749 /// [`RecvError`]: struct.RecvError.html
754 /// use std::sync::mpsc::sync_channel;
757 /// let (sender, receiver) = sync_channel(1);
759 /// // this returns immediately
760 /// sender.send(1).unwrap();
762 /// thread::spawn(move|| {
763 /// // this will block until the previous message has been received
764 /// sender.send(2).unwrap();
767 /// assert_eq!(receiver.recv().unwrap(), 1);
768 /// assert_eq!(receiver.recv().unwrap(), 2);
770 #[stable(feature = "rust1", since = "1.0.0")]
771 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
772 let a = Arc::new(sync::Packet::new(bound));
773 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
776 ////////////////////////////////////////////////////////////////////////////////
778 ////////////////////////////////////////////////////////////////////////////////
781 fn new(inner: Flavor<T>) -> Sender<T> {
783 inner: UnsafeCell::new(inner),
787 /// Attempts to send a value on this channel, returning it back if it could
790 /// A successful send occurs when it is determined that the other end of
791 /// the channel has not hung up already. An unsuccessful send would be one
792 /// where the corresponding receiver has already been deallocated. Note
793 /// that a return value of [`Err`] means that the data will never be
794 /// received, but a return value of [`Ok`] does *not* mean that the data
795 /// will be received. It is possible for the corresponding receiver to
796 /// hang up immediately after this function returns [`Ok`].
798 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
799 /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
801 /// This method will never block the current thread.
806 /// use std::sync::mpsc::channel;
808 /// let (tx, rx) = channel();
810 /// // This send is always successful
811 /// tx.send(1).unwrap();
813 /// // This send will fail because the receiver is gone
815 /// assert_eq!(tx.send(1).unwrap_err().0, 1);
817 #[stable(feature = "rust1", since = "1.0.0")]
818 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
819 let (new_inner, ret) = match *unsafe { self.inner() } {
820 Flavor::Oneshot(ref p) => {
822 return p.send(t).map_err(SendError);
824 let a = Arc::new(stream::Packet::new());
825 let rx = Receiver::new(Flavor::Stream(a.clone()));
826 match p.upgrade(rx) {
827 oneshot::UpSuccess => {
831 oneshot::UpDisconnected => (a, Err(t)),
832 oneshot::UpWoke(token) => {
833 // This send cannot panic because the thread is
834 // asleep (we're looking at it), so the receiver
836 a.send(t).ok().unwrap();
843 Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
844 Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
845 Flavor::Sync(..) => unreachable!(),
849 let tmp = Sender::new(Flavor::Stream(new_inner));
850 mem::swap(self.inner_mut(), tmp.inner_mut());
852 ret.map_err(SendError)
856 #[stable(feature = "rust1", since = "1.0.0")]
857 impl<T> Clone for Sender<T> {
858 fn clone(&self) -> Sender<T> {
859 let packet = match *unsafe { self.inner() } {
860 Flavor::Oneshot(ref p) => {
861 let a = Arc::new(shared::Packet::new());
863 let guard = a.postinit_lock();
864 let rx = Receiver::new(Flavor::Shared(a.clone()));
865 let sleeper = match p.upgrade(rx) {
867 oneshot::UpDisconnected => None,
868 oneshot::UpWoke(task) => Some(task),
870 a.inherit_blocker(sleeper, guard);
874 Flavor::Stream(ref p) => {
875 let a = Arc::new(shared::Packet::new());
877 let guard = a.postinit_lock();
878 let rx = Receiver::new(Flavor::Shared(a.clone()));
879 let sleeper = match p.upgrade(rx) {
881 stream::UpDisconnected => None,
882 stream::UpWoke(task) => Some(task),
884 a.inherit_blocker(sleeper, guard);
888 Flavor::Shared(ref p) => {
890 return Sender::new(Flavor::Shared(p.clone()));
892 Flavor::Sync(..) => unreachable!(),
896 let tmp = Sender::new(Flavor::Shared(packet.clone()));
897 mem::swap(self.inner_mut(), tmp.inner_mut());
899 Sender::new(Flavor::Shared(packet))
903 #[stable(feature = "rust1", since = "1.0.0")]
904 impl<T> Drop for Sender<T> {
906 match *unsafe { self.inner() } {
907 Flavor::Oneshot(ref p) => p.drop_chan(),
908 Flavor::Stream(ref p) => p.drop_chan(),
909 Flavor::Shared(ref p) => p.drop_chan(),
910 Flavor::Sync(..) => unreachable!(),
915 #[stable(feature = "mpsc_debug", since = "1.8.0")]
916 impl<T> fmt::Debug for Sender<T> {
917 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
918 f.debug_struct("Sender").finish()
922 ////////////////////////////////////////////////////////////////////////////////
924 ////////////////////////////////////////////////////////////////////////////////
926 impl<T> SyncSender<T> {
927 fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
931 /// Sends a value on this synchronous channel.
933 /// This function will *block* until space in the internal buffer becomes
934 /// available or a receiver is available to hand off the message to.
936 /// Note that a successful send does *not* guarantee that the receiver will
937 /// ever see the data if there is a buffer on this channel. Items may be
938 /// enqueued in the internal buffer for the receiver to receive at a later
939 /// time. If the buffer size is 0, however, the channel becomes a rendezvous
940 /// channel and it guarantees that the receiver has indeed received
941 /// the data if this function returns success.
943 /// This function will never panic, but it may return [`Err`] if the
944 /// [`Receiver`] has disconnected and is no longer able to receive
947 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
948 /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
953 /// use std::sync::mpsc::sync_channel;
956 /// // Create a rendezvous sync_channel with buffer size 0
957 /// let (sync_sender, receiver) = sync_channel(0);
959 /// thread::spawn(move || {
960 /// println!("sending message...");
961 /// sync_sender.send(1).unwrap();
962 /// // Thread is now blocked until the message is received
964 /// println!("...message received!");
967 /// let msg = receiver.recv().unwrap();
968 /// assert_eq!(1, msg);
970 #[stable(feature = "rust1", since = "1.0.0")]
971 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
972 self.inner.send(t).map_err(SendError)
975 /// Attempts to send a value on this channel without blocking.
977 /// This method differs from [`send`] by returning immediately if the
978 /// channel's buffer is full or no receiver is waiting to acquire some
979 /// data. Compared with [`send`], this function has two failure cases
980 /// instead of one (one for disconnection, one for a full buffer).
982 /// See [`send`] for notes about guarantees of whether the
983 /// receiver has received the data or not if this function is successful.
985 /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
990 /// use std::sync::mpsc::sync_channel;
993 /// // Create a sync_channel with buffer size 1
994 /// let (sync_sender, receiver) = sync_channel(1);
995 /// let sync_sender2 = sync_sender.clone();
997 /// // First thread owns sync_sender
998 /// thread::spawn(move || {
999 /// sync_sender.send(1).unwrap();
1000 /// sync_sender.send(2).unwrap();
1001 /// // Thread blocked
1004 /// // Second thread owns sync_sender2
1005 /// thread::spawn(move || {
1006 /// // This will return an error and send
1007 /// // no message if the buffer is full
1008 /// sync_sender2.try_send(3).is_err();
1012 /// msg = receiver.recv().unwrap();
1013 /// println!("message {} received", msg);
1015 /// msg = receiver.recv().unwrap();
1016 /// println!("message {} received", msg);
1018 /// // Third message may have never been sent
1019 /// match receiver.try_recv() {
1020 /// Ok(msg) => println!("message {} received", msg),
1021 /// Err(_) => println!("the third message was never sent"),
1024 #[stable(feature = "rust1", since = "1.0.0")]
1025 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
1026 self.inner.try_send(t)
1030 #[stable(feature = "rust1", since = "1.0.0")]
1031 impl<T> Clone for SyncSender<T> {
1032 fn clone(&self) -> SyncSender<T> {
1033 self.inner.clone_chan();
1034 SyncSender::new(self.inner.clone())
1038 #[stable(feature = "rust1", since = "1.0.0")]
1039 impl<T> Drop for SyncSender<T> {
1040 fn drop(&mut self) {
1041 self.inner.drop_chan();
1045 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1046 impl<T> fmt::Debug for SyncSender<T> {
1047 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1048 f.debug_struct("SyncSender").finish()
1052 ////////////////////////////////////////////////////////////////////////////////
1054 ////////////////////////////////////////////////////////////////////////////////
1056 impl<T> Receiver<T> {
1057 fn new(inner: Flavor<T>) -> Receiver<T> {
1058 Receiver { inner: UnsafeCell::new(inner) }
1061 /// Attempts to return a pending value on this receiver without blocking.
1063 /// This method will never block the caller in order to wait for data to
1064 /// become available. Instead, this will always return immediately with a
1065 /// possible option of pending data on the channel.
1067 /// This is useful for a flavor of "optimistic check" before deciding to
1068 /// block on a receiver.
1070 /// Compared with [`recv`], this function has two failure cases instead of one
1071 /// (one for disconnection, one for an empty buffer).
1073 /// [`recv`]: struct.Receiver.html#method.recv
1078 /// use std::sync::mpsc::{Receiver, channel};
1080 /// let (_, receiver): (_, Receiver<i32>) = channel();
1082 /// assert!(receiver.try_recv().is_err());
1084 #[stable(feature = "rust1", since = "1.0.0")]
1085 pub fn try_recv(&self) -> Result<T, TryRecvError> {
1087 let new_port = match *unsafe { self.inner() } {
1088 Flavor::Oneshot(ref p) => {
1089 match p.try_recv() {
1090 Ok(t) => return Ok(t),
1091 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1092 Err(oneshot::Disconnected) => {
1093 return Err(TryRecvError::Disconnected)
1095 Err(oneshot::Upgraded(rx)) => rx,
1098 Flavor::Stream(ref p) => {
1099 match p.try_recv() {
1100 Ok(t) => return Ok(t),
1101 Err(stream::Empty) => return Err(TryRecvError::Empty),
1102 Err(stream::Disconnected) => {
1103 return Err(TryRecvError::Disconnected)
1105 Err(stream::Upgraded(rx)) => rx,
1108 Flavor::Shared(ref p) => {
1109 match p.try_recv() {
1110 Ok(t) => return Ok(t),
1111 Err(shared::Empty) => return Err(TryRecvError::Empty),
1112 Err(shared::Disconnected) => {
1113 return Err(TryRecvError::Disconnected)
1117 Flavor::Sync(ref p) => {
1118 match p.try_recv() {
1119 Ok(t) => return Ok(t),
1120 Err(sync::Empty) => return Err(TryRecvError::Empty),
1121 Err(sync::Disconnected) => {
1122 return Err(TryRecvError::Disconnected)
1128 mem::swap(self.inner_mut(),
1129 new_port.inner_mut());
1134 /// Attempts to wait for a value on this receiver, returning an error if the
1135 /// corresponding channel has hung up.
1137 /// This function will always block the current thread if there is no data
1138 /// available and it's possible for more data to be sent. Once a message is
1139 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1140 /// receiver will wake up and return that message.
1142 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1143 /// this call is blocking, this call will wake up and return [`Err`] to
1144 /// indicate that no more messages can ever be received on this channel.
1145 /// However, since channels are buffered, messages sent before the disconnect
1146 /// will still be properly received.
1148 /// [`Sender`]: struct.Sender.html
1149 /// [`SyncSender`]: struct.SyncSender.html
1150 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1155 /// use std::sync::mpsc;
1156 /// use std::thread;
1158 /// let (send, recv) = mpsc::channel();
1159 /// let handle = thread::spawn(move || {
1160 /// send.send(1u8).unwrap();
1163 /// handle.join().unwrap();
1165 /// assert_eq!(Ok(1), recv.recv());
1168 /// Buffering behavior:
1171 /// use std::sync::mpsc;
1172 /// use std::thread;
1173 /// use std::sync::mpsc::RecvError;
1175 /// let (send, recv) = mpsc::channel();
1176 /// let handle = thread::spawn(move || {
1177 /// send.send(1u8).unwrap();
1178 /// send.send(2).unwrap();
1179 /// send.send(3).unwrap();
1183 /// // wait for the thread to join so we ensure the sender is dropped
1184 /// handle.join().unwrap();
1186 /// assert_eq!(Ok(1), recv.recv());
1187 /// assert_eq!(Ok(2), recv.recv());
1188 /// assert_eq!(Ok(3), recv.recv());
1189 /// assert_eq!(Err(RecvError), recv.recv());
1191 #[stable(feature = "rust1", since = "1.0.0")]
1192 pub fn recv(&self) -> Result<T, RecvError> {
1194 let new_port = match *unsafe { self.inner() } {
1195 Flavor::Oneshot(ref p) => {
1196 match p.recv(None) {
1197 Ok(t) => return Ok(t),
1198 Err(oneshot::Disconnected) => return Err(RecvError),
1199 Err(oneshot::Upgraded(rx)) => rx,
1200 Err(oneshot::Empty) => unreachable!(),
1203 Flavor::Stream(ref p) => {
1204 match p.recv(None) {
1205 Ok(t) => return Ok(t),
1206 Err(stream::Disconnected) => return Err(RecvError),
1207 Err(stream::Upgraded(rx)) => rx,
1208 Err(stream::Empty) => unreachable!(),
1211 Flavor::Shared(ref p) => {
1212 match p.recv(None) {
1213 Ok(t) => return Ok(t),
1214 Err(shared::Disconnected) => return Err(RecvError),
1215 Err(shared::Empty) => unreachable!(),
1218 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1221 mem::swap(self.inner_mut(), new_port.inner_mut());
1226 /// Attempts to wait for a value on this receiver, returning an error if the
1227 /// corresponding channel has hung up, or if it waits more than `timeout`.
1229 /// This function will always block the current thread if there is no data
1230 /// available and it's possible for more data to be sent. Once a message is
1231 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1232 /// receiver will wake up and return that message.
1234 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1235 /// this call is blocking, this call will wake up and return [`Err`] to
1236 /// indicate that no more messages can ever be received on this channel.
1237 /// However, since channels are buffered, messages sent before the disconnect
1238 /// will still be properly received.
1240 /// [`Sender`]: struct.Sender.html
1241 /// [`SyncSender`]: struct.SyncSender.html
1242 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1246 /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
1247 /// to panic unexpectedly with the following example:
1250 /// use std::sync::mpsc::channel;
1251 /// use std::thread;
1252 /// use std::time::Duration;
1254 /// let (tx, rx) = channel::<String>();
1256 /// thread::spawn(move || {
1257 /// let d = Duration::from_millis(10);
1259 /// println!("recv");
1260 /// let _r = rx.recv_timeout(d);
1264 /// thread::sleep(Duration::from_millis(100));
1265 /// let _c1 = tx.clone();
1267 /// thread::sleep(Duration::from_secs(1));
1270 /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
1274 /// Successfully receiving value before encountering timeout:
1277 /// use std::thread;
1278 /// use std::time::Duration;
1279 /// use std::sync::mpsc;
1281 /// let (send, recv) = mpsc::channel();
1283 /// thread::spawn(move || {
1284 /// send.send('a').unwrap();
1288 /// recv.recv_timeout(Duration::from_millis(400)),
1293 /// Receiving an error upon reaching timeout:
1296 /// use std::thread;
1297 /// use std::time::Duration;
1298 /// use std::sync::mpsc;
1300 /// let (send, recv) = mpsc::channel();
1302 /// thread::spawn(move || {
1303 /// thread::sleep(Duration::from_millis(800));
1304 /// send.send('a').unwrap();
1308 /// recv.recv_timeout(Duration::from_millis(400)),
1309 /// Err(mpsc::RecvTimeoutError::Timeout)
1312 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1313 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1314 // Do an optimistic try_recv to avoid the performance impact of
1315 // Instant::now() in the full-channel case.
1316 match self.try_recv() {
1317 Ok(result) => Ok(result),
1318 Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
1319 Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
1320 Some(deadline) => self.recv_deadline(deadline),
1321 // So far in the future that it's practically the same as waiting indefinitely.
1322 None => self.recv().map_err(RecvTimeoutError::from),
1327 /// Attempts to wait for a value on this receiver, returning an error if the
1328 /// corresponding channel has hung up, or if `deadline` is reached.
1330 /// This function will always block the current thread if there is no data
1331 /// available and it's possible for more data to be sent. Once a message is
1332 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1333 /// receiver will wake up and return that message.
1335 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1336 /// this call is blocking, this call will wake up and return [`Err`] to
1337 /// indicate that no more messages can ever be received on this channel.
1338 /// However, since channels are buffered, messages sent before the disconnect
1339 /// will still be properly received.
1341 /// [`Sender`]: struct.Sender.html
1342 /// [`SyncSender`]: struct.SyncSender.html
1343 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1347 /// Successfully receiving value before reaching deadline:
1350 /// #![feature(deadline_api)]
1351 /// use std::thread;
1352 /// use std::time::{Duration, Instant};
1353 /// use std::sync::mpsc;
1355 /// let (send, recv) = mpsc::channel();
1357 /// thread::spawn(move || {
1358 /// send.send('a').unwrap();
1362 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1367 /// Receiving an error upon reaching deadline:
1370 /// #![feature(deadline_api)]
1371 /// use std::thread;
1372 /// use std::time::{Duration, Instant};
1373 /// use std::sync::mpsc;
1375 /// let (send, recv) = mpsc::channel();
1377 /// thread::spawn(move || {
1378 /// thread::sleep(Duration::from_millis(800));
1379 /// send.send('a').unwrap();
1383 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1384 /// Err(mpsc::RecvTimeoutError::Timeout)
1387 #[unstable(feature = "deadline_api", issue = "46316")]
1388 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1389 use self::RecvTimeoutError::*;
1392 let port_or_empty = match *unsafe { self.inner() } {
1393 Flavor::Oneshot(ref p) => {
1394 match p.recv(Some(deadline)) {
1395 Ok(t) => return Ok(t),
1396 Err(oneshot::Disconnected) => return Err(Disconnected),
1397 Err(oneshot::Upgraded(rx)) => Some(rx),
1398 Err(oneshot::Empty) => None,
1401 Flavor::Stream(ref p) => {
1402 match p.recv(Some(deadline)) {
1403 Ok(t) => return Ok(t),
1404 Err(stream::Disconnected) => return Err(Disconnected),
1405 Err(stream::Upgraded(rx)) => Some(rx),
1406 Err(stream::Empty) => None,
1409 Flavor::Shared(ref p) => {
1410 match p.recv(Some(deadline)) {
1411 Ok(t) => return Ok(t),
1412 Err(shared::Disconnected) => return Err(Disconnected),
1413 Err(shared::Empty) => None,
1416 Flavor::Sync(ref p) => {
1417 match p.recv(Some(deadline)) {
1418 Ok(t) => return Ok(t),
1419 Err(sync::Disconnected) => return Err(Disconnected),
1420 Err(sync::Empty) => None,
1425 if let Some(new_port) = port_or_empty {
1427 mem::swap(self.inner_mut(), new_port.inner_mut());
1431 // If we're already passed the deadline, and we're here without
1432 // data, return a timeout, else try again.
1433 if Instant::now() >= deadline {
1434 return Err(Timeout);
1439 /// Returns an iterator that will block waiting for messages, but never
1440 /// [`panic!`]. It will return [`None`] when the channel has hung up.
1442 /// [`panic!`]: ../../../std/macro.panic.html
1443 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1448 /// use std::sync::mpsc::channel;
1449 /// use std::thread;
1451 /// let (send, recv) = channel();
1453 /// thread::spawn(move || {
1454 /// send.send(1).unwrap();
1455 /// send.send(2).unwrap();
1456 /// send.send(3).unwrap();
1459 /// let mut iter = recv.iter();
1460 /// assert_eq!(iter.next(), Some(1));
1461 /// assert_eq!(iter.next(), Some(2));
1462 /// assert_eq!(iter.next(), Some(3));
1463 /// assert_eq!(iter.next(), None);
1465 #[stable(feature = "rust1", since = "1.0.0")]
1466 pub fn iter(&self) -> Iter<T> {
1470 /// Returns an iterator that will attempt to yield all pending values.
1471 /// It will return `None` if there are no more pending values or if the
1472 /// channel has hung up. The iterator will never [`panic!`] or block the
1473 /// user by waiting for values.
1475 /// [`panic!`]: ../../../std/macro.panic.html
1480 /// use std::sync::mpsc::channel;
1481 /// use std::thread;
1482 /// use std::time::Duration;
1484 /// let (sender, receiver) = channel();
1486 /// // nothing is in the buffer yet
1487 /// assert!(receiver.try_iter().next().is_none());
1489 /// thread::spawn(move || {
1490 /// thread::sleep(Duration::from_secs(1));
1491 /// sender.send(1).unwrap();
1492 /// sender.send(2).unwrap();
1493 /// sender.send(3).unwrap();
1496 /// // nothing is in the buffer yet
1497 /// assert!(receiver.try_iter().next().is_none());
1499 /// // block for two seconds
1500 /// thread::sleep(Duration::from_secs(2));
1502 /// let mut iter = receiver.try_iter();
1503 /// assert_eq!(iter.next(), Some(1));
1504 /// assert_eq!(iter.next(), Some(2));
1505 /// assert_eq!(iter.next(), Some(3));
1506 /// assert_eq!(iter.next(), None);
1508 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1509 pub fn try_iter(&self) -> TryIter<T> {
1510 TryIter { rx: self }
1515 impl<T> select::Packet for Receiver<T> {
1516 fn can_recv(&self) -> bool {
1518 let new_port = match *unsafe { self.inner() } {
1519 Flavor::Oneshot(ref p) => {
1520 match p.can_recv() {
1521 Ok(ret) => return ret,
1522 Err(upgrade) => upgrade,
1525 Flavor::Stream(ref p) => {
1526 match p.can_recv() {
1527 Ok(ret) => return ret,
1528 Err(upgrade) => upgrade,
1531 Flavor::Shared(ref p) => return p.can_recv(),
1532 Flavor::Sync(ref p) => return p.can_recv(),
1535 mem::swap(self.inner_mut(),
1536 new_port.inner_mut());
1541 fn start_selection(&self, mut token: SignalToken) -> StartResult {
1543 let (t, new_port) = match *unsafe { self.inner() } {
1544 Flavor::Oneshot(ref p) => {
1545 match p.start_selection(token) {
1546 oneshot::SelSuccess => return Installed,
1547 oneshot::SelCanceled => return Abort,
1548 oneshot::SelUpgraded(t, rx) => (t, rx),
1551 Flavor::Stream(ref p) => {
1552 match p.start_selection(token) {
1553 stream::SelSuccess => return Installed,
1554 stream::SelCanceled => return Abort,
1555 stream::SelUpgraded(t, rx) => (t, rx),
1558 Flavor::Shared(ref p) => return p.start_selection(token),
1559 Flavor::Sync(ref p) => return p.start_selection(token),
1563 mem::swap(self.inner_mut(), new_port.inner_mut());
1568 fn abort_selection(&self) -> bool {
1569 let mut was_upgrade = false;
1571 let result = match *unsafe { self.inner() } {
1572 Flavor::Oneshot(ref p) => p.abort_selection(),
1573 Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
1574 Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
1575 Flavor::Sync(ref p) => return p.abort_selection(),
1577 let new_port = match result { Ok(b) => return b, Err(p) => p };
1580 mem::swap(self.inner_mut(),
1581 new_port.inner_mut());
1587 #[stable(feature = "rust1", since = "1.0.0")]
1588 impl<'a, T> Iterator for Iter<'a, T> {
1591 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1594 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1595 impl<'a, T> Iterator for TryIter<'a, T> {
1598 fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
1601 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1602 impl<'a, T> IntoIterator for &'a Receiver<T> {
1604 type IntoIter = Iter<'a, T>;
1606 fn into_iter(self) -> Iter<'a, T> { self.iter() }
1609 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1610 impl<T> Iterator for IntoIter<T> {
1612 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1615 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1616 impl <T> IntoIterator for Receiver<T> {
1618 type IntoIter = IntoIter<T>;
1620 fn into_iter(self) -> IntoIter<T> {
1621 IntoIter { rx: self }
1625 #[stable(feature = "rust1", since = "1.0.0")]
1626 impl<T> Drop for Receiver<T> {
1627 fn drop(&mut self) {
1628 match *unsafe { self.inner() } {
1629 Flavor::Oneshot(ref p) => p.drop_port(),
1630 Flavor::Stream(ref p) => p.drop_port(),
1631 Flavor::Shared(ref p) => p.drop_port(),
1632 Flavor::Sync(ref p) => p.drop_port(),
1637 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1638 impl<T> fmt::Debug for Receiver<T> {
1639 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1640 f.debug_struct("Receiver").finish()
1644 #[stable(feature = "rust1", since = "1.0.0")]
1645 impl<T> fmt::Debug for SendError<T> {
1646 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1647 "SendError(..)".fmt(f)
1651 #[stable(feature = "rust1", since = "1.0.0")]
1652 impl<T> fmt::Display for SendError<T> {
1653 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1654 "sending on a closed channel".fmt(f)
1658 #[stable(feature = "rust1", since = "1.0.0")]
1659 impl<T: Send> error::Error for SendError<T> {
1660 fn description(&self) -> &str {
1661 "sending on a closed channel"
1664 fn cause(&self) -> Option<&dyn error::Error> {
1669 #[stable(feature = "rust1", since = "1.0.0")]
1670 impl<T> fmt::Debug for TrySendError<T> {
1671 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1673 TrySendError::Full(..) => "Full(..)".fmt(f),
1674 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1679 #[stable(feature = "rust1", since = "1.0.0")]
1680 impl<T> fmt::Display for TrySendError<T> {
1681 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1683 TrySendError::Full(..) => {
1684 "sending on a full channel".fmt(f)
1686 TrySendError::Disconnected(..) => {
1687 "sending on a closed channel".fmt(f)
1693 #[stable(feature = "rust1", since = "1.0.0")]
1694 impl<T: Send> error::Error for TrySendError<T> {
1696 fn description(&self) -> &str {
1698 TrySendError::Full(..) => {
1699 "sending on a full channel"
1701 TrySendError::Disconnected(..) => {
1702 "sending on a closed channel"
1707 fn cause(&self) -> Option<&dyn error::Error> {
1712 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1713 impl<T> From<SendError<T>> for TrySendError<T> {
1714 fn from(err: SendError<T>) -> TrySendError<T> {
1716 SendError(t) => TrySendError::Disconnected(t),
1721 #[stable(feature = "rust1", since = "1.0.0")]
1722 impl fmt::Display for RecvError {
1723 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1724 "receiving on a closed channel".fmt(f)
1728 #[stable(feature = "rust1", since = "1.0.0")]
1729 impl error::Error for RecvError {
1731 fn description(&self) -> &str {
1732 "receiving on a closed channel"
1735 fn cause(&self) -> Option<&dyn error::Error> {
1740 #[stable(feature = "rust1", since = "1.0.0")]
1741 impl fmt::Display for TryRecvError {
1742 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1744 TryRecvError::Empty => {
1745 "receiving on an empty channel".fmt(f)
1747 TryRecvError::Disconnected => {
1748 "receiving on a closed channel".fmt(f)
1754 #[stable(feature = "rust1", since = "1.0.0")]
1755 impl error::Error for TryRecvError {
1757 fn description(&self) -> &str {
1759 TryRecvError::Empty => {
1760 "receiving on an empty channel"
1762 TryRecvError::Disconnected => {
1763 "receiving on a closed channel"
1768 fn cause(&self) -> Option<&dyn error::Error> {
1773 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1774 impl From<RecvError> for TryRecvError {
1775 fn from(err: RecvError) -> TryRecvError {
1777 RecvError => TryRecvError::Disconnected,
1782 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1783 impl fmt::Display for RecvTimeoutError {
1784 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1786 RecvTimeoutError::Timeout => {
1787 "timed out waiting on channel".fmt(f)
1789 RecvTimeoutError::Disconnected => {
1790 "channel is empty and sending half is closed".fmt(f)
1796 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1797 impl error::Error for RecvTimeoutError {
1798 fn description(&self) -> &str {
1800 RecvTimeoutError::Timeout => {
1801 "timed out waiting on channel"
1803 RecvTimeoutError::Disconnected => {
1804 "channel is empty and sending half is closed"
1809 fn cause(&self) -> Option<&dyn error::Error> {
1814 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1815 impl From<RecvError> for RecvTimeoutError {
1816 fn from(err: RecvError) -> RecvTimeoutError {
1818 RecvError => RecvTimeoutError::Disconnected,
1823 #[cfg(all(test, not(target_os = "emscripten")))]
1828 use crate::time::{Duration, Instant};
1830 pub fn stress_factor() -> usize {
1831 match env::var("RUST_TEST_STRESS") {
1832 Ok(val) => val.parse().unwrap(),
1839 let (tx, rx) = channel::<i32>();
1840 tx.send(1).unwrap();
1841 assert_eq!(rx.recv().unwrap(), 1);
1846 let (tx, _rx) = channel::<Box<isize>>();
1847 tx.send(box 1).unwrap();
1851 fn drop_full_shared() {
1852 let (tx, _rx) = channel::<Box<isize>>();
1855 tx.send(box 1).unwrap();
1860 let (tx, rx) = channel::<i32>();
1861 tx.send(1).unwrap();
1862 assert_eq!(rx.recv().unwrap(), 1);
1863 let tx = tx.clone();
1864 tx.send(1).unwrap();
1865 assert_eq!(rx.recv().unwrap(), 1);
1869 fn smoke_threads() {
1870 let (tx, rx) = channel::<i32>();
1871 let _t = thread::spawn(move|| {
1872 tx.send(1).unwrap();
1874 assert_eq!(rx.recv().unwrap(), 1);
1878 fn smoke_port_gone() {
1879 let (tx, rx) = channel::<i32>();
1881 assert!(tx.send(1).is_err());
1885 fn smoke_shared_port_gone() {
1886 let (tx, rx) = channel::<i32>();
1888 assert!(tx.send(1).is_err())
1892 fn smoke_shared_port_gone2() {
1893 let (tx, rx) = channel::<i32>();
1895 let tx2 = tx.clone();
1897 assert!(tx2.send(1).is_err());
1901 fn port_gone_concurrent() {
1902 let (tx, rx) = channel::<i32>();
1903 let _t = thread::spawn(move|| {
1906 while tx.send(1).is_ok() {}
1910 fn port_gone_concurrent_shared() {
1911 let (tx, rx) = channel::<i32>();
1912 let tx2 = tx.clone();
1913 let _t = thread::spawn(move|| {
1916 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1920 fn smoke_chan_gone() {
1921 let (tx, rx) = channel::<i32>();
1923 assert!(rx.recv().is_err());
1927 fn smoke_chan_gone_shared() {
1928 let (tx, rx) = channel::<()>();
1929 let tx2 = tx.clone();
1932 assert!(rx.recv().is_err());
1936 fn chan_gone_concurrent() {
1937 let (tx, rx) = channel::<i32>();
1938 let _t = thread::spawn(move|| {
1939 tx.send(1).unwrap();
1940 tx.send(1).unwrap();
1942 while rx.recv().is_ok() {}
1947 let (tx, rx) = channel::<i32>();
1948 let t = thread::spawn(move|| {
1949 for _ in 0..10000 { tx.send(1).unwrap(); }
1952 assert_eq!(rx.recv().unwrap(), 1);
1954 t.join().ok().unwrap();
1958 fn stress_shared() {
1959 const AMT: u32 = 10000;
1960 const NTHREADS: u32 = 8;
1961 let (tx, rx) = channel::<i32>();
1963 let t = thread::spawn(move|| {
1964 for _ in 0..AMT * NTHREADS {
1965 assert_eq!(rx.recv().unwrap(), 1);
1967 match rx.try_recv() {
1973 for _ in 0..NTHREADS {
1974 let tx = tx.clone();
1975 thread::spawn(move|| {
1976 for _ in 0..AMT { tx.send(1).unwrap(); }
1980 t.join().ok().unwrap();
1984 fn send_from_outside_runtime() {
1985 let (tx1, rx1) = channel::<()>();
1986 let (tx2, rx2) = channel::<i32>();
1987 let t1 = thread::spawn(move|| {
1988 tx1.send(()).unwrap();
1990 assert_eq!(rx2.recv().unwrap(), 1);
1993 rx1.recv().unwrap();
1994 let t2 = thread::spawn(move|| {
1996 tx2.send(1).unwrap();
1999 t1.join().ok().unwrap();
2000 t2.join().ok().unwrap();
2004 fn recv_from_outside_runtime() {
2005 let (tx, rx) = channel::<i32>();
2006 let t = thread::spawn(move|| {
2008 assert_eq!(rx.recv().unwrap(), 1);
2012 tx.send(1).unwrap();
2014 t.join().ok().unwrap();
2019 let (tx1, rx1) = channel::<i32>();
2020 let (tx2, rx2) = channel::<i32>();
2021 let t1 = thread::spawn(move|| {
2022 assert_eq!(rx1.recv().unwrap(), 1);
2023 tx2.send(2).unwrap();
2025 let t2 = thread::spawn(move|| {
2026 tx1.send(1).unwrap();
2027 assert_eq!(rx2.recv().unwrap(), 2);
2029 t1.join().ok().unwrap();
2030 t2.join().ok().unwrap();
2034 fn oneshot_single_thread_close_port_first() {
2035 // Simple test of closing without sending
2036 let (_tx, rx) = channel::<i32>();
2041 fn oneshot_single_thread_close_chan_first() {
2042 // Simple test of closing without sending
2043 let (tx, _rx) = channel::<i32>();
2048 fn oneshot_single_thread_send_port_close() {
2049 // Testing that the sender cleans up the payload if receiver is closed
2050 let (tx, rx) = channel::<Box<i32>>();
2052 assert!(tx.send(box 0).is_err());
2056 fn oneshot_single_thread_recv_chan_close() {
2057 // Receiving on a closed chan will panic
2058 let res = thread::spawn(move|| {
2059 let (tx, rx) = channel::<i32>();
2064 assert!(res.is_err());
2068 fn oneshot_single_thread_send_then_recv() {
2069 let (tx, rx) = channel::<Box<i32>>();
2070 tx.send(box 10).unwrap();
2071 assert!(*rx.recv().unwrap() == 10);
2075 fn oneshot_single_thread_try_send_open() {
2076 let (tx, rx) = channel::<i32>();
2077 assert!(tx.send(10).is_ok());
2078 assert!(rx.recv().unwrap() == 10);
2082 fn oneshot_single_thread_try_send_closed() {
2083 let (tx, rx) = channel::<i32>();
2085 assert!(tx.send(10).is_err());
2089 fn oneshot_single_thread_try_recv_open() {
2090 let (tx, rx) = channel::<i32>();
2091 tx.send(10).unwrap();
2092 assert!(rx.recv() == Ok(10));
2096 fn oneshot_single_thread_try_recv_closed() {
2097 let (tx, rx) = channel::<i32>();
2099 assert!(rx.recv().is_err());
2103 fn oneshot_single_thread_peek_data() {
2104 let (tx, rx) = channel::<i32>();
2105 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2106 tx.send(10).unwrap();
2107 assert_eq!(rx.try_recv(), Ok(10));
2111 fn oneshot_single_thread_peek_close() {
2112 let (tx, rx) = channel::<i32>();
2114 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2115 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2119 fn oneshot_single_thread_peek_open() {
2120 let (_tx, rx) = channel::<i32>();
2121 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2125 fn oneshot_multi_task_recv_then_send() {
2126 let (tx, rx) = channel::<Box<i32>>();
2127 let _t = thread::spawn(move|| {
2128 assert!(*rx.recv().unwrap() == 10);
2131 tx.send(box 10).unwrap();
2135 fn oneshot_multi_task_recv_then_close() {
2136 let (tx, rx) = channel::<Box<i32>>();
2137 let _t = thread::spawn(move|| {
2140 let res = thread::spawn(move|| {
2141 assert!(*rx.recv().unwrap() == 10);
2143 assert!(res.is_err());
2147 fn oneshot_multi_thread_close_stress() {
2148 for _ in 0..stress_factor() {
2149 let (tx, rx) = channel::<i32>();
2150 let _t = thread::spawn(move|| {
2158 fn oneshot_multi_thread_send_close_stress() {
2159 for _ in 0..stress_factor() {
2160 let (tx, rx) = channel::<i32>();
2161 let _t = thread::spawn(move|| {
2164 let _ = thread::spawn(move|| {
2165 tx.send(1).unwrap();
2171 fn oneshot_multi_thread_recv_close_stress() {
2172 for _ in 0..stress_factor() {
2173 let (tx, rx) = channel::<i32>();
2174 thread::spawn(move|| {
2175 let res = thread::spawn(move|| {
2178 assert!(res.is_err());
2180 let _t = thread::spawn(move|| {
2181 thread::spawn(move|| {
2189 fn oneshot_multi_thread_send_recv_stress() {
2190 for _ in 0..stress_factor() {
2191 let (tx, rx) = channel::<Box<isize>>();
2192 let _t = thread::spawn(move|| {
2193 tx.send(box 10).unwrap();
2195 assert!(*rx.recv().unwrap() == 10);
2200 fn stream_send_recv_stress() {
2201 for _ in 0..stress_factor() {
2202 let (tx, rx) = channel();
2207 fn send(tx: Sender<Box<i32>>, i: i32) {
2208 if i == 10 { return }
2210 thread::spawn(move|| {
2211 tx.send(box i).unwrap();
2216 fn recv(rx: Receiver<Box<i32>>, i: i32) {
2217 if i == 10 { return }
2219 thread::spawn(move|| {
2220 assert!(*rx.recv().unwrap() == i);
2228 fn oneshot_single_thread_recv_timeout() {
2229 let (tx, rx) = channel();
2230 tx.send(()).unwrap();
2231 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2232 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2233 tx.send(()).unwrap();
2234 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2238 fn stress_recv_timeout_two_threads() {
2239 let (tx, rx) = channel();
2240 let stress = stress_factor() + 100;
2241 let timeout = Duration::from_millis(100);
2243 thread::spawn(move || {
2244 for i in 0..stress {
2246 thread::sleep(timeout * 2);
2248 tx.send(1usize).unwrap();
2252 let mut recv_count = 0;
2254 match rx.recv_timeout(timeout) {
2256 assert_eq!(n, 1usize);
2259 Err(RecvTimeoutError::Timeout) => continue,
2260 Err(RecvTimeoutError::Disconnected) => break,
2264 assert_eq!(recv_count, stress);
2268 fn recv_timeout_upgrade() {
2269 let (tx, rx) = channel::<()>();
2270 let timeout = Duration::from_millis(1);
2271 let _tx_clone = tx.clone();
2273 let start = Instant::now();
2274 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
2275 assert!(Instant::now() >= start + timeout);
2279 fn stress_recv_timeout_shared() {
2280 let (tx, rx) = channel();
2281 let stress = stress_factor() + 100;
2283 for i in 0..stress {
2284 let tx = tx.clone();
2285 thread::spawn(move || {
2286 thread::sleep(Duration::from_millis(i as u64 * 10));
2287 tx.send(1usize).unwrap();
2293 let mut recv_count = 0;
2295 match rx.recv_timeout(Duration::from_millis(10)) {
2297 assert_eq!(n, 1usize);
2300 Err(RecvTimeoutError::Timeout) => continue,
2301 Err(RecvTimeoutError::Disconnected) => break,
2305 assert_eq!(recv_count, stress);
2309 fn very_long_recv_timeout_wont_panic() {
2310 let (tx, rx) = channel::<()>();
2311 let join_handle = thread::spawn(move || {
2312 rx.recv_timeout(Duration::from_secs(u64::max_value()))
2314 thread::sleep(Duration::from_secs(1));
2315 assert!(tx.send(()).is_ok());
2316 assert_eq!(join_handle.join().unwrap(), Ok(()));
2321 // Regression test that we don't run out of stack in scheduler context
2322 let (tx, rx) = channel();
2323 for _ in 0..10000 { tx.send(()).unwrap(); }
2324 for _ in 0..10000 { rx.recv().unwrap(); }
2328 fn shared_recv_timeout() {
2329 let (tx, rx) = channel();
2332 let tx = tx.clone();
2333 thread::spawn(move|| {
2334 tx.send(()).unwrap();
2338 for _ in 0..total { rx.recv().unwrap(); }
2340 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2341 tx.send(()).unwrap();
2342 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2346 fn shared_chan_stress() {
2347 let (tx, rx) = channel();
2348 let total = stress_factor() + 100;
2350 let tx = tx.clone();
2351 thread::spawn(move|| {
2352 tx.send(()).unwrap();
2362 fn test_nested_recv_iter() {
2363 let (tx, rx) = channel::<i32>();
2364 let (total_tx, total_rx) = channel::<i32>();
2366 let _t = thread::spawn(move|| {
2368 for x in rx.iter() {
2371 total_tx.send(acc).unwrap();
2374 tx.send(3).unwrap();
2375 tx.send(1).unwrap();
2376 tx.send(2).unwrap();
2378 assert_eq!(total_rx.recv().unwrap(), 6);
2382 fn test_recv_iter_break() {
2383 let (tx, rx) = channel::<i32>();
2384 let (count_tx, count_rx) = channel();
2386 let _t = thread::spawn(move|| {
2388 for x in rx.iter() {
2395 count_tx.send(count).unwrap();
2398 tx.send(2).unwrap();
2399 tx.send(2).unwrap();
2400 tx.send(2).unwrap();
2403 assert_eq!(count_rx.recv().unwrap(), 4);
2407 fn test_recv_try_iter() {
2408 let (request_tx, request_rx) = channel();
2409 let (response_tx, response_rx) = channel();
2411 // Request `x`s until we have `6`.
2412 let t = thread::spawn(move|| {
2415 for x in response_rx.try_iter() {
2421 request_tx.send(()).unwrap();
2425 for _ in request_rx.iter() {
2426 if response_tx.send(2).is_err() {
2431 assert_eq!(t.join().unwrap(), 6);
2435 fn test_recv_into_iter_owned() {
2437 let (tx, rx) = channel::<i32>();
2438 tx.send(1).unwrap();
2439 tx.send(2).unwrap();
2443 assert_eq!(iter.next().unwrap(), 1);
2444 assert_eq!(iter.next().unwrap(), 2);
2445 assert_eq!(iter.next().is_none(), true);
2449 fn test_recv_into_iter_borrowed() {
2450 let (tx, rx) = channel::<i32>();
2451 tx.send(1).unwrap();
2452 tx.send(2).unwrap();
2454 let mut iter = (&rx).into_iter();
2455 assert_eq!(iter.next().unwrap(), 1);
2456 assert_eq!(iter.next().unwrap(), 2);
2457 assert_eq!(iter.next().is_none(), true);
2461 fn try_recv_states() {
2462 let (tx1, rx1) = channel::<i32>();
2463 let (tx2, rx2) = channel::<()>();
2464 let (tx3, rx3) = channel::<()>();
2465 let _t = thread::spawn(move|| {
2466 rx2.recv().unwrap();
2467 tx1.send(1).unwrap();
2468 tx3.send(()).unwrap();
2469 rx2.recv().unwrap();
2471 tx3.send(()).unwrap();
2474 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2475 tx2.send(()).unwrap();
2476 rx3.recv().unwrap();
2477 assert_eq!(rx1.try_recv(), Ok(1));
2478 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2479 tx2.send(()).unwrap();
2480 rx3.recv().unwrap();
2481 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2484 // This bug used to end up in a livelock inside of the Receiver destructor
2485 // because the internal state of the Shared packet was corrupted
2487 fn destroy_upgraded_shared_port_when_sender_still_active() {
2488 let (tx, rx) = channel();
2489 let (tx2, rx2) = channel();
2490 let _t = thread::spawn(move|| {
2491 rx.recv().unwrap(); // wait on a oneshot
2492 drop(rx); // destroy a shared
2493 tx2.send(()).unwrap();
2495 // make sure the other thread has gone to sleep
2496 for _ in 0..5000 { thread::yield_now(); }
2498 // upgrade to a shared chan and send a message
2501 t.send(()).unwrap();
2503 // wait for the child thread to exit before we exit
2504 rx2.recv().unwrap();
2509 let (tx, _) = channel();
2510 let _ = tx.send(123);
2511 assert_eq!(tx.send(123), Err(SendError(123)));
2515 #[cfg(all(test, not(target_os = "emscripten")))]
2520 use crate::time::Duration;
2522 pub fn stress_factor() -> usize {
2523 match env::var("RUST_TEST_STRESS") {
2524 Ok(val) => val.parse().unwrap(),
2531 let (tx, rx) = sync_channel::<i32>(1);
2532 tx.send(1).unwrap();
2533 assert_eq!(rx.recv().unwrap(), 1);
2538 let (tx, _rx) = sync_channel::<Box<isize>>(1);
2539 tx.send(box 1).unwrap();
2544 let (tx, rx) = sync_channel::<i32>(1);
2545 tx.send(1).unwrap();
2546 assert_eq!(rx.recv().unwrap(), 1);
2547 let tx = tx.clone();
2548 tx.send(1).unwrap();
2549 assert_eq!(rx.recv().unwrap(), 1);
2554 let (tx, rx) = sync_channel::<i32>(1);
2555 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2556 tx.send(1).unwrap();
2557 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2561 fn smoke_threads() {
2562 let (tx, rx) = sync_channel::<i32>(0);
2563 let _t = thread::spawn(move|| {
2564 tx.send(1).unwrap();
2566 assert_eq!(rx.recv().unwrap(), 1);
2570 fn smoke_port_gone() {
2571 let (tx, rx) = sync_channel::<i32>(0);
2573 assert!(tx.send(1).is_err());
2577 fn smoke_shared_port_gone2() {
2578 let (tx, rx) = sync_channel::<i32>(0);
2580 let tx2 = tx.clone();
2582 assert!(tx2.send(1).is_err());
2586 fn port_gone_concurrent() {
2587 let (tx, rx) = sync_channel::<i32>(0);
2588 let _t = thread::spawn(move|| {
2591 while tx.send(1).is_ok() {}
2595 fn port_gone_concurrent_shared() {
2596 let (tx, rx) = sync_channel::<i32>(0);
2597 let tx2 = tx.clone();
2598 let _t = thread::spawn(move|| {
2601 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2605 fn smoke_chan_gone() {
2606 let (tx, rx) = sync_channel::<i32>(0);
2608 assert!(rx.recv().is_err());
2612 fn smoke_chan_gone_shared() {
2613 let (tx, rx) = sync_channel::<()>(0);
2614 let tx2 = tx.clone();
2617 assert!(rx.recv().is_err());
2621 fn chan_gone_concurrent() {
2622 let (tx, rx) = sync_channel::<i32>(0);
2623 thread::spawn(move|| {
2624 tx.send(1).unwrap();
2625 tx.send(1).unwrap();
2627 while rx.recv().is_ok() {}
2632 let (tx, rx) = sync_channel::<i32>(0);
2633 thread::spawn(move|| {
2634 for _ in 0..10000 { tx.send(1).unwrap(); }
2637 assert_eq!(rx.recv().unwrap(), 1);
2642 fn stress_recv_timeout_two_threads() {
2643 let (tx, rx) = sync_channel::<i32>(0);
2645 thread::spawn(move|| {
2646 for _ in 0..10000 { tx.send(1).unwrap(); }
2649 let mut recv_count = 0;
2651 match rx.recv_timeout(Duration::from_millis(1)) {
2656 Err(RecvTimeoutError::Timeout) => continue,
2657 Err(RecvTimeoutError::Disconnected) => break,
2661 assert_eq!(recv_count, 10000);
2665 fn stress_recv_timeout_shared() {
2666 const AMT: u32 = 1000;
2667 const NTHREADS: u32 = 8;
2668 let (tx, rx) = sync_channel::<i32>(0);
2669 let (dtx, drx) = sync_channel::<()>(0);
2671 thread::spawn(move|| {
2672 let mut recv_count = 0;
2674 match rx.recv_timeout(Duration::from_millis(10)) {
2679 Err(RecvTimeoutError::Timeout) => continue,
2680 Err(RecvTimeoutError::Disconnected) => break,
2684 assert_eq!(recv_count, AMT * NTHREADS);
2685 assert!(rx.try_recv().is_err());
2687 dtx.send(()).unwrap();
2690 for _ in 0..NTHREADS {
2691 let tx = tx.clone();
2692 thread::spawn(move|| {
2693 for _ in 0..AMT { tx.send(1).unwrap(); }
2699 drx.recv().unwrap();
2703 fn stress_shared() {
2704 const AMT: u32 = 1000;
2705 const NTHREADS: u32 = 8;
2706 let (tx, rx) = sync_channel::<i32>(0);
2707 let (dtx, drx) = sync_channel::<()>(0);
2709 thread::spawn(move|| {
2710 for _ in 0..AMT * NTHREADS {
2711 assert_eq!(rx.recv().unwrap(), 1);
2713 match rx.try_recv() {
2717 dtx.send(()).unwrap();
2720 for _ in 0..NTHREADS {
2721 let tx = tx.clone();
2722 thread::spawn(move|| {
2723 for _ in 0..AMT { tx.send(1).unwrap(); }
2727 drx.recv().unwrap();
2731 fn oneshot_single_thread_close_port_first() {
2732 // Simple test of closing without sending
2733 let (_tx, rx) = sync_channel::<i32>(0);
2738 fn oneshot_single_thread_close_chan_first() {
2739 // Simple test of closing without sending
2740 let (tx, _rx) = sync_channel::<i32>(0);
2745 fn oneshot_single_thread_send_port_close() {
2746 // Testing that the sender cleans up the payload if receiver is closed
2747 let (tx, rx) = sync_channel::<Box<i32>>(0);
2749 assert!(tx.send(box 0).is_err());
2753 fn oneshot_single_thread_recv_chan_close() {
2754 // Receiving on a closed chan will panic
2755 let res = thread::spawn(move|| {
2756 let (tx, rx) = sync_channel::<i32>(0);
2761 assert!(res.is_err());
2765 fn oneshot_single_thread_send_then_recv() {
2766 let (tx, rx) = sync_channel::<Box<i32>>(1);
2767 tx.send(box 10).unwrap();
2768 assert!(*rx.recv().unwrap() == 10);
2772 fn oneshot_single_thread_try_send_open() {
2773 let (tx, rx) = sync_channel::<i32>(1);
2774 assert_eq!(tx.try_send(10), Ok(()));
2775 assert!(rx.recv().unwrap() == 10);
2779 fn oneshot_single_thread_try_send_closed() {
2780 let (tx, rx) = sync_channel::<i32>(0);
2782 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2786 fn oneshot_single_thread_try_send_closed2() {
2787 let (tx, _rx) = sync_channel::<i32>(0);
2788 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2792 fn oneshot_single_thread_try_recv_open() {
2793 let (tx, rx) = sync_channel::<i32>(1);
2794 tx.send(10).unwrap();
2795 assert!(rx.recv() == Ok(10));
2799 fn oneshot_single_thread_try_recv_closed() {
2800 let (tx, rx) = sync_channel::<i32>(0);
2802 assert!(rx.recv().is_err());
2806 fn oneshot_single_thread_try_recv_closed_with_data() {
2807 let (tx, rx) = sync_channel::<i32>(1);
2808 tx.send(10).unwrap();
2810 assert_eq!(rx.try_recv(), Ok(10));
2811 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2815 fn oneshot_single_thread_peek_data() {
2816 let (tx, rx) = sync_channel::<i32>(1);
2817 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2818 tx.send(10).unwrap();
2819 assert_eq!(rx.try_recv(), Ok(10));
2823 fn oneshot_single_thread_peek_close() {
2824 let (tx, rx) = sync_channel::<i32>(0);
2826 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2827 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2831 fn oneshot_single_thread_peek_open() {
2832 let (_tx, rx) = sync_channel::<i32>(0);
2833 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2837 fn oneshot_multi_task_recv_then_send() {
2838 let (tx, rx) = sync_channel::<Box<i32>>(0);
2839 let _t = thread::spawn(move|| {
2840 assert!(*rx.recv().unwrap() == 10);
2843 tx.send(box 10).unwrap();
2847 fn oneshot_multi_task_recv_then_close() {
2848 let (tx, rx) = sync_channel::<Box<i32>>(0);
2849 let _t = thread::spawn(move|| {
2852 let res = thread::spawn(move|| {
2853 assert!(*rx.recv().unwrap() == 10);
2855 assert!(res.is_err());
2859 fn oneshot_multi_thread_close_stress() {
2860 for _ in 0..stress_factor() {
2861 let (tx, rx) = sync_channel::<i32>(0);
2862 let _t = thread::spawn(move|| {
2870 fn oneshot_multi_thread_send_close_stress() {
2871 for _ in 0..stress_factor() {
2872 let (tx, rx) = sync_channel::<i32>(0);
2873 let _t = thread::spawn(move|| {
2876 let _ = thread::spawn(move || {
2877 tx.send(1).unwrap();
2883 fn oneshot_multi_thread_recv_close_stress() {
2884 for _ in 0..stress_factor() {
2885 let (tx, rx) = sync_channel::<i32>(0);
2886 let _t = thread::spawn(move|| {
2887 let res = thread::spawn(move|| {
2890 assert!(res.is_err());
2892 let _t = thread::spawn(move|| {
2893 thread::spawn(move|| {
2901 fn oneshot_multi_thread_send_recv_stress() {
2902 for _ in 0..stress_factor() {
2903 let (tx, rx) = sync_channel::<Box<i32>>(0);
2904 let _t = thread::spawn(move|| {
2905 tx.send(box 10).unwrap();
2907 assert!(*rx.recv().unwrap() == 10);
2912 fn stream_send_recv_stress() {
2913 for _ in 0..stress_factor() {
2914 let (tx, rx) = sync_channel::<Box<i32>>(0);
2919 fn send(tx: SyncSender<Box<i32>>, i: i32) {
2920 if i == 10 { return }
2922 thread::spawn(move|| {
2923 tx.send(box i).unwrap();
2928 fn recv(rx: Receiver<Box<i32>>, i: i32) {
2929 if i == 10 { return }
2931 thread::spawn(move|| {
2932 assert!(*rx.recv().unwrap() == i);
2941 // Regression test that we don't run out of stack in scheduler context
2942 let (tx, rx) = sync_channel(10000);
2943 for _ in 0..10000 { tx.send(()).unwrap(); }
2944 for _ in 0..10000 { rx.recv().unwrap(); }
2948 fn shared_chan_stress() {
2949 let (tx, rx) = sync_channel(0);
2950 let total = stress_factor() + 100;
2952 let tx = tx.clone();
2953 thread::spawn(move|| {
2954 tx.send(()).unwrap();
2964 fn test_nested_recv_iter() {
2965 let (tx, rx) = sync_channel::<i32>(0);
2966 let (total_tx, total_rx) = sync_channel::<i32>(0);
2968 let _t = thread::spawn(move|| {
2970 for x in rx.iter() {
2973 total_tx.send(acc).unwrap();
2976 tx.send(3).unwrap();
2977 tx.send(1).unwrap();
2978 tx.send(2).unwrap();
2980 assert_eq!(total_rx.recv().unwrap(), 6);
2984 fn test_recv_iter_break() {
2985 let (tx, rx) = sync_channel::<i32>(0);
2986 let (count_tx, count_rx) = sync_channel(0);
2988 let _t = thread::spawn(move|| {
2990 for x in rx.iter() {
2997 count_tx.send(count).unwrap();
3000 tx.send(2).unwrap();
3001 tx.send(2).unwrap();
3002 tx.send(2).unwrap();
3003 let _ = tx.try_send(2);
3005 assert_eq!(count_rx.recv().unwrap(), 4);
3009 fn try_recv_states() {
3010 let (tx1, rx1) = sync_channel::<i32>(1);
3011 let (tx2, rx2) = sync_channel::<()>(1);
3012 let (tx3, rx3) = sync_channel::<()>(1);
3013 let _t = thread::spawn(move|| {
3014 rx2.recv().unwrap();
3015 tx1.send(1).unwrap();
3016 tx3.send(()).unwrap();
3017 rx2.recv().unwrap();
3019 tx3.send(()).unwrap();
3022 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
3023 tx2.send(()).unwrap();
3024 rx3.recv().unwrap();
3025 assert_eq!(rx1.try_recv(), Ok(1));
3026 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
3027 tx2.send(()).unwrap();
3028 rx3.recv().unwrap();
3029 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
3032 // This bug used to end up in a livelock inside of the Receiver destructor
3033 // because the internal state of the Shared packet was corrupted
3035 fn destroy_upgraded_shared_port_when_sender_still_active() {
3036 let (tx, rx) = sync_channel::<()>(0);
3037 let (tx2, rx2) = sync_channel::<()>(0);
3038 let _t = thread::spawn(move|| {
3039 rx.recv().unwrap(); // wait on a oneshot
3040 drop(rx); // destroy a shared
3041 tx2.send(()).unwrap();
3043 // make sure the other thread has gone to sleep
3044 for _ in 0..5000 { thread::yield_now(); }
3046 // upgrade to a shared chan and send a message
3049 t.send(()).unwrap();
3051 // wait for the child thread to exit before we exit
3052 rx2.recv().unwrap();
3057 let (tx, rx) = sync_channel::<i32>(0);
3058 let _t = thread::spawn(move|| { rx.recv().unwrap(); });
3059 assert_eq!(tx.send(1), Ok(()));
3064 let (tx, rx) = sync_channel::<i32>(0);
3065 let _t = thread::spawn(move|| { drop(rx); });
3066 assert!(tx.send(1).is_err());
3071 let (tx, rx) = sync_channel::<i32>(1);
3072 assert_eq!(tx.send(1), Ok(()));
3073 let _t =thread::spawn(move|| { drop(rx); });
3074 assert!(tx.send(1).is_err());
3079 let (tx, rx) = sync_channel::<i32>(0);
3080 let tx2 = tx.clone();
3081 let (done, donerx) = channel();
3082 let done2 = done.clone();
3083 let _t = thread::spawn(move|| {
3084 assert!(tx.send(1).is_err());
3085 done.send(()).unwrap();
3087 let _t = thread::spawn(move|| {
3088 assert!(tx2.send(2).is_err());
3089 done2.send(()).unwrap();
3092 donerx.recv().unwrap();
3093 donerx.recv().unwrap();
3098 let (tx, _rx) = sync_channel::<i32>(0);
3099 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3104 let (tx, _rx) = sync_channel::<i32>(1);
3105 assert_eq!(tx.try_send(1), Ok(()));
3106 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3111 let (tx, rx) = sync_channel::<i32>(1);
3112 assert_eq!(tx.try_send(1), Ok(()));
3114 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
3120 let (tx1, rx1) = sync_channel::<()>(3);
3121 let (tx2, rx2) = sync_channel::<()>(3);
3123 let _t = thread::spawn(move|| {
3124 rx1.recv().unwrap();
3125 tx2.try_send(()).unwrap();
3128 tx1.try_send(()).unwrap();
3129 rx2.recv().unwrap();