1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Multi-producer, single-consumer FIFO queue communication primitives.
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
20 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
24 //! These channels come in two flavors:
26 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27 //! will return a `(Sender, Receiver)` tuple where all sends will be
28 //! **asynchronous** (they never block). The channel conceptually has an
31 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32 //! a `(SyncSender, Receiver)` tuple where the storage for pending messages
33 //! is a pre-allocated buffer of a fixed size. All sends will be
34 //! **synchronous** by blocking until there is buffer space available. Note
35 //! that a bound of 0 is allowed, causing the channel to become a
36 //! "rendezvous" channel where each sender atomically hands off a message to
41 //! The send and receive operations on channels will all return a `Result`
42 //! indicating whether the operation succeeded or not. An unsuccessful operation
43 //! is normally indicative of the other half of a channel having "hung up" by
44 //! being dropped in its corresponding thread.
46 //! Once half of a channel has been deallocated, most operations can no longer
47 //! continue to make progress, so `Err` will be returned. Many applications will
48 //! continue to `unwrap()` the results returned from this module, instigating a
49 //! propagation of failure among threads if one unexpectedly dies.
57 //! use std::sync::mpsc::channel;
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! thread::spawn(move|| {
62 //! tx.send(10).unwrap();
64 //! assert_eq!(rx.recv().unwrap(), 10);
71 //! use std::sync::mpsc::channel;
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
78 //! let tx = tx.clone();
79 //! thread::spawn(move|| {
80 //! tx.send(i).unwrap();
85 //! let j = rx.recv().unwrap();
86 //! assert!(0 <= j && j < 10);
90 //! Propagating panics:
93 //! use std::sync::mpsc::channel;
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<i32>();
99 //! assert!(rx.recv().is_err());
102 //! Synchronous channels:
106 //! use std::sync::mpsc::sync_channel;
108 //! let (tx, rx) = sync_channel::<i32>(0);
109 //! thread::spawn(move|| {
110 //! // This will wait for the parent thread to start receiving
111 //! tx.send(53).unwrap();
113 //! rx.recv().unwrap();
116 #![stable(feature = "rust1", since = "1.0.0")]
118 // A description of how Rust's channel implementation works
120 // Channels are supposed to be the basic building block for all other
121 // concurrent primitives that are used in Rust. As a result, the channel type
122 // needs to be highly optimized, flexible, and broad enough for use everywhere.
124 // The choice of implementation of all channels is to be built on lock-free data
125 // structures. The channels themselves are then consequently also lock-free data
126 // structures. As always with lock-free code, this is a very "here be dragons"
127 // territory, especially because I'm unaware of any academic papers that have
128 // gone into great length about channels of these flavors.
130 // ## Flavors of channels
132 // From the perspective of a consumer of this library, there is only one flavor
133 // of channel. This channel can be used as a stream and cloned to allow multiple
134 // senders. Under the hood, however, there are actually three flavors of
137 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
138 // case. They contain as few atomics as possible and
139 // involve one and exactly one allocation.
140 // * Streams - these channels are optimized for the non-shared use case. They
141 // use a different concurrent queue that is more tailored for this
142 // use case. The initial allocation of this flavor of channel is not
144 // * Shared - this is the most general form of channel that this module offers,
145 // a channel with multiple senders. This type is as optimized as it
146 // can be, but the previous two types mentioned are much faster for
149 // ## Concurrent queues
151 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
152 // but recv() obviously blocks. This means that under the hood there must be
153 // some shared and concurrent queue holding all of the actual data.
155 // With two flavors of channels, two flavors of queues are also used. We have
156 // chosen to use queues from a well-known author that are abbreviated as SPSC
157 // and MPSC (single producer, single consumer and multiple producer, single
158 // consumer). SPSC queues are used for streams while MPSC queues are used for
161 // ### SPSC optimizations
163 // The SPSC queue found online is essentially a linked list of nodes where one
164 // half of the nodes are the "queue of data" and the other half of nodes are a
165 // cache of unused nodes. The unused nodes are used such that an allocation is
166 // not required on every push() and a free doesn't need to happen on every
169 // As found online, however, the cache of nodes is of an infinite size. This
170 // means that if a channel at one point in its life had 50k items in the queue,
171 // then the queue will always have the capacity for 50k items. I believed that
172 // this was an unnecessary limitation of the implementation, so I have altered
173 // the queue to optionally have a bound on the cache size.
175 // By default, streams will have an unbounded SPSC queue with a small-ish cache
176 // size. The hope is that the cache is still large enough to have very fast
177 // send() operations while not too large such that millions of channels can
180 // ### MPSC optimizations
182 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
183 // a linked list under the hood to earn its unboundedness, but I have not put
184 // forth much effort into having a cache of nodes similar to the SPSC queue.
186 // For now, I believe that this is "ok" because shared channels are not the most
187 // common type, but soon we may wish to revisit this queue choice and determine
188 // another candidate for backend storage of shared channels.
190 // ## Overview of the Implementation
192 // Now that there's a little background on the concurrent queues used, it's
193 // worth going into much more detail about the channels themselves. The basic
194 // pseudocode for a send/recv are:
198 // queue.push(t) return if queue.pop()
199 // if increment() == -1 deschedule {
200 // wakeup() if decrement() > 0
201 // cancel_deschedule()
205 // As mentioned before, there are no locks in this implementation, only atomic
206 // instructions are used.
208 // ### The internal atomic counter
210 // Every channel has a shared counter with each half to keep track of the size
211 // of the queue. This counter is used to abort descheduling by the receiver and
212 // to know when to wake up on the sending side.
214 // As seen in the pseudocode, senders will increment this count and receivers
215 // will decrement the count. The theory behind this is that if a sender sees a
216 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
217 // then it doesn't need to block.
219 // The recv() method has a beginning call to pop(), and if successful, it needs
220 // to decrement the count. It is a crucial implementation detail that this
221 // decrement does *not* happen to the shared counter. If this were the case,
222 // then it would be possible for the counter to be very negative when there were
223 // no receivers waiting, in which case the senders would have to determine when
224 // it was actually appropriate to wake up a receiver.
226 // Instead, the "steal count" is kept track of separately (not atomically
227 // because it's only used by receivers), and then the decrement() call when
228 // descheduling will lump in all of the recent steals into one large decrement.
230 // The implication of this is that if a sender sees a -1 count, then there's
231 // guaranteed to be a waiter waiting!
233 // ## Native Implementation
235 // A major goal of these channels is to work seamlessly on and off the runtime.
236 // All of the previous race conditions have been worded in terms of
237 // scheduler-isms (which is obviously not available without the runtime).
239 // For now, native usage of channels (off the runtime) will fall back onto
240 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
241 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
242 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
243 // condition variable.
247 // Being able to support selection over channels has greatly influenced this
248 // design, and not only does selection need to work inside the runtime, but also
249 // outside the runtime.
251 // The implementation is fairly straightforward. The goal of select() is not to
252 // return some data, but only to return which channel can receive data without
253 // blocking. The implementation is essentially the entire blocking procedure
254 // followed by an increment as soon as its woken up. The cancellation procedure
255 // involves an increment and swapping out of to_wake to acquire ownership of the
256 // thread to unblock.
258 // Sadly this current implementation requires multiple allocations, so I have
259 // seen the throughput of select() be much worse than it should be. I do not
260 // believe that there is anything fundamental that needs to change about these
261 // channels, however, in order to support a more efficient select().
265 // And now that you've seen all the races that I found and attempted to fix,
266 // here's the code for you to find some more!
272 use cell::UnsafeCell;
273 use time::{Duration, Instant};
275 #[unstable(feature = "mpsc_select", issue = "27800")]
276 pub use self::select::{Select, Handle};
277 use self::select::StartResult;
278 use self::select::StartResult::*;
279 use self::blocking::SignalToken;
290 /// The receiving-half of Rust's channel type. This half can only be owned by
292 #[stable(feature = "rust1", since = "1.0.0")]
293 pub struct Receiver<T> {
294 inner: UnsafeCell<Flavor<T>>,
297 // The receiver port can be sent from place to place, so long as it
298 // is not used to receive non-sendable things.
299 #[stable(feature = "rust1", since = "1.0.0")]
300 unsafe impl<T: Send> Send for Receiver<T> { }
302 #[stable(feature = "rust1", since = "1.0.0")]
303 impl<T> !Sync for Receiver<T> { }
305 /// An iterator over messages on a receiver, this iterator will block
306 /// whenever `next` is called, waiting for a new message, and `None` will be
307 /// returned when the corresponding channel has hung up.
308 #[stable(feature = "rust1", since = "1.0.0")]
310 pub struct Iter<'a, T: 'a> {
314 /// An iterator that attempts to yield all pending values for a receiver.
315 /// `None` will be returned when there are no pending values remaining or
316 /// if the corresponding channel has hung up.
318 /// This Iterator will never block the caller in order to wait for data to
319 /// become available. Instead, it will return `None`.
320 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
322 pub struct TryIter<'a, T: 'a> {
326 /// An owning iterator over messages on a receiver, this iterator will block
327 /// whenever `next` is called, waiting for a new message, and `None` will be
328 /// returned when the corresponding channel has hung up.
329 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
331 pub struct IntoIter<T> {
335 /// The sending-half of Rust's asynchronous channel type. This half can only be
336 /// owned by one thread, but it can be cloned to send to other threads.
337 #[stable(feature = "rust1", since = "1.0.0")]
338 pub struct Sender<T> {
339 inner: UnsafeCell<Flavor<T>>,
342 // The send port can be sent from place to place, so long as it
343 // is not used to send non-sendable things.
344 #[stable(feature = "rust1", since = "1.0.0")]
345 unsafe impl<T: Send> Send for Sender<T> { }
347 #[stable(feature = "rust1", since = "1.0.0")]
348 impl<T> !Sync for Sender<T> { }
350 /// The sending-half of Rust's synchronous channel type. This half can only be
351 /// owned by one thread, but it can be cloned to send to other threads.
352 #[stable(feature = "rust1", since = "1.0.0")]
353 pub struct SyncSender<T> {
354 inner: Arc<sync::Packet<T>>,
357 #[stable(feature = "rust1", since = "1.0.0")]
358 unsafe impl<T: Send> Send for SyncSender<T> {}
360 #[stable(feature = "rust1", since = "1.0.0")]
361 impl<T> !Sync for SyncSender<T> {}
363 /// An error returned from the `send` function on channels.
365 /// A `send` operation can only fail if the receiving end of a channel is
366 /// disconnected, implying that the data could never be received. The error
367 /// contains the data being sent as a payload so it can be recovered.
368 #[stable(feature = "rust1", since = "1.0.0")]
369 #[derive(PartialEq, Eq, Clone, Copy)]
370 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
372 /// An error returned from the `recv` function on a `Receiver`.
374 /// The `recv` operation can only fail if the sending half of a channel is
375 /// disconnected, implying that no further messages will ever be received.
376 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
377 #[stable(feature = "rust1", since = "1.0.0")]
378 pub struct RecvError;
380 /// This enumeration is the list of the possible reasons that `try_recv` could
381 /// not return data when called.
382 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
383 #[stable(feature = "rust1", since = "1.0.0")]
384 pub enum TryRecvError {
385 /// This channel is currently empty, but the sender(s) have not yet
386 /// disconnected, so data may yet become available.
387 #[stable(feature = "rust1", since = "1.0.0")]
390 /// This channel's sending half has become disconnected, and there will
391 /// never be any more data received on this channel
392 #[stable(feature = "rust1", since = "1.0.0")]
396 /// This enumeration is the list of possible errors that `recv_timeout` could
397 /// not return data when called.
398 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
399 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
400 pub enum RecvTimeoutError {
401 /// This channel is currently empty, but the sender(s) have not yet
402 /// disconnected, so data may yet become available.
403 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
405 /// This channel's sending half has become disconnected, and there will
406 /// never be any more data received on this channel
407 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
411 /// This enumeration is the list of the possible error outcomes for the
412 /// `SyncSender::try_send` method.
413 #[stable(feature = "rust1", since = "1.0.0")]
414 #[derive(PartialEq, Eq, Clone, Copy)]
415 pub enum TrySendError<T> {
416 /// The data could not be sent on the channel because it would require that
417 /// the callee block to send the data.
419 /// If this is a buffered channel, then the buffer is full at this time. If
420 /// this is not a buffered channel, then there is no receiver available to
421 /// acquire the data.
422 #[stable(feature = "rust1", since = "1.0.0")]
423 Full(#[stable(feature = "rust1", since = "1.0.0")] T),
425 /// This channel's receiving half has disconnected, so the data could not be
426 /// sent. The data is returned back to the callee in this case.
427 #[stable(feature = "rust1", since = "1.0.0")]
428 Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
432 Oneshot(Arc<oneshot::Packet<T>>),
433 Stream(Arc<stream::Packet<T>>),
434 Shared(Arc<shared::Packet<T>>),
435 Sync(Arc<sync::Packet<T>>),
439 trait UnsafeFlavor<T> {
440 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
441 unsafe fn inner_mut(&self) -> &mut Flavor<T> {
442 &mut *self.inner_unsafe().get()
444 unsafe fn inner(&self) -> &Flavor<T> {
445 &*self.inner_unsafe().get()
448 impl<T> UnsafeFlavor<T> for Sender<T> {
449 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
453 impl<T> UnsafeFlavor<T> for Receiver<T> {
454 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
459 /// Creates a new asynchronous channel, returning the sender/receiver halves.
460 /// All data sent on the sender will become available on the receiver, and no
461 /// send will block the calling thread (this channel has an "infinite buffer").
463 /// If the [`Receiver`] is disconnected while trying to [`send()`] with the
464 /// [`Sender`], the [`send()`] method will return an error.
466 /// [`send()`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
467 /// [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
468 /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
473 /// use std::sync::mpsc::channel;
476 /// // tx is the sending half (tx for transmission), and rx is the receiving
477 /// // half (rx for receiving).
478 /// let (tx, rx) = channel();
480 /// // Spawn off an expensive computation
481 /// thread::spawn(move|| {
482 /// # fn expensive_computation() {}
483 /// tx.send(expensive_computation()).unwrap();
486 /// // Do some useful work for awhile
488 /// // Let's see what that answer was
489 /// println!("{:?}", rx.recv().unwrap());
491 #[stable(feature = "rust1", since = "1.0.0")]
492 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
493 let a = Arc::new(oneshot::Packet::new());
494 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
497 /// Creates a new synchronous, bounded channel.
499 /// Like asynchronous channels, the [`Receiver`] will block until a message
500 /// becomes available. These channels differ greatly in the semantics of the
501 /// sender from asynchronous channels, however.
503 /// This channel has an internal buffer on which messages will be queued.
504 /// `bound` specifies the buffer size. When the internal buffer becomes full,
505 /// future sends will *block* waiting for the buffer to open up. Note that a
506 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
507 /// where each [`send()`] will not return until a recv is paired with it.
509 /// Like asynchronous channels, if the [`Receiver`] is disconnected while
510 /// trying to [`send()`] with the [`SyncSender`], the [`send()`] method will
513 /// [`send()`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
514 /// [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
515 /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
520 /// use std::sync::mpsc::sync_channel;
523 /// let (tx, rx) = sync_channel(1);
525 /// // this returns immediately
526 /// tx.send(1).unwrap();
528 /// thread::spawn(move|| {
529 /// // this will block until the previous message has been received
530 /// tx.send(2).unwrap();
533 /// assert_eq!(rx.recv().unwrap(), 1);
534 /// assert_eq!(rx.recv().unwrap(), 2);
536 #[stable(feature = "rust1", since = "1.0.0")]
537 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
538 let a = Arc::new(sync::Packet::new(bound));
539 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
542 ////////////////////////////////////////////////////////////////////////////////
544 ////////////////////////////////////////////////////////////////////////////////
547 fn new(inner: Flavor<T>) -> Sender<T> {
549 inner: UnsafeCell::new(inner),
553 /// Attempts to send a value on this channel, returning it back if it could
556 /// A successful send occurs when it is determined that the other end of
557 /// the channel has not hung up already. An unsuccessful send would be one
558 /// where the corresponding receiver has already been deallocated. Note
559 /// that a return value of `Err` means that the data will never be
560 /// received, but a return value of `Ok` does *not* mean that the data
561 /// will be received. It is possible for the corresponding receiver to
562 /// hang up immediately after this function returns `Ok`.
564 /// This method will never block the current thread.
569 /// use std::sync::mpsc::channel;
571 /// let (tx, rx) = channel();
573 /// // This send is always successful
574 /// tx.send(1).unwrap();
576 /// // This send will fail because the receiver is gone
578 /// assert_eq!(tx.send(1).unwrap_err().0, 1);
580 #[stable(feature = "rust1", since = "1.0.0")]
581 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
582 let (new_inner, ret) = match *unsafe { self.inner() } {
583 Flavor::Oneshot(ref p) => {
585 return p.send(t).map_err(SendError);
587 let a = Arc::new(stream::Packet::new());
588 let rx = Receiver::new(Flavor::Stream(a.clone()));
589 match p.upgrade(rx) {
590 oneshot::UpSuccess => {
594 oneshot::UpDisconnected => (a, Err(t)),
595 oneshot::UpWoke(token) => {
596 // This send cannot panic because the thread is
597 // asleep (we're looking at it), so the receiver
599 a.send(t).ok().unwrap();
606 Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
607 Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
608 Flavor::Sync(..) => unreachable!(),
612 let tmp = Sender::new(Flavor::Stream(new_inner));
613 mem::swap(self.inner_mut(), tmp.inner_mut());
615 ret.map_err(SendError)
619 #[stable(feature = "rust1", since = "1.0.0")]
620 impl<T> Clone for Sender<T> {
621 fn clone(&self) -> Sender<T> {
622 let packet = match *unsafe { self.inner() } {
623 Flavor::Oneshot(ref p) => {
624 let a = Arc::new(shared::Packet::new());
626 let guard = a.postinit_lock();
627 let rx = Receiver::new(Flavor::Shared(a.clone()));
628 let sleeper = match p.upgrade(rx) {
630 oneshot::UpDisconnected => None,
631 oneshot::UpWoke(task) => Some(task),
633 a.inherit_blocker(sleeper, guard);
637 Flavor::Stream(ref p) => {
638 let a = Arc::new(shared::Packet::new());
640 let guard = a.postinit_lock();
641 let rx = Receiver::new(Flavor::Shared(a.clone()));
642 let sleeper = match p.upgrade(rx) {
644 stream::UpDisconnected => None,
645 stream::UpWoke(task) => Some(task),
647 a.inherit_blocker(sleeper, guard);
651 Flavor::Shared(ref p) => {
653 return Sender::new(Flavor::Shared(p.clone()));
655 Flavor::Sync(..) => unreachable!(),
659 let tmp = Sender::new(Flavor::Shared(packet.clone()));
660 mem::swap(self.inner_mut(), tmp.inner_mut());
662 Sender::new(Flavor::Shared(packet))
666 #[stable(feature = "rust1", since = "1.0.0")]
667 impl<T> Drop for Sender<T> {
669 match *unsafe { self.inner() } {
670 Flavor::Oneshot(ref p) => p.drop_chan(),
671 Flavor::Stream(ref p) => p.drop_chan(),
672 Flavor::Shared(ref p) => p.drop_chan(),
673 Flavor::Sync(..) => unreachable!(),
678 #[stable(feature = "mpsc_debug", since = "1.7.0")]
679 impl<T> fmt::Debug for Sender<T> {
680 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
681 write!(f, "Sender {{ .. }}")
685 ////////////////////////////////////////////////////////////////////////////////
687 ////////////////////////////////////////////////////////////////////////////////
689 impl<T> SyncSender<T> {
690 fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
691 SyncSender { inner: inner }
694 /// Sends a value on this synchronous channel.
696 /// This function will *block* until space in the internal buffer becomes
697 /// available or a receiver is available to hand off the message to.
699 /// Note that a successful send does *not* guarantee that the receiver will
700 /// ever see the data if there is a buffer on this channel. Items may be
701 /// enqueued in the internal buffer for the receiver to receive at a later
702 /// time. If the buffer size is 0, however, it can be guaranteed that the
703 /// receiver has indeed received the data if this function returns success.
705 /// This function will never panic, but it may return `Err` if the
706 /// `Receiver` has disconnected and is no longer able to receive
708 #[stable(feature = "rust1", since = "1.0.0")]
709 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
710 self.inner.send(t).map_err(SendError)
713 /// Attempts to send a value on this channel without blocking.
715 /// This method differs from `send` by returning immediately if the
716 /// channel's buffer is full or no receiver is waiting to acquire some
717 /// data. Compared with `send`, this function has two failure cases
718 /// instead of one (one for disconnection, one for a full buffer).
720 /// See `SyncSender::send` for notes about guarantees of whether the
721 /// receiver has received the data or not if this function is successful.
722 #[stable(feature = "rust1", since = "1.0.0")]
723 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
724 self.inner.try_send(t)
728 #[stable(feature = "rust1", since = "1.0.0")]
729 impl<T> Clone for SyncSender<T> {
730 fn clone(&self) -> SyncSender<T> {
731 self.inner.clone_chan();
732 SyncSender::new(self.inner.clone())
736 #[stable(feature = "rust1", since = "1.0.0")]
737 impl<T> Drop for SyncSender<T> {
739 self.inner.drop_chan();
743 #[stable(feature = "mpsc_debug", since = "1.7.0")]
744 impl<T> fmt::Debug for SyncSender<T> {
745 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
746 write!(f, "SyncSender {{ .. }}")
750 ////////////////////////////////////////////////////////////////////////////////
752 ////////////////////////////////////////////////////////////////////////////////
754 impl<T> Receiver<T> {
755 fn new(inner: Flavor<T>) -> Receiver<T> {
756 Receiver { inner: UnsafeCell::new(inner) }
759 /// Attempts to return a pending value on this receiver without blocking
761 /// This method will never block the caller in order to wait for data to
762 /// become available. Instead, this will always return immediately with a
763 /// possible option of pending data on the channel.
765 /// This is useful for a flavor of "optimistic check" before deciding to
766 /// block on a receiver.
767 #[stable(feature = "rust1", since = "1.0.0")]
768 pub fn try_recv(&self) -> Result<T, TryRecvError> {
770 let new_port = match *unsafe { self.inner() } {
771 Flavor::Oneshot(ref p) => {
773 Ok(t) => return Ok(t),
774 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
775 Err(oneshot::Disconnected) => {
776 return Err(TryRecvError::Disconnected)
778 Err(oneshot::Upgraded(rx)) => rx,
781 Flavor::Stream(ref p) => {
783 Ok(t) => return Ok(t),
784 Err(stream::Empty) => return Err(TryRecvError::Empty),
785 Err(stream::Disconnected) => {
786 return Err(TryRecvError::Disconnected)
788 Err(stream::Upgraded(rx)) => rx,
791 Flavor::Shared(ref p) => {
793 Ok(t) => return Ok(t),
794 Err(shared::Empty) => return Err(TryRecvError::Empty),
795 Err(shared::Disconnected) => {
796 return Err(TryRecvError::Disconnected)
800 Flavor::Sync(ref p) => {
802 Ok(t) => return Ok(t),
803 Err(sync::Empty) => return Err(TryRecvError::Empty),
804 Err(sync::Disconnected) => {
805 return Err(TryRecvError::Disconnected)
811 mem::swap(self.inner_mut(),
812 new_port.inner_mut());
817 /// Attempts to wait for a value on this receiver, returning an error if the
818 /// corresponding channel has hung up.
820 /// This function will always block the current thread if there is no data
821 /// available and it's possible for more data to be sent. Once a message is
822 /// sent to the corresponding `Sender`, then this receiver will wake up and
823 /// return that message.
825 /// If the corresponding `Sender` has disconnected, or it disconnects while
826 /// this call is blocking, this call will wake up and return `Err` to
827 /// indicate that no more messages can ever be received on this channel.
828 /// However, since channels are buffered, messages sent before the disconnect
829 /// will still be properly received.
834 /// use std::sync::mpsc;
837 /// let (send, recv) = mpsc::channel();
838 /// let handle = thread::spawn(move || {
839 /// send.send(1u8).unwrap();
842 /// handle.join().unwrap();
844 /// assert_eq!(Ok(1), recv.recv());
847 /// Buffering behavior:
850 /// use std::sync::mpsc;
852 /// use std::sync::mpsc::RecvError;
854 /// let (send, recv) = mpsc::channel();
855 /// let handle = thread::spawn(move || {
856 /// send.send(1u8).unwrap();
857 /// send.send(2).unwrap();
858 /// send.send(3).unwrap();
862 /// // wait for the thread to join so we ensure the sender is dropped
863 /// handle.join().unwrap();
865 /// assert_eq!(Ok(1), recv.recv());
866 /// assert_eq!(Ok(2), recv.recv());
867 /// assert_eq!(Ok(3), recv.recv());
868 /// assert_eq!(Err(RecvError), recv.recv());
870 #[stable(feature = "rust1", since = "1.0.0")]
871 pub fn recv(&self) -> Result<T, RecvError> {
873 let new_port = match *unsafe { self.inner() } {
874 Flavor::Oneshot(ref p) => {
876 Ok(t) => return Ok(t),
877 Err(oneshot::Disconnected) => return Err(RecvError),
878 Err(oneshot::Upgraded(rx)) => rx,
879 Err(oneshot::Empty) => unreachable!(),
882 Flavor::Stream(ref p) => {
884 Ok(t) => return Ok(t),
885 Err(stream::Disconnected) => return Err(RecvError),
886 Err(stream::Upgraded(rx)) => rx,
887 Err(stream::Empty) => unreachable!(),
890 Flavor::Shared(ref p) => {
892 Ok(t) => return Ok(t),
893 Err(shared::Disconnected) => return Err(RecvError),
894 Err(shared::Empty) => unreachable!(),
897 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
900 mem::swap(self.inner_mut(), new_port.inner_mut());
905 /// Attempts to wait for a value on this receiver, returning an error if the
906 /// corresponding channel has hung up, or if it waits more than `timeout`.
908 /// This function will always block the current thread if there is no data
909 /// available and it's possible for more data to be sent. Once a message is
910 /// sent to the corresponding `Sender`, then this receiver will wake up and
911 /// return that message.
913 /// If the corresponding `Sender` has disconnected, or it disconnects while
914 /// this call is blocking, this call will wake up and return `Err` to
915 /// indicate that no more messages can ever be received on this channel.
916 /// However, since channels are buffered, messages sent before the disconnect
917 /// will still be properly received.
922 /// use std::sync::mpsc::{self, RecvTimeoutError};
923 /// use std::time::Duration;
925 /// let (send, recv) = mpsc::channel::<()>();
927 /// let timeout = Duration::from_millis(100);
928 /// assert_eq!(Err(RecvTimeoutError::Timeout), recv.recv_timeout(timeout));
930 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
931 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
932 // Do an optimistic try_recv to avoid the performance impact of
933 // Instant::now() in the full-channel case.
934 match self.try_recv() {
937 Err(TryRecvError::Disconnected)
938 => Err(RecvTimeoutError::Disconnected),
939 Err(TryRecvError::Empty)
940 => self.recv_max_until(Instant::now() + timeout)
944 fn recv_max_until(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
945 use self::RecvTimeoutError::*;
948 let port_or_empty = match *unsafe { self.inner() } {
949 Flavor::Oneshot(ref p) => {
950 match p.recv(Some(deadline)) {
951 Ok(t) => return Ok(t),
952 Err(oneshot::Disconnected) => return Err(Disconnected),
953 Err(oneshot::Upgraded(rx)) => Some(rx),
954 Err(oneshot::Empty) => None,
957 Flavor::Stream(ref p) => {
958 match p.recv(Some(deadline)) {
959 Ok(t) => return Ok(t),
960 Err(stream::Disconnected) => return Err(Disconnected),
961 Err(stream::Upgraded(rx)) => Some(rx),
962 Err(stream::Empty) => None,
965 Flavor::Shared(ref p) => {
966 match p.recv(Some(deadline)) {
967 Ok(t) => return Ok(t),
968 Err(shared::Disconnected) => return Err(Disconnected),
969 Err(shared::Empty) => None,
972 Flavor::Sync(ref p) => {
973 match p.recv(Some(deadline)) {
974 Ok(t) => return Ok(t),
975 Err(sync::Disconnected) => return Err(Disconnected),
976 Err(sync::Empty) => None,
981 if let Some(new_port) = port_or_empty {
983 mem::swap(self.inner_mut(), new_port.inner_mut());
987 // If we're already passed the deadline, and we're here without
988 // data, return a timeout, else try again.
989 if Instant::now() >= deadline {
995 /// Returns an iterator that will block waiting for messages, but never
996 /// `panic!`. It will return `None` when the channel has hung up.
997 #[stable(feature = "rust1", since = "1.0.0")]
998 pub fn iter(&self) -> Iter<T> {
1002 /// Returns an iterator that will attempt to yield all pending values.
1003 /// It will return `None` if there are no more pending values or if the
1004 /// channel has hung up. The iterator will never `panic!` or block the
1005 /// user by waiting for values.
1006 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1007 pub fn try_iter(&self) -> TryIter<T> {
1008 TryIter { rx: self }
1013 impl<T> select::Packet for Receiver<T> {
1014 fn can_recv(&self) -> bool {
1016 let new_port = match *unsafe { self.inner() } {
1017 Flavor::Oneshot(ref p) => {
1018 match p.can_recv() {
1019 Ok(ret) => return ret,
1020 Err(upgrade) => upgrade,
1023 Flavor::Stream(ref p) => {
1024 match p.can_recv() {
1025 Ok(ret) => return ret,
1026 Err(upgrade) => upgrade,
1029 Flavor::Shared(ref p) => return p.can_recv(),
1030 Flavor::Sync(ref p) => return p.can_recv(),
1033 mem::swap(self.inner_mut(),
1034 new_port.inner_mut());
1039 fn start_selection(&self, mut token: SignalToken) -> StartResult {
1041 let (t, new_port) = match *unsafe { self.inner() } {
1042 Flavor::Oneshot(ref p) => {
1043 match p.start_selection(token) {
1044 oneshot::SelSuccess => return Installed,
1045 oneshot::SelCanceled => return Abort,
1046 oneshot::SelUpgraded(t, rx) => (t, rx),
1049 Flavor::Stream(ref p) => {
1050 match p.start_selection(token) {
1051 stream::SelSuccess => return Installed,
1052 stream::SelCanceled => return Abort,
1053 stream::SelUpgraded(t, rx) => (t, rx),
1056 Flavor::Shared(ref p) => return p.start_selection(token),
1057 Flavor::Sync(ref p) => return p.start_selection(token),
1061 mem::swap(self.inner_mut(), new_port.inner_mut());
1066 fn abort_selection(&self) -> bool {
1067 let mut was_upgrade = false;
1069 let result = match *unsafe { self.inner() } {
1070 Flavor::Oneshot(ref p) => p.abort_selection(),
1071 Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
1072 Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
1073 Flavor::Sync(ref p) => return p.abort_selection(),
1075 let new_port = match result { Ok(b) => return b, Err(p) => p };
1078 mem::swap(self.inner_mut(),
1079 new_port.inner_mut());
1085 #[stable(feature = "rust1", since = "1.0.0")]
1086 impl<'a, T> Iterator for Iter<'a, T> {
1089 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1092 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1093 impl<'a, T> Iterator for TryIter<'a, T> {
1096 fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
1099 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1100 impl<'a, T> IntoIterator for &'a Receiver<T> {
1102 type IntoIter = Iter<'a, T>;
1104 fn into_iter(self) -> Iter<'a, T> { self.iter() }
1107 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1108 impl<T> Iterator for IntoIter<T> {
1110 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1113 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1114 impl <T> IntoIterator for Receiver<T> {
1116 type IntoIter = IntoIter<T>;
1118 fn into_iter(self) -> IntoIter<T> {
1119 IntoIter { rx: self }
1123 #[stable(feature = "rust1", since = "1.0.0")]
1124 impl<T> Drop for Receiver<T> {
1125 fn drop(&mut self) {
1126 match *unsafe { self.inner() } {
1127 Flavor::Oneshot(ref p) => p.drop_port(),
1128 Flavor::Stream(ref p) => p.drop_port(),
1129 Flavor::Shared(ref p) => p.drop_port(),
1130 Flavor::Sync(ref p) => p.drop_port(),
1135 #[stable(feature = "mpsc_debug", since = "1.7.0")]
1136 impl<T> fmt::Debug for Receiver<T> {
1137 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1138 write!(f, "Receiver {{ .. }}")
1142 #[stable(feature = "rust1", since = "1.0.0")]
1143 impl<T> fmt::Debug for SendError<T> {
1144 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1145 "SendError(..)".fmt(f)
1149 #[stable(feature = "rust1", since = "1.0.0")]
1150 impl<T> fmt::Display for SendError<T> {
1151 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1152 "sending on a closed channel".fmt(f)
1156 #[stable(feature = "rust1", since = "1.0.0")]
1157 impl<T: Send> error::Error for SendError<T> {
1158 fn description(&self) -> &str {
1159 "sending on a closed channel"
1162 fn cause(&self) -> Option<&error::Error> {
1167 #[stable(feature = "rust1", since = "1.0.0")]
1168 impl<T> fmt::Debug for TrySendError<T> {
1169 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1171 TrySendError::Full(..) => "Full(..)".fmt(f),
1172 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1177 #[stable(feature = "rust1", since = "1.0.0")]
1178 impl<T> fmt::Display for TrySendError<T> {
1179 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1181 TrySendError::Full(..) => {
1182 "sending on a full channel".fmt(f)
1184 TrySendError::Disconnected(..) => {
1185 "sending on a closed channel".fmt(f)
1191 #[stable(feature = "rust1", since = "1.0.0")]
1192 impl<T: Send> error::Error for TrySendError<T> {
1194 fn description(&self) -> &str {
1196 TrySendError::Full(..) => {
1197 "sending on a full channel"
1199 TrySendError::Disconnected(..) => {
1200 "sending on a closed channel"
1205 fn cause(&self) -> Option<&error::Error> {
1210 #[stable(feature = "rust1", since = "1.0.0")]
1211 impl fmt::Display for RecvError {
1212 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1213 "receiving on a closed channel".fmt(f)
1217 #[stable(feature = "rust1", since = "1.0.0")]
1218 impl error::Error for RecvError {
1220 fn description(&self) -> &str {
1221 "receiving on a closed channel"
1224 fn cause(&self) -> Option<&error::Error> {
1229 #[stable(feature = "rust1", since = "1.0.0")]
1230 impl fmt::Display for TryRecvError {
1231 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1233 TryRecvError::Empty => {
1234 "receiving on an empty channel".fmt(f)
1236 TryRecvError::Disconnected => {
1237 "receiving on a closed channel".fmt(f)
1243 #[stable(feature = "rust1", since = "1.0.0")]
1244 impl error::Error for TryRecvError {
1246 fn description(&self) -> &str {
1248 TryRecvError::Empty => {
1249 "receiving on an empty channel"
1251 TryRecvError::Disconnected => {
1252 "receiving on a closed channel"
1257 fn cause(&self) -> Option<&error::Error> {
1262 #[stable(feature = "mpsc_recv_timeout_error", since = "1.14.0")]
1263 impl fmt::Display for RecvTimeoutError {
1264 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1266 RecvTimeoutError::Timeout => {
1267 "timed out waiting on channel".fmt(f)
1269 RecvTimeoutError::Disconnected => {
1270 "channel is empty and sending half is closed".fmt(f)
1276 #[stable(feature = "mpsc_recv_timeout_error", since = "1.14.0")]
1277 impl error::Error for RecvTimeoutError {
1278 fn description(&self) -> &str {
1280 RecvTimeoutError::Timeout => {
1281 "timed out waiting on channel"
1283 RecvTimeoutError::Disconnected => {
1284 "channel is empty and sending half is closed"
1289 fn cause(&self) -> Option<&error::Error> {
1294 #[cfg(all(test, not(target_os = "emscripten")))]
1299 use time::{Duration, Instant};
1301 pub fn stress_factor() -> usize {
1302 match env::var("RUST_TEST_STRESS") {
1303 Ok(val) => val.parse().unwrap(),
1310 let (tx, rx) = channel::<i32>();
1311 tx.send(1).unwrap();
1312 assert_eq!(rx.recv().unwrap(), 1);
1317 let (tx, _rx) = channel::<Box<isize>>();
1318 tx.send(box 1).unwrap();
1322 fn drop_full_shared() {
1323 let (tx, _rx) = channel::<Box<isize>>();
1326 tx.send(box 1).unwrap();
1331 let (tx, rx) = channel::<i32>();
1332 tx.send(1).unwrap();
1333 assert_eq!(rx.recv().unwrap(), 1);
1334 let tx = tx.clone();
1335 tx.send(1).unwrap();
1336 assert_eq!(rx.recv().unwrap(), 1);
1340 fn smoke_threads() {
1341 let (tx, rx) = channel::<i32>();
1342 let _t = thread::spawn(move|| {
1343 tx.send(1).unwrap();
1345 assert_eq!(rx.recv().unwrap(), 1);
1349 fn smoke_port_gone() {
1350 let (tx, rx) = channel::<i32>();
1352 assert!(tx.send(1).is_err());
1356 fn smoke_shared_port_gone() {
1357 let (tx, rx) = channel::<i32>();
1359 assert!(tx.send(1).is_err())
1363 fn smoke_shared_port_gone2() {
1364 let (tx, rx) = channel::<i32>();
1366 let tx2 = tx.clone();
1368 assert!(tx2.send(1).is_err());
1372 fn port_gone_concurrent() {
1373 let (tx, rx) = channel::<i32>();
1374 let _t = thread::spawn(move|| {
1377 while tx.send(1).is_ok() {}
1381 fn port_gone_concurrent_shared() {
1382 let (tx, rx) = channel::<i32>();
1383 let tx2 = tx.clone();
1384 let _t = thread::spawn(move|| {
1387 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1391 fn smoke_chan_gone() {
1392 let (tx, rx) = channel::<i32>();
1394 assert!(rx.recv().is_err());
1398 fn smoke_chan_gone_shared() {
1399 let (tx, rx) = channel::<()>();
1400 let tx2 = tx.clone();
1403 assert!(rx.recv().is_err());
1407 fn chan_gone_concurrent() {
1408 let (tx, rx) = channel::<i32>();
1409 let _t = thread::spawn(move|| {
1410 tx.send(1).unwrap();
1411 tx.send(1).unwrap();
1413 while rx.recv().is_ok() {}
1418 let (tx, rx) = channel::<i32>();
1419 let t = thread::spawn(move|| {
1420 for _ in 0..10000 { tx.send(1).unwrap(); }
1423 assert_eq!(rx.recv().unwrap(), 1);
1425 t.join().ok().unwrap();
1429 fn stress_shared() {
1430 const AMT: u32 = 10000;
1431 const NTHREADS: u32 = 8;
1432 let (tx, rx) = channel::<i32>();
1434 let t = thread::spawn(move|| {
1435 for _ in 0..AMT * NTHREADS {
1436 assert_eq!(rx.recv().unwrap(), 1);
1438 match rx.try_recv() {
1444 for _ in 0..NTHREADS {
1445 let tx = tx.clone();
1446 thread::spawn(move|| {
1447 for _ in 0..AMT { tx.send(1).unwrap(); }
1451 t.join().ok().unwrap();
1455 fn send_from_outside_runtime() {
1456 let (tx1, rx1) = channel::<()>();
1457 let (tx2, rx2) = channel::<i32>();
1458 let t1 = thread::spawn(move|| {
1459 tx1.send(()).unwrap();
1461 assert_eq!(rx2.recv().unwrap(), 1);
1464 rx1.recv().unwrap();
1465 let t2 = thread::spawn(move|| {
1467 tx2.send(1).unwrap();
1470 t1.join().ok().unwrap();
1471 t2.join().ok().unwrap();
1475 fn recv_from_outside_runtime() {
1476 let (tx, rx) = channel::<i32>();
1477 let t = thread::spawn(move|| {
1479 assert_eq!(rx.recv().unwrap(), 1);
1483 tx.send(1).unwrap();
1485 t.join().ok().unwrap();
1490 let (tx1, rx1) = channel::<i32>();
1491 let (tx2, rx2) = channel::<i32>();
1492 let t1 = thread::spawn(move|| {
1493 assert_eq!(rx1.recv().unwrap(), 1);
1494 tx2.send(2).unwrap();
1496 let t2 = thread::spawn(move|| {
1497 tx1.send(1).unwrap();
1498 assert_eq!(rx2.recv().unwrap(), 2);
1500 t1.join().ok().unwrap();
1501 t2.join().ok().unwrap();
1505 fn oneshot_single_thread_close_port_first() {
1506 // Simple test of closing without sending
1507 let (_tx, rx) = channel::<i32>();
1512 fn oneshot_single_thread_close_chan_first() {
1513 // Simple test of closing without sending
1514 let (tx, _rx) = channel::<i32>();
1519 fn oneshot_single_thread_send_port_close() {
1520 // Testing that the sender cleans up the payload if receiver is closed
1521 let (tx, rx) = channel::<Box<i32>>();
1523 assert!(tx.send(box 0).is_err());
1527 fn oneshot_single_thread_recv_chan_close() {
1528 // Receiving on a closed chan will panic
1529 let res = thread::spawn(move|| {
1530 let (tx, rx) = channel::<i32>();
1535 assert!(res.is_err());
1539 fn oneshot_single_thread_send_then_recv() {
1540 let (tx, rx) = channel::<Box<i32>>();
1541 tx.send(box 10).unwrap();
1542 assert!(rx.recv().unwrap() == box 10);
1546 fn oneshot_single_thread_try_send_open() {
1547 let (tx, rx) = channel::<i32>();
1548 assert!(tx.send(10).is_ok());
1549 assert!(rx.recv().unwrap() == 10);
1553 fn oneshot_single_thread_try_send_closed() {
1554 let (tx, rx) = channel::<i32>();
1556 assert!(tx.send(10).is_err());
1560 fn oneshot_single_thread_try_recv_open() {
1561 let (tx, rx) = channel::<i32>();
1562 tx.send(10).unwrap();
1563 assert!(rx.recv() == Ok(10));
1567 fn oneshot_single_thread_try_recv_closed() {
1568 let (tx, rx) = channel::<i32>();
1570 assert!(rx.recv().is_err());
1574 fn oneshot_single_thread_peek_data() {
1575 let (tx, rx) = channel::<i32>();
1576 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1577 tx.send(10).unwrap();
1578 assert_eq!(rx.try_recv(), Ok(10));
1582 fn oneshot_single_thread_peek_close() {
1583 let (tx, rx) = channel::<i32>();
1585 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1586 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1590 fn oneshot_single_thread_peek_open() {
1591 let (_tx, rx) = channel::<i32>();
1592 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1596 fn oneshot_multi_task_recv_then_send() {
1597 let (tx, rx) = channel::<Box<i32>>();
1598 let _t = thread::spawn(move|| {
1599 assert!(rx.recv().unwrap() == box 10);
1602 tx.send(box 10).unwrap();
1606 fn oneshot_multi_task_recv_then_close() {
1607 let (tx, rx) = channel::<Box<i32>>();
1608 let _t = thread::spawn(move|| {
1611 let res = thread::spawn(move|| {
1612 assert!(rx.recv().unwrap() == box 10);
1614 assert!(res.is_err());
1618 fn oneshot_multi_thread_close_stress() {
1619 for _ in 0..stress_factor() {
1620 let (tx, rx) = channel::<i32>();
1621 let _t = thread::spawn(move|| {
1629 fn oneshot_multi_thread_send_close_stress() {
1630 for _ in 0..stress_factor() {
1631 let (tx, rx) = channel::<i32>();
1632 let _t = thread::spawn(move|| {
1635 let _ = thread::spawn(move|| {
1636 tx.send(1).unwrap();
1642 fn oneshot_multi_thread_recv_close_stress() {
1643 for _ in 0..stress_factor() {
1644 let (tx, rx) = channel::<i32>();
1645 thread::spawn(move|| {
1646 let res = thread::spawn(move|| {
1649 assert!(res.is_err());
1651 let _t = thread::spawn(move|| {
1652 thread::spawn(move|| {
1660 fn oneshot_multi_thread_send_recv_stress() {
1661 for _ in 0..stress_factor() {
1662 let (tx, rx) = channel::<Box<isize>>();
1663 let _t = thread::spawn(move|| {
1664 tx.send(box 10).unwrap();
1666 assert!(rx.recv().unwrap() == box 10);
1671 fn stream_send_recv_stress() {
1672 for _ in 0..stress_factor() {
1673 let (tx, rx) = channel();
1678 fn send(tx: Sender<Box<i32>>, i: i32) {
1679 if i == 10 { return }
1681 thread::spawn(move|| {
1682 tx.send(box i).unwrap();
1687 fn recv(rx: Receiver<Box<i32>>, i: i32) {
1688 if i == 10 { return }
1690 thread::spawn(move|| {
1691 assert!(rx.recv().unwrap() == box i);
1699 fn oneshot_single_thread_recv_timeout() {
1700 let (tx, rx) = channel();
1701 tx.send(()).unwrap();
1702 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
1703 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
1704 tx.send(()).unwrap();
1705 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
1709 fn stress_recv_timeout_two_threads() {
1710 let (tx, rx) = channel();
1711 let stress = stress_factor() + 100;
1712 let timeout = Duration::from_millis(100);
1714 thread::spawn(move || {
1715 for i in 0..stress {
1717 thread::sleep(timeout * 2);
1719 tx.send(1usize).unwrap();
1723 let mut recv_count = 0;
1725 match rx.recv_timeout(timeout) {
1727 assert_eq!(n, 1usize);
1730 Err(RecvTimeoutError::Timeout) => continue,
1731 Err(RecvTimeoutError::Disconnected) => break,
1735 assert_eq!(recv_count, stress);
1739 fn recv_timeout_upgrade() {
1740 let (tx, rx) = channel::<()>();
1741 let timeout = Duration::from_millis(1);
1742 let _tx_clone = tx.clone();
1744 let start = Instant::now();
1745 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
1746 assert!(Instant::now() >= start + timeout);
1750 fn stress_recv_timeout_shared() {
1751 let (tx, rx) = channel();
1752 let stress = stress_factor() + 100;
1754 for i in 0..stress {
1755 let tx = tx.clone();
1756 thread::spawn(move || {
1757 thread::sleep(Duration::from_millis(i as u64 * 10));
1758 tx.send(1usize).unwrap();
1764 let mut recv_count = 0;
1766 match rx.recv_timeout(Duration::from_millis(10)) {
1768 assert_eq!(n, 1usize);
1771 Err(RecvTimeoutError::Timeout) => continue,
1772 Err(RecvTimeoutError::Disconnected) => break,
1776 assert_eq!(recv_count, stress);
1781 // Regression test that we don't run out of stack in scheduler context
1782 let (tx, rx) = channel();
1783 for _ in 0..10000 { tx.send(()).unwrap(); }
1784 for _ in 0..10000 { rx.recv().unwrap(); }
1788 fn shared_recv_timeout() {
1789 let (tx, rx) = channel();
1792 let tx = tx.clone();
1793 thread::spawn(move|| {
1794 tx.send(()).unwrap();
1798 for _ in 0..total { rx.recv().unwrap(); }
1800 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
1801 tx.send(()).unwrap();
1802 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
1806 fn shared_chan_stress() {
1807 let (tx, rx) = channel();
1808 let total = stress_factor() + 100;
1810 let tx = tx.clone();
1811 thread::spawn(move|| {
1812 tx.send(()).unwrap();
1822 fn test_nested_recv_iter() {
1823 let (tx, rx) = channel::<i32>();
1824 let (total_tx, total_rx) = channel::<i32>();
1826 let _t = thread::spawn(move|| {
1828 for x in rx.iter() {
1831 total_tx.send(acc).unwrap();
1834 tx.send(3).unwrap();
1835 tx.send(1).unwrap();
1836 tx.send(2).unwrap();
1838 assert_eq!(total_rx.recv().unwrap(), 6);
1842 fn test_recv_iter_break() {
1843 let (tx, rx) = channel::<i32>();
1844 let (count_tx, count_rx) = channel();
1846 let _t = thread::spawn(move|| {
1848 for x in rx.iter() {
1855 count_tx.send(count).unwrap();
1858 tx.send(2).unwrap();
1859 tx.send(2).unwrap();
1860 tx.send(2).unwrap();
1863 assert_eq!(count_rx.recv().unwrap(), 4);
1867 fn test_recv_try_iter() {
1868 let (request_tx, request_rx) = channel();
1869 let (response_tx, response_rx) = channel();
1871 // Request `x`s until we have `6`.
1872 let t = thread::spawn(move|| {
1875 for x in response_rx.try_iter() {
1881 request_tx.send(()).unwrap();
1885 for _ in request_rx.iter() {
1886 if response_tx.send(2).is_err() {
1891 assert_eq!(t.join().unwrap(), 6);
1895 fn test_recv_into_iter_owned() {
1897 let (tx, rx) = channel::<i32>();
1898 tx.send(1).unwrap();
1899 tx.send(2).unwrap();
1903 assert_eq!(iter.next().unwrap(), 1);
1904 assert_eq!(iter.next().unwrap(), 2);
1905 assert_eq!(iter.next().is_none(), true);
1909 fn test_recv_into_iter_borrowed() {
1910 let (tx, rx) = channel::<i32>();
1911 tx.send(1).unwrap();
1912 tx.send(2).unwrap();
1914 let mut iter = (&rx).into_iter();
1915 assert_eq!(iter.next().unwrap(), 1);
1916 assert_eq!(iter.next().unwrap(), 2);
1917 assert_eq!(iter.next().is_none(), true);
1921 fn try_recv_states() {
1922 let (tx1, rx1) = channel::<i32>();
1923 let (tx2, rx2) = channel::<()>();
1924 let (tx3, rx3) = channel::<()>();
1925 let _t = thread::spawn(move|| {
1926 rx2.recv().unwrap();
1927 tx1.send(1).unwrap();
1928 tx3.send(()).unwrap();
1929 rx2.recv().unwrap();
1931 tx3.send(()).unwrap();
1934 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1935 tx2.send(()).unwrap();
1936 rx3.recv().unwrap();
1937 assert_eq!(rx1.try_recv(), Ok(1));
1938 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1939 tx2.send(()).unwrap();
1940 rx3.recv().unwrap();
1941 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1944 // This bug used to end up in a livelock inside of the Receiver destructor
1945 // because the internal state of the Shared packet was corrupted
1947 fn destroy_upgraded_shared_port_when_sender_still_active() {
1948 let (tx, rx) = channel();
1949 let (tx2, rx2) = channel();
1950 let _t = thread::spawn(move|| {
1951 rx.recv().unwrap(); // wait on a oneshot
1952 drop(rx); // destroy a shared
1953 tx2.send(()).unwrap();
1955 // make sure the other thread has gone to sleep
1956 for _ in 0..5000 { thread::yield_now(); }
1958 // upgrade to a shared chan and send a message
1961 t.send(()).unwrap();
1963 // wait for the child thread to exit before we exit
1964 rx2.recv().unwrap();
1969 let (tx, _) = channel();
1970 let _ = tx.send(123);
1971 assert_eq!(tx.send(123), Err(SendError(123)));
1975 #[cfg(all(test, not(target_os = "emscripten")))]
1982 pub fn stress_factor() -> usize {
1983 match env::var("RUST_TEST_STRESS") {
1984 Ok(val) => val.parse().unwrap(),
1991 let (tx, rx) = sync_channel::<i32>(1);
1992 tx.send(1).unwrap();
1993 assert_eq!(rx.recv().unwrap(), 1);
1998 let (tx, _rx) = sync_channel::<Box<isize>>(1);
1999 tx.send(box 1).unwrap();
2004 let (tx, rx) = sync_channel::<i32>(1);
2005 tx.send(1).unwrap();
2006 assert_eq!(rx.recv().unwrap(), 1);
2007 let tx = tx.clone();
2008 tx.send(1).unwrap();
2009 assert_eq!(rx.recv().unwrap(), 1);
2014 let (tx, rx) = sync_channel::<i32>(1);
2015 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2016 tx.send(1).unwrap();
2017 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2021 fn smoke_threads() {
2022 let (tx, rx) = sync_channel::<i32>(0);
2023 let _t = thread::spawn(move|| {
2024 tx.send(1).unwrap();
2026 assert_eq!(rx.recv().unwrap(), 1);
2030 fn smoke_port_gone() {
2031 let (tx, rx) = sync_channel::<i32>(0);
2033 assert!(tx.send(1).is_err());
2037 fn smoke_shared_port_gone2() {
2038 let (tx, rx) = sync_channel::<i32>(0);
2040 let tx2 = tx.clone();
2042 assert!(tx2.send(1).is_err());
2046 fn port_gone_concurrent() {
2047 let (tx, rx) = sync_channel::<i32>(0);
2048 let _t = thread::spawn(move|| {
2051 while tx.send(1).is_ok() {}
2055 fn port_gone_concurrent_shared() {
2056 let (tx, rx) = sync_channel::<i32>(0);
2057 let tx2 = tx.clone();
2058 let _t = thread::spawn(move|| {
2061 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2065 fn smoke_chan_gone() {
2066 let (tx, rx) = sync_channel::<i32>(0);
2068 assert!(rx.recv().is_err());
2072 fn smoke_chan_gone_shared() {
2073 let (tx, rx) = sync_channel::<()>(0);
2074 let tx2 = tx.clone();
2077 assert!(rx.recv().is_err());
2081 fn chan_gone_concurrent() {
2082 let (tx, rx) = sync_channel::<i32>(0);
2083 thread::spawn(move|| {
2084 tx.send(1).unwrap();
2085 tx.send(1).unwrap();
2087 while rx.recv().is_ok() {}
2092 let (tx, rx) = sync_channel::<i32>(0);
2093 thread::spawn(move|| {
2094 for _ in 0..10000 { tx.send(1).unwrap(); }
2097 assert_eq!(rx.recv().unwrap(), 1);
2102 fn stress_recv_timeout_two_threads() {
2103 let (tx, rx) = sync_channel::<i32>(0);
2105 thread::spawn(move|| {
2106 for _ in 0..10000 { tx.send(1).unwrap(); }
2109 let mut recv_count = 0;
2111 match rx.recv_timeout(Duration::from_millis(1)) {
2116 Err(RecvTimeoutError::Timeout) => continue,
2117 Err(RecvTimeoutError::Disconnected) => break,
2121 assert_eq!(recv_count, 10000);
2125 fn stress_recv_timeout_shared() {
2126 const AMT: u32 = 1000;
2127 const NTHREADS: u32 = 8;
2128 let (tx, rx) = sync_channel::<i32>(0);
2129 let (dtx, drx) = sync_channel::<()>(0);
2131 thread::spawn(move|| {
2132 let mut recv_count = 0;
2134 match rx.recv_timeout(Duration::from_millis(10)) {
2139 Err(RecvTimeoutError::Timeout) => continue,
2140 Err(RecvTimeoutError::Disconnected) => break,
2144 assert_eq!(recv_count, AMT * NTHREADS);
2145 assert!(rx.try_recv().is_err());
2147 dtx.send(()).unwrap();
2150 for _ in 0..NTHREADS {
2151 let tx = tx.clone();
2152 thread::spawn(move|| {
2153 for _ in 0..AMT { tx.send(1).unwrap(); }
2159 drx.recv().unwrap();
2163 fn stress_shared() {
2164 const AMT: u32 = 1000;
2165 const NTHREADS: u32 = 8;
2166 let (tx, rx) = sync_channel::<i32>(0);
2167 let (dtx, drx) = sync_channel::<()>(0);
2169 thread::spawn(move|| {
2170 for _ in 0..AMT * NTHREADS {
2171 assert_eq!(rx.recv().unwrap(), 1);
2173 match rx.try_recv() {
2177 dtx.send(()).unwrap();
2180 for _ in 0..NTHREADS {
2181 let tx = tx.clone();
2182 thread::spawn(move|| {
2183 for _ in 0..AMT { tx.send(1).unwrap(); }
2187 drx.recv().unwrap();
2191 fn oneshot_single_thread_close_port_first() {
2192 // Simple test of closing without sending
2193 let (_tx, rx) = sync_channel::<i32>(0);
2198 fn oneshot_single_thread_close_chan_first() {
2199 // Simple test of closing without sending
2200 let (tx, _rx) = sync_channel::<i32>(0);
2205 fn oneshot_single_thread_send_port_close() {
2206 // Testing that the sender cleans up the payload if receiver is closed
2207 let (tx, rx) = sync_channel::<Box<i32>>(0);
2209 assert!(tx.send(box 0).is_err());
2213 fn oneshot_single_thread_recv_chan_close() {
2214 // Receiving on a closed chan will panic
2215 let res = thread::spawn(move|| {
2216 let (tx, rx) = sync_channel::<i32>(0);
2221 assert!(res.is_err());
2225 fn oneshot_single_thread_send_then_recv() {
2226 let (tx, rx) = sync_channel::<Box<i32>>(1);
2227 tx.send(box 10).unwrap();
2228 assert!(rx.recv().unwrap() == box 10);
2232 fn oneshot_single_thread_try_send_open() {
2233 let (tx, rx) = sync_channel::<i32>(1);
2234 assert_eq!(tx.try_send(10), Ok(()));
2235 assert!(rx.recv().unwrap() == 10);
2239 fn oneshot_single_thread_try_send_closed() {
2240 let (tx, rx) = sync_channel::<i32>(0);
2242 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2246 fn oneshot_single_thread_try_send_closed2() {
2247 let (tx, _rx) = sync_channel::<i32>(0);
2248 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2252 fn oneshot_single_thread_try_recv_open() {
2253 let (tx, rx) = sync_channel::<i32>(1);
2254 tx.send(10).unwrap();
2255 assert!(rx.recv() == Ok(10));
2259 fn oneshot_single_thread_try_recv_closed() {
2260 let (tx, rx) = sync_channel::<i32>(0);
2262 assert!(rx.recv().is_err());
2266 fn oneshot_single_thread_try_recv_closed_with_data() {
2267 let (tx, rx) = sync_channel::<i32>(1);
2268 tx.send(10).unwrap();
2270 assert_eq!(rx.try_recv(), Ok(10));
2271 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2275 fn oneshot_single_thread_peek_data() {
2276 let (tx, rx) = sync_channel::<i32>(1);
2277 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2278 tx.send(10).unwrap();
2279 assert_eq!(rx.try_recv(), Ok(10));
2283 fn oneshot_single_thread_peek_close() {
2284 let (tx, rx) = sync_channel::<i32>(0);
2286 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2287 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2291 fn oneshot_single_thread_peek_open() {
2292 let (_tx, rx) = sync_channel::<i32>(0);
2293 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2297 fn oneshot_multi_task_recv_then_send() {
2298 let (tx, rx) = sync_channel::<Box<i32>>(0);
2299 let _t = thread::spawn(move|| {
2300 assert!(rx.recv().unwrap() == box 10);
2303 tx.send(box 10).unwrap();
2307 fn oneshot_multi_task_recv_then_close() {
2308 let (tx, rx) = sync_channel::<Box<i32>>(0);
2309 let _t = thread::spawn(move|| {
2312 let res = thread::spawn(move|| {
2313 assert!(rx.recv().unwrap() == box 10);
2315 assert!(res.is_err());
2319 fn oneshot_multi_thread_close_stress() {
2320 for _ in 0..stress_factor() {
2321 let (tx, rx) = sync_channel::<i32>(0);
2322 let _t = thread::spawn(move|| {
2330 fn oneshot_multi_thread_send_close_stress() {
2331 for _ in 0..stress_factor() {
2332 let (tx, rx) = sync_channel::<i32>(0);
2333 let _t = thread::spawn(move|| {
2336 let _ = thread::spawn(move || {
2337 tx.send(1).unwrap();
2343 fn oneshot_multi_thread_recv_close_stress() {
2344 for _ in 0..stress_factor() {
2345 let (tx, rx) = sync_channel::<i32>(0);
2346 let _t = thread::spawn(move|| {
2347 let res = thread::spawn(move|| {
2350 assert!(res.is_err());
2352 let _t = thread::spawn(move|| {
2353 thread::spawn(move|| {
2361 fn oneshot_multi_thread_send_recv_stress() {
2362 for _ in 0..stress_factor() {
2363 let (tx, rx) = sync_channel::<Box<i32>>(0);
2364 let _t = thread::spawn(move|| {
2365 tx.send(box 10).unwrap();
2367 assert!(rx.recv().unwrap() == box 10);
2372 fn stream_send_recv_stress() {
2373 for _ in 0..stress_factor() {
2374 let (tx, rx) = sync_channel::<Box<i32>>(0);
2379 fn send(tx: SyncSender<Box<i32>>, i: i32) {
2380 if i == 10 { return }
2382 thread::spawn(move|| {
2383 tx.send(box i).unwrap();
2388 fn recv(rx: Receiver<Box<i32>>, i: i32) {
2389 if i == 10 { return }
2391 thread::spawn(move|| {
2392 assert!(rx.recv().unwrap() == box i);
2401 // Regression test that we don't run out of stack in scheduler context
2402 let (tx, rx) = sync_channel(10000);
2403 for _ in 0..10000 { tx.send(()).unwrap(); }
2404 for _ in 0..10000 { rx.recv().unwrap(); }
2408 fn shared_chan_stress() {
2409 let (tx, rx) = sync_channel(0);
2410 let total = stress_factor() + 100;
2412 let tx = tx.clone();
2413 thread::spawn(move|| {
2414 tx.send(()).unwrap();
2424 fn test_nested_recv_iter() {
2425 let (tx, rx) = sync_channel::<i32>(0);
2426 let (total_tx, total_rx) = sync_channel::<i32>(0);
2428 let _t = thread::spawn(move|| {
2430 for x in rx.iter() {
2433 total_tx.send(acc).unwrap();
2436 tx.send(3).unwrap();
2437 tx.send(1).unwrap();
2438 tx.send(2).unwrap();
2440 assert_eq!(total_rx.recv().unwrap(), 6);
2444 fn test_recv_iter_break() {
2445 let (tx, rx) = sync_channel::<i32>(0);
2446 let (count_tx, count_rx) = sync_channel(0);
2448 let _t = thread::spawn(move|| {
2450 for x in rx.iter() {
2457 count_tx.send(count).unwrap();
2460 tx.send(2).unwrap();
2461 tx.send(2).unwrap();
2462 tx.send(2).unwrap();
2463 let _ = tx.try_send(2);
2465 assert_eq!(count_rx.recv().unwrap(), 4);
2469 fn try_recv_states() {
2470 let (tx1, rx1) = sync_channel::<i32>(1);
2471 let (tx2, rx2) = sync_channel::<()>(1);
2472 let (tx3, rx3) = sync_channel::<()>(1);
2473 let _t = thread::spawn(move|| {
2474 rx2.recv().unwrap();
2475 tx1.send(1).unwrap();
2476 tx3.send(()).unwrap();
2477 rx2.recv().unwrap();
2479 tx3.send(()).unwrap();
2482 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2483 tx2.send(()).unwrap();
2484 rx3.recv().unwrap();
2485 assert_eq!(rx1.try_recv(), Ok(1));
2486 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2487 tx2.send(()).unwrap();
2488 rx3.recv().unwrap();
2489 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2492 // This bug used to end up in a livelock inside of the Receiver destructor
2493 // because the internal state of the Shared packet was corrupted
2495 fn destroy_upgraded_shared_port_when_sender_still_active() {
2496 let (tx, rx) = sync_channel::<()>(0);
2497 let (tx2, rx2) = sync_channel::<()>(0);
2498 let _t = thread::spawn(move|| {
2499 rx.recv().unwrap(); // wait on a oneshot
2500 drop(rx); // destroy a shared
2501 tx2.send(()).unwrap();
2503 // make sure the other thread has gone to sleep
2504 for _ in 0..5000 { thread::yield_now(); }
2506 // upgrade to a shared chan and send a message
2509 t.send(()).unwrap();
2511 // wait for the child thread to exit before we exit
2512 rx2.recv().unwrap();
2517 let (tx, rx) = sync_channel::<i32>(0);
2518 let _t = thread::spawn(move|| { rx.recv().unwrap(); });
2519 assert_eq!(tx.send(1), Ok(()));
2524 let (tx, rx) = sync_channel::<i32>(0);
2525 let _t = thread::spawn(move|| { drop(rx); });
2526 assert!(tx.send(1).is_err());
2531 let (tx, rx) = sync_channel::<i32>(1);
2532 assert_eq!(tx.send(1), Ok(()));
2533 let _t =thread::spawn(move|| { drop(rx); });
2534 assert!(tx.send(1).is_err());
2539 let (tx, rx) = sync_channel::<i32>(0);
2540 let tx2 = tx.clone();
2541 let (done, donerx) = channel();
2542 let done2 = done.clone();
2543 let _t = thread::spawn(move|| {
2544 assert!(tx.send(1).is_err());
2545 done.send(()).unwrap();
2547 let _t = thread::spawn(move|| {
2548 assert!(tx2.send(2).is_err());
2549 done2.send(()).unwrap();
2552 donerx.recv().unwrap();
2553 donerx.recv().unwrap();
2558 let (tx, _rx) = sync_channel::<i32>(0);
2559 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2564 let (tx, _rx) = sync_channel::<i32>(1);
2565 assert_eq!(tx.try_send(1), Ok(()));
2566 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2571 let (tx, rx) = sync_channel::<i32>(1);
2572 assert_eq!(tx.try_send(1), Ok(()));
2574 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2580 let (tx1, rx1) = sync_channel::<()>(3);
2581 let (tx2, rx2) = sync_channel::<()>(3);
2583 let _t = thread::spawn(move|| {
2584 rx1.recv().unwrap();
2585 tx2.try_send(()).unwrap();
2588 tx1.try_send(()).unwrap();
2589 rx2.recv().unwrap();
2598 fn fmt_debug_sender() {
2599 let (tx, _) = channel::<i32>();
2600 assert_eq!(format!("{:?}", tx), "Sender { .. }");
2604 fn fmt_debug_recv() {
2605 let (_, rx) = channel::<i32>();
2606 assert_eq!(format!("{:?}", rx), "Receiver { .. }");
2610 fn fmt_debug_sync_sender() {
2611 let (tx, _) = sync_channel::<i32>(1);
2612 assert_eq!(format!("{:?}", tx), "SyncSender { .. }");