1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Multi-producer, single-consumer FIFO queue communication primitives.
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
20 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
24 //! These channels come in two flavors:
26 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27 //! will return a `(Sender, Receiver)` tuple where all sends will be
28 //! **asynchronous** (they never block). The channel conceptually has an
31 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32 //! a `(SyncSender, Receiver)` tuple where the storage for pending messages
33 //! is a pre-allocated buffer of a fixed size. All sends will be
34 //! **synchronous** by blocking until there is buffer space available. Note
35 //! that a bound of 0 is allowed, causing the channel to become a
36 //! "rendezvous" channel where each sender atomically hands off a message to
41 //! The send and receive operations on channels will all return a `Result`
42 //! indicating whether the operation succeeded or not. An unsuccessful operation
43 //! is normally indicative of the other half of a channel having "hung up" by
44 //! being dropped in its corresponding thread.
46 //! Once half of a channel has been deallocated, most operations can no longer
47 //! continue to make progress, so `Err` will be returned. Many applications will
48 //! continue to `unwrap()` the results returned from this module, instigating a
49 //! propagation of failure among threads if one unexpectedly dies.
57 //! use std::sync::mpsc::channel;
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! thread::spawn(move|| {
62 //! tx.send(10).unwrap();
64 //! assert_eq!(rx.recv().unwrap(), 10);
71 //! use std::sync::mpsc::channel;
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
78 //! let tx = tx.clone();
79 //! thread::spawn(move|| {
80 //! tx.send(i).unwrap();
85 //! let j = rx.recv().unwrap();
86 //! assert!(0 <= j && j < 10);
90 //! Propagating panics:
93 //! use std::sync::mpsc::channel;
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<i32>();
99 //! assert!(rx.recv().is_err());
102 //! Synchronous channels:
106 //! use std::sync::mpsc::sync_channel;
108 //! let (tx, rx) = sync_channel::<i32>(0);
109 //! thread::spawn(move|| {
110 //! // This will wait for the parent thread to start receiving
111 //! tx.send(53).unwrap();
113 //! rx.recv().unwrap();
116 #![stable(feature = "rust1", since = "1.0.0")]
118 // A description of how Rust's channel implementation works
120 // Channels are supposed to be the basic building block for all other
121 // concurrent primitives that are used in Rust. As a result, the channel type
122 // needs to be highly optimized, flexible, and broad enough for use everywhere.
124 // The choice of implementation of all channels is to be built on lock-free data
125 // structures. The channels themselves are then consequently also lock-free data
126 // structures. As always with lock-free code, this is a very "here be dragons"
127 // territory, especially because I'm unaware of any academic papers that have
128 // gone into great length about channels of these flavors.
130 // ## Flavors of channels
132 // From the perspective of a consumer of this library, there is only one flavor
133 // of channel. This channel can be used as a stream and cloned to allow multiple
134 // senders. Under the hood, however, there are actually three flavors of
137 // * Flavor::Oneshots - these channels are highly optimized for the one-send use case.
138 // They contain as few atomics as possible and involve one and
139 // exactly one allocation.
140 // * Streams - these channels are optimized for the non-shared use case. They
141 // use a different concurrent queue that is more tailored for this
142 // use case. The initial allocation of this flavor of channel is not
144 // * Shared - this is the most general form of channel that this module offers,
145 // a channel with multiple senders. This type is as optimized as it
146 // can be, but the previous two types mentioned are much faster for
149 // ## Concurrent queues
151 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
152 // recv() obviously blocks. This means that under the hood there must be some
153 // shared and concurrent queue holding all of the actual data.
155 // With two flavors of channels, two flavors of queues are also used. We have
156 // chosen to use queues from a well-known author that are abbreviated as SPSC
157 // and MPSC (single producer, single consumer and multiple producer, single
158 // consumer). SPSC queues are used for streams while MPSC queues are used for
161 // ### SPSC optimizations
163 // The SPSC queue found online is essentially a linked list of nodes where one
164 // half of the nodes are the "queue of data" and the other half of nodes are a
165 // cache of unused nodes. The unused nodes are used such that an allocation is
166 // not required on every push() and a free doesn't need to happen on every
169 // As found online, however, the cache of nodes is of an infinite size. This
170 // means that if a channel at one point in its life had 50k items in the queue,
171 // then the queue will always have the capacity for 50k items. I believed that
172 // this was an unnecessary limitation of the implementation, so I have altered
173 // the queue to optionally have a bound on the cache size.
175 // By default, streams will have an unbounded SPSC queue with a small-ish cache
176 // size. The hope is that the cache is still large enough to have very fast
177 // send() operations while not too large such that millions of channels can
180 // ### MPSC optimizations
182 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
183 // a linked list under the hood to earn its unboundedness, but I have not put
184 // forth much effort into having a cache of nodes similar to the SPSC queue.
186 // For now, I believe that this is "ok" because shared channels are not the most
187 // common type, but soon we may wish to revisit this queue choice and determine
188 // another candidate for backend storage of shared channels.
190 // ## Overview of the Implementation
192 // Now that there's a little background on the concurrent queues used, it's
193 // worth going into much more detail about the channels themselves. The basic
194 // pseudocode for a send/recv are:
198 // queue.push(t) return if queue.pop()
199 // if increment() == -1 deschedule {
200 // wakeup() if decrement() > 0
201 // cancel_deschedule()
205 // As mentioned before, there are no locks in this implementation, only atomic
206 // instructions are used.
208 // ### The internal atomic counter
210 // Every channel has a shared counter with each half to keep track of the size
211 // of the queue. This counter is used to abort descheduling by the receiver and
212 // to know when to wake up on the sending side.
214 // As seen in the pseudocode, senders will increment this count and receivers
215 // will decrement the count. The theory behind this is that if a sender sees a
216 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
217 // then it doesn't need to block.
219 // The recv() method has a beginning call to pop(), and if successful, it needs
220 // to decrement the count. It is a crucial implementation detail that this
221 // decrement does *not* happen to the shared counter. If this were the case,
222 // then it would be possible for the counter to be very negative when there were
223 // no receivers waiting, in which case the senders would have to determine when
224 // it was actually appropriate to wake up a receiver.
226 // Instead, the "steal count" is kept track of separately (not atomically
227 // because it's only used by receivers), and then the decrement() call when
228 // descheduling will lump in all of the recent steals into one large decrement.
230 // The implication of this is that if a sender sees a -1 count, then there's
231 // guaranteed to be a waiter waiting!
233 // ## Native Implementation
235 // A major goal of these channels is to work seamlessly on and off the runtime.
236 // All of the previous race conditions have been worded in terms of
237 // scheduler-isms (which is obviously not available without the runtime).
239 // For now, native usage of channels (off the runtime) will fall back onto
240 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
241 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
242 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
243 // condition variable.
247 // Being able to support selection over channels has greatly influenced this
248 // design, and not only does selection need to work inside the runtime, but also
249 // outside the runtime.
251 // The implementation is fairly straightforward. The goal of select() is not to
252 // return some data, but only to return which channel can receive data without
253 // blocking. The implementation is essentially the entire blocking procedure
254 // followed by an increment as soon as its woken up. The cancellation procedure
255 // involves an increment and swapping out of to_wake to acquire ownership of the
256 // thread to unblock.
258 // Sadly this current implementation requires multiple allocations, so I have
259 // seen the throughput of select() be much worse than it should be. I do not
260 // believe that there is anything fundamental that needs to change about these
261 // channels, however, in order to support a more efficient select().
265 // And now that you've seen all the races that I found and attempted to fix,
266 // here's the code for you to find some more!
272 use cell::UnsafeCell;
275 #[unstable(feature = "mpsc_select", issue = "27800")]
276 pub use self::select::{Select, Handle};
277 use self::select::StartResult;
278 use self::select::StartResult::*;
279 use self::blocking::SignalToken;
290 /// The receiving-half of Rust's channel type. This half can only be owned by
292 #[stable(feature = "rust1", since = "1.0.0")]
293 pub struct Receiver<T> {
294 inner: UnsafeCell<Flavor<T>>,
297 // The receiver port can be sent from place to place, so long as it
298 // is not used to receive non-sendable things.
299 #[stable(feature = "rust1", since = "1.0.0")]
300 unsafe impl<T: Send> Send for Receiver<T> { }
302 /// An iterator over messages on a receiver, this iterator will block
303 /// whenever `next` is called, waiting for a new message, and `None` will be
304 /// returned when the corresponding channel has hung up.
305 #[stable(feature = "rust1", since = "1.0.0")]
306 pub struct Iter<'a, T: 'a> {
310 /// An owning iterator over messages on a receiver, this iterator will block
311 /// whenever `next` is called, waiting for a new message, and `None` will be
312 /// returned when the corresponding channel has hung up.
313 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
314 pub struct IntoIter<T> {
318 /// The sending-half of Rust's asynchronous channel type. This half can only be
319 /// owned by one thread, but it can be cloned to send to other threads.
320 #[stable(feature = "rust1", since = "1.0.0")]
321 pub struct Sender<T> {
322 inner: UnsafeCell<Flavor<T>>,
325 // The send port can be sent from place to place, so long as it
326 // is not used to send non-sendable things.
327 #[stable(feature = "rust1", since = "1.0.0")]
328 unsafe impl<T: Send> Send for Sender<T> { }
330 /// The sending-half of Rust's synchronous channel type. This half can only be
331 /// owned by one thread, but it can be cloned to send to other threads.
332 #[stable(feature = "rust1", since = "1.0.0")]
333 pub struct SyncSender<T> {
334 inner: Arc<UnsafeCell<sync::Packet<T>>>,
337 #[stable(feature = "rust1", since = "1.0.0")]
338 unsafe impl<T: Send> Send for SyncSender<T> {}
340 #[stable(feature = "rust1", since = "1.0.0")]
341 impl<T> !Sync for SyncSender<T> {}
343 /// An error returned from the `send` function on channels.
345 /// A `send` operation can only fail if the receiving end of a channel is
346 /// disconnected, implying that the data could never be received. The error
347 /// contains the data being sent as a payload so it can be recovered.
348 #[stable(feature = "rust1", since = "1.0.0")]
349 #[derive(PartialEq, Eq, Clone, Copy)]
350 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
352 /// An error returned from the `recv` function on a `Receiver`.
354 /// The `recv` operation can only fail if the sending half of a channel is
355 /// disconnected, implying that no further messages will ever be received.
356 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
357 #[stable(feature = "rust1", since = "1.0.0")]
358 pub struct RecvError;
360 /// This enumeration is the list of the possible reasons that `try_recv` could
361 /// not return data when called.
362 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
363 #[stable(feature = "rust1", since = "1.0.0")]
364 pub enum TryRecvError {
365 /// This channel is currently empty, but the sender(s) have not yet
366 /// disconnected, so data may yet become available.
367 #[stable(feature = "rust1", since = "1.0.0")]
370 /// This channel's sending half has become disconnected, and there will
371 /// never be any more data received on this channel
372 #[stable(feature = "rust1", since = "1.0.0")]
376 /// This enumeration is the list of the possible error outcomes for the
377 /// `SyncSender::try_send` method.
378 #[stable(feature = "rust1", since = "1.0.0")]
379 #[derive(PartialEq, Eq, Clone, Copy)]
380 pub enum TrySendError<T> {
381 /// The data could not be sent on the channel because it would require that
382 /// the callee block to send the data.
384 /// If this is a buffered channel, then the buffer is full at this time. If
385 /// this is not a buffered channel, then there is no receiver available to
386 /// acquire the data.
387 #[stable(feature = "rust1", since = "1.0.0")]
388 Full(#[cfg_attr(not(stage0), stable(feature = "rust1", since = "1.0.0"))] T),
390 /// This channel's receiving half has disconnected, so the data could not be
391 /// sent. The data is returned back to the callee in this case.
392 #[stable(feature = "rust1", since = "1.0.0")]
393 Disconnected(#[cfg_attr(not(stage0), stable(feature = "rust1", since = "1.0.0"))] T),
397 Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
398 Stream(Arc<UnsafeCell<stream::Packet<T>>>),
399 Shared(Arc<UnsafeCell<shared::Packet<T>>>),
400 Sync(Arc<UnsafeCell<sync::Packet<T>>>),
404 trait UnsafeFlavor<T> {
405 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
406 unsafe fn inner_mut(&self) -> &mut Flavor<T> {
407 &mut *self.inner_unsafe().get()
409 unsafe fn inner(&self) -> &Flavor<T> {
410 &*self.inner_unsafe().get()
413 impl<T> UnsafeFlavor<T> for Sender<T> {
414 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
418 impl<T> UnsafeFlavor<T> for Receiver<T> {
419 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
424 /// Creates a new asynchronous channel, returning the sender/receiver halves.
426 /// All data sent on the sender will become available on the receiver, and no
427 /// send will block the calling thread (this channel has an "infinite buffer").
432 /// use std::sync::mpsc::channel;
435 /// // tx is the sending half (tx for transmission), and rx is the receiving
436 /// // half (rx for receiving).
437 /// let (tx, rx) = channel();
439 /// // Spawn off an expensive computation
440 /// thread::spawn(move|| {
441 /// # fn expensive_computation() {}
442 /// tx.send(expensive_computation()).unwrap();
445 /// // Do some useful work for awhile
447 /// // Let's see what that answer was
448 /// println!("{:?}", rx.recv().unwrap());
450 #[stable(feature = "rust1", since = "1.0.0")]
451 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
452 let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
453 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
456 /// Creates a new synchronous, bounded channel.
458 /// Like asynchronous channels, the `Receiver` will block until a message
459 /// becomes available. These channels differ greatly in the semantics of the
460 /// sender from asynchronous channels, however.
462 /// This channel has an internal buffer on which messages will be queued. When
463 /// the internal buffer becomes full, future sends will *block* waiting for the
464 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
465 /// becomes "rendezvous channel" where each send will not return until a recv
466 /// is paired with it.
468 /// As with asynchronous channels, all senders will panic in `send` if the
469 /// `Receiver` has been destroyed.
474 /// use std::sync::mpsc::sync_channel;
477 /// let (tx, rx) = sync_channel(1);
479 /// // this returns immediately
480 /// tx.send(1).unwrap();
482 /// thread::spawn(move|| {
483 /// // this will block until the previous message has been received
484 /// tx.send(2).unwrap();
487 /// assert_eq!(rx.recv().unwrap(), 1);
488 /// assert_eq!(rx.recv().unwrap(), 2);
490 #[stable(feature = "rust1", since = "1.0.0")]
491 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
492 let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
493 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
496 ////////////////////////////////////////////////////////////////////////////////
498 ////////////////////////////////////////////////////////////////////////////////
501 fn new(inner: Flavor<T>) -> Sender<T> {
503 inner: UnsafeCell::new(inner),
507 /// Attempts to send a value on this channel, returning it back if it could
510 /// A successful send occurs when it is determined that the other end of
511 /// the channel has not hung up already. An unsuccessful send would be one
512 /// where the corresponding receiver has already been deallocated. Note
513 /// that a return value of `Err` means that the data will never be
514 /// received, but a return value of `Ok` does *not* mean that the data
515 /// will be received. It is possible for the corresponding receiver to
516 /// hang up immediately after this function returns `Ok`.
518 /// This method will never block the current thread.
523 /// use std::sync::mpsc::channel;
525 /// let (tx, rx) = channel();
527 /// // This send is always successful
528 /// tx.send(1).unwrap();
530 /// // This send will fail because the receiver is gone
532 /// assert_eq!(tx.send(1).err().unwrap().0, 1);
534 #[stable(feature = "rust1", since = "1.0.0")]
535 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
536 let (new_inner, ret) = match *unsafe { self.inner() } {
537 Flavor::Oneshot(ref p) => {
541 return (*p).send(t).map_err(SendError);
544 Arc::new(UnsafeCell::new(stream::Packet::new()));
545 let rx = Receiver::new(Flavor::Stream(a.clone()));
546 match (*p).upgrade(rx) {
547 oneshot::UpSuccess => {
548 let ret = (*a.get()).send(t);
551 oneshot::UpDisconnected => (a, Err(t)),
552 oneshot::UpWoke(token) => {
553 // This send cannot panic because the thread is
554 // asleep (we're looking at it), so the receiver
556 (*a.get()).send(t).ok().unwrap();
564 Flavor::Stream(ref p) => return unsafe {
565 (*p.get()).send(t).map_err(SendError)
567 Flavor::Shared(ref p) => return unsafe {
568 (*p.get()).send(t).map_err(SendError)
570 Flavor::Sync(..) => unreachable!(),
574 let tmp = Sender::new(Flavor::Stream(new_inner));
575 mem::swap(self.inner_mut(), tmp.inner_mut());
577 ret.map_err(SendError)
581 #[stable(feature = "rust1", since = "1.0.0")]
582 impl<T> Clone for Sender<T> {
583 fn clone(&self) -> Sender<T> {
584 let (packet, sleeper, guard) = match *unsafe { self.inner() } {
585 Flavor::Oneshot(ref p) => {
586 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
588 let guard = (*a.get()).postinit_lock();
589 let rx = Receiver::new(Flavor::Shared(a.clone()));
590 match (*p.get()).upgrade(rx) {
592 oneshot::UpDisconnected => (a, None, guard),
593 oneshot::UpWoke(task) => (a, Some(task), guard)
597 Flavor::Stream(ref p) => {
598 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
600 let guard = (*a.get()).postinit_lock();
601 let rx = Receiver::new(Flavor::Shared(a.clone()));
602 match (*p.get()).upgrade(rx) {
604 stream::UpDisconnected => (a, None, guard),
605 stream::UpWoke(task) => (a, Some(task), guard),
609 Flavor::Shared(ref p) => {
610 unsafe { (*p.get()).clone_chan(); }
611 return Sender::new(Flavor::Shared(p.clone()));
613 Flavor::Sync(..) => unreachable!(),
617 (*packet.get()).inherit_blocker(sleeper, guard);
619 let tmp = Sender::new(Flavor::Shared(packet.clone()));
620 mem::swap(self.inner_mut(), tmp.inner_mut());
622 Sender::new(Flavor::Shared(packet))
626 #[stable(feature = "rust1", since = "1.0.0")]
627 impl<T> Drop for Sender<T> {
629 match *unsafe { self.inner_mut() } {
630 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
631 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
632 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
633 Flavor::Sync(..) => unreachable!(),
638 #[stable(feature = "mpsc_debug", since = "1.7.0")]
639 impl<T> fmt::Debug for Sender<T> {
640 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
641 write!(f, "Sender {{ .. }}")
645 ////////////////////////////////////////////////////////////////////////////////
647 ////////////////////////////////////////////////////////////////////////////////
649 impl<T> SyncSender<T> {
650 fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
651 SyncSender { inner: inner }
654 /// Sends a value on this synchronous channel.
656 /// This function will *block* until space in the internal buffer becomes
657 /// available or a receiver is available to hand off the message to.
659 /// Note that a successful send does *not* guarantee that the receiver will
660 /// ever see the data if there is a buffer on this channel. Items may be
661 /// enqueued in the internal buffer for the receiver to receive at a later
662 /// time. If the buffer size is 0, however, it can be guaranteed that the
663 /// receiver has indeed received the data if this function returns success.
665 /// This function will never panic, but it may return `Err` if the
666 /// `Receiver` has disconnected and is no longer able to receive
668 #[stable(feature = "rust1", since = "1.0.0")]
669 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
670 unsafe { (*self.inner.get()).send(t).map_err(SendError) }
673 /// Attempts to send a value on this channel without blocking.
675 /// This method differs from `send` by returning immediately if the
676 /// channel's buffer is full or no receiver is waiting to acquire some
677 /// data. Compared with `send`, this function has two failure cases
678 /// instead of one (one for disconnection, one for a full buffer).
680 /// See `SyncSender::send` for notes about guarantees of whether the
681 /// receiver has received the data or not if this function is successful.
682 #[stable(feature = "rust1", since = "1.0.0")]
683 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
684 unsafe { (*self.inner.get()).try_send(t) }
688 #[stable(feature = "rust1", since = "1.0.0")]
689 impl<T> Clone for SyncSender<T> {
690 fn clone(&self) -> SyncSender<T> {
691 unsafe { (*self.inner.get()).clone_chan(); }
692 SyncSender::new(self.inner.clone())
696 #[stable(feature = "rust1", since = "1.0.0")]
697 impl<T> Drop for SyncSender<T> {
699 unsafe { (*self.inner.get()).drop_chan(); }
703 #[stable(feature = "mpsc_debug", since = "1.7.0")]
704 impl<T> fmt::Debug for SyncSender<T> {
705 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
706 write!(f, "SyncSender {{ .. }}")
710 ////////////////////////////////////////////////////////////////////////////////
712 ////////////////////////////////////////////////////////////////////////////////
714 impl<T> Receiver<T> {
715 fn new(inner: Flavor<T>) -> Receiver<T> {
716 Receiver { inner: UnsafeCell::new(inner) }
719 /// Attempts to return a pending value on this receiver without blocking
721 /// This method will never block the caller in order to wait for data to
722 /// become available. Instead, this will always return immediately with a
723 /// possible option of pending data on the channel.
725 /// This is useful for a flavor of "optimistic check" before deciding to
726 /// block on a receiver.
727 #[stable(feature = "rust1", since = "1.0.0")]
728 pub fn try_recv(&self) -> Result<T, TryRecvError> {
730 let new_port = match *unsafe { self.inner() } {
731 Flavor::Oneshot(ref p) => {
732 match unsafe { (*p.get()).try_recv() } {
733 Ok(t) => return Ok(t),
734 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
735 Err(oneshot::Disconnected) => {
736 return Err(TryRecvError::Disconnected)
738 Err(oneshot::Upgraded(rx)) => rx,
741 Flavor::Stream(ref p) => {
742 match unsafe { (*p.get()).try_recv() } {
743 Ok(t) => return Ok(t),
744 Err(stream::Empty) => return Err(TryRecvError::Empty),
745 Err(stream::Disconnected) => {
746 return Err(TryRecvError::Disconnected)
748 Err(stream::Upgraded(rx)) => rx,
751 Flavor::Shared(ref p) => {
752 match unsafe { (*p.get()).try_recv() } {
753 Ok(t) => return Ok(t),
754 Err(shared::Empty) => return Err(TryRecvError::Empty),
755 Err(shared::Disconnected) => {
756 return Err(TryRecvError::Disconnected)
760 Flavor::Sync(ref p) => {
761 match unsafe { (*p.get()).try_recv() } {
762 Ok(t) => return Ok(t),
763 Err(sync::Empty) => return Err(TryRecvError::Empty),
764 Err(sync::Disconnected) => {
765 return Err(TryRecvError::Disconnected)
771 mem::swap(self.inner_mut(),
772 new_port.inner_mut());
777 /// Attempts to wait for a value on this receiver, returning an error if the
778 /// corresponding channel has hung up.
780 /// This function will always block the current thread if there is no data
781 /// available and it's possible for more data to be sent. Once a message is
782 /// sent to the corresponding `Sender`, then this receiver will wake up and
783 /// return that message.
785 /// If the corresponding `Sender` has disconnected, or it disconnects while
786 /// this call is blocking, this call will wake up and return `Err` to
787 /// indicate that no more messages can ever be received on this channel.
788 /// However, since channels are buffered, messages sent before the disconnect
789 /// will still be properly received.
794 /// use std::sync::mpsc;
797 /// let (send, recv) = mpsc::channel();
798 /// let handle = thread::spawn(move || {
799 /// send.send(1u8).unwrap();
802 /// handle.join().unwrap();
804 /// assert_eq!(Ok(1), recv.recv());
807 /// Buffering behavior:
810 /// use std::sync::mpsc;
812 /// use std::sync::mpsc::RecvError;
814 /// let (send, recv) = mpsc::channel();
815 /// let handle = thread::spawn(move || {
816 /// send.send(1u8).unwrap();
817 /// send.send(2).unwrap();
818 /// send.send(3).unwrap();
822 /// // wait for the thread to join so we ensure the sender is dropped
823 /// handle.join().unwrap();
825 /// assert_eq!(Ok(1), recv.recv());
826 /// assert_eq!(Ok(2), recv.recv());
827 /// assert_eq!(Ok(3), recv.recv());
828 /// assert_eq!(Err(RecvError), recv.recv());
830 #[stable(feature = "rust1", since = "1.0.0")]
831 pub fn recv(&self) -> Result<T, RecvError> {
833 let new_port = match *unsafe { self.inner() } {
834 Flavor::Oneshot(ref p) => {
835 match unsafe { (*p.get()).recv() } {
836 Ok(t) => return Ok(t),
837 Err(oneshot::Empty) => return unreachable!(),
838 Err(oneshot::Disconnected) => return Err(RecvError),
839 Err(oneshot::Upgraded(rx)) => rx,
842 Flavor::Stream(ref p) => {
843 match unsafe { (*p.get()).recv() } {
844 Ok(t) => return Ok(t),
845 Err(stream::Empty) => return unreachable!(),
846 Err(stream::Disconnected) => return Err(RecvError),
847 Err(stream::Upgraded(rx)) => rx,
850 Flavor::Shared(ref p) => {
851 match unsafe { (*p.get()).recv() } {
852 Ok(t) => return Ok(t),
853 Err(shared::Empty) => return unreachable!(),
854 Err(shared::Disconnected) => return Err(RecvError),
857 Flavor::Sync(ref p) => return unsafe {
858 (*p.get()).recv().map_err(|()| RecvError)
862 mem::swap(self.inner_mut(), new_port.inner_mut());
867 /// Returns an iterator that will block waiting for messages, but never
868 /// `panic!`. It will return `None` when the channel has hung up.
869 #[stable(feature = "rust1", since = "1.0.0")]
870 pub fn iter(&self) -> Iter<T> {
875 impl<T> select::Packet for Receiver<T> {
876 fn can_recv(&self) -> bool {
878 let new_port = match *unsafe { self.inner() } {
879 Flavor::Oneshot(ref p) => {
880 match unsafe { (*p.get()).can_recv() } {
881 Ok(ret) => return ret,
882 Err(upgrade) => upgrade,
885 Flavor::Stream(ref p) => {
886 match unsafe { (*p.get()).can_recv() } {
887 Ok(ret) => return ret,
888 Err(upgrade) => upgrade,
891 Flavor::Shared(ref p) => {
892 return unsafe { (*p.get()).can_recv() };
894 Flavor::Sync(ref p) => {
895 return unsafe { (*p.get()).can_recv() };
899 mem::swap(self.inner_mut(),
900 new_port.inner_mut());
905 fn start_selection(&self, mut token: SignalToken) -> StartResult {
907 let (t, new_port) = match *unsafe { self.inner() } {
908 Flavor::Oneshot(ref p) => {
909 match unsafe { (*p.get()).start_selection(token) } {
910 oneshot::SelSuccess => return Installed,
911 oneshot::SelCanceled => return Abort,
912 oneshot::SelUpgraded(t, rx) => (t, rx),
915 Flavor::Stream(ref p) => {
916 match unsafe { (*p.get()).start_selection(token) } {
917 stream::SelSuccess => return Installed,
918 stream::SelCanceled => return Abort,
919 stream::SelUpgraded(t, rx) => (t, rx),
922 Flavor::Shared(ref p) => {
923 return unsafe { (*p.get()).start_selection(token) };
925 Flavor::Sync(ref p) => {
926 return unsafe { (*p.get()).start_selection(token) };
931 mem::swap(self.inner_mut(), new_port.inner_mut());
936 fn abort_selection(&self) -> bool {
937 let mut was_upgrade = false;
939 let result = match *unsafe { self.inner() } {
940 Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
941 Flavor::Stream(ref p) => unsafe {
942 (*p.get()).abort_selection(was_upgrade)
944 Flavor::Shared(ref p) => return unsafe {
945 (*p.get()).abort_selection(was_upgrade)
947 Flavor::Sync(ref p) => return unsafe {
948 (*p.get()).abort_selection()
951 let new_port = match result { Ok(b) => return b, Err(p) => p };
954 mem::swap(self.inner_mut(),
955 new_port.inner_mut());
961 #[stable(feature = "rust1", since = "1.0.0")]
962 impl<'a, T> Iterator for Iter<'a, T> {
965 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
968 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
969 impl<'a, T> IntoIterator for &'a Receiver<T> {
971 type IntoIter = Iter<'a, T>;
973 fn into_iter(self) -> Iter<'a, T> { self.iter() }
976 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
977 impl<T> Iterator for IntoIter<T> {
979 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
982 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
983 impl <T> IntoIterator for Receiver<T> {
985 type IntoIter = IntoIter<T>;
987 fn into_iter(self) -> IntoIter<T> {
988 IntoIter { rx: self }
992 #[stable(feature = "rust1", since = "1.0.0")]
993 impl<T> Drop for Receiver<T> {
995 match *unsafe { self.inner_mut() } {
996 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
997 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
998 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
999 Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
1004 #[stable(feature = "mpsc_debug", since = "1.7.0")]
1005 impl<T> fmt::Debug for Receiver<T> {
1006 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1007 write!(f, "Receiver {{ .. }}")
1011 #[stable(feature = "rust1", since = "1.0.0")]
1012 impl<T> fmt::Debug for SendError<T> {
1013 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1014 "SendError(..)".fmt(f)
1018 #[stable(feature = "rust1", since = "1.0.0")]
1019 impl<T> fmt::Display for SendError<T> {
1020 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1021 "sending on a closed channel".fmt(f)
1025 #[stable(feature = "rust1", since = "1.0.0")]
1026 impl<T: Send + Reflect> error::Error for SendError<T> {
1027 fn description(&self) -> &str {
1028 "sending on a closed channel"
1031 fn cause(&self) -> Option<&error::Error> {
1036 #[stable(feature = "rust1", since = "1.0.0")]
1037 impl<T> fmt::Debug for TrySendError<T> {
1038 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1040 TrySendError::Full(..) => "Full(..)".fmt(f),
1041 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1046 #[stable(feature = "rust1", since = "1.0.0")]
1047 impl<T> fmt::Display for TrySendError<T> {
1048 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1050 TrySendError::Full(..) => {
1051 "sending on a full channel".fmt(f)
1053 TrySendError::Disconnected(..) => {
1054 "sending on a closed channel".fmt(f)
1060 #[stable(feature = "rust1", since = "1.0.0")]
1061 impl<T: Send + Reflect> error::Error for TrySendError<T> {
1063 fn description(&self) -> &str {
1065 TrySendError::Full(..) => {
1066 "sending on a full channel"
1068 TrySendError::Disconnected(..) => {
1069 "sending on a closed channel"
1074 fn cause(&self) -> Option<&error::Error> {
1079 #[stable(feature = "rust1", since = "1.0.0")]
1080 impl fmt::Display for RecvError {
1081 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1082 "receiving on a closed channel".fmt(f)
1086 #[stable(feature = "rust1", since = "1.0.0")]
1087 impl error::Error for RecvError {
1089 fn description(&self) -> &str {
1090 "receiving on a closed channel"
1093 fn cause(&self) -> Option<&error::Error> {
1098 #[stable(feature = "rust1", since = "1.0.0")]
1099 impl fmt::Display for TryRecvError {
1100 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1102 TryRecvError::Empty => {
1103 "receiving on an empty channel".fmt(f)
1105 TryRecvError::Disconnected => {
1106 "receiving on a closed channel".fmt(f)
1112 #[stable(feature = "rust1", since = "1.0.0")]
1113 impl error::Error for TryRecvError {
1115 fn description(&self) -> &str {
1117 TryRecvError::Empty => {
1118 "receiving on an empty channel"
1120 TryRecvError::Disconnected => {
1121 "receiving on a closed channel"
1126 fn cause(&self) -> Option<&error::Error> {
1139 pub fn stress_factor() -> usize {
1140 match env::var("RUST_TEST_STRESS") {
1141 Ok(val) => val.parse().unwrap(),
1148 let (tx, rx) = channel::<i32>();
1149 tx.send(1).unwrap();
1150 assert_eq!(rx.recv().unwrap(), 1);
1155 let (tx, _rx) = channel::<Box<isize>>();
1156 tx.send(box 1).unwrap();
1160 fn drop_full_shared() {
1161 let (tx, _rx) = channel::<Box<isize>>();
1164 tx.send(box 1).unwrap();
1169 let (tx, rx) = channel::<i32>();
1170 tx.send(1).unwrap();
1171 assert_eq!(rx.recv().unwrap(), 1);
1172 let tx = tx.clone();
1173 tx.send(1).unwrap();
1174 assert_eq!(rx.recv().unwrap(), 1);
1178 fn smoke_threads() {
1179 let (tx, rx) = channel::<i32>();
1180 let _t = thread::spawn(move|| {
1181 tx.send(1).unwrap();
1183 assert_eq!(rx.recv().unwrap(), 1);
1187 fn smoke_port_gone() {
1188 let (tx, rx) = channel::<i32>();
1190 assert!(tx.send(1).is_err());
1194 fn smoke_shared_port_gone() {
1195 let (tx, rx) = channel::<i32>();
1197 assert!(tx.send(1).is_err())
1201 fn smoke_shared_port_gone2() {
1202 let (tx, rx) = channel::<i32>();
1204 let tx2 = tx.clone();
1206 assert!(tx2.send(1).is_err());
1210 fn port_gone_concurrent() {
1211 let (tx, rx) = channel::<i32>();
1212 let _t = thread::spawn(move|| {
1215 while tx.send(1).is_ok() {}
1219 fn port_gone_concurrent_shared() {
1220 let (tx, rx) = channel::<i32>();
1221 let tx2 = tx.clone();
1222 let _t = thread::spawn(move|| {
1225 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1229 fn smoke_chan_gone() {
1230 let (tx, rx) = channel::<i32>();
1232 assert!(rx.recv().is_err());
1236 fn smoke_chan_gone_shared() {
1237 let (tx, rx) = channel::<()>();
1238 let tx2 = tx.clone();
1241 assert!(rx.recv().is_err());
1245 fn chan_gone_concurrent() {
1246 let (tx, rx) = channel::<i32>();
1247 let _t = thread::spawn(move|| {
1248 tx.send(1).unwrap();
1249 tx.send(1).unwrap();
1251 while rx.recv().is_ok() {}
1256 let (tx, rx) = channel::<i32>();
1257 let t = thread::spawn(move|| {
1258 for _ in 0..10000 { tx.send(1).unwrap(); }
1261 assert_eq!(rx.recv().unwrap(), 1);
1263 t.join().ok().unwrap();
1267 fn stress_shared() {
1268 const AMT: u32 = 10000;
1269 const NTHREADS: u32 = 8;
1270 let (tx, rx) = channel::<i32>();
1272 let t = thread::spawn(move|| {
1273 for _ in 0..AMT * NTHREADS {
1274 assert_eq!(rx.recv().unwrap(), 1);
1276 match rx.try_recv() {
1282 for _ in 0..NTHREADS {
1283 let tx = tx.clone();
1284 thread::spawn(move|| {
1285 for _ in 0..AMT { tx.send(1).unwrap(); }
1289 t.join().ok().unwrap();
1293 fn send_from_outside_runtime() {
1294 let (tx1, rx1) = channel::<()>();
1295 let (tx2, rx2) = channel::<i32>();
1296 let t1 = thread::spawn(move|| {
1297 tx1.send(()).unwrap();
1299 assert_eq!(rx2.recv().unwrap(), 1);
1302 rx1.recv().unwrap();
1303 let t2 = thread::spawn(move|| {
1305 tx2.send(1).unwrap();
1308 t1.join().ok().unwrap();
1309 t2.join().ok().unwrap();
1313 fn recv_from_outside_runtime() {
1314 let (tx, rx) = channel::<i32>();
1315 let t = thread::spawn(move|| {
1317 assert_eq!(rx.recv().unwrap(), 1);
1321 tx.send(1).unwrap();
1323 t.join().ok().unwrap();
1328 let (tx1, rx1) = channel::<i32>();
1329 let (tx2, rx2) = channel::<i32>();
1330 let t1 = thread::spawn(move|| {
1331 assert_eq!(rx1.recv().unwrap(), 1);
1332 tx2.send(2).unwrap();
1334 let t2 = thread::spawn(move|| {
1335 tx1.send(1).unwrap();
1336 assert_eq!(rx2.recv().unwrap(), 2);
1338 t1.join().ok().unwrap();
1339 t2.join().ok().unwrap();
1343 fn oneshot_single_thread_close_port_first() {
1344 // Simple test of closing without sending
1345 let (_tx, rx) = channel::<i32>();
1350 fn oneshot_single_thread_close_chan_first() {
1351 // Simple test of closing without sending
1352 let (tx, _rx) = channel::<i32>();
1357 fn oneshot_single_thread_send_port_close() {
1358 // Testing that the sender cleans up the payload if receiver is closed
1359 let (tx, rx) = channel::<Box<i32>>();
1361 assert!(tx.send(box 0).is_err());
1365 fn oneshot_single_thread_recv_chan_close() {
1366 // Receiving on a closed chan will panic
1367 let res = thread::spawn(move|| {
1368 let (tx, rx) = channel::<i32>();
1373 assert!(res.is_err());
1377 fn oneshot_single_thread_send_then_recv() {
1378 let (tx, rx) = channel::<Box<i32>>();
1379 tx.send(box 10).unwrap();
1380 assert!(rx.recv().unwrap() == box 10);
1384 fn oneshot_single_thread_try_send_open() {
1385 let (tx, rx) = channel::<i32>();
1386 assert!(tx.send(10).is_ok());
1387 assert!(rx.recv().unwrap() == 10);
1391 fn oneshot_single_thread_try_send_closed() {
1392 let (tx, rx) = channel::<i32>();
1394 assert!(tx.send(10).is_err());
1398 fn oneshot_single_thread_try_recv_open() {
1399 let (tx, rx) = channel::<i32>();
1400 tx.send(10).unwrap();
1401 assert!(rx.recv() == Ok(10));
1405 fn oneshot_single_thread_try_recv_closed() {
1406 let (tx, rx) = channel::<i32>();
1408 assert!(rx.recv().is_err());
1412 fn oneshot_single_thread_peek_data() {
1413 let (tx, rx) = channel::<i32>();
1414 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1415 tx.send(10).unwrap();
1416 assert_eq!(rx.try_recv(), Ok(10));
1420 fn oneshot_single_thread_peek_close() {
1421 let (tx, rx) = channel::<i32>();
1423 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1424 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1428 fn oneshot_single_thread_peek_open() {
1429 let (_tx, rx) = channel::<i32>();
1430 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1434 fn oneshot_multi_task_recv_then_send() {
1435 let (tx, rx) = channel::<Box<i32>>();
1436 let _t = thread::spawn(move|| {
1437 assert!(rx.recv().unwrap() == box 10);
1440 tx.send(box 10).unwrap();
1444 fn oneshot_multi_task_recv_then_close() {
1445 let (tx, rx) = channel::<Box<i32>>();
1446 let _t = thread::spawn(move|| {
1449 let res = thread::spawn(move|| {
1450 assert!(rx.recv().unwrap() == box 10);
1452 assert!(res.is_err());
1456 fn oneshot_multi_thread_close_stress() {
1457 for _ in 0..stress_factor() {
1458 let (tx, rx) = channel::<i32>();
1459 let _t = thread::spawn(move|| {
1467 fn oneshot_multi_thread_send_close_stress() {
1468 for _ in 0..stress_factor() {
1469 let (tx, rx) = channel::<i32>();
1470 let _t = thread::spawn(move|| {
1473 let _ = thread::spawn(move|| {
1474 tx.send(1).unwrap();
1480 fn oneshot_multi_thread_recv_close_stress() {
1481 for _ in 0..stress_factor() {
1482 let (tx, rx) = channel::<i32>();
1483 thread::spawn(move|| {
1484 let res = thread::spawn(move|| {
1487 assert!(res.is_err());
1489 let _t = thread::spawn(move|| {
1490 thread::spawn(move|| {
1498 fn oneshot_multi_thread_send_recv_stress() {
1499 for _ in 0..stress_factor() {
1500 let (tx, rx) = channel::<Box<isize>>();
1501 let _t = thread::spawn(move|| {
1502 tx.send(box 10).unwrap();
1504 assert!(rx.recv().unwrap() == box 10);
1509 fn stream_send_recv_stress() {
1510 for _ in 0..stress_factor() {
1511 let (tx, rx) = channel();
1516 fn send(tx: Sender<Box<i32>>, i: i32) {
1517 if i == 10 { return }
1519 thread::spawn(move|| {
1520 tx.send(box i).unwrap();
1525 fn recv(rx: Receiver<Box<i32>>, i: i32) {
1526 if i == 10 { return }
1528 thread::spawn(move|| {
1529 assert!(rx.recv().unwrap() == box i);
1538 // Regression test that we don't run out of stack in scheduler context
1539 let (tx, rx) = channel();
1540 for _ in 0..10000 { tx.send(()).unwrap(); }
1541 for _ in 0..10000 { rx.recv().unwrap(); }
1545 fn shared_chan_stress() {
1546 let (tx, rx) = channel();
1547 let total = stress_factor() + 100;
1549 let tx = tx.clone();
1550 thread::spawn(move|| {
1551 tx.send(()).unwrap();
1561 fn test_nested_recv_iter() {
1562 let (tx, rx) = channel::<i32>();
1563 let (total_tx, total_rx) = channel::<i32>();
1565 let _t = thread::spawn(move|| {
1567 for x in rx.iter() {
1570 total_tx.send(acc).unwrap();
1573 tx.send(3).unwrap();
1574 tx.send(1).unwrap();
1575 tx.send(2).unwrap();
1577 assert_eq!(total_rx.recv().unwrap(), 6);
1581 fn test_recv_iter_break() {
1582 let (tx, rx) = channel::<i32>();
1583 let (count_tx, count_rx) = channel();
1585 let _t = thread::spawn(move|| {
1587 for x in rx.iter() {
1594 count_tx.send(count).unwrap();
1597 tx.send(2).unwrap();
1598 tx.send(2).unwrap();
1599 tx.send(2).unwrap();
1602 assert_eq!(count_rx.recv().unwrap(), 4);
1606 fn test_recv_into_iter_owned() {
1608 let (tx, rx) = channel::<i32>();
1609 tx.send(1).unwrap();
1610 tx.send(2).unwrap();
1614 assert_eq!(iter.next().unwrap(), 1);
1615 assert_eq!(iter.next().unwrap(), 2);
1616 assert_eq!(iter.next().is_none(), true);
1620 fn test_recv_into_iter_borrowed() {
1621 let (tx, rx) = channel::<i32>();
1622 tx.send(1).unwrap();
1623 tx.send(2).unwrap();
1625 let mut iter = (&rx).into_iter();
1626 assert_eq!(iter.next().unwrap(), 1);
1627 assert_eq!(iter.next().unwrap(), 2);
1628 assert_eq!(iter.next().is_none(), true);
1632 fn try_recv_states() {
1633 let (tx1, rx1) = channel::<i32>();
1634 let (tx2, rx2) = channel::<()>();
1635 let (tx3, rx3) = channel::<()>();
1636 let _t = thread::spawn(move|| {
1637 rx2.recv().unwrap();
1638 tx1.send(1).unwrap();
1639 tx3.send(()).unwrap();
1640 rx2.recv().unwrap();
1642 tx3.send(()).unwrap();
1645 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1646 tx2.send(()).unwrap();
1647 rx3.recv().unwrap();
1648 assert_eq!(rx1.try_recv(), Ok(1));
1649 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1650 tx2.send(()).unwrap();
1651 rx3.recv().unwrap();
1652 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1655 // This bug used to end up in a livelock inside of the Receiver destructor
1656 // because the internal state of the Shared packet was corrupted
1658 fn destroy_upgraded_shared_port_when_sender_still_active() {
1659 let (tx, rx) = channel();
1660 let (tx2, rx2) = channel();
1661 let _t = thread::spawn(move|| {
1662 rx.recv().unwrap(); // wait on a oneshot
1663 drop(rx); // destroy a shared
1664 tx2.send(()).unwrap();
1666 // make sure the other thread has gone to sleep
1667 for _ in 0..5000 { thread::yield_now(); }
1669 // upgrade to a shared chan and send a message
1672 t.send(()).unwrap();
1674 // wait for the child thread to exit before we exit
1675 rx2.recv().unwrap();
1687 pub fn stress_factor() -> usize {
1688 match env::var("RUST_TEST_STRESS") {
1689 Ok(val) => val.parse().unwrap(),
1696 let (tx, rx) = sync_channel::<i32>(1);
1697 tx.send(1).unwrap();
1698 assert_eq!(rx.recv().unwrap(), 1);
1703 let (tx, _rx) = sync_channel::<Box<isize>>(1);
1704 tx.send(box 1).unwrap();
1709 let (tx, rx) = sync_channel::<i32>(1);
1710 tx.send(1).unwrap();
1711 assert_eq!(rx.recv().unwrap(), 1);
1712 let tx = tx.clone();
1713 tx.send(1).unwrap();
1714 assert_eq!(rx.recv().unwrap(), 1);
1718 fn smoke_threads() {
1719 let (tx, rx) = sync_channel::<i32>(0);
1720 let _t = thread::spawn(move|| {
1721 tx.send(1).unwrap();
1723 assert_eq!(rx.recv().unwrap(), 1);
1727 fn smoke_port_gone() {
1728 let (tx, rx) = sync_channel::<i32>(0);
1730 assert!(tx.send(1).is_err());
1734 fn smoke_shared_port_gone2() {
1735 let (tx, rx) = sync_channel::<i32>(0);
1737 let tx2 = tx.clone();
1739 assert!(tx2.send(1).is_err());
1743 fn port_gone_concurrent() {
1744 let (tx, rx) = sync_channel::<i32>(0);
1745 let _t = thread::spawn(move|| {
1748 while tx.send(1).is_ok() {}
1752 fn port_gone_concurrent_shared() {
1753 let (tx, rx) = sync_channel::<i32>(0);
1754 let tx2 = tx.clone();
1755 let _t = thread::spawn(move|| {
1758 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1762 fn smoke_chan_gone() {
1763 let (tx, rx) = sync_channel::<i32>(0);
1765 assert!(rx.recv().is_err());
1769 fn smoke_chan_gone_shared() {
1770 let (tx, rx) = sync_channel::<()>(0);
1771 let tx2 = tx.clone();
1774 assert!(rx.recv().is_err());
1778 fn chan_gone_concurrent() {
1779 let (tx, rx) = sync_channel::<i32>(0);
1780 thread::spawn(move|| {
1781 tx.send(1).unwrap();
1782 tx.send(1).unwrap();
1784 while rx.recv().is_ok() {}
1789 let (tx, rx) = sync_channel::<i32>(0);
1790 thread::spawn(move|| {
1791 for _ in 0..10000 { tx.send(1).unwrap(); }
1794 assert_eq!(rx.recv().unwrap(), 1);
1799 fn stress_shared() {
1800 const AMT: u32 = 1000;
1801 const NTHREADS: u32 = 8;
1802 let (tx, rx) = sync_channel::<i32>(0);
1803 let (dtx, drx) = sync_channel::<()>(0);
1805 thread::spawn(move|| {
1806 for _ in 0..AMT * NTHREADS {
1807 assert_eq!(rx.recv().unwrap(), 1);
1809 match rx.try_recv() {
1813 dtx.send(()).unwrap();
1816 for _ in 0..NTHREADS {
1817 let tx = tx.clone();
1818 thread::spawn(move|| {
1819 for _ in 0..AMT { tx.send(1).unwrap(); }
1823 drx.recv().unwrap();
1827 fn oneshot_single_thread_close_port_first() {
1828 // Simple test of closing without sending
1829 let (_tx, rx) = sync_channel::<i32>(0);
1834 fn oneshot_single_thread_close_chan_first() {
1835 // Simple test of closing without sending
1836 let (tx, _rx) = sync_channel::<i32>(0);
1841 fn oneshot_single_thread_send_port_close() {
1842 // Testing that the sender cleans up the payload if receiver is closed
1843 let (tx, rx) = sync_channel::<Box<i32>>(0);
1845 assert!(tx.send(box 0).is_err());
1849 fn oneshot_single_thread_recv_chan_close() {
1850 // Receiving on a closed chan will panic
1851 let res = thread::spawn(move|| {
1852 let (tx, rx) = sync_channel::<i32>(0);
1857 assert!(res.is_err());
1861 fn oneshot_single_thread_send_then_recv() {
1862 let (tx, rx) = sync_channel::<Box<i32>>(1);
1863 tx.send(box 10).unwrap();
1864 assert!(rx.recv().unwrap() == box 10);
1868 fn oneshot_single_thread_try_send_open() {
1869 let (tx, rx) = sync_channel::<i32>(1);
1870 assert_eq!(tx.try_send(10), Ok(()));
1871 assert!(rx.recv().unwrap() == 10);
1875 fn oneshot_single_thread_try_send_closed() {
1876 let (tx, rx) = sync_channel::<i32>(0);
1878 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1882 fn oneshot_single_thread_try_send_closed2() {
1883 let (tx, _rx) = sync_channel::<i32>(0);
1884 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1888 fn oneshot_single_thread_try_recv_open() {
1889 let (tx, rx) = sync_channel::<i32>(1);
1890 tx.send(10).unwrap();
1891 assert!(rx.recv() == Ok(10));
1895 fn oneshot_single_thread_try_recv_closed() {
1896 let (tx, rx) = sync_channel::<i32>(0);
1898 assert!(rx.recv().is_err());
1902 fn oneshot_single_thread_peek_data() {
1903 let (tx, rx) = sync_channel::<i32>(1);
1904 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1905 tx.send(10).unwrap();
1906 assert_eq!(rx.try_recv(), Ok(10));
1910 fn oneshot_single_thread_peek_close() {
1911 let (tx, rx) = sync_channel::<i32>(0);
1913 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1914 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1918 fn oneshot_single_thread_peek_open() {
1919 let (_tx, rx) = sync_channel::<i32>(0);
1920 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1924 fn oneshot_multi_task_recv_then_send() {
1925 let (tx, rx) = sync_channel::<Box<i32>>(0);
1926 let _t = thread::spawn(move|| {
1927 assert!(rx.recv().unwrap() == box 10);
1930 tx.send(box 10).unwrap();
1934 fn oneshot_multi_task_recv_then_close() {
1935 let (tx, rx) = sync_channel::<Box<i32>>(0);
1936 let _t = thread::spawn(move|| {
1939 let res = thread::spawn(move|| {
1940 assert!(rx.recv().unwrap() == box 10);
1942 assert!(res.is_err());
1946 fn oneshot_multi_thread_close_stress() {
1947 for _ in 0..stress_factor() {
1948 let (tx, rx) = sync_channel::<i32>(0);
1949 let _t = thread::spawn(move|| {
1957 fn oneshot_multi_thread_send_close_stress() {
1958 for _ in 0..stress_factor() {
1959 let (tx, rx) = sync_channel::<i32>(0);
1960 let _t = thread::spawn(move|| {
1963 let _ = thread::spawn(move || {
1964 tx.send(1).unwrap();
1970 fn oneshot_multi_thread_recv_close_stress() {
1971 for _ in 0..stress_factor() {
1972 let (tx, rx) = sync_channel::<i32>(0);
1973 let _t = thread::spawn(move|| {
1974 let res = thread::spawn(move|| {
1977 assert!(res.is_err());
1979 let _t = thread::spawn(move|| {
1980 thread::spawn(move|| {
1988 fn oneshot_multi_thread_send_recv_stress() {
1989 for _ in 0..stress_factor() {
1990 let (tx, rx) = sync_channel::<Box<i32>>(0);
1991 let _t = thread::spawn(move|| {
1992 tx.send(box 10).unwrap();
1994 assert!(rx.recv().unwrap() == box 10);
1999 fn stream_send_recv_stress() {
2000 for _ in 0..stress_factor() {
2001 let (tx, rx) = sync_channel::<Box<i32>>(0);
2006 fn send(tx: SyncSender<Box<i32>>, i: i32) {
2007 if i == 10 { return }
2009 thread::spawn(move|| {
2010 tx.send(box i).unwrap();
2015 fn recv(rx: Receiver<Box<i32>>, i: i32) {
2016 if i == 10 { return }
2018 thread::spawn(move|| {
2019 assert!(rx.recv().unwrap() == box i);
2028 // Regression test that we don't run out of stack in scheduler context
2029 let (tx, rx) = sync_channel(10000);
2030 for _ in 0..10000 { tx.send(()).unwrap(); }
2031 for _ in 0..10000 { rx.recv().unwrap(); }
2035 fn shared_chan_stress() {
2036 let (tx, rx) = sync_channel(0);
2037 let total = stress_factor() + 100;
2039 let tx = tx.clone();
2040 thread::spawn(move|| {
2041 tx.send(()).unwrap();
2051 fn test_nested_recv_iter() {
2052 let (tx, rx) = sync_channel::<i32>(0);
2053 let (total_tx, total_rx) = sync_channel::<i32>(0);
2055 let _t = thread::spawn(move|| {
2057 for x in rx.iter() {
2060 total_tx.send(acc).unwrap();
2063 tx.send(3).unwrap();
2064 tx.send(1).unwrap();
2065 tx.send(2).unwrap();
2067 assert_eq!(total_rx.recv().unwrap(), 6);
2071 fn test_recv_iter_break() {
2072 let (tx, rx) = sync_channel::<i32>(0);
2073 let (count_tx, count_rx) = sync_channel(0);
2075 let _t = thread::spawn(move|| {
2077 for x in rx.iter() {
2084 count_tx.send(count).unwrap();
2087 tx.send(2).unwrap();
2088 tx.send(2).unwrap();
2089 tx.send(2).unwrap();
2090 let _ = tx.try_send(2);
2092 assert_eq!(count_rx.recv().unwrap(), 4);
2096 fn try_recv_states() {
2097 let (tx1, rx1) = sync_channel::<i32>(1);
2098 let (tx2, rx2) = sync_channel::<()>(1);
2099 let (tx3, rx3) = sync_channel::<()>(1);
2100 let _t = thread::spawn(move|| {
2101 rx2.recv().unwrap();
2102 tx1.send(1).unwrap();
2103 tx3.send(()).unwrap();
2104 rx2.recv().unwrap();
2106 tx3.send(()).unwrap();
2109 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2110 tx2.send(()).unwrap();
2111 rx3.recv().unwrap();
2112 assert_eq!(rx1.try_recv(), Ok(1));
2113 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2114 tx2.send(()).unwrap();
2115 rx3.recv().unwrap();
2116 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2119 // This bug used to end up in a livelock inside of the Receiver destructor
2120 // because the internal state of the Shared packet was corrupted
2122 fn destroy_upgraded_shared_port_when_sender_still_active() {
2123 let (tx, rx) = sync_channel::<()>(0);
2124 let (tx2, rx2) = sync_channel::<()>(0);
2125 let _t = thread::spawn(move|| {
2126 rx.recv().unwrap(); // wait on a oneshot
2127 drop(rx); // destroy a shared
2128 tx2.send(()).unwrap();
2130 // make sure the other thread has gone to sleep
2131 for _ in 0..5000 { thread::yield_now(); }
2133 // upgrade to a shared chan and send a message
2136 t.send(()).unwrap();
2138 // wait for the child thread to exit before we exit
2139 rx2.recv().unwrap();
2144 let (tx, rx) = sync_channel::<i32>(0);
2145 let _t = thread::spawn(move|| { rx.recv().unwrap(); });
2146 assert_eq!(tx.send(1), Ok(()));
2151 let (tx, rx) = sync_channel::<i32>(0);
2152 let _t = thread::spawn(move|| { drop(rx); });
2153 assert!(tx.send(1).is_err());
2158 let (tx, rx) = sync_channel::<i32>(1);
2159 assert_eq!(tx.send(1), Ok(()));
2160 let _t =thread::spawn(move|| { drop(rx); });
2161 assert!(tx.send(1).is_err());
2166 let (tx, rx) = sync_channel::<i32>(0);
2167 let tx2 = tx.clone();
2168 let (done, donerx) = channel();
2169 let done2 = done.clone();
2170 let _t = thread::spawn(move|| {
2171 assert!(tx.send(1).is_err());
2172 done.send(()).unwrap();
2174 let _t = thread::spawn(move|| {
2175 assert!(tx2.send(2).is_err());
2176 done2.send(()).unwrap();
2179 donerx.recv().unwrap();
2180 donerx.recv().unwrap();
2185 let (tx, _rx) = sync_channel::<i32>(0);
2186 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2191 let (tx, _rx) = sync_channel::<i32>(1);
2192 assert_eq!(tx.try_send(1), Ok(()));
2193 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2198 let (tx, rx) = sync_channel::<i32>(1);
2199 assert_eq!(tx.try_send(1), Ok(()));
2201 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2207 let (tx1, rx1) = sync_channel::<()>(3);
2208 let (tx2, rx2) = sync_channel::<()>(3);
2210 let _t = thread::spawn(move|| {
2211 rx1.recv().unwrap();
2212 tx2.try_send(()).unwrap();
2215 tx1.try_send(()).unwrap();
2216 rx2.recv().unwrap();
2225 fn fmt_debug_sender() {
2226 let (tx, _) = channel::<i32>();
2227 assert_eq!(format!("{:?}", tx), "Sender { .. }");
2231 fn fmt_debug_recv() {
2232 let (_, rx) = channel::<i32>();
2233 assert_eq!(format!("{:?}", rx), "Receiver { .. }");
2237 fn fmt_debug_sync_sender() {
2238 let (tx, _) = sync_channel::<i32>(1);
2239 assert_eq!(format!("{:?}", tx), "SyncSender { .. }");