1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Multi-producer, single-consumer FIFO queue communication primitives.
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
20 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
24 //! These channels come in two flavors:
26 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
27 //! will return a `(Sender, Receiver)` tuple where all sends will be
28 //! **asynchronous** (they never block). The channel conceptually has an
31 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
32 //! return a `(SyncSender, Receiver)` tuple where the storage for pending
33 //! messages is a pre-allocated buffer of a fixed size. All sends will be
34 //! **synchronous** by blocking until there is buffer space available. Note
35 //! that a bound of 0 is allowed, causing the channel to become a "rendezvous"
36 //! channel where each sender atomically hands off a message to a receiver.
38 //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
39 //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
40 //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
41 //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
42 //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
43 //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
47 //! The send and receive operations on channels will all return a [`Result`]
48 //! indicating whether the operation succeeded or not. An unsuccessful operation
49 //! is normally indicative of the other half of a channel having "hung up" by
50 //! being dropped in its corresponding thread.
52 //! Once half of a channel has been deallocated, most operations can no longer
53 //! continue to make progress, so [`Err`] will be returned. Many applications
54 //! will continue to [`unwrap`] the results returned from this module,
55 //! instigating a propagation of failure among threads if one unexpectedly dies.
57 //! [`Result`]: ../../../std/result/enum.Result.html
58 //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
59 //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
67 //! use std::sync::mpsc::channel;
69 //! // Create a simple streaming channel
70 //! let (tx, rx) = channel();
71 //! thread::spawn(move|| {
72 //! tx.send(10).unwrap();
74 //! assert_eq!(rx.recv().unwrap(), 10);
81 //! use std::sync::mpsc::channel;
83 //! // Create a shared channel that can be sent along from many threads
84 //! // where tx is the sending half (tx for transmission), and rx is the receiving
85 //! // half (rx for receiving).
86 //! let (tx, rx) = channel();
88 //! let tx = tx.clone();
89 //! thread::spawn(move|| {
90 //! tx.send(i).unwrap();
95 //! let j = rx.recv().unwrap();
96 //! assert!(0 <= j && j < 10);
100 //! Propagating panics:
103 //! use std::sync::mpsc::channel;
105 //! // The call to recv() will return an error because the channel has already
106 //! // hung up (or been deallocated)
107 //! let (tx, rx) = channel::<i32>();
109 //! assert!(rx.recv().is_err());
112 //! Synchronous channels:
116 //! use std::sync::mpsc::sync_channel;
118 //! let (tx, rx) = sync_channel::<i32>(0);
119 //! thread::spawn(move|| {
120 //! // This will wait for the parent thread to start receiving
121 //! tx.send(53).unwrap();
123 //! rx.recv().unwrap();
126 #![stable(feature = "rust1", since = "1.0.0")]
128 // A description of how Rust's channel implementation works
130 // Channels are supposed to be the basic building block for all other
131 // concurrent primitives that are used in Rust. As a result, the channel type
132 // needs to be highly optimized, flexible, and broad enough for use everywhere.
134 // The choice of implementation of all channels is to be built on lock-free data
135 // structures. The channels themselves are then consequently also lock-free data
136 // structures. As always with lock-free code, this is a very "here be dragons"
137 // territory, especially because I'm unaware of any academic papers that have
138 // gone into great length about channels of these flavors.
140 // ## Flavors of channels
142 // From the perspective of a consumer of this library, there is only one flavor
143 // of channel. This channel can be used as a stream and cloned to allow multiple
144 // senders. Under the hood, however, there are actually three flavors of
147 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
148 // case. They contain as few atomics as possible and
149 // involve one and exactly one allocation.
150 // * Streams - these channels are optimized for the non-shared use case. They
151 // use a different concurrent queue that is more tailored for this
152 // use case. The initial allocation of this flavor of channel is not
154 // * Shared - this is the most general form of channel that this module offers,
155 // a channel with multiple senders. This type is as optimized as it
156 // can be, but the previous two types mentioned are much faster for
159 // ## Concurrent queues
161 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
162 // but recv() obviously blocks. This means that under the hood there must be
163 // some shared and concurrent queue holding all of the actual data.
165 // With two flavors of channels, two flavors of queues are also used. We have
166 // chosen to use queues from a well-known author that are abbreviated as SPSC
167 // and MPSC (single producer, single consumer and multiple producer, single
168 // consumer). SPSC queues are used for streams while MPSC queues are used for
171 // ### SPSC optimizations
173 // The SPSC queue found online is essentially a linked list of nodes where one
174 // half of the nodes are the "queue of data" and the other half of nodes are a
175 // cache of unused nodes. The unused nodes are used such that an allocation is
176 // not required on every push() and a free doesn't need to happen on every
179 // As found online, however, the cache of nodes is of an infinite size. This
180 // means that if a channel at one point in its life had 50k items in the queue,
181 // then the queue will always have the capacity for 50k items. I believed that
182 // this was an unnecessary limitation of the implementation, so I have altered
183 // the queue to optionally have a bound on the cache size.
185 // By default, streams will have an unbounded SPSC queue with a small-ish cache
186 // size. The hope is that the cache is still large enough to have very fast
187 // send() operations while not too large such that millions of channels can
190 // ### MPSC optimizations
192 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
193 // a linked list under the hood to earn its unboundedness, but I have not put
194 // forth much effort into having a cache of nodes similar to the SPSC queue.
196 // For now, I believe that this is "ok" because shared channels are not the most
197 // common type, but soon we may wish to revisit this queue choice and determine
198 // another candidate for backend storage of shared channels.
200 // ## Overview of the Implementation
202 // Now that there's a little background on the concurrent queues used, it's
203 // worth going into much more detail about the channels themselves. The basic
204 // pseudocode for a send/recv are:
208 // queue.push(t) return if queue.pop()
209 // if increment() == -1 deschedule {
210 // wakeup() if decrement() > 0
211 // cancel_deschedule()
215 // As mentioned before, there are no locks in this implementation, only atomic
216 // instructions are used.
218 // ### The internal atomic counter
220 // Every channel has a shared counter with each half to keep track of the size
221 // of the queue. This counter is used to abort descheduling by the receiver and
222 // to know when to wake up on the sending side.
224 // As seen in the pseudocode, senders will increment this count and receivers
225 // will decrement the count. The theory behind this is that if a sender sees a
226 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
227 // then it doesn't need to block.
229 // The recv() method has a beginning call to pop(), and if successful, it needs
230 // to decrement the count. It is a crucial implementation detail that this
231 // decrement does *not* happen to the shared counter. If this were the case,
232 // then it would be possible for the counter to be very negative when there were
233 // no receivers waiting, in which case the senders would have to determine when
234 // it was actually appropriate to wake up a receiver.
236 // Instead, the "steal count" is kept track of separately (not atomically
237 // because it's only used by receivers), and then the decrement() call when
238 // descheduling will lump in all of the recent steals into one large decrement.
240 // The implication of this is that if a sender sees a -1 count, then there's
241 // guaranteed to be a waiter waiting!
243 // ## Native Implementation
245 // A major goal of these channels is to work seamlessly on and off the runtime.
246 // All of the previous race conditions have been worded in terms of
247 // scheduler-isms (which is obviously not available without the runtime).
249 // For now, native usage of channels (off the runtime) will fall back onto
250 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
251 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
252 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
253 // condition variable.
257 // Being able to support selection over channels has greatly influenced this
258 // design, and not only does selection need to work inside the runtime, but also
259 // outside the runtime.
261 // The implementation is fairly straightforward. The goal of select() is not to
262 // return some data, but only to return which channel can receive data without
263 // blocking. The implementation is essentially the entire blocking procedure
264 // followed by an increment as soon as its woken up. The cancellation procedure
265 // involves an increment and swapping out of to_wake to acquire ownership of the
266 // thread to unblock.
268 // Sadly this current implementation requires multiple allocations, so I have
269 // seen the throughput of select() be much worse than it should be. I do not
270 // believe that there is anything fundamental that needs to change about these
271 // channels, however, in order to support a more efficient select().
275 // And now that you've seen all the races that I found and attempted to fix,
276 // here's the code for you to find some more!
282 use cell::UnsafeCell;
283 use time::{Duration, Instant};
285 #[unstable(feature = "mpsc_select", issue = "27800")]
286 pub use self::select::{Select, Handle};
287 use self::select::StartResult;
288 use self::select::StartResult::*;
289 use self::blocking::SignalToken;
300 /// The receiving-half of Rust's channel type. This half can only be owned by
303 /// Messages sent to the channel can be retrieved using [`recv`].
305 /// [`recv`]: ../../../std/sync/mpsc/struct.Receiver.html#method.recv
310 /// use std::sync::mpsc::channel;
312 /// use std::time::Duration;
314 /// let (send, recv) = channel();
316 /// thread::spawn(move || {
317 /// send.send("Hello world!").unwrap();
318 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
319 /// send.send("Delayed for 2 seconds").unwrap();
322 /// println!("{}", recv.recv().unwrap()); // Received immediately
323 /// println!("Waiting...");
324 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
326 #[stable(feature = "rust1", since = "1.0.0")]
327 pub struct Receiver<T> {
328 inner: UnsafeCell<Flavor<T>>,
331 // The receiver port can be sent from place to place, so long as it
332 // is not used to receive non-sendable things.
333 #[stable(feature = "rust1", since = "1.0.0")]
334 unsafe impl<T: Send> Send for Receiver<T> { }
336 #[stable(feature = "rust1", since = "1.0.0")]
337 impl<T> !Sync for Receiver<T> { }
339 /// An iterator over messages on a receiver, this iterator will block whenever
340 /// [`next`] is called, waiting for a new message, and [`None`] will be returned
341 /// when the corresponding channel has hung up.
343 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
344 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
345 #[stable(feature = "rust1", since = "1.0.0")]
347 pub struct Iter<'a, T: 'a> {
351 /// An iterator that attempts to yield all pending values for a receiver.
352 /// [`None`] will be returned when there are no pending values remaining or if
353 /// the corresponding channel has hung up.
355 /// This Iterator will never block the caller in order to wait for data to
356 /// become available. Instead, it will return [`None`].
358 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
359 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
361 pub struct TryIter<'a, T: 'a> {
365 /// An owning iterator over messages on a receiver, this iterator will block
366 /// whenever [`next`] is called, waiting for a new message, and [`None`] will be
367 /// returned when the corresponding channel has hung up.
369 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
370 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
372 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
374 pub struct IntoIter<T> {
378 /// The sending-half of Rust's asynchronous channel type. This half can only be
379 /// owned by one thread, but it can be cloned to send to other threads.
381 /// Messages can be sent through this channel with [`send`].
383 /// [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
388 /// use std::sync::mpsc::channel;
391 /// let (sender, receiver) = channel();
392 /// let sender2 = sender.clone();
394 /// // First thread owns sender
395 /// thread::spawn(move || {
396 /// sender.send(1).unwrap();
399 /// // Second thread owns sender2
400 /// thread::spawn(move || {
401 /// sender2.send(2).unwrap();
404 /// let msg = receiver.recv().unwrap();
405 /// let msg2 = receiver.recv().unwrap();
407 /// assert_eq!(3, msg + msg2);
409 #[stable(feature = "rust1", since = "1.0.0")]
410 pub struct Sender<T> {
411 inner: UnsafeCell<Flavor<T>>,
414 // The send port can be sent from place to place, so long as it
415 // is not used to send non-sendable things.
416 #[stable(feature = "rust1", since = "1.0.0")]
417 unsafe impl<T: Send> Send for Sender<T> { }
419 #[stable(feature = "rust1", since = "1.0.0")]
420 impl<T> !Sync for Sender<T> { }
422 /// The sending-half of Rust's synchronous channel type. This half can only be
423 /// owned by one thread, but it can be cloned to send to other threads.
425 /// [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
426 /// [`SyncSender::send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
428 #[stable(feature = "rust1", since = "1.0.0")]
429 pub struct SyncSender<T> {
430 inner: Arc<sync::Packet<T>>,
433 #[stable(feature = "rust1", since = "1.0.0")]
434 unsafe impl<T: Send> Send for SyncSender<T> {}
436 #[stable(feature = "rust1", since = "1.0.0")]
437 impl<T> !Sync for SyncSender<T> {}
439 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
440 /// function on **channel**s.
442 /// A **send** operation can only fail if the receiving end of a channel is
443 /// disconnected, implying that the data could never be received. The error
444 /// contains the data being sent as a payload so it can be recovered.
446 /// [`Sender::send`]: struct.Sender.html#method.send
447 /// [`SyncSender::send`]: struct.SyncSender.html#method.send
448 #[stable(feature = "rust1", since = "1.0.0")]
449 #[derive(PartialEq, Eq, Clone, Copy)]
450 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
452 /// An error returned from the [`recv`] function on a [`Receiver`].
454 /// The [`recv`] operation can only fail if the sending half of a
455 /// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
456 /// messages will ever be received.
458 /// [`recv`]: struct.Receiver.html#method.recv
459 /// [`Receiver`]: struct.Receiver.html
460 /// [`channel`]: fn.channel.html
461 /// [`sync_channel`]: fn.sync_channel.html
462 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
463 #[stable(feature = "rust1", since = "1.0.0")]
464 pub struct RecvError;
466 /// This enumeration is the list of the possible reasons that [`try_recv`] could
467 /// not return data when called. This can occur with both a [`channel`] and
468 /// a [`sync_channel`].
470 /// [`try_recv`]: struct.Receiver.html#method.try_recv
471 /// [`channel`]: fn.channel.html
472 /// [`sync_channel`]: fn.sync_channel.html
473 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
474 #[stable(feature = "rust1", since = "1.0.0")]
475 pub enum TryRecvError {
476 /// This **channel** is currently empty, but the **Sender**(s) have not yet
477 /// disconnected, so data may yet become available.
478 #[stable(feature = "rust1", since = "1.0.0")]
481 /// The **channel**'s sending half has become disconnected, and there will
482 /// never be any more data received on it.
483 #[stable(feature = "rust1", since = "1.0.0")]
487 /// This enumeration is the list of possible errors that made [`recv_timeout`]
488 /// unable to return data when called. This can occur with both a [`channel`] and
489 /// a [`sync_channel`].
491 /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
492 /// [`channel`]: fn.channel.html
493 /// [`sync_channel`]: fn.sync_channel.html
494 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
495 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
496 pub enum RecvTimeoutError {
497 /// This **channel** is currently empty, but the **Sender**(s) have not yet
498 /// disconnected, so data may yet become available.
499 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
501 /// The **channel**'s sending half has become disconnected, and there will
502 /// never be any more data received on it.
503 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
507 /// This enumeration is the list of the possible error outcomes for the
508 /// [`try_send`] method.
510 /// [`try_send`]: struct.SyncSender.html#method.try_send
511 #[stable(feature = "rust1", since = "1.0.0")]
512 #[derive(PartialEq, Eq, Clone, Copy)]
513 pub enum TrySendError<T> {
514 /// The data could not be sent on the [`sync_channel`] because it would require that
515 /// the callee block to send the data.
517 /// If this is a buffered channel, then the buffer is full at this time. If
518 /// this is not a buffered channel, then there is no [`Receiver`] available to
519 /// acquire the data.
521 /// [`sync_channel`]: fn.sync_channel.html
522 /// [`Receiver`]: struct.Receiver.html
523 #[stable(feature = "rust1", since = "1.0.0")]
524 Full(#[stable(feature = "rust1", since = "1.0.0")] T),
526 /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
527 /// sent. The data is returned back to the callee in this case.
529 /// [`sync_channel`]: fn.sync_channel.html
530 #[stable(feature = "rust1", since = "1.0.0")]
531 Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
535 Oneshot(Arc<oneshot::Packet<T>>),
536 Stream(Arc<stream::Packet<T>>),
537 Shared(Arc<shared::Packet<T>>),
538 Sync(Arc<sync::Packet<T>>),
542 trait UnsafeFlavor<T> {
543 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
544 unsafe fn inner_mut(&self) -> &mut Flavor<T> {
545 &mut *self.inner_unsafe().get()
547 unsafe fn inner(&self) -> &Flavor<T> {
548 &*self.inner_unsafe().get()
551 impl<T> UnsafeFlavor<T> for Sender<T> {
552 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
556 impl<T> UnsafeFlavor<T> for Receiver<T> {
557 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
562 /// Creates a new asynchronous channel, returning the sender/receiver halves.
563 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
564 /// the same order as it was sent, and no [`send`] will block the calling thread
565 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
566 /// block after its buffer limit is reached). [`recv`] will block until a message
569 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
570 /// only one [`Receiver`] is supported.
572 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
573 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, If the
574 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
575 /// return a [`RecvError`].
577 /// [`send`]: struct.Sender.html#method.send
578 /// [`recv`]: struct.Receiver.html#method.recv
579 /// [`Sender`]: struct.Sender.html
580 /// [`Receiver`]: struct.Receiver.html
581 /// [`sync_channel`]: fn.sync_channel.html
582 /// [`SendError`]: struct.SendError.html
583 /// [`RecvError`]: struct.RecvError.html
588 /// use std::sync::mpsc::channel;
591 /// let (sender, receiver) = channel();
593 /// // Spawn off an expensive computation
594 /// thread::spawn(move|| {
595 /// # fn expensive_computation() {}
596 /// sender.send(expensive_computation()).unwrap();
599 /// // Do some useful work for awhile
601 /// // Let's see what that answer was
602 /// println!("{:?}", receiver.recv().unwrap());
604 #[stable(feature = "rust1", since = "1.0.0")]
605 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
606 let a = Arc::new(oneshot::Packet::new());
607 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
610 /// Creates a new synchronous, bounded channel.
611 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
612 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
613 /// [`Receiver`] will block until a message becomes available. `sync_channel`
614 /// differs greatly in the semantics of the sender, however.
616 /// This channel has an internal buffer on which messages will be queued.
617 /// `bound` specifies the buffer size. When the internal buffer becomes full,
618 /// future sends will *block* waiting for the buffer to open up. Note that a
619 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
620 /// where each [`send`] will not return until a [`recv`] is paired with it.
622 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
623 /// times, but only one [`Receiver`] is supported.
625 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
626 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
627 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
628 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
630 /// [`channel`]: fn.channel.html
631 /// [`send`]: struct.SyncSender.html#method.send
632 /// [`recv`]: struct.Receiver.html#method.recv
633 /// [`SyncSender`]: struct.SyncSender.html
634 /// [`Receiver`]: struct.Receiver.html
635 /// [`SendError`]: struct.SendError.html
636 /// [`RecvError`]: struct.RecvError.html
641 /// use std::sync::mpsc::sync_channel;
644 /// let (sender, receiver) = sync_channel(1);
646 /// // this returns immediately
647 /// sender.send(1).unwrap();
649 /// thread::spawn(move|| {
650 /// // this will block until the previous message has been received
651 /// sender.send(2).unwrap();
654 /// assert_eq!(receiver.recv().unwrap(), 1);
655 /// assert_eq!(receiver.recv().unwrap(), 2);
657 #[stable(feature = "rust1", since = "1.0.0")]
658 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
659 let a = Arc::new(sync::Packet::new(bound));
660 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
663 ////////////////////////////////////////////////////////////////////////////////
665 ////////////////////////////////////////////////////////////////////////////////
668 fn new(inner: Flavor<T>) -> Sender<T> {
670 inner: UnsafeCell::new(inner),
674 /// Attempts to send a value on this channel, returning it back if it could
677 /// A successful send occurs when it is determined that the other end of
678 /// the channel has not hung up already. An unsuccessful send would be one
679 /// where the corresponding receiver has already been deallocated. Note
680 /// that a return value of [`Err`] means that the data will never be
681 /// received, but a return value of [`Ok`] does *not* mean that the data
682 /// will be received. It is possible for the corresponding receiver to
683 /// hang up immediately after this function returns [`Ok`].
685 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
686 /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
688 /// This method will never block the current thread.
693 /// use std::sync::mpsc::channel;
695 /// let (tx, rx) = channel();
697 /// // This send is always successful
698 /// tx.send(1).unwrap();
700 /// // This send will fail because the receiver is gone
702 /// assert_eq!(tx.send(1).unwrap_err().0, 1);
704 #[stable(feature = "rust1", since = "1.0.0")]
705 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
706 let (new_inner, ret) = match *unsafe { self.inner() } {
707 Flavor::Oneshot(ref p) => {
709 return p.send(t).map_err(SendError);
711 let a = Arc::new(stream::Packet::new());
712 let rx = Receiver::new(Flavor::Stream(a.clone()));
713 match p.upgrade(rx) {
714 oneshot::UpSuccess => {
718 oneshot::UpDisconnected => (a, Err(t)),
719 oneshot::UpWoke(token) => {
720 // This send cannot panic because the thread is
721 // asleep (we're looking at it), so the receiver
723 a.send(t).ok().unwrap();
730 Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
731 Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
732 Flavor::Sync(..) => unreachable!(),
736 let tmp = Sender::new(Flavor::Stream(new_inner));
737 mem::swap(self.inner_mut(), tmp.inner_mut());
739 ret.map_err(SendError)
743 #[stable(feature = "rust1", since = "1.0.0")]
744 impl<T> Clone for Sender<T> {
745 fn clone(&self) -> Sender<T> {
746 let packet = match *unsafe { self.inner() } {
747 Flavor::Oneshot(ref p) => {
748 let a = Arc::new(shared::Packet::new());
750 let guard = a.postinit_lock();
751 let rx = Receiver::new(Flavor::Shared(a.clone()));
752 let sleeper = match p.upgrade(rx) {
754 oneshot::UpDisconnected => None,
755 oneshot::UpWoke(task) => Some(task),
757 a.inherit_blocker(sleeper, guard);
761 Flavor::Stream(ref p) => {
762 let a = Arc::new(shared::Packet::new());
764 let guard = a.postinit_lock();
765 let rx = Receiver::new(Flavor::Shared(a.clone()));
766 let sleeper = match p.upgrade(rx) {
768 stream::UpDisconnected => None,
769 stream::UpWoke(task) => Some(task),
771 a.inherit_blocker(sleeper, guard);
775 Flavor::Shared(ref p) => {
777 return Sender::new(Flavor::Shared(p.clone()));
779 Flavor::Sync(..) => unreachable!(),
783 let tmp = Sender::new(Flavor::Shared(packet.clone()));
784 mem::swap(self.inner_mut(), tmp.inner_mut());
786 Sender::new(Flavor::Shared(packet))
790 #[stable(feature = "rust1", since = "1.0.0")]
791 impl<T> Drop for Sender<T> {
793 match *unsafe { self.inner() } {
794 Flavor::Oneshot(ref p) => p.drop_chan(),
795 Flavor::Stream(ref p) => p.drop_chan(),
796 Flavor::Shared(ref p) => p.drop_chan(),
797 Flavor::Sync(..) => unreachable!(),
802 #[stable(feature = "mpsc_debug", since = "1.7.0")]
803 impl<T> fmt::Debug for Sender<T> {
804 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
805 write!(f, "Sender {{ .. }}")
809 ////////////////////////////////////////////////////////////////////////////////
811 ////////////////////////////////////////////////////////////////////////////////
813 impl<T> SyncSender<T> {
814 fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
815 SyncSender { inner: inner }
818 /// Sends a value on this synchronous channel.
820 /// This function will *block* until space in the internal buffer becomes
821 /// available or a receiver is available to hand off the message to.
823 /// Note that a successful send does *not* guarantee that the receiver will
824 /// ever see the data if there is a buffer on this channel. Items may be
825 /// enqueued in the internal buffer for the receiver to receive at a later
826 /// time. If the buffer size is 0, however, it can be guaranteed that the
827 /// receiver has indeed received the data if this function returns success.
829 /// This function will never panic, but it may return [`Err`] if the
830 /// [`Receiver`] has disconnected and is no longer able to receive
833 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
834 /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
835 #[stable(feature = "rust1", since = "1.0.0")]
836 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
837 self.inner.send(t).map_err(SendError)
840 /// Attempts to send a value on this channel without blocking.
842 /// This method differs from [`send`] by returning immediately if the
843 /// channel's buffer is full or no receiver is waiting to acquire some
844 /// data. Compared with [`send`], this function has two failure cases
845 /// instead of one (one for disconnection, one for a full buffer).
847 /// See [`SyncSender::send`] for notes about guarantees of whether the
848 /// receiver has received the data or not if this function is successful.
850 /// [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
851 /// [`SyncSender::send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
852 #[stable(feature = "rust1", since = "1.0.0")]
853 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
854 self.inner.try_send(t)
858 #[stable(feature = "rust1", since = "1.0.0")]
859 impl<T> Clone for SyncSender<T> {
860 fn clone(&self) -> SyncSender<T> {
861 self.inner.clone_chan();
862 SyncSender::new(self.inner.clone())
866 #[stable(feature = "rust1", since = "1.0.0")]
867 impl<T> Drop for SyncSender<T> {
869 self.inner.drop_chan();
873 #[stable(feature = "mpsc_debug", since = "1.7.0")]
874 impl<T> fmt::Debug for SyncSender<T> {
875 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
876 write!(f, "SyncSender {{ .. }}")
880 ////////////////////////////////////////////////////////////////////////////////
882 ////////////////////////////////////////////////////////////////////////////////
884 impl<T> Receiver<T> {
885 fn new(inner: Flavor<T>) -> Receiver<T> {
886 Receiver { inner: UnsafeCell::new(inner) }
889 /// Attempts to return a pending value on this receiver without blocking
891 /// This method will never block the caller in order to wait for data to
892 /// become available. Instead, this will always return immediately with a
893 /// possible option of pending data on the channel.
895 /// This is useful for a flavor of "optimistic check" before deciding to
896 /// block on a receiver.
897 #[stable(feature = "rust1", since = "1.0.0")]
898 pub fn try_recv(&self) -> Result<T, TryRecvError> {
900 let new_port = match *unsafe { self.inner() } {
901 Flavor::Oneshot(ref p) => {
903 Ok(t) => return Ok(t),
904 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
905 Err(oneshot::Disconnected) => {
906 return Err(TryRecvError::Disconnected)
908 Err(oneshot::Upgraded(rx)) => rx,
911 Flavor::Stream(ref p) => {
913 Ok(t) => return Ok(t),
914 Err(stream::Empty) => return Err(TryRecvError::Empty),
915 Err(stream::Disconnected) => {
916 return Err(TryRecvError::Disconnected)
918 Err(stream::Upgraded(rx)) => rx,
921 Flavor::Shared(ref p) => {
923 Ok(t) => return Ok(t),
924 Err(shared::Empty) => return Err(TryRecvError::Empty),
925 Err(shared::Disconnected) => {
926 return Err(TryRecvError::Disconnected)
930 Flavor::Sync(ref p) => {
932 Ok(t) => return Ok(t),
933 Err(sync::Empty) => return Err(TryRecvError::Empty),
934 Err(sync::Disconnected) => {
935 return Err(TryRecvError::Disconnected)
941 mem::swap(self.inner_mut(),
942 new_port.inner_mut());
947 /// Attempts to wait for a value on this receiver, returning an error if the
948 /// corresponding channel has hung up.
950 /// This function will always block the current thread if there is no data
951 /// available and it's possible for more data to be sent. Once a message is
952 /// sent to the corresponding [`Sender`], then this receiver will wake up and
953 /// return that message.
955 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
956 /// this call is blocking, this call will wake up and return [`Err`] to
957 /// indicate that no more messages can ever be received on this channel.
958 /// However, since channels are buffered, messages sent before the disconnect
959 /// will still be properly received.
961 /// [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
962 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
967 /// use std::sync::mpsc;
970 /// let (send, recv) = mpsc::channel();
971 /// let handle = thread::spawn(move || {
972 /// send.send(1u8).unwrap();
975 /// handle.join().unwrap();
977 /// assert_eq!(Ok(1), recv.recv());
980 /// Buffering behavior:
983 /// use std::sync::mpsc;
985 /// use std::sync::mpsc::RecvError;
987 /// let (send, recv) = mpsc::channel();
988 /// let handle = thread::spawn(move || {
989 /// send.send(1u8).unwrap();
990 /// send.send(2).unwrap();
991 /// send.send(3).unwrap();
995 /// // wait for the thread to join so we ensure the sender is dropped
996 /// handle.join().unwrap();
998 /// assert_eq!(Ok(1), recv.recv());
999 /// assert_eq!(Ok(2), recv.recv());
1000 /// assert_eq!(Ok(3), recv.recv());
1001 /// assert_eq!(Err(RecvError), recv.recv());
1003 #[stable(feature = "rust1", since = "1.0.0")]
1004 pub fn recv(&self) -> Result<T, RecvError> {
1006 let new_port = match *unsafe { self.inner() } {
1007 Flavor::Oneshot(ref p) => {
1008 match p.recv(None) {
1009 Ok(t) => return Ok(t),
1010 Err(oneshot::Disconnected) => return Err(RecvError),
1011 Err(oneshot::Upgraded(rx)) => rx,
1012 Err(oneshot::Empty) => unreachable!(),
1015 Flavor::Stream(ref p) => {
1016 match p.recv(None) {
1017 Ok(t) => return Ok(t),
1018 Err(stream::Disconnected) => return Err(RecvError),
1019 Err(stream::Upgraded(rx)) => rx,
1020 Err(stream::Empty) => unreachable!(),
1023 Flavor::Shared(ref p) => {
1024 match p.recv(None) {
1025 Ok(t) => return Ok(t),
1026 Err(shared::Disconnected) => return Err(RecvError),
1027 Err(shared::Empty) => unreachable!(),
1030 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1033 mem::swap(self.inner_mut(), new_port.inner_mut());
1038 /// Attempts to wait for a value on this receiver, returning an error if the
1039 /// corresponding channel has hung up, or if it waits more than `timeout`.
1041 /// This function will always block the current thread if there is no data
1042 /// available and it's possible for more data to be sent. Once a message is
1043 /// sent to the corresponding [`Sender`], then this receiver will wake up and
1044 /// return that message.
1046 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1047 /// this call is blocking, this call will wake up and return [`Err`] to
1048 /// indicate that no more messages can ever be received on this channel.
1049 /// However, since channels are buffered, messages sent before the disconnect
1050 /// will still be properly received.
1052 /// [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
1053 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1058 /// use std::sync::mpsc::{self, RecvTimeoutError};
1059 /// use std::time::Duration;
1061 /// let (send, recv) = mpsc::channel::<()>();
1063 /// let timeout = Duration::from_millis(100);
1064 /// assert_eq!(Err(RecvTimeoutError::Timeout), recv.recv_timeout(timeout));
1066 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1067 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1068 // Do an optimistic try_recv to avoid the performance impact of
1069 // Instant::now() in the full-channel case.
1070 match self.try_recv() {
1073 Err(TryRecvError::Disconnected)
1074 => Err(RecvTimeoutError::Disconnected),
1075 Err(TryRecvError::Empty)
1076 => self.recv_max_until(Instant::now() + timeout)
1080 fn recv_max_until(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1081 use self::RecvTimeoutError::*;
1084 let port_or_empty = match *unsafe { self.inner() } {
1085 Flavor::Oneshot(ref p) => {
1086 match p.recv(Some(deadline)) {
1087 Ok(t) => return Ok(t),
1088 Err(oneshot::Disconnected) => return Err(Disconnected),
1089 Err(oneshot::Upgraded(rx)) => Some(rx),
1090 Err(oneshot::Empty) => None,
1093 Flavor::Stream(ref p) => {
1094 match p.recv(Some(deadline)) {
1095 Ok(t) => return Ok(t),
1096 Err(stream::Disconnected) => return Err(Disconnected),
1097 Err(stream::Upgraded(rx)) => Some(rx),
1098 Err(stream::Empty) => None,
1101 Flavor::Shared(ref p) => {
1102 match p.recv(Some(deadline)) {
1103 Ok(t) => return Ok(t),
1104 Err(shared::Disconnected) => return Err(Disconnected),
1105 Err(shared::Empty) => None,
1108 Flavor::Sync(ref p) => {
1109 match p.recv(Some(deadline)) {
1110 Ok(t) => return Ok(t),
1111 Err(sync::Disconnected) => return Err(Disconnected),
1112 Err(sync::Empty) => None,
1117 if let Some(new_port) = port_or_empty {
1119 mem::swap(self.inner_mut(), new_port.inner_mut());
1123 // If we're already passed the deadline, and we're here without
1124 // data, return a timeout, else try again.
1125 if Instant::now() >= deadline {
1126 return Err(Timeout);
1131 /// Returns an iterator that will block waiting for messages, but never
1132 /// [`panic!`]. It will return [`None`] when the channel has hung up.
1134 /// [`panic!`]: ../../../std/macro.panic.html
1135 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1140 /// use std::sync::mpsc::channel;
1141 /// use std::thread;
1143 /// let (send, recv) = channel();
1145 /// thread::spawn(move || {
1146 /// send.send(1u8).unwrap();
1147 /// send.send(2u8).unwrap();
1148 /// send.send(3u8).unwrap();
1151 /// for x in recv.iter() {
1152 /// println!("Got: {}", x);
1155 #[stable(feature = "rust1", since = "1.0.0")]
1156 pub fn iter(&self) -> Iter<T> {
1160 /// Returns an iterator that will attempt to yield all pending values.
1161 /// It will return `None` if there are no more pending values or if the
1162 /// channel has hung up. The iterator will never [`panic!`] or block the
1163 /// user by waiting for values.
1165 /// [`panic!`]: ../../../std/macro.panic.html
1166 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1167 pub fn try_iter(&self) -> TryIter<T> {
1168 TryIter { rx: self }
1173 impl<T> select::Packet for Receiver<T> {
1174 fn can_recv(&self) -> bool {
1176 let new_port = match *unsafe { self.inner() } {
1177 Flavor::Oneshot(ref p) => {
1178 match p.can_recv() {
1179 Ok(ret) => return ret,
1180 Err(upgrade) => upgrade,
1183 Flavor::Stream(ref p) => {
1184 match p.can_recv() {
1185 Ok(ret) => return ret,
1186 Err(upgrade) => upgrade,
1189 Flavor::Shared(ref p) => return p.can_recv(),
1190 Flavor::Sync(ref p) => return p.can_recv(),
1193 mem::swap(self.inner_mut(),
1194 new_port.inner_mut());
1199 fn start_selection(&self, mut token: SignalToken) -> StartResult {
1201 let (t, new_port) = match *unsafe { self.inner() } {
1202 Flavor::Oneshot(ref p) => {
1203 match p.start_selection(token) {
1204 oneshot::SelSuccess => return Installed,
1205 oneshot::SelCanceled => return Abort,
1206 oneshot::SelUpgraded(t, rx) => (t, rx),
1209 Flavor::Stream(ref p) => {
1210 match p.start_selection(token) {
1211 stream::SelSuccess => return Installed,
1212 stream::SelCanceled => return Abort,
1213 stream::SelUpgraded(t, rx) => (t, rx),
1216 Flavor::Shared(ref p) => return p.start_selection(token),
1217 Flavor::Sync(ref p) => return p.start_selection(token),
1221 mem::swap(self.inner_mut(), new_port.inner_mut());
1226 fn abort_selection(&self) -> bool {
1227 let mut was_upgrade = false;
1229 let result = match *unsafe { self.inner() } {
1230 Flavor::Oneshot(ref p) => p.abort_selection(),
1231 Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
1232 Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
1233 Flavor::Sync(ref p) => return p.abort_selection(),
1235 let new_port = match result { Ok(b) => return b, Err(p) => p };
1238 mem::swap(self.inner_mut(),
1239 new_port.inner_mut());
1245 #[stable(feature = "rust1", since = "1.0.0")]
1246 impl<'a, T> Iterator for Iter<'a, T> {
1249 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1252 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1253 impl<'a, T> Iterator for TryIter<'a, T> {
1256 fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
1259 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1260 impl<'a, T> IntoIterator for &'a Receiver<T> {
1262 type IntoIter = Iter<'a, T>;
1264 fn into_iter(self) -> Iter<'a, T> { self.iter() }
1267 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1268 impl<T> Iterator for IntoIter<T> {
1270 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1273 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1274 impl <T> IntoIterator for Receiver<T> {
1276 type IntoIter = IntoIter<T>;
1278 fn into_iter(self) -> IntoIter<T> {
1279 IntoIter { rx: self }
1283 #[stable(feature = "rust1", since = "1.0.0")]
1284 impl<T> Drop for Receiver<T> {
1285 fn drop(&mut self) {
1286 match *unsafe { self.inner() } {
1287 Flavor::Oneshot(ref p) => p.drop_port(),
1288 Flavor::Stream(ref p) => p.drop_port(),
1289 Flavor::Shared(ref p) => p.drop_port(),
1290 Flavor::Sync(ref p) => p.drop_port(),
1295 #[stable(feature = "mpsc_debug", since = "1.7.0")]
1296 impl<T> fmt::Debug for Receiver<T> {
1297 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1298 write!(f, "Receiver {{ .. }}")
1302 #[stable(feature = "rust1", since = "1.0.0")]
1303 impl<T> fmt::Debug for SendError<T> {
1304 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1305 "SendError(..)".fmt(f)
1309 #[stable(feature = "rust1", since = "1.0.0")]
1310 impl<T> fmt::Display for SendError<T> {
1311 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1312 "sending on a closed channel".fmt(f)
1316 #[stable(feature = "rust1", since = "1.0.0")]
1317 impl<T: Send> error::Error for SendError<T> {
1318 fn description(&self) -> &str {
1319 "sending on a closed channel"
1322 fn cause(&self) -> Option<&error::Error> {
1327 #[stable(feature = "rust1", since = "1.0.0")]
1328 impl<T> fmt::Debug for TrySendError<T> {
1329 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1331 TrySendError::Full(..) => "Full(..)".fmt(f),
1332 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1337 #[stable(feature = "rust1", since = "1.0.0")]
1338 impl<T> fmt::Display for TrySendError<T> {
1339 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1341 TrySendError::Full(..) => {
1342 "sending on a full channel".fmt(f)
1344 TrySendError::Disconnected(..) => {
1345 "sending on a closed channel".fmt(f)
1351 #[stable(feature = "rust1", since = "1.0.0")]
1352 impl<T: Send> error::Error for TrySendError<T> {
1354 fn description(&self) -> &str {
1356 TrySendError::Full(..) => {
1357 "sending on a full channel"
1359 TrySendError::Disconnected(..) => {
1360 "sending on a closed channel"
1365 fn cause(&self) -> Option<&error::Error> {
1370 #[stable(feature = "rust1", since = "1.0.0")]
1371 impl fmt::Display for RecvError {
1372 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1373 "receiving on a closed channel".fmt(f)
1377 #[stable(feature = "rust1", since = "1.0.0")]
1378 impl error::Error for RecvError {
1380 fn description(&self) -> &str {
1381 "receiving on a closed channel"
1384 fn cause(&self) -> Option<&error::Error> {
1389 #[stable(feature = "rust1", since = "1.0.0")]
1390 impl fmt::Display for TryRecvError {
1391 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1393 TryRecvError::Empty => {
1394 "receiving on an empty channel".fmt(f)
1396 TryRecvError::Disconnected => {
1397 "receiving on a closed channel".fmt(f)
1403 #[stable(feature = "rust1", since = "1.0.0")]
1404 impl error::Error for TryRecvError {
1406 fn description(&self) -> &str {
1408 TryRecvError::Empty => {
1409 "receiving on an empty channel"
1411 TryRecvError::Disconnected => {
1412 "receiving on a closed channel"
1417 fn cause(&self) -> Option<&error::Error> {
1422 #[stable(feature = "mpsc_recv_timeout_error", since = "1.14.0")]
1423 impl fmt::Display for RecvTimeoutError {
1424 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1426 RecvTimeoutError::Timeout => {
1427 "timed out waiting on channel".fmt(f)
1429 RecvTimeoutError::Disconnected => {
1430 "channel is empty and sending half is closed".fmt(f)
1436 #[stable(feature = "mpsc_recv_timeout_error", since = "1.14.0")]
1437 impl error::Error for RecvTimeoutError {
1438 fn description(&self) -> &str {
1440 RecvTimeoutError::Timeout => {
1441 "timed out waiting on channel"
1443 RecvTimeoutError::Disconnected => {
1444 "channel is empty and sending half is closed"
1449 fn cause(&self) -> Option<&error::Error> {
1454 #[cfg(all(test, not(target_os = "emscripten")))]
1459 use time::{Duration, Instant};
1461 pub fn stress_factor() -> usize {
1462 match env::var("RUST_TEST_STRESS") {
1463 Ok(val) => val.parse().unwrap(),
1470 let (tx, rx) = channel::<i32>();
1471 tx.send(1).unwrap();
1472 assert_eq!(rx.recv().unwrap(), 1);
1477 let (tx, _rx) = channel::<Box<isize>>();
1478 tx.send(box 1).unwrap();
1482 fn drop_full_shared() {
1483 let (tx, _rx) = channel::<Box<isize>>();
1486 tx.send(box 1).unwrap();
1491 let (tx, rx) = channel::<i32>();
1492 tx.send(1).unwrap();
1493 assert_eq!(rx.recv().unwrap(), 1);
1494 let tx = tx.clone();
1495 tx.send(1).unwrap();
1496 assert_eq!(rx.recv().unwrap(), 1);
1500 fn smoke_threads() {
1501 let (tx, rx) = channel::<i32>();
1502 let _t = thread::spawn(move|| {
1503 tx.send(1).unwrap();
1505 assert_eq!(rx.recv().unwrap(), 1);
1509 fn smoke_port_gone() {
1510 let (tx, rx) = channel::<i32>();
1512 assert!(tx.send(1).is_err());
1516 fn smoke_shared_port_gone() {
1517 let (tx, rx) = channel::<i32>();
1519 assert!(tx.send(1).is_err())
1523 fn smoke_shared_port_gone2() {
1524 let (tx, rx) = channel::<i32>();
1526 let tx2 = tx.clone();
1528 assert!(tx2.send(1).is_err());
1532 fn port_gone_concurrent() {
1533 let (tx, rx) = channel::<i32>();
1534 let _t = thread::spawn(move|| {
1537 while tx.send(1).is_ok() {}
1541 fn port_gone_concurrent_shared() {
1542 let (tx, rx) = channel::<i32>();
1543 let tx2 = tx.clone();
1544 let _t = thread::spawn(move|| {
1547 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1551 fn smoke_chan_gone() {
1552 let (tx, rx) = channel::<i32>();
1554 assert!(rx.recv().is_err());
1558 fn smoke_chan_gone_shared() {
1559 let (tx, rx) = channel::<()>();
1560 let tx2 = tx.clone();
1563 assert!(rx.recv().is_err());
1567 fn chan_gone_concurrent() {
1568 let (tx, rx) = channel::<i32>();
1569 let _t = thread::spawn(move|| {
1570 tx.send(1).unwrap();
1571 tx.send(1).unwrap();
1573 while rx.recv().is_ok() {}
1578 let (tx, rx) = channel::<i32>();
1579 let t = thread::spawn(move|| {
1580 for _ in 0..10000 { tx.send(1).unwrap(); }
1583 assert_eq!(rx.recv().unwrap(), 1);
1585 t.join().ok().unwrap();
1589 fn stress_shared() {
1590 const AMT: u32 = 10000;
1591 const NTHREADS: u32 = 8;
1592 let (tx, rx) = channel::<i32>();
1594 let t = thread::spawn(move|| {
1595 for _ in 0..AMT * NTHREADS {
1596 assert_eq!(rx.recv().unwrap(), 1);
1598 match rx.try_recv() {
1604 for _ in 0..NTHREADS {
1605 let tx = tx.clone();
1606 thread::spawn(move|| {
1607 for _ in 0..AMT { tx.send(1).unwrap(); }
1611 t.join().ok().unwrap();
1615 fn send_from_outside_runtime() {
1616 let (tx1, rx1) = channel::<()>();
1617 let (tx2, rx2) = channel::<i32>();
1618 let t1 = thread::spawn(move|| {
1619 tx1.send(()).unwrap();
1621 assert_eq!(rx2.recv().unwrap(), 1);
1624 rx1.recv().unwrap();
1625 let t2 = thread::spawn(move|| {
1627 tx2.send(1).unwrap();
1630 t1.join().ok().unwrap();
1631 t2.join().ok().unwrap();
1635 fn recv_from_outside_runtime() {
1636 let (tx, rx) = channel::<i32>();
1637 let t = thread::spawn(move|| {
1639 assert_eq!(rx.recv().unwrap(), 1);
1643 tx.send(1).unwrap();
1645 t.join().ok().unwrap();
1650 let (tx1, rx1) = channel::<i32>();
1651 let (tx2, rx2) = channel::<i32>();
1652 let t1 = thread::spawn(move|| {
1653 assert_eq!(rx1.recv().unwrap(), 1);
1654 tx2.send(2).unwrap();
1656 let t2 = thread::spawn(move|| {
1657 tx1.send(1).unwrap();
1658 assert_eq!(rx2.recv().unwrap(), 2);
1660 t1.join().ok().unwrap();
1661 t2.join().ok().unwrap();
1665 fn oneshot_single_thread_close_port_first() {
1666 // Simple test of closing without sending
1667 let (_tx, rx) = channel::<i32>();
1672 fn oneshot_single_thread_close_chan_first() {
1673 // Simple test of closing without sending
1674 let (tx, _rx) = channel::<i32>();
1679 fn oneshot_single_thread_send_port_close() {
1680 // Testing that the sender cleans up the payload if receiver is closed
1681 let (tx, rx) = channel::<Box<i32>>();
1683 assert!(tx.send(box 0).is_err());
1687 fn oneshot_single_thread_recv_chan_close() {
1688 // Receiving on a closed chan will panic
1689 let res = thread::spawn(move|| {
1690 let (tx, rx) = channel::<i32>();
1695 assert!(res.is_err());
1699 fn oneshot_single_thread_send_then_recv() {
1700 let (tx, rx) = channel::<Box<i32>>();
1701 tx.send(box 10).unwrap();
1702 assert!(rx.recv().unwrap() == box 10);
1706 fn oneshot_single_thread_try_send_open() {
1707 let (tx, rx) = channel::<i32>();
1708 assert!(tx.send(10).is_ok());
1709 assert!(rx.recv().unwrap() == 10);
1713 fn oneshot_single_thread_try_send_closed() {
1714 let (tx, rx) = channel::<i32>();
1716 assert!(tx.send(10).is_err());
1720 fn oneshot_single_thread_try_recv_open() {
1721 let (tx, rx) = channel::<i32>();
1722 tx.send(10).unwrap();
1723 assert!(rx.recv() == Ok(10));
1727 fn oneshot_single_thread_try_recv_closed() {
1728 let (tx, rx) = channel::<i32>();
1730 assert!(rx.recv().is_err());
1734 fn oneshot_single_thread_peek_data() {
1735 let (tx, rx) = channel::<i32>();
1736 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1737 tx.send(10).unwrap();
1738 assert_eq!(rx.try_recv(), Ok(10));
1742 fn oneshot_single_thread_peek_close() {
1743 let (tx, rx) = channel::<i32>();
1745 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1746 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1750 fn oneshot_single_thread_peek_open() {
1751 let (_tx, rx) = channel::<i32>();
1752 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1756 fn oneshot_multi_task_recv_then_send() {
1757 let (tx, rx) = channel::<Box<i32>>();
1758 let _t = thread::spawn(move|| {
1759 assert!(rx.recv().unwrap() == box 10);
1762 tx.send(box 10).unwrap();
1766 fn oneshot_multi_task_recv_then_close() {
1767 let (tx, rx) = channel::<Box<i32>>();
1768 let _t = thread::spawn(move|| {
1771 let res = thread::spawn(move|| {
1772 assert!(rx.recv().unwrap() == box 10);
1774 assert!(res.is_err());
1778 fn oneshot_multi_thread_close_stress() {
1779 for _ in 0..stress_factor() {
1780 let (tx, rx) = channel::<i32>();
1781 let _t = thread::spawn(move|| {
1789 fn oneshot_multi_thread_send_close_stress() {
1790 for _ in 0..stress_factor() {
1791 let (tx, rx) = channel::<i32>();
1792 let _t = thread::spawn(move|| {
1795 let _ = thread::spawn(move|| {
1796 tx.send(1).unwrap();
1802 fn oneshot_multi_thread_recv_close_stress() {
1803 for _ in 0..stress_factor() {
1804 let (tx, rx) = channel::<i32>();
1805 thread::spawn(move|| {
1806 let res = thread::spawn(move|| {
1809 assert!(res.is_err());
1811 let _t = thread::spawn(move|| {
1812 thread::spawn(move|| {
1820 fn oneshot_multi_thread_send_recv_stress() {
1821 for _ in 0..stress_factor() {
1822 let (tx, rx) = channel::<Box<isize>>();
1823 let _t = thread::spawn(move|| {
1824 tx.send(box 10).unwrap();
1826 assert!(rx.recv().unwrap() == box 10);
1831 fn stream_send_recv_stress() {
1832 for _ in 0..stress_factor() {
1833 let (tx, rx) = channel();
1838 fn send(tx: Sender<Box<i32>>, i: i32) {
1839 if i == 10 { return }
1841 thread::spawn(move|| {
1842 tx.send(box i).unwrap();
1847 fn recv(rx: Receiver<Box<i32>>, i: i32) {
1848 if i == 10 { return }
1850 thread::spawn(move|| {
1851 assert!(rx.recv().unwrap() == box i);
1859 fn oneshot_single_thread_recv_timeout() {
1860 let (tx, rx) = channel();
1861 tx.send(()).unwrap();
1862 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
1863 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
1864 tx.send(()).unwrap();
1865 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
1869 fn stress_recv_timeout_two_threads() {
1870 let (tx, rx) = channel();
1871 let stress = stress_factor() + 100;
1872 let timeout = Duration::from_millis(100);
1874 thread::spawn(move || {
1875 for i in 0..stress {
1877 thread::sleep(timeout * 2);
1879 tx.send(1usize).unwrap();
1883 let mut recv_count = 0;
1885 match rx.recv_timeout(timeout) {
1887 assert_eq!(n, 1usize);
1890 Err(RecvTimeoutError::Timeout) => continue,
1891 Err(RecvTimeoutError::Disconnected) => break,
1895 assert_eq!(recv_count, stress);
1899 fn recv_timeout_upgrade() {
1900 let (tx, rx) = channel::<()>();
1901 let timeout = Duration::from_millis(1);
1902 let _tx_clone = tx.clone();
1904 let start = Instant::now();
1905 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
1906 assert!(Instant::now() >= start + timeout);
1910 fn stress_recv_timeout_shared() {
1911 let (tx, rx) = channel();
1912 let stress = stress_factor() + 100;
1914 for i in 0..stress {
1915 let tx = tx.clone();
1916 thread::spawn(move || {
1917 thread::sleep(Duration::from_millis(i as u64 * 10));
1918 tx.send(1usize).unwrap();
1924 let mut recv_count = 0;
1926 match rx.recv_timeout(Duration::from_millis(10)) {
1928 assert_eq!(n, 1usize);
1931 Err(RecvTimeoutError::Timeout) => continue,
1932 Err(RecvTimeoutError::Disconnected) => break,
1936 assert_eq!(recv_count, stress);
1941 // Regression test that we don't run out of stack in scheduler context
1942 let (tx, rx) = channel();
1943 for _ in 0..10000 { tx.send(()).unwrap(); }
1944 for _ in 0..10000 { rx.recv().unwrap(); }
1948 fn shared_recv_timeout() {
1949 let (tx, rx) = channel();
1952 let tx = tx.clone();
1953 thread::spawn(move|| {
1954 tx.send(()).unwrap();
1958 for _ in 0..total { rx.recv().unwrap(); }
1960 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
1961 tx.send(()).unwrap();
1962 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
1966 fn shared_chan_stress() {
1967 let (tx, rx) = channel();
1968 let total = stress_factor() + 100;
1970 let tx = tx.clone();
1971 thread::spawn(move|| {
1972 tx.send(()).unwrap();
1982 fn test_nested_recv_iter() {
1983 let (tx, rx) = channel::<i32>();
1984 let (total_tx, total_rx) = channel::<i32>();
1986 let _t = thread::spawn(move|| {
1988 for x in rx.iter() {
1991 total_tx.send(acc).unwrap();
1994 tx.send(3).unwrap();
1995 tx.send(1).unwrap();
1996 tx.send(2).unwrap();
1998 assert_eq!(total_rx.recv().unwrap(), 6);
2002 fn test_recv_iter_break() {
2003 let (tx, rx) = channel::<i32>();
2004 let (count_tx, count_rx) = channel();
2006 let _t = thread::spawn(move|| {
2008 for x in rx.iter() {
2015 count_tx.send(count).unwrap();
2018 tx.send(2).unwrap();
2019 tx.send(2).unwrap();
2020 tx.send(2).unwrap();
2023 assert_eq!(count_rx.recv().unwrap(), 4);
2027 fn test_recv_try_iter() {
2028 let (request_tx, request_rx) = channel();
2029 let (response_tx, response_rx) = channel();
2031 // Request `x`s until we have `6`.
2032 let t = thread::spawn(move|| {
2035 for x in response_rx.try_iter() {
2041 request_tx.send(()).unwrap();
2045 for _ in request_rx.iter() {
2046 if response_tx.send(2).is_err() {
2051 assert_eq!(t.join().unwrap(), 6);
2055 fn test_recv_into_iter_owned() {
2057 let (tx, rx) = channel::<i32>();
2058 tx.send(1).unwrap();
2059 tx.send(2).unwrap();
2063 assert_eq!(iter.next().unwrap(), 1);
2064 assert_eq!(iter.next().unwrap(), 2);
2065 assert_eq!(iter.next().is_none(), true);
2069 fn test_recv_into_iter_borrowed() {
2070 let (tx, rx) = channel::<i32>();
2071 tx.send(1).unwrap();
2072 tx.send(2).unwrap();
2074 let mut iter = (&rx).into_iter();
2075 assert_eq!(iter.next().unwrap(), 1);
2076 assert_eq!(iter.next().unwrap(), 2);
2077 assert_eq!(iter.next().is_none(), true);
2081 fn try_recv_states() {
2082 let (tx1, rx1) = channel::<i32>();
2083 let (tx2, rx2) = channel::<()>();
2084 let (tx3, rx3) = channel::<()>();
2085 let _t = thread::spawn(move|| {
2086 rx2.recv().unwrap();
2087 tx1.send(1).unwrap();
2088 tx3.send(()).unwrap();
2089 rx2.recv().unwrap();
2091 tx3.send(()).unwrap();
2094 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2095 tx2.send(()).unwrap();
2096 rx3.recv().unwrap();
2097 assert_eq!(rx1.try_recv(), Ok(1));
2098 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2099 tx2.send(()).unwrap();
2100 rx3.recv().unwrap();
2101 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2104 // This bug used to end up in a livelock inside of the Receiver destructor
2105 // because the internal state of the Shared packet was corrupted
2107 fn destroy_upgraded_shared_port_when_sender_still_active() {
2108 let (tx, rx) = channel();
2109 let (tx2, rx2) = channel();
2110 let _t = thread::spawn(move|| {
2111 rx.recv().unwrap(); // wait on a oneshot
2112 drop(rx); // destroy a shared
2113 tx2.send(()).unwrap();
2115 // make sure the other thread has gone to sleep
2116 for _ in 0..5000 { thread::yield_now(); }
2118 // upgrade to a shared chan and send a message
2121 t.send(()).unwrap();
2123 // wait for the child thread to exit before we exit
2124 rx2.recv().unwrap();
2129 let (tx, _) = channel();
2130 let _ = tx.send(123);
2131 assert_eq!(tx.send(123), Err(SendError(123)));
2135 #[cfg(all(test, not(target_os = "emscripten")))]
2142 pub fn stress_factor() -> usize {
2143 match env::var("RUST_TEST_STRESS") {
2144 Ok(val) => val.parse().unwrap(),
2151 let (tx, rx) = sync_channel::<i32>(1);
2152 tx.send(1).unwrap();
2153 assert_eq!(rx.recv().unwrap(), 1);
2158 let (tx, _rx) = sync_channel::<Box<isize>>(1);
2159 tx.send(box 1).unwrap();
2164 let (tx, rx) = sync_channel::<i32>(1);
2165 tx.send(1).unwrap();
2166 assert_eq!(rx.recv().unwrap(), 1);
2167 let tx = tx.clone();
2168 tx.send(1).unwrap();
2169 assert_eq!(rx.recv().unwrap(), 1);
2174 let (tx, rx) = sync_channel::<i32>(1);
2175 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2176 tx.send(1).unwrap();
2177 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2181 fn smoke_threads() {
2182 let (tx, rx) = sync_channel::<i32>(0);
2183 let _t = thread::spawn(move|| {
2184 tx.send(1).unwrap();
2186 assert_eq!(rx.recv().unwrap(), 1);
2190 fn smoke_port_gone() {
2191 let (tx, rx) = sync_channel::<i32>(0);
2193 assert!(tx.send(1).is_err());
2197 fn smoke_shared_port_gone2() {
2198 let (tx, rx) = sync_channel::<i32>(0);
2200 let tx2 = tx.clone();
2202 assert!(tx2.send(1).is_err());
2206 fn port_gone_concurrent() {
2207 let (tx, rx) = sync_channel::<i32>(0);
2208 let _t = thread::spawn(move|| {
2211 while tx.send(1).is_ok() {}
2215 fn port_gone_concurrent_shared() {
2216 let (tx, rx) = sync_channel::<i32>(0);
2217 let tx2 = tx.clone();
2218 let _t = thread::spawn(move|| {
2221 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2225 fn smoke_chan_gone() {
2226 let (tx, rx) = sync_channel::<i32>(0);
2228 assert!(rx.recv().is_err());
2232 fn smoke_chan_gone_shared() {
2233 let (tx, rx) = sync_channel::<()>(0);
2234 let tx2 = tx.clone();
2237 assert!(rx.recv().is_err());
2241 fn chan_gone_concurrent() {
2242 let (tx, rx) = sync_channel::<i32>(0);
2243 thread::spawn(move|| {
2244 tx.send(1).unwrap();
2245 tx.send(1).unwrap();
2247 while rx.recv().is_ok() {}
2252 let (tx, rx) = sync_channel::<i32>(0);
2253 thread::spawn(move|| {
2254 for _ in 0..10000 { tx.send(1).unwrap(); }
2257 assert_eq!(rx.recv().unwrap(), 1);
2262 fn stress_recv_timeout_two_threads() {
2263 let (tx, rx) = sync_channel::<i32>(0);
2265 thread::spawn(move|| {
2266 for _ in 0..10000 { tx.send(1).unwrap(); }
2269 let mut recv_count = 0;
2271 match rx.recv_timeout(Duration::from_millis(1)) {
2276 Err(RecvTimeoutError::Timeout) => continue,
2277 Err(RecvTimeoutError::Disconnected) => break,
2281 assert_eq!(recv_count, 10000);
2285 fn stress_recv_timeout_shared() {
2286 const AMT: u32 = 1000;
2287 const NTHREADS: u32 = 8;
2288 let (tx, rx) = sync_channel::<i32>(0);
2289 let (dtx, drx) = sync_channel::<()>(0);
2291 thread::spawn(move|| {
2292 let mut recv_count = 0;
2294 match rx.recv_timeout(Duration::from_millis(10)) {
2299 Err(RecvTimeoutError::Timeout) => continue,
2300 Err(RecvTimeoutError::Disconnected) => break,
2304 assert_eq!(recv_count, AMT * NTHREADS);
2305 assert!(rx.try_recv().is_err());
2307 dtx.send(()).unwrap();
2310 for _ in 0..NTHREADS {
2311 let tx = tx.clone();
2312 thread::spawn(move|| {
2313 for _ in 0..AMT { tx.send(1).unwrap(); }
2319 drx.recv().unwrap();
2323 fn stress_shared() {
2324 const AMT: u32 = 1000;
2325 const NTHREADS: u32 = 8;
2326 let (tx, rx) = sync_channel::<i32>(0);
2327 let (dtx, drx) = sync_channel::<()>(0);
2329 thread::spawn(move|| {
2330 for _ in 0..AMT * NTHREADS {
2331 assert_eq!(rx.recv().unwrap(), 1);
2333 match rx.try_recv() {
2337 dtx.send(()).unwrap();
2340 for _ in 0..NTHREADS {
2341 let tx = tx.clone();
2342 thread::spawn(move|| {
2343 for _ in 0..AMT { tx.send(1).unwrap(); }
2347 drx.recv().unwrap();
2351 fn oneshot_single_thread_close_port_first() {
2352 // Simple test of closing without sending
2353 let (_tx, rx) = sync_channel::<i32>(0);
2358 fn oneshot_single_thread_close_chan_first() {
2359 // Simple test of closing without sending
2360 let (tx, _rx) = sync_channel::<i32>(0);
2365 fn oneshot_single_thread_send_port_close() {
2366 // Testing that the sender cleans up the payload if receiver is closed
2367 let (tx, rx) = sync_channel::<Box<i32>>(0);
2369 assert!(tx.send(box 0).is_err());
2373 fn oneshot_single_thread_recv_chan_close() {
2374 // Receiving on a closed chan will panic
2375 let res = thread::spawn(move|| {
2376 let (tx, rx) = sync_channel::<i32>(0);
2381 assert!(res.is_err());
2385 fn oneshot_single_thread_send_then_recv() {
2386 let (tx, rx) = sync_channel::<Box<i32>>(1);
2387 tx.send(box 10).unwrap();
2388 assert!(rx.recv().unwrap() == box 10);
2392 fn oneshot_single_thread_try_send_open() {
2393 let (tx, rx) = sync_channel::<i32>(1);
2394 assert_eq!(tx.try_send(10), Ok(()));
2395 assert!(rx.recv().unwrap() == 10);
2399 fn oneshot_single_thread_try_send_closed() {
2400 let (tx, rx) = sync_channel::<i32>(0);
2402 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2406 fn oneshot_single_thread_try_send_closed2() {
2407 let (tx, _rx) = sync_channel::<i32>(0);
2408 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2412 fn oneshot_single_thread_try_recv_open() {
2413 let (tx, rx) = sync_channel::<i32>(1);
2414 tx.send(10).unwrap();
2415 assert!(rx.recv() == Ok(10));
2419 fn oneshot_single_thread_try_recv_closed() {
2420 let (tx, rx) = sync_channel::<i32>(0);
2422 assert!(rx.recv().is_err());
2426 fn oneshot_single_thread_try_recv_closed_with_data() {
2427 let (tx, rx) = sync_channel::<i32>(1);
2428 tx.send(10).unwrap();
2430 assert_eq!(rx.try_recv(), Ok(10));
2431 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2435 fn oneshot_single_thread_peek_data() {
2436 let (tx, rx) = sync_channel::<i32>(1);
2437 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2438 tx.send(10).unwrap();
2439 assert_eq!(rx.try_recv(), Ok(10));
2443 fn oneshot_single_thread_peek_close() {
2444 let (tx, rx) = sync_channel::<i32>(0);
2446 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2447 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2451 fn oneshot_single_thread_peek_open() {
2452 let (_tx, rx) = sync_channel::<i32>(0);
2453 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2457 fn oneshot_multi_task_recv_then_send() {
2458 let (tx, rx) = sync_channel::<Box<i32>>(0);
2459 let _t = thread::spawn(move|| {
2460 assert!(rx.recv().unwrap() == box 10);
2463 tx.send(box 10).unwrap();
2467 fn oneshot_multi_task_recv_then_close() {
2468 let (tx, rx) = sync_channel::<Box<i32>>(0);
2469 let _t = thread::spawn(move|| {
2472 let res = thread::spawn(move|| {
2473 assert!(rx.recv().unwrap() == box 10);
2475 assert!(res.is_err());
2479 fn oneshot_multi_thread_close_stress() {
2480 for _ in 0..stress_factor() {
2481 let (tx, rx) = sync_channel::<i32>(0);
2482 let _t = thread::spawn(move|| {
2490 fn oneshot_multi_thread_send_close_stress() {
2491 for _ in 0..stress_factor() {
2492 let (tx, rx) = sync_channel::<i32>(0);
2493 let _t = thread::spawn(move|| {
2496 let _ = thread::spawn(move || {
2497 tx.send(1).unwrap();
2503 fn oneshot_multi_thread_recv_close_stress() {
2504 for _ in 0..stress_factor() {
2505 let (tx, rx) = sync_channel::<i32>(0);
2506 let _t = thread::spawn(move|| {
2507 let res = thread::spawn(move|| {
2510 assert!(res.is_err());
2512 let _t = thread::spawn(move|| {
2513 thread::spawn(move|| {
2521 fn oneshot_multi_thread_send_recv_stress() {
2522 for _ in 0..stress_factor() {
2523 let (tx, rx) = sync_channel::<Box<i32>>(0);
2524 let _t = thread::spawn(move|| {
2525 tx.send(box 10).unwrap();
2527 assert!(rx.recv().unwrap() == box 10);
2532 fn stream_send_recv_stress() {
2533 for _ in 0..stress_factor() {
2534 let (tx, rx) = sync_channel::<Box<i32>>(0);
2539 fn send(tx: SyncSender<Box<i32>>, i: i32) {
2540 if i == 10 { return }
2542 thread::spawn(move|| {
2543 tx.send(box i).unwrap();
2548 fn recv(rx: Receiver<Box<i32>>, i: i32) {
2549 if i == 10 { return }
2551 thread::spawn(move|| {
2552 assert!(rx.recv().unwrap() == box i);
2561 // Regression test that we don't run out of stack in scheduler context
2562 let (tx, rx) = sync_channel(10000);
2563 for _ in 0..10000 { tx.send(()).unwrap(); }
2564 for _ in 0..10000 { rx.recv().unwrap(); }
2568 fn shared_chan_stress() {
2569 let (tx, rx) = sync_channel(0);
2570 let total = stress_factor() + 100;
2572 let tx = tx.clone();
2573 thread::spawn(move|| {
2574 tx.send(()).unwrap();
2584 fn test_nested_recv_iter() {
2585 let (tx, rx) = sync_channel::<i32>(0);
2586 let (total_tx, total_rx) = sync_channel::<i32>(0);
2588 let _t = thread::spawn(move|| {
2590 for x in rx.iter() {
2593 total_tx.send(acc).unwrap();
2596 tx.send(3).unwrap();
2597 tx.send(1).unwrap();
2598 tx.send(2).unwrap();
2600 assert_eq!(total_rx.recv().unwrap(), 6);
2604 fn test_recv_iter_break() {
2605 let (tx, rx) = sync_channel::<i32>(0);
2606 let (count_tx, count_rx) = sync_channel(0);
2608 let _t = thread::spawn(move|| {
2610 for x in rx.iter() {
2617 count_tx.send(count).unwrap();
2620 tx.send(2).unwrap();
2621 tx.send(2).unwrap();
2622 tx.send(2).unwrap();
2623 let _ = tx.try_send(2);
2625 assert_eq!(count_rx.recv().unwrap(), 4);
2629 fn try_recv_states() {
2630 let (tx1, rx1) = sync_channel::<i32>(1);
2631 let (tx2, rx2) = sync_channel::<()>(1);
2632 let (tx3, rx3) = sync_channel::<()>(1);
2633 let _t = thread::spawn(move|| {
2634 rx2.recv().unwrap();
2635 tx1.send(1).unwrap();
2636 tx3.send(()).unwrap();
2637 rx2.recv().unwrap();
2639 tx3.send(()).unwrap();
2642 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2643 tx2.send(()).unwrap();
2644 rx3.recv().unwrap();
2645 assert_eq!(rx1.try_recv(), Ok(1));
2646 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2647 tx2.send(()).unwrap();
2648 rx3.recv().unwrap();
2649 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2652 // This bug used to end up in a livelock inside of the Receiver destructor
2653 // because the internal state of the Shared packet was corrupted
2655 fn destroy_upgraded_shared_port_when_sender_still_active() {
2656 let (tx, rx) = sync_channel::<()>(0);
2657 let (tx2, rx2) = sync_channel::<()>(0);
2658 let _t = thread::spawn(move|| {
2659 rx.recv().unwrap(); // wait on a oneshot
2660 drop(rx); // destroy a shared
2661 tx2.send(()).unwrap();
2663 // make sure the other thread has gone to sleep
2664 for _ in 0..5000 { thread::yield_now(); }
2666 // upgrade to a shared chan and send a message
2669 t.send(()).unwrap();
2671 // wait for the child thread to exit before we exit
2672 rx2.recv().unwrap();
2677 let (tx, rx) = sync_channel::<i32>(0);
2678 let _t = thread::spawn(move|| { rx.recv().unwrap(); });
2679 assert_eq!(tx.send(1), Ok(()));
2684 let (tx, rx) = sync_channel::<i32>(0);
2685 let _t = thread::spawn(move|| { drop(rx); });
2686 assert!(tx.send(1).is_err());
2691 let (tx, rx) = sync_channel::<i32>(1);
2692 assert_eq!(tx.send(1), Ok(()));
2693 let _t =thread::spawn(move|| { drop(rx); });
2694 assert!(tx.send(1).is_err());
2699 let (tx, rx) = sync_channel::<i32>(0);
2700 let tx2 = tx.clone();
2701 let (done, donerx) = channel();
2702 let done2 = done.clone();
2703 let _t = thread::spawn(move|| {
2704 assert!(tx.send(1).is_err());
2705 done.send(()).unwrap();
2707 let _t = thread::spawn(move|| {
2708 assert!(tx2.send(2).is_err());
2709 done2.send(()).unwrap();
2712 donerx.recv().unwrap();
2713 donerx.recv().unwrap();
2718 let (tx, _rx) = sync_channel::<i32>(0);
2719 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2724 let (tx, _rx) = sync_channel::<i32>(1);
2725 assert_eq!(tx.try_send(1), Ok(()));
2726 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2731 let (tx, rx) = sync_channel::<i32>(1);
2732 assert_eq!(tx.try_send(1), Ok(()));
2734 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2740 let (tx1, rx1) = sync_channel::<()>(3);
2741 let (tx2, rx2) = sync_channel::<()>(3);
2743 let _t = thread::spawn(move|| {
2744 rx1.recv().unwrap();
2745 tx2.try_send(()).unwrap();
2748 tx1.try_send(()).unwrap();
2749 rx2.recv().unwrap();
2758 fn fmt_debug_sender() {
2759 let (tx, _) = channel::<i32>();
2760 assert_eq!(format!("{:?}", tx), "Sender { .. }");
2764 fn fmt_debug_recv() {
2765 let (_, rx) = channel::<i32>();
2766 assert_eq!(format!("{:?}", rx), "Receiver { .. }");
2770 fn fmt_debug_sync_sender() {
2771 let (tx, _) = sync_channel::<i32>(1);
2772 assert_eq!(format!("{:?}", tx), "SyncSender { .. }");