1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Multi-producer, single-consumer FIFO queue communication primitives.
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
20 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
24 //! These channels come in two flavors:
26 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
27 //! will return a `(Sender, Receiver)` tuple where all sends will be
28 //! **asynchronous** (they never block). The channel conceptually has an
31 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
32 //! return a `(SyncSender, Receiver)` tuple where the storage for pending
33 //! messages is a pre-allocated buffer of a fixed size. All sends will be
34 //! **synchronous** by blocking until there is buffer space available. Note
35 //! that a bound of 0 is allowed, causing the channel to become a "rendezvous"
36 //! channel where each sender atomically hands off a message to a receiver.
38 //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
39 //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
40 //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
41 //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
42 //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
43 //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
47 //! The send and receive operations on channels will all return a [`Result`]
48 //! indicating whether the operation succeeded or not. An unsuccessful operation
49 //! is normally indicative of the other half of a channel having "hung up" by
50 //! being dropped in its corresponding thread.
52 //! Once half of a channel has been deallocated, most operations can no longer
53 //! continue to make progress, so [`Err`] will be returned. Many applications
54 //! will continue to [`unwrap`] the results returned from this module,
55 //! instigating a propagation of failure among threads if one unexpectedly dies.
57 //! [`Result`]: ../../../std/result/enum.Result.html
58 //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
59 //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
67 //! use std::sync::mpsc::channel;
69 //! // Create a simple streaming channel
70 //! let (tx, rx) = channel();
71 //! thread::spawn(move|| {
72 //! tx.send(10).unwrap();
74 //! assert_eq!(rx.recv().unwrap(), 10);
81 //! use std::sync::mpsc::channel;
83 //! // Create a shared channel that can be sent along from many threads
84 //! // where tx is the sending half (tx for transmission), and rx is the receiving
85 //! // half (rx for receiving).
86 //! let (tx, rx) = channel();
88 //! let tx = tx.clone();
89 //! thread::spawn(move|| {
90 //! tx.send(i).unwrap();
95 //! let j = rx.recv().unwrap();
96 //! assert!(0 <= j && j < 10);
100 //! Propagating panics:
103 //! use std::sync::mpsc::channel;
105 //! // The call to recv() will return an error because the channel has already
106 //! // hung up (or been deallocated)
107 //! let (tx, rx) = channel::<i32>();
109 //! assert!(rx.recv().is_err());
112 //! Synchronous channels:
116 //! use std::sync::mpsc::sync_channel;
118 //! let (tx, rx) = sync_channel::<i32>(0);
119 //! thread::spawn(move|| {
120 //! // This will wait for the parent thread to start receiving
121 //! tx.send(53).unwrap();
123 //! rx.recv().unwrap();
126 #![stable(feature = "rust1", since = "1.0.0")]
128 // A description of how Rust's channel implementation works
130 // Channels are supposed to be the basic building block for all other
131 // concurrent primitives that are used in Rust. As a result, the channel type
132 // needs to be highly optimized, flexible, and broad enough for use everywhere.
134 // The choice of implementation of all channels is to be built on lock-free data
135 // structures. The channels themselves are then consequently also lock-free data
136 // structures. As always with lock-free code, this is a very "here be dragons"
137 // territory, especially because I'm unaware of any academic papers that have
138 // gone into great length about channels of these flavors.
140 // ## Flavors of channels
142 // From the perspective of a consumer of this library, there is only one flavor
143 // of channel. This channel can be used as a stream and cloned to allow multiple
144 // senders. Under the hood, however, there are actually three flavors of
147 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
148 // case. They contain as few atomics as possible and
149 // involve one and exactly one allocation.
150 // * Streams - these channels are optimized for the non-shared use case. They
151 // use a different concurrent queue that is more tailored for this
152 // use case. The initial allocation of this flavor of channel is not
154 // * Shared - this is the most general form of channel that this module offers,
155 // a channel with multiple senders. This type is as optimized as it
156 // can be, but the previous two types mentioned are much faster for
159 // ## Concurrent queues
161 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
162 // but recv() obviously blocks. This means that under the hood there must be
163 // some shared and concurrent queue holding all of the actual data.
165 // With two flavors of channels, two flavors of queues are also used. We have
166 // chosen to use queues from a well-known author that are abbreviated as SPSC
167 // and MPSC (single producer, single consumer and multiple producer, single
168 // consumer). SPSC queues are used for streams while MPSC queues are used for
171 // ### SPSC optimizations
173 // The SPSC queue found online is essentially a linked list of nodes where one
174 // half of the nodes are the "queue of data" and the other half of nodes are a
175 // cache of unused nodes. The unused nodes are used such that an allocation is
176 // not required on every push() and a free doesn't need to happen on every
179 // As found online, however, the cache of nodes is of an infinite size. This
180 // means that if a channel at one point in its life had 50k items in the queue,
181 // then the queue will always have the capacity for 50k items. I believed that
182 // this was an unnecessary limitation of the implementation, so I have altered
183 // the queue to optionally have a bound on the cache size.
185 // By default, streams will have an unbounded SPSC queue with a small-ish cache
186 // size. The hope is that the cache is still large enough to have very fast
187 // send() operations while not too large such that millions of channels can
190 // ### MPSC optimizations
192 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
193 // a linked list under the hood to earn its unboundedness, but I have not put
194 // forth much effort into having a cache of nodes similar to the SPSC queue.
196 // For now, I believe that this is "ok" because shared channels are not the most
197 // common type, but soon we may wish to revisit this queue choice and determine
198 // another candidate for backend storage of shared channels.
200 // ## Overview of the Implementation
202 // Now that there's a little background on the concurrent queues used, it's
203 // worth going into much more detail about the channels themselves. The basic
204 // pseudocode for a send/recv are:
208 // queue.push(t) return if queue.pop()
209 // if increment() == -1 deschedule {
210 // wakeup() if decrement() > 0
211 // cancel_deschedule()
215 // As mentioned before, there are no locks in this implementation, only atomic
216 // instructions are used.
218 // ### The internal atomic counter
220 // Every channel has a shared counter with each half to keep track of the size
221 // of the queue. This counter is used to abort descheduling by the receiver and
222 // to know when to wake up on the sending side.
224 // As seen in the pseudocode, senders will increment this count and receivers
225 // will decrement the count. The theory behind this is that if a sender sees a
226 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
227 // then it doesn't need to block.
229 // The recv() method has a beginning call to pop(), and if successful, it needs
230 // to decrement the count. It is a crucial implementation detail that this
231 // decrement does *not* happen to the shared counter. If this were the case,
232 // then it would be possible for the counter to be very negative when there were
233 // no receivers waiting, in which case the senders would have to determine when
234 // it was actually appropriate to wake up a receiver.
236 // Instead, the "steal count" is kept track of separately (not atomically
237 // because it's only used by receivers), and then the decrement() call when
238 // descheduling will lump in all of the recent steals into one large decrement.
240 // The implication of this is that if a sender sees a -1 count, then there's
241 // guaranteed to be a waiter waiting!
243 // ## Native Implementation
245 // A major goal of these channels is to work seamlessly on and off the runtime.
246 // All of the previous race conditions have been worded in terms of
247 // scheduler-isms (which is obviously not available without the runtime).
249 // For now, native usage of channels (off the runtime) will fall back onto
250 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
251 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
252 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
253 // condition variable.
257 // Being able to support selection over channels has greatly influenced this
258 // design, and not only does selection need to work inside the runtime, but also
259 // outside the runtime.
261 // The implementation is fairly straightforward. The goal of select() is not to
262 // return some data, but only to return which channel can receive data without
263 // blocking. The implementation is essentially the entire blocking procedure
264 // followed by an increment as soon as its woken up. The cancellation procedure
265 // involves an increment and swapping out of to_wake to acquire ownership of the
266 // thread to unblock.
268 // Sadly this current implementation requires multiple allocations, so I have
269 // seen the throughput of select() be much worse than it should be. I do not
270 // believe that there is anything fundamental that needs to change about these
271 // channels, however, in order to support a more efficient select().
275 // And now that you've seen all the races that I found and attempted to fix,
276 // here's the code for you to find some more!
282 use cell::UnsafeCell;
283 use time::{Duration, Instant};
285 #[unstable(feature = "mpsc_select", issue = "27800")]
286 pub use self::select::{Select, Handle};
287 use self::select::StartResult;
288 use self::select::StartResult::*;
289 use self::blocking::SignalToken;
302 /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
303 /// This half can only be owned by one thread.
305 /// Messages sent to the channel can be retrieved using [`recv`].
307 /// [`channel`]: fn.channel.html
308 /// [`sync_channel`]: fn.sync_channel.html
309 /// [`recv`]: struct.Receiver.html#method.recv
314 /// use std::sync::mpsc::channel;
316 /// use std::time::Duration;
318 /// let (send, recv) = channel();
320 /// thread::spawn(move || {
321 /// send.send("Hello world!").unwrap();
322 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
323 /// send.send("Delayed for 2 seconds").unwrap();
326 /// println!("{}", recv.recv().unwrap()); // Received immediately
327 /// println!("Waiting...");
328 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
330 #[stable(feature = "rust1", since = "1.0.0")]
331 pub struct Receiver<T> {
332 inner: UnsafeCell<Flavor<T>>,
335 // The receiver port can be sent from place to place, so long as it
336 // is not used to receive non-sendable things.
337 #[stable(feature = "rust1", since = "1.0.0")]
338 unsafe impl<T: Send> Send for Receiver<T> { }
340 #[stable(feature = "rust1", since = "1.0.0")]
341 impl<T> !Sync for Receiver<T> { }
343 /// An iterator over messages on a [`Receiver`], created by [`iter`].
345 /// This iterator will block whenever [`next`] is called,
346 /// waiting for a new message, and [`None`] will be returned
347 /// when the corresponding channel has hung up.
349 /// [`iter`]: struct.Receiver.html#method.iter
350 /// [`Receiver`]: struct.Receiver.html
351 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
352 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
357 /// use std::sync::mpsc::channel;
360 /// let (send, recv) = channel();
362 /// thread::spawn(move || {
363 /// send.send(1u8).unwrap();
364 /// send.send(2u8).unwrap();
365 /// send.send(3u8).unwrap();
368 /// for x in recv.iter() {
369 /// println!("Got: {}", x);
372 #[stable(feature = "rust1", since = "1.0.0")]
374 pub struct Iter<'a, T: 'a> {
378 /// An iterator that attempts to yield all pending values for a [`Receiver`],
379 /// created by [`try_iter`].
381 /// [`None`] will be returned when there are no pending values remaining or
382 /// if the corresponding channel has hung up.
384 /// This iterator will never block the caller in order to wait for data to
385 /// become available. Instead, it will return [`None`].
387 /// [`Receiver`]: struct.Receiver.html
388 /// [`try_iter`]: struct.Receiver.html#method.try_iter
389 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
394 /// use std::sync::mpsc::channel;
396 /// use std::time::Duration;
398 /// let (sender, receiver) = channel();
400 /// // Nothing is in the buffer yet
401 /// assert!(receiver.try_iter().next().is_none());
402 /// println!("Nothing in the buffer...");
404 /// thread::spawn(move || {
405 /// sender.send(1).unwrap();
406 /// sender.send(2).unwrap();
407 /// sender.send(3).unwrap();
410 /// println!("Going to sleep...");
411 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
413 /// for x in receiver.try_iter() {
414 /// println!("Got: {}", x);
417 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
419 pub struct TryIter<'a, T: 'a> {
423 /// An owning iterator over messages on a [`Receiver`],
424 /// created by **Receiver::into_iter**.
426 /// This iterator will block whenever [`next`]
427 /// is called, waiting for a new message, and [`None`] will be
428 /// returned if the corresponding channel has hung up.
430 /// [`Receiver`]: struct.Receiver.html
431 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
432 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
437 /// use std::sync::mpsc::channel;
440 /// let (send, recv) = channel();
442 /// thread::spawn(move || {
443 /// send.send(1u8).unwrap();
444 /// send.send(2u8).unwrap();
445 /// send.send(3u8).unwrap();
448 /// for x in recv.into_iter() {
449 /// println!("Got: {}", x);
452 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
454 pub struct IntoIter<T> {
458 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
459 /// owned by one thread, but it can be cloned to send to other threads.
461 /// Messages can be sent through this channel with [`send`].
463 /// [`channel`]: fn.channel.html
464 /// [`send`]: struct.Sender.html#method.send
469 /// use std::sync::mpsc::channel;
472 /// let (sender, receiver) = channel();
473 /// let sender2 = sender.clone();
475 /// // First thread owns sender
476 /// thread::spawn(move || {
477 /// sender.send(1).unwrap();
480 /// // Second thread owns sender2
481 /// thread::spawn(move || {
482 /// sender2.send(2).unwrap();
485 /// let msg = receiver.recv().unwrap();
486 /// let msg2 = receiver.recv().unwrap();
488 /// assert_eq!(3, msg + msg2);
490 #[stable(feature = "rust1", since = "1.0.0")]
491 pub struct Sender<T> {
492 inner: UnsafeCell<Flavor<T>>,
495 // The send port can be sent from place to place, so long as it
496 // is not used to send non-sendable things.
497 #[stable(feature = "rust1", since = "1.0.0")]
498 unsafe impl<T: Send> Send for Sender<T> { }
500 #[stable(feature = "rust1", since = "1.0.0")]
501 impl<T> !Sync for Sender<T> { }
503 /// The sending-half of Rust's synchronous [`sync_channel`] type.
505 /// Messages can be sent through this channel with [`send`] or [`try_send`].
507 /// [`send`] will block if there is no space in the internal buffer.
509 /// [`sync_channel`]: fn.sync_channel.html
510 /// [`send`]: struct.SyncSender.html#method.send
511 /// [`try_send`]: struct.SyncSender.html#method.try_send
516 /// use std::sync::mpsc::sync_channel;
519 /// // Create a sync_channel with buffer size 2
520 /// let (sync_sender, receiver) = sync_channel(2);
521 /// let sync_sender2 = sync_sender.clone();
523 /// // First thread owns sync_sender
524 /// thread::spawn(move || {
525 /// sync_sender.send(1).unwrap();
526 /// sync_sender.send(2).unwrap();
529 /// // Second thread owns sync_sender2
530 /// thread::spawn(move || {
531 /// sync_sender2.send(3).unwrap();
532 /// // thread will now block since the buffer is full
533 /// println!("Thread unblocked!");
538 /// msg = receiver.recv().unwrap();
539 /// println!("message {} received", msg);
541 /// // "Thread unblocked!" will be printed now
543 /// msg = receiver.recv().unwrap();
544 /// println!("message {} received", msg);
546 /// msg = receiver.recv().unwrap();
548 /// println!("message {} received", msg);
550 #[stable(feature = "rust1", since = "1.0.0")]
551 pub struct SyncSender<T> {
552 inner: Arc<sync::Packet<T>>,
555 #[stable(feature = "rust1", since = "1.0.0")]
556 unsafe impl<T: Send> Send for SyncSender<T> {}
558 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
559 /// function on **channel**s.
561 /// A **send** operation can only fail if the receiving end of a channel is
562 /// disconnected, implying that the data could never be received. The error
563 /// contains the data being sent as a payload so it can be recovered.
565 /// [`Sender::send`]: struct.Sender.html#method.send
566 /// [`SyncSender::send`]: struct.SyncSender.html#method.send
567 #[stable(feature = "rust1", since = "1.0.0")]
568 #[derive(PartialEq, Eq, Clone, Copy)]
569 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
571 /// An error returned from the [`recv`] function on a [`Receiver`].
573 /// The [`recv`] operation can only fail if the sending half of a
574 /// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
575 /// messages will ever be received.
577 /// [`recv`]: struct.Receiver.html#method.recv
578 /// [`Receiver`]: struct.Receiver.html
579 /// [`channel`]: fn.channel.html
580 /// [`sync_channel`]: fn.sync_channel.html
581 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
582 #[stable(feature = "rust1", since = "1.0.0")]
583 pub struct RecvError;
585 /// This enumeration is the list of the possible reasons that [`try_recv`] could
586 /// not return data when called. This can occur with both a [`channel`] and
587 /// a [`sync_channel`].
589 /// [`try_recv`]: struct.Receiver.html#method.try_recv
590 /// [`channel`]: fn.channel.html
591 /// [`sync_channel`]: fn.sync_channel.html
592 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
593 #[stable(feature = "rust1", since = "1.0.0")]
594 pub enum TryRecvError {
595 /// This **channel** is currently empty, but the **Sender**(s) have not yet
596 /// disconnected, so data may yet become available.
597 #[stable(feature = "rust1", since = "1.0.0")]
600 /// The **channel**'s sending half has become disconnected, and there will
601 /// never be any more data received on it.
602 #[stable(feature = "rust1", since = "1.0.0")]
606 /// This enumeration is the list of possible errors that made [`recv_timeout`]
607 /// unable to return data when called. This can occur with both a [`channel`] and
608 /// a [`sync_channel`].
610 /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
611 /// [`channel`]: fn.channel.html
612 /// [`sync_channel`]: fn.sync_channel.html
613 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
614 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
615 pub enum RecvTimeoutError {
616 /// This **channel** is currently empty, but the **Sender**(s) have not yet
617 /// disconnected, so data may yet become available.
618 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
620 /// The **channel**'s sending half has become disconnected, and there will
621 /// never be any more data received on it.
622 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
626 /// This enumeration is the list of the possible error outcomes for the
627 /// [`try_send`] method.
629 /// [`try_send`]: struct.SyncSender.html#method.try_send
630 #[stable(feature = "rust1", since = "1.0.0")]
631 #[derive(PartialEq, Eq, Clone, Copy)]
632 pub enum TrySendError<T> {
633 /// The data could not be sent on the [`sync_channel`] because it would require that
634 /// the callee block to send the data.
636 /// If this is a buffered channel, then the buffer is full at this time. If
637 /// this is not a buffered channel, then there is no [`Receiver`] available to
638 /// acquire the data.
640 /// [`sync_channel`]: fn.sync_channel.html
641 /// [`Receiver`]: struct.Receiver.html
642 #[stable(feature = "rust1", since = "1.0.0")]
643 Full(#[stable(feature = "rust1", since = "1.0.0")] T),
645 /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
646 /// sent. The data is returned back to the callee in this case.
648 /// [`sync_channel`]: fn.sync_channel.html
649 #[stable(feature = "rust1", since = "1.0.0")]
650 Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
654 Oneshot(Arc<oneshot::Packet<T>>),
655 Stream(Arc<stream::Packet<T>>),
656 Shared(Arc<shared::Packet<T>>),
657 Sync(Arc<sync::Packet<T>>),
661 trait UnsafeFlavor<T> {
662 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
663 unsafe fn inner_mut(&self) -> &mut Flavor<T> {
664 &mut *self.inner_unsafe().get()
666 unsafe fn inner(&self) -> &Flavor<T> {
667 &*self.inner_unsafe().get()
670 impl<T> UnsafeFlavor<T> for Sender<T> {
671 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
675 impl<T> UnsafeFlavor<T> for Receiver<T> {
676 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
681 /// Creates a new asynchronous channel, returning the sender/receiver halves.
682 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
683 /// the same order as it was sent, and no [`send`] will block the calling thread
684 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
685 /// block after its buffer limit is reached). [`recv`] will block until a message
688 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
689 /// only one [`Receiver`] is supported.
691 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
692 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
693 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
694 /// return a [`RecvError`].
696 /// [`send`]: struct.Sender.html#method.send
697 /// [`recv`]: struct.Receiver.html#method.recv
698 /// [`Sender`]: struct.Sender.html
699 /// [`Receiver`]: struct.Receiver.html
700 /// [`sync_channel`]: fn.sync_channel.html
701 /// [`SendError`]: struct.SendError.html
702 /// [`RecvError`]: struct.RecvError.html
707 /// use std::sync::mpsc::channel;
710 /// let (sender, receiver) = channel();
712 /// // Spawn off an expensive computation
713 /// thread::spawn(move|| {
714 /// # fn expensive_computation() {}
715 /// sender.send(expensive_computation()).unwrap();
718 /// // Do some useful work for awhile
720 /// // Let's see what that answer was
721 /// println!("{:?}", receiver.recv().unwrap());
723 #[stable(feature = "rust1", since = "1.0.0")]
724 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
725 let a = Arc::new(oneshot::Packet::new());
726 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
729 /// Creates a new synchronous, bounded channel.
730 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
731 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
732 /// [`Receiver`] will block until a message becomes available. `sync_channel`
733 /// differs greatly in the semantics of the sender, however.
735 /// This channel has an internal buffer on which messages will be queued.
736 /// `bound` specifies the buffer size. When the internal buffer becomes full,
737 /// future sends will *block* waiting for the buffer to open up. Note that a
738 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
739 /// where each [`send`] will not return until a [`recv`] is paired with it.
741 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
742 /// times, but only one [`Receiver`] is supported.
744 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
745 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
746 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
747 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
749 /// [`channel`]: fn.channel.html
750 /// [`send`]: struct.SyncSender.html#method.send
751 /// [`recv`]: struct.Receiver.html#method.recv
752 /// [`SyncSender`]: struct.SyncSender.html
753 /// [`Receiver`]: struct.Receiver.html
754 /// [`SendError`]: struct.SendError.html
755 /// [`RecvError`]: struct.RecvError.html
760 /// use std::sync::mpsc::sync_channel;
763 /// let (sender, receiver) = sync_channel(1);
765 /// // this returns immediately
766 /// sender.send(1).unwrap();
768 /// thread::spawn(move|| {
769 /// // this will block until the previous message has been received
770 /// sender.send(2).unwrap();
773 /// assert_eq!(receiver.recv().unwrap(), 1);
774 /// assert_eq!(receiver.recv().unwrap(), 2);
776 #[stable(feature = "rust1", since = "1.0.0")]
777 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
778 let a = Arc::new(sync::Packet::new(bound));
779 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
782 ////////////////////////////////////////////////////////////////////////////////
784 ////////////////////////////////////////////////////////////////////////////////
787 fn new(inner: Flavor<T>) -> Sender<T> {
789 inner: UnsafeCell::new(inner),
793 /// Attempts to send a value on this channel, returning it back if it could
796 /// A successful send occurs when it is determined that the other end of
797 /// the channel has not hung up already. An unsuccessful send would be one
798 /// where the corresponding receiver has already been deallocated. Note
799 /// that a return value of [`Err`] means that the data will never be
800 /// received, but a return value of [`Ok`] does *not* mean that the data
801 /// will be received. It is possible for the corresponding receiver to
802 /// hang up immediately after this function returns [`Ok`].
804 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
805 /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
807 /// This method will never block the current thread.
812 /// use std::sync::mpsc::channel;
814 /// let (tx, rx) = channel();
816 /// // This send is always successful
817 /// tx.send(1).unwrap();
819 /// // This send will fail because the receiver is gone
821 /// assert_eq!(tx.send(1).unwrap_err().0, 1);
823 #[stable(feature = "rust1", since = "1.0.0")]
824 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
825 let (new_inner, ret) = match *unsafe { self.inner() } {
826 Flavor::Oneshot(ref p) => {
828 return p.send(t).map_err(SendError);
830 let a = Arc::new(stream::Packet::new());
831 let rx = Receiver::new(Flavor::Stream(a.clone()));
832 match p.upgrade(rx) {
833 oneshot::UpSuccess => {
837 oneshot::UpDisconnected => (a, Err(t)),
838 oneshot::UpWoke(token) => {
839 // This send cannot panic because the thread is
840 // asleep (we're looking at it), so the receiver
842 a.send(t).ok().unwrap();
849 Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
850 Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
851 Flavor::Sync(..) => unreachable!(),
855 let tmp = Sender::new(Flavor::Stream(new_inner));
856 mem::swap(self.inner_mut(), tmp.inner_mut());
858 ret.map_err(SendError)
862 #[stable(feature = "rust1", since = "1.0.0")]
863 impl<T> Clone for Sender<T> {
864 fn clone(&self) -> Sender<T> {
865 let packet = match *unsafe { self.inner() } {
866 Flavor::Oneshot(ref p) => {
867 let a = Arc::new(shared::Packet::new());
869 let guard = a.postinit_lock();
870 let rx = Receiver::new(Flavor::Shared(a.clone()));
871 let sleeper = match p.upgrade(rx) {
873 oneshot::UpDisconnected => None,
874 oneshot::UpWoke(task) => Some(task),
876 a.inherit_blocker(sleeper, guard);
880 Flavor::Stream(ref p) => {
881 let a = Arc::new(shared::Packet::new());
883 let guard = a.postinit_lock();
884 let rx = Receiver::new(Flavor::Shared(a.clone()));
885 let sleeper = match p.upgrade(rx) {
887 stream::UpDisconnected => None,
888 stream::UpWoke(task) => Some(task),
890 a.inherit_blocker(sleeper, guard);
894 Flavor::Shared(ref p) => {
896 return Sender::new(Flavor::Shared(p.clone()));
898 Flavor::Sync(..) => unreachable!(),
902 let tmp = Sender::new(Flavor::Shared(packet.clone()));
903 mem::swap(self.inner_mut(), tmp.inner_mut());
905 Sender::new(Flavor::Shared(packet))
909 #[stable(feature = "rust1", since = "1.0.0")]
910 impl<T> Drop for Sender<T> {
912 match *unsafe { self.inner() } {
913 Flavor::Oneshot(ref p) => p.drop_chan(),
914 Flavor::Stream(ref p) => p.drop_chan(),
915 Flavor::Shared(ref p) => p.drop_chan(),
916 Flavor::Sync(..) => unreachable!(),
921 #[stable(feature = "mpsc_debug", since = "1.8.0")]
922 impl<T> fmt::Debug for Sender<T> {
923 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
924 f.debug_struct("Sender").finish()
928 ////////////////////////////////////////////////////////////////////////////////
930 ////////////////////////////////////////////////////////////////////////////////
932 impl<T> SyncSender<T> {
933 fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
934 SyncSender { inner: inner }
937 /// Sends a value on this synchronous channel.
939 /// This function will *block* until space in the internal buffer becomes
940 /// available or a receiver is available to hand off the message to.
942 /// Note that a successful send does *not* guarantee that the receiver will
943 /// ever see the data if there is a buffer on this channel. Items may be
944 /// enqueued in the internal buffer for the receiver to receive at a later
945 /// time. If the buffer size is 0, however, the channel becomes a rendezvous
946 /// channel and it guarantees that the receiver has indeed received
947 /// the data if this function returns success.
949 /// This function will never panic, but it may return [`Err`] if the
950 /// [`Receiver`] has disconnected and is no longer able to receive
953 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
954 /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
959 /// use std::sync::mpsc::sync_channel;
962 /// // Create a rendezvous sync_channel with buffer size 0
963 /// let (sync_sender, receiver) = sync_channel(0);
965 /// thread::spawn(move || {
966 /// println!("sending message...");
967 /// sync_sender.send(1).unwrap();
968 /// // Thread is now blocked until the message is received
970 /// println!("...message received!");
973 /// let msg = receiver.recv().unwrap();
974 /// assert_eq!(1, msg);
976 #[stable(feature = "rust1", since = "1.0.0")]
977 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
978 self.inner.send(t).map_err(SendError)
981 /// Attempts to send a value on this channel without blocking.
983 /// This method differs from [`send`] by returning immediately if the
984 /// channel's buffer is full or no receiver is waiting to acquire some
985 /// data. Compared with [`send`], this function has two failure cases
986 /// instead of one (one for disconnection, one for a full buffer).
988 /// See [`send`] for notes about guarantees of whether the
989 /// receiver has received the data or not if this function is successful.
991 /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
996 /// use std::sync::mpsc::sync_channel;
999 /// // Create a sync_channel with buffer size 1
1000 /// let (sync_sender, receiver) = sync_channel(1);
1001 /// let sync_sender2 = sync_sender.clone();
1003 /// // First thread owns sync_sender
1004 /// thread::spawn(move || {
1005 /// sync_sender.send(1).unwrap();
1006 /// sync_sender.send(2).unwrap();
1007 /// // Thread blocked
1010 /// // Second thread owns sync_sender2
1011 /// thread::spawn(move || {
1012 /// // This will return an error and send
1013 /// // no message if the buffer is full
1014 /// sync_sender2.try_send(3).is_err();
1018 /// msg = receiver.recv().unwrap();
1019 /// println!("message {} received", msg);
1021 /// msg = receiver.recv().unwrap();
1022 /// println!("message {} received", msg);
1024 /// // Third message may have never been sent
1025 /// match receiver.try_recv() {
1026 /// Ok(msg) => println!("message {} received", msg),
1027 /// Err(_) => println!("the third message was never sent"),
1030 #[stable(feature = "rust1", since = "1.0.0")]
1031 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
1032 self.inner.try_send(t)
1036 #[stable(feature = "rust1", since = "1.0.0")]
1037 impl<T> Clone for SyncSender<T> {
1038 fn clone(&self) -> SyncSender<T> {
1039 self.inner.clone_chan();
1040 SyncSender::new(self.inner.clone())
1044 #[stable(feature = "rust1", since = "1.0.0")]
1045 impl<T> Drop for SyncSender<T> {
1046 fn drop(&mut self) {
1047 self.inner.drop_chan();
1051 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1052 impl<T> fmt::Debug for SyncSender<T> {
1053 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1054 f.debug_struct("SyncSender").finish()
1058 ////////////////////////////////////////////////////////////////////////////////
1060 ////////////////////////////////////////////////////////////////////////////////
1062 impl<T> Receiver<T> {
1063 fn new(inner: Flavor<T>) -> Receiver<T> {
1064 Receiver { inner: UnsafeCell::new(inner) }
1067 /// Attempts to return a pending value on this receiver without blocking.
1069 /// This method will never block the caller in order to wait for data to
1070 /// become available. Instead, this will always return immediately with a
1071 /// possible option of pending data on the channel.
1073 /// This is useful for a flavor of "optimistic check" before deciding to
1074 /// block on a receiver.
1076 /// Compared with [`recv`], this function has two failure cases instead of one
1077 /// (one for disconnection, one for an empty buffer).
1079 /// [`recv`]: struct.Receiver.html#method.recv
1084 /// use std::sync::mpsc::{Receiver, channel};
1086 /// let (_, receiver): (_, Receiver<i32>) = channel();
1088 /// assert!(receiver.try_recv().is_err());
1090 #[stable(feature = "rust1", since = "1.0.0")]
1091 pub fn try_recv(&self) -> Result<T, TryRecvError> {
1093 let new_port = match *unsafe { self.inner() } {
1094 Flavor::Oneshot(ref p) => {
1095 match p.try_recv() {
1096 Ok(t) => return Ok(t),
1097 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1098 Err(oneshot::Disconnected) => {
1099 return Err(TryRecvError::Disconnected)
1101 Err(oneshot::Upgraded(rx)) => rx,
1104 Flavor::Stream(ref p) => {
1105 match p.try_recv() {
1106 Ok(t) => return Ok(t),
1107 Err(stream::Empty) => return Err(TryRecvError::Empty),
1108 Err(stream::Disconnected) => {
1109 return Err(TryRecvError::Disconnected)
1111 Err(stream::Upgraded(rx)) => rx,
1114 Flavor::Shared(ref p) => {
1115 match p.try_recv() {
1116 Ok(t) => return Ok(t),
1117 Err(shared::Empty) => return Err(TryRecvError::Empty),
1118 Err(shared::Disconnected) => {
1119 return Err(TryRecvError::Disconnected)
1123 Flavor::Sync(ref p) => {
1124 match p.try_recv() {
1125 Ok(t) => return Ok(t),
1126 Err(sync::Empty) => return Err(TryRecvError::Empty),
1127 Err(sync::Disconnected) => {
1128 return Err(TryRecvError::Disconnected)
1134 mem::swap(self.inner_mut(),
1135 new_port.inner_mut());
1140 /// Attempts to wait for a value on this receiver, returning an error if the
1141 /// corresponding channel has hung up.
1143 /// This function will always block the current thread if there is no data
1144 /// available and it's possible for more data to be sent. Once a message is
1145 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1146 /// receiver will wake up and return that message.
1148 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1149 /// this call is blocking, this call will wake up and return [`Err`] to
1150 /// indicate that no more messages can ever be received on this channel.
1151 /// However, since channels are buffered, messages sent before the disconnect
1152 /// will still be properly received.
1154 /// [`Sender`]: struct.Sender.html
1155 /// [`SyncSender`]: struct.SyncSender.html
1156 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1161 /// use std::sync::mpsc;
1162 /// use std::thread;
1164 /// let (send, recv) = mpsc::channel();
1165 /// let handle = thread::spawn(move || {
1166 /// send.send(1u8).unwrap();
1169 /// handle.join().unwrap();
1171 /// assert_eq!(Ok(1), recv.recv());
1174 /// Buffering behavior:
1177 /// use std::sync::mpsc;
1178 /// use std::thread;
1179 /// use std::sync::mpsc::RecvError;
1181 /// let (send, recv) = mpsc::channel();
1182 /// let handle = thread::spawn(move || {
1183 /// send.send(1u8).unwrap();
1184 /// send.send(2).unwrap();
1185 /// send.send(3).unwrap();
1189 /// // wait for the thread to join so we ensure the sender is dropped
1190 /// handle.join().unwrap();
1192 /// assert_eq!(Ok(1), recv.recv());
1193 /// assert_eq!(Ok(2), recv.recv());
1194 /// assert_eq!(Ok(3), recv.recv());
1195 /// assert_eq!(Err(RecvError), recv.recv());
1197 #[stable(feature = "rust1", since = "1.0.0")]
1198 pub fn recv(&self) -> Result<T, RecvError> {
1200 let new_port = match *unsafe { self.inner() } {
1201 Flavor::Oneshot(ref p) => {
1202 match p.recv(None) {
1203 Ok(t) => return Ok(t),
1204 Err(oneshot::Disconnected) => return Err(RecvError),
1205 Err(oneshot::Upgraded(rx)) => rx,
1206 Err(oneshot::Empty) => unreachable!(),
1209 Flavor::Stream(ref p) => {
1210 match p.recv(None) {
1211 Ok(t) => return Ok(t),
1212 Err(stream::Disconnected) => return Err(RecvError),
1213 Err(stream::Upgraded(rx)) => rx,
1214 Err(stream::Empty) => unreachable!(),
1217 Flavor::Shared(ref p) => {
1218 match p.recv(None) {
1219 Ok(t) => return Ok(t),
1220 Err(shared::Disconnected) => return Err(RecvError),
1221 Err(shared::Empty) => unreachable!(),
1224 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1227 mem::swap(self.inner_mut(), new_port.inner_mut());
1232 /// Attempts to wait for a value on this receiver, returning an error if the
1233 /// corresponding channel has hung up, or if it waits more than `timeout`.
1235 /// This function will always block the current thread if there is no data
1236 /// available and it's possible for more data to be sent. Once a message is
1237 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1238 /// receiver will wake up and return that message.
1240 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1241 /// this call is blocking, this call will wake up and return [`Err`] to
1242 /// indicate that no more messages can ever be received on this channel.
1243 /// However, since channels are buffered, messages sent before the disconnect
1244 /// will still be properly received.
1246 /// [`Sender`]: struct.Sender.html
1247 /// [`SyncSender`]: struct.SyncSender.html
1248 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1252 /// Successfully receiving value before encountering timeout:
1255 /// use std::thread;
1256 /// use std::time::Duration;
1257 /// use std::sync::mpsc;
1259 /// let (send, recv) = mpsc::channel();
1261 /// thread::spawn(move || {
1262 /// send.send('a').unwrap();
1266 /// recv.recv_timeout(Duration::from_millis(400)),
1271 /// Receiving an error upon reaching timeout:
1274 /// use std::thread;
1275 /// use std::time::Duration;
1276 /// use std::sync::mpsc;
1278 /// let (send, recv) = mpsc::channel();
1280 /// thread::spawn(move || {
1281 /// thread::sleep(Duration::from_millis(800));
1282 /// send.send('a').unwrap();
1286 /// recv.recv_timeout(Duration::from_millis(400)),
1287 /// Err(mpsc::RecvTimeoutError::Timeout)
1290 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1291 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1292 // Do an optimistic try_recv to avoid the performance impact of
1293 // Instant::now() in the full-channel case.
1294 match self.try_recv() {
1297 Err(TryRecvError::Disconnected)
1298 => Err(RecvTimeoutError::Disconnected),
1299 Err(TryRecvError::Empty)
1300 => self.recv_deadline(Instant::now() + timeout)
1304 /// Attempts to wait for a value on this receiver, returning an error if the
1305 /// corresponding channel has hung up, or if `deadline` is reached.
1307 /// This function will always block the current thread if there is no data
1308 /// available and it's possible for more data to be sent. Once a message is
1309 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1310 /// receiver will wake up and return that message.
1312 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1313 /// this call is blocking, this call will wake up and return [`Err`] to
1314 /// indicate that no more messages can ever be received on this channel.
1315 /// However, since channels are buffered, messages sent before the disconnect
1316 /// will still be properly received.
1318 /// [`Sender`]: struct.Sender.html
1319 /// [`SyncSender`]: struct.SyncSender.html
1320 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1324 /// Successfully receiving value before reaching deadline:
1327 /// #![feature(deadline_api)]
1328 /// use std::thread;
1329 /// use std::time::{Duration, Instant};
1330 /// use std::sync::mpsc;
1332 /// let (send, recv) = mpsc::channel();
1334 /// thread::spawn(move || {
1335 /// send.send('a').unwrap();
1339 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1344 /// Receiving an error upon reaching deadline:
1347 /// #![feature(deadline_api)]
1348 /// use std::thread;
1349 /// use std::time::{Duration, Instant};
1350 /// use std::sync::mpsc;
1352 /// let (send, recv) = mpsc::channel();
1354 /// thread::spawn(move || {
1355 /// thread::sleep(Duration::from_millis(800));
1356 /// send.send('a').unwrap();
1360 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1361 /// Err(mpsc::RecvTimeoutError::Timeout)
1364 #[unstable(feature = "deadline_api", issue = "46316")]
1365 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1366 use self::RecvTimeoutError::*;
1369 let port_or_empty = match *unsafe { self.inner() } {
1370 Flavor::Oneshot(ref p) => {
1371 match p.recv(Some(deadline)) {
1372 Ok(t) => return Ok(t),
1373 Err(oneshot::Disconnected) => return Err(Disconnected),
1374 Err(oneshot::Upgraded(rx)) => Some(rx),
1375 Err(oneshot::Empty) => None,
1378 Flavor::Stream(ref p) => {
1379 match p.recv(Some(deadline)) {
1380 Ok(t) => return Ok(t),
1381 Err(stream::Disconnected) => return Err(Disconnected),
1382 Err(stream::Upgraded(rx)) => Some(rx),
1383 Err(stream::Empty) => None,
1386 Flavor::Shared(ref p) => {
1387 match p.recv(Some(deadline)) {
1388 Ok(t) => return Ok(t),
1389 Err(shared::Disconnected) => return Err(Disconnected),
1390 Err(shared::Empty) => None,
1393 Flavor::Sync(ref p) => {
1394 match p.recv(Some(deadline)) {
1395 Ok(t) => return Ok(t),
1396 Err(sync::Disconnected) => return Err(Disconnected),
1397 Err(sync::Empty) => None,
1402 if let Some(new_port) = port_or_empty {
1404 mem::swap(self.inner_mut(), new_port.inner_mut());
1408 // If we're already passed the deadline, and we're here without
1409 // data, return a timeout, else try again.
1410 if Instant::now() >= deadline {
1411 return Err(Timeout);
1416 /// Returns an iterator that will block waiting for messages, but never
1417 /// [`panic!`]. It will return [`None`] when the channel has hung up.
1419 /// [`panic!`]: ../../../std/macro.panic.html
1420 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1425 /// use std::sync::mpsc::channel;
1426 /// use std::thread;
1428 /// let (send, recv) = channel();
1430 /// thread::spawn(move || {
1431 /// send.send(1).unwrap();
1432 /// send.send(2).unwrap();
1433 /// send.send(3).unwrap();
1436 /// let mut iter = recv.iter();
1437 /// assert_eq!(iter.next(), Some(1));
1438 /// assert_eq!(iter.next(), Some(2));
1439 /// assert_eq!(iter.next(), Some(3));
1440 /// assert_eq!(iter.next(), None);
1442 #[stable(feature = "rust1", since = "1.0.0")]
1443 pub fn iter(&self) -> Iter<T> {
1447 /// Returns an iterator that will attempt to yield all pending values.
1448 /// It will return `None` if there are no more pending values or if the
1449 /// channel has hung up. The iterator will never [`panic!`] or block the
1450 /// user by waiting for values.
1452 /// [`panic!`]: ../../../std/macro.panic.html
1457 /// use std::sync::mpsc::channel;
1458 /// use std::thread;
1459 /// use std::time::Duration;
1461 /// let (sender, receiver) = channel();
1463 /// // nothing is in the buffer yet
1464 /// assert!(receiver.try_iter().next().is_none());
1466 /// thread::spawn(move || {
1467 /// thread::sleep(Duration::from_secs(1));
1468 /// sender.send(1).unwrap();
1469 /// sender.send(2).unwrap();
1470 /// sender.send(3).unwrap();
1473 /// // nothing is in the buffer yet
1474 /// assert!(receiver.try_iter().next().is_none());
1476 /// // block for two seconds
1477 /// thread::sleep(Duration::from_secs(2));
1479 /// let mut iter = receiver.try_iter();
1480 /// assert_eq!(iter.next(), Some(1));
1481 /// assert_eq!(iter.next(), Some(2));
1482 /// assert_eq!(iter.next(), Some(3));
1483 /// assert_eq!(iter.next(), None);
1485 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1486 pub fn try_iter(&self) -> TryIter<T> {
1487 TryIter { rx: self }
1492 impl<T> select::Packet for Receiver<T> {
1493 fn can_recv(&self) -> bool {
1495 let new_port = match *unsafe { self.inner() } {
1496 Flavor::Oneshot(ref p) => {
1497 match p.can_recv() {
1498 Ok(ret) => return ret,
1499 Err(upgrade) => upgrade,
1502 Flavor::Stream(ref p) => {
1503 match p.can_recv() {
1504 Ok(ret) => return ret,
1505 Err(upgrade) => upgrade,
1508 Flavor::Shared(ref p) => return p.can_recv(),
1509 Flavor::Sync(ref p) => return p.can_recv(),
1512 mem::swap(self.inner_mut(),
1513 new_port.inner_mut());
1518 fn start_selection(&self, mut token: SignalToken) -> StartResult {
1520 let (t, new_port) = match *unsafe { self.inner() } {
1521 Flavor::Oneshot(ref p) => {
1522 match p.start_selection(token) {
1523 oneshot::SelSuccess => return Installed,
1524 oneshot::SelCanceled => return Abort,
1525 oneshot::SelUpgraded(t, rx) => (t, rx),
1528 Flavor::Stream(ref p) => {
1529 match p.start_selection(token) {
1530 stream::SelSuccess => return Installed,
1531 stream::SelCanceled => return Abort,
1532 stream::SelUpgraded(t, rx) => (t, rx),
1535 Flavor::Shared(ref p) => return p.start_selection(token),
1536 Flavor::Sync(ref p) => return p.start_selection(token),
1540 mem::swap(self.inner_mut(), new_port.inner_mut());
1545 fn abort_selection(&self) -> bool {
1546 let mut was_upgrade = false;
1548 let result = match *unsafe { self.inner() } {
1549 Flavor::Oneshot(ref p) => p.abort_selection(),
1550 Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
1551 Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
1552 Flavor::Sync(ref p) => return p.abort_selection(),
1554 let new_port = match result { Ok(b) => return b, Err(p) => p };
1557 mem::swap(self.inner_mut(),
1558 new_port.inner_mut());
1564 #[stable(feature = "rust1", since = "1.0.0")]
1565 impl<'a, T> Iterator for Iter<'a, T> {
1568 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1571 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1572 impl<'a, T> Iterator for TryIter<'a, T> {
1575 fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
1578 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1579 impl<'a, T> IntoIterator for &'a Receiver<T> {
1581 type IntoIter = Iter<'a, T>;
1583 fn into_iter(self) -> Iter<'a, T> { self.iter() }
1586 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1587 impl<T> Iterator for IntoIter<T> {
1589 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1592 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1593 impl <T> IntoIterator for Receiver<T> {
1595 type IntoIter = IntoIter<T>;
1597 fn into_iter(self) -> IntoIter<T> {
1598 IntoIter { rx: self }
1602 #[stable(feature = "rust1", since = "1.0.0")]
1603 impl<T> Drop for Receiver<T> {
1604 fn drop(&mut self) {
1605 match *unsafe { self.inner() } {
1606 Flavor::Oneshot(ref p) => p.drop_port(),
1607 Flavor::Stream(ref p) => p.drop_port(),
1608 Flavor::Shared(ref p) => p.drop_port(),
1609 Flavor::Sync(ref p) => p.drop_port(),
1614 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1615 impl<T> fmt::Debug for Receiver<T> {
1616 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1617 f.debug_struct("Receiver").finish()
1621 #[stable(feature = "rust1", since = "1.0.0")]
1622 impl<T> fmt::Debug for SendError<T> {
1623 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1624 "SendError(..)".fmt(f)
1628 #[stable(feature = "rust1", since = "1.0.0")]
1629 impl<T> fmt::Display for SendError<T> {
1630 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1631 "sending on a closed channel".fmt(f)
1635 #[stable(feature = "rust1", since = "1.0.0")]
1636 impl<T: Send> error::Error for SendError<T> {
1637 fn description(&self) -> &str {
1638 "sending on a closed channel"
1641 fn cause(&self) -> Option<&error::Error> {
1646 #[stable(feature = "rust1", since = "1.0.0")]
1647 impl<T> fmt::Debug for TrySendError<T> {
1648 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1650 TrySendError::Full(..) => "Full(..)".fmt(f),
1651 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1656 #[stable(feature = "rust1", since = "1.0.0")]
1657 impl<T> fmt::Display for TrySendError<T> {
1658 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1660 TrySendError::Full(..) => {
1661 "sending on a full channel".fmt(f)
1663 TrySendError::Disconnected(..) => {
1664 "sending on a closed channel".fmt(f)
1670 #[stable(feature = "rust1", since = "1.0.0")]
1671 impl<T: Send> error::Error for TrySendError<T> {
1673 fn description(&self) -> &str {
1675 TrySendError::Full(..) => {
1676 "sending on a full channel"
1678 TrySendError::Disconnected(..) => {
1679 "sending on a closed channel"
1684 fn cause(&self) -> Option<&error::Error> {
1689 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1690 impl<T> From<SendError<T>> for TrySendError<T> {
1691 fn from(err: SendError<T>) -> TrySendError<T> {
1693 SendError(t) => TrySendError::Disconnected(t),
1698 #[stable(feature = "rust1", since = "1.0.0")]
1699 impl fmt::Display for RecvError {
1700 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1701 "receiving on a closed channel".fmt(f)
1705 #[stable(feature = "rust1", since = "1.0.0")]
1706 impl error::Error for RecvError {
1708 fn description(&self) -> &str {
1709 "receiving on a closed channel"
1712 fn cause(&self) -> Option<&error::Error> {
1717 #[stable(feature = "rust1", since = "1.0.0")]
1718 impl fmt::Display for TryRecvError {
1719 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1721 TryRecvError::Empty => {
1722 "receiving on an empty channel".fmt(f)
1724 TryRecvError::Disconnected => {
1725 "receiving on a closed channel".fmt(f)
1731 #[stable(feature = "rust1", since = "1.0.0")]
1732 impl error::Error for TryRecvError {
1734 fn description(&self) -> &str {
1736 TryRecvError::Empty => {
1737 "receiving on an empty channel"
1739 TryRecvError::Disconnected => {
1740 "receiving on a closed channel"
1745 fn cause(&self) -> Option<&error::Error> {
1750 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1751 impl From<RecvError> for TryRecvError {
1752 fn from(err: RecvError) -> TryRecvError {
1754 RecvError => TryRecvError::Disconnected,
1759 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1760 impl fmt::Display for RecvTimeoutError {
1761 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1763 RecvTimeoutError::Timeout => {
1764 "timed out waiting on channel".fmt(f)
1766 RecvTimeoutError::Disconnected => {
1767 "channel is empty and sending half is closed".fmt(f)
1773 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1774 impl error::Error for RecvTimeoutError {
1775 fn description(&self) -> &str {
1777 RecvTimeoutError::Timeout => {
1778 "timed out waiting on channel"
1780 RecvTimeoutError::Disconnected => {
1781 "channel is empty and sending half is closed"
1786 fn cause(&self) -> Option<&error::Error> {
1791 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1792 impl From<RecvError> for RecvTimeoutError {
1793 fn from(err: RecvError) -> RecvTimeoutError {
1795 RecvError => RecvTimeoutError::Disconnected,
1800 #[cfg(all(test, not(target_os = "emscripten")))]
1805 use time::{Duration, Instant};
1807 pub fn stress_factor() -> usize {
1808 match env::var("RUST_TEST_STRESS") {
1809 Ok(val) => val.parse().unwrap(),
1816 let (tx, rx) = channel::<i32>();
1817 tx.send(1).unwrap();
1818 assert_eq!(rx.recv().unwrap(), 1);
1823 let (tx, _rx) = channel::<Box<isize>>();
1824 tx.send(box 1).unwrap();
1828 fn drop_full_shared() {
1829 let (tx, _rx) = channel::<Box<isize>>();
1832 tx.send(box 1).unwrap();
1837 let (tx, rx) = channel::<i32>();
1838 tx.send(1).unwrap();
1839 assert_eq!(rx.recv().unwrap(), 1);
1840 let tx = tx.clone();
1841 tx.send(1).unwrap();
1842 assert_eq!(rx.recv().unwrap(), 1);
1846 fn smoke_threads() {
1847 let (tx, rx) = channel::<i32>();
1848 let _t = thread::spawn(move|| {
1849 tx.send(1).unwrap();
1851 assert_eq!(rx.recv().unwrap(), 1);
1855 fn smoke_port_gone() {
1856 let (tx, rx) = channel::<i32>();
1858 assert!(tx.send(1).is_err());
1862 fn smoke_shared_port_gone() {
1863 let (tx, rx) = channel::<i32>();
1865 assert!(tx.send(1).is_err())
1869 fn smoke_shared_port_gone2() {
1870 let (tx, rx) = channel::<i32>();
1872 let tx2 = tx.clone();
1874 assert!(tx2.send(1).is_err());
1878 fn port_gone_concurrent() {
1879 let (tx, rx) = channel::<i32>();
1880 let _t = thread::spawn(move|| {
1883 while tx.send(1).is_ok() {}
1887 fn port_gone_concurrent_shared() {
1888 let (tx, rx) = channel::<i32>();
1889 let tx2 = tx.clone();
1890 let _t = thread::spawn(move|| {
1893 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1897 fn smoke_chan_gone() {
1898 let (tx, rx) = channel::<i32>();
1900 assert!(rx.recv().is_err());
1904 fn smoke_chan_gone_shared() {
1905 let (tx, rx) = channel::<()>();
1906 let tx2 = tx.clone();
1909 assert!(rx.recv().is_err());
1913 fn chan_gone_concurrent() {
1914 let (tx, rx) = channel::<i32>();
1915 let _t = thread::spawn(move|| {
1916 tx.send(1).unwrap();
1917 tx.send(1).unwrap();
1919 while rx.recv().is_ok() {}
1924 let (tx, rx) = channel::<i32>();
1925 let t = thread::spawn(move|| {
1926 for _ in 0..10000 { tx.send(1).unwrap(); }
1929 assert_eq!(rx.recv().unwrap(), 1);
1931 t.join().ok().unwrap();
1935 fn stress_shared() {
1936 const AMT: u32 = 10000;
1937 const NTHREADS: u32 = 8;
1938 let (tx, rx) = channel::<i32>();
1940 let t = thread::spawn(move|| {
1941 for _ in 0..AMT * NTHREADS {
1942 assert_eq!(rx.recv().unwrap(), 1);
1944 match rx.try_recv() {
1950 for _ in 0..NTHREADS {
1951 let tx = tx.clone();
1952 thread::spawn(move|| {
1953 for _ in 0..AMT { tx.send(1).unwrap(); }
1957 t.join().ok().unwrap();
1961 fn send_from_outside_runtime() {
1962 let (tx1, rx1) = channel::<()>();
1963 let (tx2, rx2) = channel::<i32>();
1964 let t1 = thread::spawn(move|| {
1965 tx1.send(()).unwrap();
1967 assert_eq!(rx2.recv().unwrap(), 1);
1970 rx1.recv().unwrap();
1971 let t2 = thread::spawn(move|| {
1973 tx2.send(1).unwrap();
1976 t1.join().ok().unwrap();
1977 t2.join().ok().unwrap();
1981 fn recv_from_outside_runtime() {
1982 let (tx, rx) = channel::<i32>();
1983 let t = thread::spawn(move|| {
1985 assert_eq!(rx.recv().unwrap(), 1);
1989 tx.send(1).unwrap();
1991 t.join().ok().unwrap();
1996 let (tx1, rx1) = channel::<i32>();
1997 let (tx2, rx2) = channel::<i32>();
1998 let t1 = thread::spawn(move|| {
1999 assert_eq!(rx1.recv().unwrap(), 1);
2000 tx2.send(2).unwrap();
2002 let t2 = thread::spawn(move|| {
2003 tx1.send(1).unwrap();
2004 assert_eq!(rx2.recv().unwrap(), 2);
2006 t1.join().ok().unwrap();
2007 t2.join().ok().unwrap();
2011 fn oneshot_single_thread_close_port_first() {
2012 // Simple test of closing without sending
2013 let (_tx, rx) = channel::<i32>();
2018 fn oneshot_single_thread_close_chan_first() {
2019 // Simple test of closing without sending
2020 let (tx, _rx) = channel::<i32>();
2025 fn oneshot_single_thread_send_port_close() {
2026 // Testing that the sender cleans up the payload if receiver is closed
2027 let (tx, rx) = channel::<Box<i32>>();
2029 assert!(tx.send(box 0).is_err());
2033 fn oneshot_single_thread_recv_chan_close() {
2034 // Receiving on a closed chan will panic
2035 let res = thread::spawn(move|| {
2036 let (tx, rx) = channel::<i32>();
2041 assert!(res.is_err());
2045 fn oneshot_single_thread_send_then_recv() {
2046 let (tx, rx) = channel::<Box<i32>>();
2047 tx.send(box 10).unwrap();
2048 assert!(*rx.recv().unwrap() == 10);
2052 fn oneshot_single_thread_try_send_open() {
2053 let (tx, rx) = channel::<i32>();
2054 assert!(tx.send(10).is_ok());
2055 assert!(rx.recv().unwrap() == 10);
2059 fn oneshot_single_thread_try_send_closed() {
2060 let (tx, rx) = channel::<i32>();
2062 assert!(tx.send(10).is_err());
2066 fn oneshot_single_thread_try_recv_open() {
2067 let (tx, rx) = channel::<i32>();
2068 tx.send(10).unwrap();
2069 assert!(rx.recv() == Ok(10));
2073 fn oneshot_single_thread_try_recv_closed() {
2074 let (tx, rx) = channel::<i32>();
2076 assert!(rx.recv().is_err());
2080 fn oneshot_single_thread_peek_data() {
2081 let (tx, rx) = channel::<i32>();
2082 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2083 tx.send(10).unwrap();
2084 assert_eq!(rx.try_recv(), Ok(10));
2088 fn oneshot_single_thread_peek_close() {
2089 let (tx, rx) = channel::<i32>();
2091 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2092 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2096 fn oneshot_single_thread_peek_open() {
2097 let (_tx, rx) = channel::<i32>();
2098 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2102 fn oneshot_multi_task_recv_then_send() {
2103 let (tx, rx) = channel::<Box<i32>>();
2104 let _t = thread::spawn(move|| {
2105 assert!(*rx.recv().unwrap() == 10);
2108 tx.send(box 10).unwrap();
2112 fn oneshot_multi_task_recv_then_close() {
2113 let (tx, rx) = channel::<Box<i32>>();
2114 let _t = thread::spawn(move|| {
2117 let res = thread::spawn(move|| {
2118 assert!(*rx.recv().unwrap() == 10);
2120 assert!(res.is_err());
2124 fn oneshot_multi_thread_close_stress() {
2125 for _ in 0..stress_factor() {
2126 let (tx, rx) = channel::<i32>();
2127 let _t = thread::spawn(move|| {
2135 fn oneshot_multi_thread_send_close_stress() {
2136 for _ in 0..stress_factor() {
2137 let (tx, rx) = channel::<i32>();
2138 let _t = thread::spawn(move|| {
2141 let _ = thread::spawn(move|| {
2142 tx.send(1).unwrap();
2148 fn oneshot_multi_thread_recv_close_stress() {
2149 for _ in 0..stress_factor() {
2150 let (tx, rx) = channel::<i32>();
2151 thread::spawn(move|| {
2152 let res = thread::spawn(move|| {
2155 assert!(res.is_err());
2157 let _t = thread::spawn(move|| {
2158 thread::spawn(move|| {
2166 fn oneshot_multi_thread_send_recv_stress() {
2167 for _ in 0..stress_factor() {
2168 let (tx, rx) = channel::<Box<isize>>();
2169 let _t = thread::spawn(move|| {
2170 tx.send(box 10).unwrap();
2172 assert!(*rx.recv().unwrap() == 10);
2177 fn stream_send_recv_stress() {
2178 for _ in 0..stress_factor() {
2179 let (tx, rx) = channel();
2184 fn send(tx: Sender<Box<i32>>, i: i32) {
2185 if i == 10 { return }
2187 thread::spawn(move|| {
2188 tx.send(box i).unwrap();
2193 fn recv(rx: Receiver<Box<i32>>, i: i32) {
2194 if i == 10 { return }
2196 thread::spawn(move|| {
2197 assert!(*rx.recv().unwrap() == i);
2205 fn oneshot_single_thread_recv_timeout() {
2206 let (tx, rx) = channel();
2207 tx.send(()).unwrap();
2208 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2209 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2210 tx.send(()).unwrap();
2211 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2215 fn stress_recv_timeout_two_threads() {
2216 let (tx, rx) = channel();
2217 let stress = stress_factor() + 100;
2218 let timeout = Duration::from_millis(100);
2220 thread::spawn(move || {
2221 for i in 0..stress {
2223 thread::sleep(timeout * 2);
2225 tx.send(1usize).unwrap();
2229 let mut recv_count = 0;
2231 match rx.recv_timeout(timeout) {
2233 assert_eq!(n, 1usize);
2236 Err(RecvTimeoutError::Timeout) => continue,
2237 Err(RecvTimeoutError::Disconnected) => break,
2241 assert_eq!(recv_count, stress);
2245 fn recv_timeout_upgrade() {
2246 let (tx, rx) = channel::<()>();
2247 let timeout = Duration::from_millis(1);
2248 let _tx_clone = tx.clone();
2250 let start = Instant::now();
2251 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
2252 assert!(Instant::now() >= start + timeout);
2256 fn stress_recv_timeout_shared() {
2257 let (tx, rx) = channel();
2258 let stress = stress_factor() + 100;
2260 for i in 0..stress {
2261 let tx = tx.clone();
2262 thread::spawn(move || {
2263 thread::sleep(Duration::from_millis(i as u64 * 10));
2264 tx.send(1usize).unwrap();
2270 let mut recv_count = 0;
2272 match rx.recv_timeout(Duration::from_millis(10)) {
2274 assert_eq!(n, 1usize);
2277 Err(RecvTimeoutError::Timeout) => continue,
2278 Err(RecvTimeoutError::Disconnected) => break,
2282 assert_eq!(recv_count, stress);
2287 // Regression test that we don't run out of stack in scheduler context
2288 let (tx, rx) = channel();
2289 for _ in 0..10000 { tx.send(()).unwrap(); }
2290 for _ in 0..10000 { rx.recv().unwrap(); }
2294 fn shared_recv_timeout() {
2295 let (tx, rx) = channel();
2298 let tx = tx.clone();
2299 thread::spawn(move|| {
2300 tx.send(()).unwrap();
2304 for _ in 0..total { rx.recv().unwrap(); }
2306 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2307 tx.send(()).unwrap();
2308 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2312 fn shared_chan_stress() {
2313 let (tx, rx) = channel();
2314 let total = stress_factor() + 100;
2316 let tx = tx.clone();
2317 thread::spawn(move|| {
2318 tx.send(()).unwrap();
2328 fn test_nested_recv_iter() {
2329 let (tx, rx) = channel::<i32>();
2330 let (total_tx, total_rx) = channel::<i32>();
2332 let _t = thread::spawn(move|| {
2334 for x in rx.iter() {
2337 total_tx.send(acc).unwrap();
2340 tx.send(3).unwrap();
2341 tx.send(1).unwrap();
2342 tx.send(2).unwrap();
2344 assert_eq!(total_rx.recv().unwrap(), 6);
2348 fn test_recv_iter_break() {
2349 let (tx, rx) = channel::<i32>();
2350 let (count_tx, count_rx) = channel();
2352 let _t = thread::spawn(move|| {
2354 for x in rx.iter() {
2361 count_tx.send(count).unwrap();
2364 tx.send(2).unwrap();
2365 tx.send(2).unwrap();
2366 tx.send(2).unwrap();
2369 assert_eq!(count_rx.recv().unwrap(), 4);
2373 fn test_recv_try_iter() {
2374 let (request_tx, request_rx) = channel();
2375 let (response_tx, response_rx) = channel();
2377 // Request `x`s until we have `6`.
2378 let t = thread::spawn(move|| {
2381 for x in response_rx.try_iter() {
2387 request_tx.send(()).unwrap();
2391 for _ in request_rx.iter() {
2392 if response_tx.send(2).is_err() {
2397 assert_eq!(t.join().unwrap(), 6);
2401 fn test_recv_into_iter_owned() {
2403 let (tx, rx) = channel::<i32>();
2404 tx.send(1).unwrap();
2405 tx.send(2).unwrap();
2409 assert_eq!(iter.next().unwrap(), 1);
2410 assert_eq!(iter.next().unwrap(), 2);
2411 assert_eq!(iter.next().is_none(), true);
2415 fn test_recv_into_iter_borrowed() {
2416 let (tx, rx) = channel::<i32>();
2417 tx.send(1).unwrap();
2418 tx.send(2).unwrap();
2420 let mut iter = (&rx).into_iter();
2421 assert_eq!(iter.next().unwrap(), 1);
2422 assert_eq!(iter.next().unwrap(), 2);
2423 assert_eq!(iter.next().is_none(), true);
2427 fn try_recv_states() {
2428 let (tx1, rx1) = channel::<i32>();
2429 let (tx2, rx2) = channel::<()>();
2430 let (tx3, rx3) = channel::<()>();
2431 let _t = thread::spawn(move|| {
2432 rx2.recv().unwrap();
2433 tx1.send(1).unwrap();
2434 tx3.send(()).unwrap();
2435 rx2.recv().unwrap();
2437 tx3.send(()).unwrap();
2440 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2441 tx2.send(()).unwrap();
2442 rx3.recv().unwrap();
2443 assert_eq!(rx1.try_recv(), Ok(1));
2444 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2445 tx2.send(()).unwrap();
2446 rx3.recv().unwrap();
2447 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2450 // This bug used to end up in a livelock inside of the Receiver destructor
2451 // because the internal state of the Shared packet was corrupted
2453 fn destroy_upgraded_shared_port_when_sender_still_active() {
2454 let (tx, rx) = channel();
2455 let (tx2, rx2) = channel();
2456 let _t = thread::spawn(move|| {
2457 rx.recv().unwrap(); // wait on a oneshot
2458 drop(rx); // destroy a shared
2459 tx2.send(()).unwrap();
2461 // make sure the other thread has gone to sleep
2462 for _ in 0..5000 { thread::yield_now(); }
2464 // upgrade to a shared chan and send a message
2467 t.send(()).unwrap();
2469 // wait for the child thread to exit before we exit
2470 rx2.recv().unwrap();
2475 let (tx, _) = channel();
2476 let _ = tx.send(123);
2477 assert_eq!(tx.send(123), Err(SendError(123)));
2481 #[cfg(all(test, not(target_os = "emscripten")))]
2488 pub fn stress_factor() -> usize {
2489 match env::var("RUST_TEST_STRESS") {
2490 Ok(val) => val.parse().unwrap(),
2497 let (tx, rx) = sync_channel::<i32>(1);
2498 tx.send(1).unwrap();
2499 assert_eq!(rx.recv().unwrap(), 1);
2504 let (tx, _rx) = sync_channel::<Box<isize>>(1);
2505 tx.send(box 1).unwrap();
2510 let (tx, rx) = sync_channel::<i32>(1);
2511 tx.send(1).unwrap();
2512 assert_eq!(rx.recv().unwrap(), 1);
2513 let tx = tx.clone();
2514 tx.send(1).unwrap();
2515 assert_eq!(rx.recv().unwrap(), 1);
2520 let (tx, rx) = sync_channel::<i32>(1);
2521 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2522 tx.send(1).unwrap();
2523 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2527 fn smoke_threads() {
2528 let (tx, rx) = sync_channel::<i32>(0);
2529 let _t = thread::spawn(move|| {
2530 tx.send(1).unwrap();
2532 assert_eq!(rx.recv().unwrap(), 1);
2536 fn smoke_port_gone() {
2537 let (tx, rx) = sync_channel::<i32>(0);
2539 assert!(tx.send(1).is_err());
2543 fn smoke_shared_port_gone2() {
2544 let (tx, rx) = sync_channel::<i32>(0);
2546 let tx2 = tx.clone();
2548 assert!(tx2.send(1).is_err());
2552 fn port_gone_concurrent() {
2553 let (tx, rx) = sync_channel::<i32>(0);
2554 let _t = thread::spawn(move|| {
2557 while tx.send(1).is_ok() {}
2561 fn port_gone_concurrent_shared() {
2562 let (tx, rx) = sync_channel::<i32>(0);
2563 let tx2 = tx.clone();
2564 let _t = thread::spawn(move|| {
2567 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2571 fn smoke_chan_gone() {
2572 let (tx, rx) = sync_channel::<i32>(0);
2574 assert!(rx.recv().is_err());
2578 fn smoke_chan_gone_shared() {
2579 let (tx, rx) = sync_channel::<()>(0);
2580 let tx2 = tx.clone();
2583 assert!(rx.recv().is_err());
2587 fn chan_gone_concurrent() {
2588 let (tx, rx) = sync_channel::<i32>(0);
2589 thread::spawn(move|| {
2590 tx.send(1).unwrap();
2591 tx.send(1).unwrap();
2593 while rx.recv().is_ok() {}
2598 let (tx, rx) = sync_channel::<i32>(0);
2599 thread::spawn(move|| {
2600 for _ in 0..10000 { tx.send(1).unwrap(); }
2603 assert_eq!(rx.recv().unwrap(), 1);
2608 fn stress_recv_timeout_two_threads() {
2609 let (tx, rx) = sync_channel::<i32>(0);
2611 thread::spawn(move|| {
2612 for _ in 0..10000 { tx.send(1).unwrap(); }
2615 let mut recv_count = 0;
2617 match rx.recv_timeout(Duration::from_millis(1)) {
2622 Err(RecvTimeoutError::Timeout) => continue,
2623 Err(RecvTimeoutError::Disconnected) => break,
2627 assert_eq!(recv_count, 10000);
2631 fn stress_recv_timeout_shared() {
2632 const AMT: u32 = 1000;
2633 const NTHREADS: u32 = 8;
2634 let (tx, rx) = sync_channel::<i32>(0);
2635 let (dtx, drx) = sync_channel::<()>(0);
2637 thread::spawn(move|| {
2638 let mut recv_count = 0;
2640 match rx.recv_timeout(Duration::from_millis(10)) {
2645 Err(RecvTimeoutError::Timeout) => continue,
2646 Err(RecvTimeoutError::Disconnected) => break,
2650 assert_eq!(recv_count, AMT * NTHREADS);
2651 assert!(rx.try_recv().is_err());
2653 dtx.send(()).unwrap();
2656 for _ in 0..NTHREADS {
2657 let tx = tx.clone();
2658 thread::spawn(move|| {
2659 for _ in 0..AMT { tx.send(1).unwrap(); }
2665 drx.recv().unwrap();
2669 fn stress_shared() {
2670 const AMT: u32 = 1000;
2671 const NTHREADS: u32 = 8;
2672 let (tx, rx) = sync_channel::<i32>(0);
2673 let (dtx, drx) = sync_channel::<()>(0);
2675 thread::spawn(move|| {
2676 for _ in 0..AMT * NTHREADS {
2677 assert_eq!(rx.recv().unwrap(), 1);
2679 match rx.try_recv() {
2683 dtx.send(()).unwrap();
2686 for _ in 0..NTHREADS {
2687 let tx = tx.clone();
2688 thread::spawn(move|| {
2689 for _ in 0..AMT { tx.send(1).unwrap(); }
2693 drx.recv().unwrap();
2697 fn oneshot_single_thread_close_port_first() {
2698 // Simple test of closing without sending
2699 let (_tx, rx) = sync_channel::<i32>(0);
2704 fn oneshot_single_thread_close_chan_first() {
2705 // Simple test of closing without sending
2706 let (tx, _rx) = sync_channel::<i32>(0);
2711 fn oneshot_single_thread_send_port_close() {
2712 // Testing that the sender cleans up the payload if receiver is closed
2713 let (tx, rx) = sync_channel::<Box<i32>>(0);
2715 assert!(tx.send(box 0).is_err());
2719 fn oneshot_single_thread_recv_chan_close() {
2720 // Receiving on a closed chan will panic
2721 let res = thread::spawn(move|| {
2722 let (tx, rx) = sync_channel::<i32>(0);
2727 assert!(res.is_err());
2731 fn oneshot_single_thread_send_then_recv() {
2732 let (tx, rx) = sync_channel::<Box<i32>>(1);
2733 tx.send(box 10).unwrap();
2734 assert!(*rx.recv().unwrap() == 10);
2738 fn oneshot_single_thread_try_send_open() {
2739 let (tx, rx) = sync_channel::<i32>(1);
2740 assert_eq!(tx.try_send(10), Ok(()));
2741 assert!(rx.recv().unwrap() == 10);
2745 fn oneshot_single_thread_try_send_closed() {
2746 let (tx, rx) = sync_channel::<i32>(0);
2748 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2752 fn oneshot_single_thread_try_send_closed2() {
2753 let (tx, _rx) = sync_channel::<i32>(0);
2754 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2758 fn oneshot_single_thread_try_recv_open() {
2759 let (tx, rx) = sync_channel::<i32>(1);
2760 tx.send(10).unwrap();
2761 assert!(rx.recv() == Ok(10));
2765 fn oneshot_single_thread_try_recv_closed() {
2766 let (tx, rx) = sync_channel::<i32>(0);
2768 assert!(rx.recv().is_err());
2772 fn oneshot_single_thread_try_recv_closed_with_data() {
2773 let (tx, rx) = sync_channel::<i32>(1);
2774 tx.send(10).unwrap();
2776 assert_eq!(rx.try_recv(), Ok(10));
2777 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2781 fn oneshot_single_thread_peek_data() {
2782 let (tx, rx) = sync_channel::<i32>(1);
2783 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2784 tx.send(10).unwrap();
2785 assert_eq!(rx.try_recv(), Ok(10));
2789 fn oneshot_single_thread_peek_close() {
2790 let (tx, rx) = sync_channel::<i32>(0);
2792 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2793 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2797 fn oneshot_single_thread_peek_open() {
2798 let (_tx, rx) = sync_channel::<i32>(0);
2799 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2803 fn oneshot_multi_task_recv_then_send() {
2804 let (tx, rx) = sync_channel::<Box<i32>>(0);
2805 let _t = thread::spawn(move|| {
2806 assert!(*rx.recv().unwrap() == 10);
2809 tx.send(box 10).unwrap();
2813 fn oneshot_multi_task_recv_then_close() {
2814 let (tx, rx) = sync_channel::<Box<i32>>(0);
2815 let _t = thread::spawn(move|| {
2818 let res = thread::spawn(move|| {
2819 assert!(*rx.recv().unwrap() == 10);
2821 assert!(res.is_err());
2825 fn oneshot_multi_thread_close_stress() {
2826 for _ in 0..stress_factor() {
2827 let (tx, rx) = sync_channel::<i32>(0);
2828 let _t = thread::spawn(move|| {
2836 fn oneshot_multi_thread_send_close_stress() {
2837 for _ in 0..stress_factor() {
2838 let (tx, rx) = sync_channel::<i32>(0);
2839 let _t = thread::spawn(move|| {
2842 let _ = thread::spawn(move || {
2843 tx.send(1).unwrap();
2849 fn oneshot_multi_thread_recv_close_stress() {
2850 for _ in 0..stress_factor() {
2851 let (tx, rx) = sync_channel::<i32>(0);
2852 let _t = thread::spawn(move|| {
2853 let res = thread::spawn(move|| {
2856 assert!(res.is_err());
2858 let _t = thread::spawn(move|| {
2859 thread::spawn(move|| {
2867 fn oneshot_multi_thread_send_recv_stress() {
2868 for _ in 0..stress_factor() {
2869 let (tx, rx) = sync_channel::<Box<i32>>(0);
2870 let _t = thread::spawn(move|| {
2871 tx.send(box 10).unwrap();
2873 assert!(*rx.recv().unwrap() == 10);
2878 fn stream_send_recv_stress() {
2879 for _ in 0..stress_factor() {
2880 let (tx, rx) = sync_channel::<Box<i32>>(0);
2885 fn send(tx: SyncSender<Box<i32>>, i: i32) {
2886 if i == 10 { return }
2888 thread::spawn(move|| {
2889 tx.send(box i).unwrap();
2894 fn recv(rx: Receiver<Box<i32>>, i: i32) {
2895 if i == 10 { return }
2897 thread::spawn(move|| {
2898 assert!(*rx.recv().unwrap() == i);
2907 // Regression test that we don't run out of stack in scheduler context
2908 let (tx, rx) = sync_channel(10000);
2909 for _ in 0..10000 { tx.send(()).unwrap(); }
2910 for _ in 0..10000 { rx.recv().unwrap(); }
2914 fn shared_chan_stress() {
2915 let (tx, rx) = sync_channel(0);
2916 let total = stress_factor() + 100;
2918 let tx = tx.clone();
2919 thread::spawn(move|| {
2920 tx.send(()).unwrap();
2930 fn test_nested_recv_iter() {
2931 let (tx, rx) = sync_channel::<i32>(0);
2932 let (total_tx, total_rx) = sync_channel::<i32>(0);
2934 let _t = thread::spawn(move|| {
2936 for x in rx.iter() {
2939 total_tx.send(acc).unwrap();
2942 tx.send(3).unwrap();
2943 tx.send(1).unwrap();
2944 tx.send(2).unwrap();
2946 assert_eq!(total_rx.recv().unwrap(), 6);
2950 fn test_recv_iter_break() {
2951 let (tx, rx) = sync_channel::<i32>(0);
2952 let (count_tx, count_rx) = sync_channel(0);
2954 let _t = thread::spawn(move|| {
2956 for x in rx.iter() {
2963 count_tx.send(count).unwrap();
2966 tx.send(2).unwrap();
2967 tx.send(2).unwrap();
2968 tx.send(2).unwrap();
2969 let _ = tx.try_send(2);
2971 assert_eq!(count_rx.recv().unwrap(), 4);
2975 fn try_recv_states() {
2976 let (tx1, rx1) = sync_channel::<i32>(1);
2977 let (tx2, rx2) = sync_channel::<()>(1);
2978 let (tx3, rx3) = sync_channel::<()>(1);
2979 let _t = thread::spawn(move|| {
2980 rx2.recv().unwrap();
2981 tx1.send(1).unwrap();
2982 tx3.send(()).unwrap();
2983 rx2.recv().unwrap();
2985 tx3.send(()).unwrap();
2988 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2989 tx2.send(()).unwrap();
2990 rx3.recv().unwrap();
2991 assert_eq!(rx1.try_recv(), Ok(1));
2992 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2993 tx2.send(()).unwrap();
2994 rx3.recv().unwrap();
2995 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2998 // This bug used to end up in a livelock inside of the Receiver destructor
2999 // because the internal state of the Shared packet was corrupted
3001 fn destroy_upgraded_shared_port_when_sender_still_active() {
3002 let (tx, rx) = sync_channel::<()>(0);
3003 let (tx2, rx2) = sync_channel::<()>(0);
3004 let _t = thread::spawn(move|| {
3005 rx.recv().unwrap(); // wait on a oneshot
3006 drop(rx); // destroy a shared
3007 tx2.send(()).unwrap();
3009 // make sure the other thread has gone to sleep
3010 for _ in 0..5000 { thread::yield_now(); }
3012 // upgrade to a shared chan and send a message
3015 t.send(()).unwrap();
3017 // wait for the child thread to exit before we exit
3018 rx2.recv().unwrap();
3023 let (tx, rx) = sync_channel::<i32>(0);
3024 let _t = thread::spawn(move|| { rx.recv().unwrap(); });
3025 assert_eq!(tx.send(1), Ok(()));
3030 let (tx, rx) = sync_channel::<i32>(0);
3031 let _t = thread::spawn(move|| { drop(rx); });
3032 assert!(tx.send(1).is_err());
3037 let (tx, rx) = sync_channel::<i32>(1);
3038 assert_eq!(tx.send(1), Ok(()));
3039 let _t =thread::spawn(move|| { drop(rx); });
3040 assert!(tx.send(1).is_err());
3045 let (tx, rx) = sync_channel::<i32>(0);
3046 let tx2 = tx.clone();
3047 let (done, donerx) = channel();
3048 let done2 = done.clone();
3049 let _t = thread::spawn(move|| {
3050 assert!(tx.send(1).is_err());
3051 done.send(()).unwrap();
3053 let _t = thread::spawn(move|| {
3054 assert!(tx2.send(2).is_err());
3055 done2.send(()).unwrap();
3058 donerx.recv().unwrap();
3059 donerx.recv().unwrap();
3064 let (tx, _rx) = sync_channel::<i32>(0);
3065 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3070 let (tx, _rx) = sync_channel::<i32>(1);
3071 assert_eq!(tx.try_send(1), Ok(()));
3072 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3077 let (tx, rx) = sync_channel::<i32>(1);
3078 assert_eq!(tx.try_send(1), Ok(()));
3080 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
3086 let (tx1, rx1) = sync_channel::<()>(3);
3087 let (tx2, rx2) = sync_channel::<()>(3);
3089 let _t = thread::spawn(move|| {
3090 rx1.recv().unwrap();
3091 tx2.try_send(()).unwrap();
3094 tx1.try_send(()).unwrap();
3095 rx2.recv().unwrap();