]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/mod.rs
Rollup merge of #64603 - gilescope:unused-lifetime-warning, r=matthewjasper
[rust.git] / src / libstd / sync / mpsc / mod.rs
1 // ignore-tidy-filelength
2
3 //! Multi-producer, single-consumer FIFO queue communication primitives.
4 //!
5 //! This module provides message-based communication over channels, concretely
6 //! defined among three types:
7 //!
8 //! * [`Sender`]
9 //! * [`SyncSender`]
10 //! * [`Receiver`]
11 //!
12 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
13 //! senders are clone-able (multi-producer) such that many threads can send
14 //! simultaneously to one receiver (single-consumer).
15 //!
16 //! These channels come in two flavors:
17 //!
18 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
19 //!    will return a `(Sender, Receiver)` tuple where all sends will be
20 //!    **asynchronous** (they never block). The channel conceptually has an
21 //!    infinite buffer.
22 //!
23 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
24 //!    return a `(SyncSender, Receiver)` tuple where the storage for pending
25 //!    messages is a pre-allocated buffer of a fixed size. All sends will be
26 //!    **synchronous** by blocking until there is buffer space available. Note
27 //!    that a bound of 0 is allowed, causing the channel to become a "rendezvous"
28 //!    channel where each sender atomically hands off a message to a receiver.
29 //!
30 //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
31 //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
32 //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
33 //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
34 //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
35 //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
36 //!
37 //! ## Disconnection
38 //!
39 //! The send and receive operations on channels will all return a [`Result`]
40 //! indicating whether the operation succeeded or not. An unsuccessful operation
41 //! is normally indicative of the other half of a channel having "hung up" by
42 //! being dropped in its corresponding thread.
43 //!
44 //! Once half of a channel has been deallocated, most operations can no longer
45 //! continue to make progress, so [`Err`] will be returned. Many applications
46 //! will continue to [`unwrap`] the results returned from this module,
47 //! instigating a propagation of failure among threads if one unexpectedly dies.
48 //!
49 //! [`Result`]: ../../../std/result/enum.Result.html
50 //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
51 //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
52 //!
53 //! # Examples
54 //!
55 //! Simple usage:
56 //!
57 //! ```
58 //! use std::thread;
59 //! use std::sync::mpsc::channel;
60 //!
61 //! // Create a simple streaming channel
62 //! let (tx, rx) = channel();
63 //! thread::spawn(move|| {
64 //!     tx.send(10).unwrap();
65 //! });
66 //! assert_eq!(rx.recv().unwrap(), 10);
67 //! ```
68 //!
69 //! Shared usage:
70 //!
71 //! ```
72 //! use std::thread;
73 //! use std::sync::mpsc::channel;
74 //!
75 //! // Create a shared channel that can be sent along from many threads
76 //! // where tx is the sending half (tx for transmission), and rx is the receiving
77 //! // half (rx for receiving).
78 //! let (tx, rx) = channel();
79 //! for i in 0..10 {
80 //!     let tx = tx.clone();
81 //!     thread::spawn(move|| {
82 //!         tx.send(i).unwrap();
83 //!     });
84 //! }
85 //!
86 //! for _ in 0..10 {
87 //!     let j = rx.recv().unwrap();
88 //!     assert!(0 <= j && j < 10);
89 //! }
90 //! ```
91 //!
92 //! Propagating panics:
93 //!
94 //! ```
95 //! use std::sync::mpsc::channel;
96 //!
97 //! // The call to recv() will return an error because the channel has already
98 //! // hung up (or been deallocated)
99 //! let (tx, rx) = channel::<i32>();
100 //! drop(tx);
101 //! assert!(rx.recv().is_err());
102 //! ```
103 //!
104 //! Synchronous channels:
105 //!
106 //! ```
107 //! use std::thread;
108 //! use std::sync::mpsc::sync_channel;
109 //!
110 //! let (tx, rx) = sync_channel::<i32>(0);
111 //! thread::spawn(move|| {
112 //!     // This will wait for the parent thread to start receiving
113 //!     tx.send(53).unwrap();
114 //! });
115 //! rx.recv().unwrap();
116 //! ```
117
118 #![stable(feature = "rust1", since = "1.0.0")]
119
120 // A description of how Rust's channel implementation works
121 //
122 // Channels are supposed to be the basic building block for all other
123 // concurrent primitives that are used in Rust. As a result, the channel type
124 // needs to be highly optimized, flexible, and broad enough for use everywhere.
125 //
126 // The choice of implementation of all channels is to be built on lock-free data
127 // structures. The channels themselves are then consequently also lock-free data
128 // structures. As always with lock-free code, this is a very "here be dragons"
129 // territory, especially because I'm unaware of any academic papers that have
130 // gone into great length about channels of these flavors.
131 //
132 // ## Flavors of channels
133 //
134 // From the perspective of a consumer of this library, there is only one flavor
135 // of channel. This channel can be used as a stream and cloned to allow multiple
136 // senders. Under the hood, however, there are actually three flavors of
137 // channels in play.
138 //
139 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
140 //                      case. They contain as few atomics as possible and
141 //                      involve one and exactly one allocation.
142 // * Streams - these channels are optimized for the non-shared use case. They
143 //             use a different concurrent queue that is more tailored for this
144 //             use case. The initial allocation of this flavor of channel is not
145 //             optimized.
146 // * Shared - this is the most general form of channel that this module offers,
147 //            a channel with multiple senders. This type is as optimized as it
148 //            can be, but the previous two types mentioned are much faster for
149 //            their use-cases.
150 //
151 // ## Concurrent queues
152 //
153 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
154 // but recv() obviously blocks. This means that under the hood there must be
155 // some shared and concurrent queue holding all of the actual data.
156 //
157 // With two flavors of channels, two flavors of queues are also used. We have
158 // chosen to use queues from a well-known author that are abbreviated as SPSC
159 // and MPSC (single producer, single consumer and multiple producer, single
160 // consumer). SPSC queues are used for streams while MPSC queues are used for
161 // shared channels.
162 //
163 // ### SPSC optimizations
164 //
165 // The SPSC queue found online is essentially a linked list of nodes where one
166 // half of the nodes are the "queue of data" and the other half of nodes are a
167 // cache of unused nodes. The unused nodes are used such that an allocation is
168 // not required on every push() and a free doesn't need to happen on every
169 // pop().
170 //
171 // As found online, however, the cache of nodes is of an infinite size. This
172 // means that if a channel at one point in its life had 50k items in the queue,
173 // then the queue will always have the capacity for 50k items. I believed that
174 // this was an unnecessary limitation of the implementation, so I have altered
175 // the queue to optionally have a bound on the cache size.
176 //
177 // By default, streams will have an unbounded SPSC queue with a small-ish cache
178 // size. The hope is that the cache is still large enough to have very fast
179 // send() operations while not too large such that millions of channels can
180 // coexist at once.
181 //
182 // ### MPSC optimizations
183 //
184 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
185 // a linked list under the hood to earn its unboundedness, but I have not put
186 // forth much effort into having a cache of nodes similar to the SPSC queue.
187 //
188 // For now, I believe that this is "ok" because shared channels are not the most
189 // common type, but soon we may wish to revisit this queue choice and determine
190 // another candidate for backend storage of shared channels.
191 //
192 // ## Overview of the Implementation
193 //
194 // Now that there's a little background on the concurrent queues used, it's
195 // worth going into much more detail about the channels themselves. The basic
196 // pseudocode for a send/recv are:
197 //
198 //
199 //      send(t)                             recv()
200 //        queue.push(t)                       return if queue.pop()
201 //        if increment() == -1                deschedule {
202 //          wakeup()                            if decrement() > 0
203 //                                                cancel_deschedule()
204 //                                            }
205 //                                            queue.pop()
206 //
207 // As mentioned before, there are no locks in this implementation, only atomic
208 // instructions are used.
209 //
210 // ### The internal atomic counter
211 //
212 // Every channel has a shared counter with each half to keep track of the size
213 // of the queue. This counter is used to abort descheduling by the receiver and
214 // to know when to wake up on the sending side.
215 //
216 // As seen in the pseudocode, senders will increment this count and receivers
217 // will decrement the count. The theory behind this is that if a sender sees a
218 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
219 // then it doesn't need to block.
220 //
221 // The recv() method has a beginning call to pop(), and if successful, it needs
222 // to decrement the count. It is a crucial implementation detail that this
223 // decrement does *not* happen to the shared counter. If this were the case,
224 // then it would be possible for the counter to be very negative when there were
225 // no receivers waiting, in which case the senders would have to determine when
226 // it was actually appropriate to wake up a receiver.
227 //
228 // Instead, the "steal count" is kept track of separately (not atomically
229 // because it's only used by receivers), and then the decrement() call when
230 // descheduling will lump in all of the recent steals into one large decrement.
231 //
232 // The implication of this is that if a sender sees a -1 count, then there's
233 // guaranteed to be a waiter waiting!
234 //
235 // ## Native Implementation
236 //
237 // A major goal of these channels is to work seamlessly on and off the runtime.
238 // All of the previous race conditions have been worded in terms of
239 // scheduler-isms (which is obviously not available without the runtime).
240 //
241 // For now, native usage of channels (off the runtime) will fall back onto
242 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
243 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
244 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
245 // condition variable.
246 //
247 // ## Select
248 //
249 // Being able to support selection over channels has greatly influenced this
250 // design, and not only does selection need to work inside the runtime, but also
251 // outside the runtime.
252 //
253 // The implementation is fairly straightforward. The goal of select() is not to
254 // return some data, but only to return which channel can receive data without
255 // blocking. The implementation is essentially the entire blocking procedure
256 // followed by an increment as soon as its woken up. The cancellation procedure
257 // involves an increment and swapping out of to_wake to acquire ownership of the
258 // thread to unblock.
259 //
260 // Sadly this current implementation requires multiple allocations, so I have
261 // seen the throughput of select() be much worse than it should be. I do not
262 // believe that there is anything fundamental that needs to change about these
263 // channels, however, in order to support a more efficient select().
264 //
265 // FIXME: Select is now removed, so these factors are ready to be cleaned up!
266 //
267 // # Conclusion
268 //
269 // And now that you've seen all the races that I found and attempted to fix,
270 // here's the code for you to find some more!
271
272 use crate::sync::Arc;
273 use crate::error;
274 use crate::fmt;
275 use crate::mem;
276 use crate::cell::UnsafeCell;
277 use crate::time::{Duration, Instant};
278
279 mod blocking;
280 mod oneshot;
281 mod shared;
282 mod stream;
283 mod sync;
284 mod mpsc_queue;
285 mod spsc_queue;
286
287 mod cache_aligned;
288
289 /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
290 /// This half can only be owned by one thread.
291 ///
292 /// Messages sent to the channel can be retrieved using [`recv`].
293 ///
294 /// [`channel`]: fn.channel.html
295 /// [`sync_channel`]: fn.sync_channel.html
296 /// [`recv`]: struct.Receiver.html#method.recv
297 ///
298 /// # Examples
299 ///
300 /// ```rust
301 /// use std::sync::mpsc::channel;
302 /// use std::thread;
303 /// use std::time::Duration;
304 ///
305 /// let (send, recv) = channel();
306 ///
307 /// thread::spawn(move || {
308 ///     send.send("Hello world!").unwrap();
309 ///     thread::sleep(Duration::from_secs(2)); // block for two seconds
310 ///     send.send("Delayed for 2 seconds").unwrap();
311 /// });
312 ///
313 /// println!("{}", recv.recv().unwrap()); // Received immediately
314 /// println!("Waiting...");
315 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
316 /// ```
317 #[stable(feature = "rust1", since = "1.0.0")]
318 pub struct Receiver<T> {
319     inner: UnsafeCell<Flavor<T>>,
320 }
321
322 // The receiver port can be sent from place to place, so long as it
323 // is not used to receive non-sendable things.
324 #[stable(feature = "rust1", since = "1.0.0")]
325 unsafe impl<T: Send> Send for Receiver<T> { }
326
327 #[stable(feature = "rust1", since = "1.0.0")]
328 impl<T> !Sync for Receiver<T> { }
329
330 /// An iterator over messages on a [`Receiver`], created by [`iter`].
331 ///
332 /// This iterator will block whenever [`next`] is called,
333 /// waiting for a new message, and [`None`] will be returned
334 /// when the corresponding channel has hung up.
335 ///
336 /// [`iter`]: struct.Receiver.html#method.iter
337 /// [`Receiver`]: struct.Receiver.html
338 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
339 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
340 ///
341 /// # Examples
342 ///
343 /// ```rust
344 /// use std::sync::mpsc::channel;
345 /// use std::thread;
346 ///
347 /// let (send, recv) = channel();
348 ///
349 /// thread::spawn(move || {
350 ///     send.send(1u8).unwrap();
351 ///     send.send(2u8).unwrap();
352 ///     send.send(3u8).unwrap();
353 /// });
354 ///
355 /// for x in recv.iter() {
356 ///     println!("Got: {}", x);
357 /// }
358 /// ```
359 #[stable(feature = "rust1", since = "1.0.0")]
360 #[derive(Debug)]
361 pub struct Iter<'a, T: 'a> {
362     rx: &'a Receiver<T>
363 }
364
365 /// An iterator that attempts to yield all pending values for a [`Receiver`],
366 /// created by [`try_iter`].
367 ///
368 /// [`None`] will be returned when there are no pending values remaining or
369 /// if the corresponding channel has hung up.
370 ///
371 /// This iterator will never block the caller in order to wait for data to
372 /// become available. Instead, it will return [`None`].
373 ///
374 /// [`Receiver`]: struct.Receiver.html
375 /// [`try_iter`]: struct.Receiver.html#method.try_iter
376 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
377 ///
378 /// # Examples
379 ///
380 /// ```rust
381 /// use std::sync::mpsc::channel;
382 /// use std::thread;
383 /// use std::time::Duration;
384 ///
385 /// let (sender, receiver) = channel();
386 ///
387 /// // Nothing is in the buffer yet
388 /// assert!(receiver.try_iter().next().is_none());
389 /// println!("Nothing in the buffer...");
390 ///
391 /// thread::spawn(move || {
392 ///     sender.send(1).unwrap();
393 ///     sender.send(2).unwrap();
394 ///     sender.send(3).unwrap();
395 /// });
396 ///
397 /// println!("Going to sleep...");
398 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
399 ///
400 /// for x in receiver.try_iter() {
401 ///     println!("Got: {}", x);
402 /// }
403 /// ```
404 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
405 #[derive(Debug)]
406 pub struct TryIter<'a, T: 'a> {
407     rx: &'a Receiver<T>
408 }
409
410 /// An owning iterator over messages on a [`Receiver`],
411 /// created by **Receiver::into_iter**.
412 ///
413 /// This iterator will block whenever [`next`]
414 /// is called, waiting for a new message, and [`None`] will be
415 /// returned if the corresponding channel has hung up.
416 ///
417 /// [`Receiver`]: struct.Receiver.html
418 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
419 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
420 ///
421 /// # Examples
422 ///
423 /// ```rust
424 /// use std::sync::mpsc::channel;
425 /// use std::thread;
426 ///
427 /// let (send, recv) = channel();
428 ///
429 /// thread::spawn(move || {
430 ///     send.send(1u8).unwrap();
431 ///     send.send(2u8).unwrap();
432 ///     send.send(3u8).unwrap();
433 /// });
434 ///
435 /// for x in recv.into_iter() {
436 ///     println!("Got: {}", x);
437 /// }
438 /// ```
439 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
440 #[derive(Debug)]
441 pub struct IntoIter<T> {
442     rx: Receiver<T>
443 }
444
445 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
446 /// owned by one thread, but it can be cloned to send to other threads.
447 ///
448 /// Messages can be sent through this channel with [`send`].
449 ///
450 /// [`channel`]: fn.channel.html
451 /// [`send`]: struct.Sender.html#method.send
452 ///
453 /// # Examples
454 ///
455 /// ```rust
456 /// use std::sync::mpsc::channel;
457 /// use std::thread;
458 ///
459 /// let (sender, receiver) = channel();
460 /// let sender2 = sender.clone();
461 ///
462 /// // First thread owns sender
463 /// thread::spawn(move || {
464 ///     sender.send(1).unwrap();
465 /// });
466 ///
467 /// // Second thread owns sender2
468 /// thread::spawn(move || {
469 ///     sender2.send(2).unwrap();
470 /// });
471 ///
472 /// let msg = receiver.recv().unwrap();
473 /// let msg2 = receiver.recv().unwrap();
474 ///
475 /// assert_eq!(3, msg + msg2);
476 /// ```
477 #[stable(feature = "rust1", since = "1.0.0")]
478 pub struct Sender<T> {
479     inner: UnsafeCell<Flavor<T>>,
480 }
481
482 // The send port can be sent from place to place, so long as it
483 // is not used to send non-sendable things.
484 #[stable(feature = "rust1", since = "1.0.0")]
485 unsafe impl<T: Send> Send for Sender<T> { }
486
487 #[stable(feature = "rust1", since = "1.0.0")]
488 impl<T> !Sync for Sender<T> { }
489
490 /// The sending-half of Rust's synchronous [`sync_channel`] type.
491 ///
492 /// Messages can be sent through this channel with [`send`] or [`try_send`].
493 ///
494 /// [`send`] will block if there is no space in the internal buffer.
495 ///
496 /// [`sync_channel`]: fn.sync_channel.html
497 /// [`send`]: struct.SyncSender.html#method.send
498 /// [`try_send`]: struct.SyncSender.html#method.try_send
499 ///
500 /// # Examples
501 ///
502 /// ```rust
503 /// use std::sync::mpsc::sync_channel;
504 /// use std::thread;
505 ///
506 /// // Create a sync_channel with buffer size 2
507 /// let (sync_sender, receiver) = sync_channel(2);
508 /// let sync_sender2 = sync_sender.clone();
509 ///
510 /// // First thread owns sync_sender
511 /// thread::spawn(move || {
512 ///     sync_sender.send(1).unwrap();
513 ///     sync_sender.send(2).unwrap();
514 /// });
515 ///
516 /// // Second thread owns sync_sender2
517 /// thread::spawn(move || {
518 ///     sync_sender2.send(3).unwrap();
519 ///     // thread will now block since the buffer is full
520 ///     println!("Thread unblocked!");
521 /// });
522 ///
523 /// let mut msg;
524 ///
525 /// msg = receiver.recv().unwrap();
526 /// println!("message {} received", msg);
527 ///
528 /// // "Thread unblocked!" will be printed now
529 ///
530 /// msg = receiver.recv().unwrap();
531 /// println!("message {} received", msg);
532 ///
533 /// msg = receiver.recv().unwrap();
534 ///
535 /// println!("message {} received", msg);
536 /// ```
537 #[stable(feature = "rust1", since = "1.0.0")]
538 pub struct SyncSender<T> {
539     inner: Arc<sync::Packet<T>>,
540 }
541
542 #[stable(feature = "rust1", since = "1.0.0")]
543 unsafe impl<T: Send> Send for SyncSender<T> {}
544
545 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
546 /// function on **channel**s.
547 ///
548 /// A **send** operation can only fail if the receiving end of a channel is
549 /// disconnected, implying that the data could never be received. The error
550 /// contains the data being sent as a payload so it can be recovered.
551 ///
552 /// [`Sender::send`]: struct.Sender.html#method.send
553 /// [`SyncSender::send`]: struct.SyncSender.html#method.send
554 #[stable(feature = "rust1", since = "1.0.0")]
555 #[derive(PartialEq, Eq, Clone, Copy)]
556 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
557
558 /// An error returned from the [`recv`] function on a [`Receiver`].
559 ///
560 /// The [`recv`] operation can only fail if the sending half of a
561 /// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
562 /// messages will ever be received.
563 ///
564 /// [`recv`]: struct.Receiver.html#method.recv
565 /// [`Receiver`]: struct.Receiver.html
566 /// [`channel`]: fn.channel.html
567 /// [`sync_channel`]: fn.sync_channel.html
568 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
569 #[stable(feature = "rust1", since = "1.0.0")]
570 pub struct RecvError;
571
572 /// This enumeration is the list of the possible reasons that [`try_recv`] could
573 /// not return data when called. This can occur with both a [`channel`] and
574 /// a [`sync_channel`].
575 ///
576 /// [`try_recv`]: struct.Receiver.html#method.try_recv
577 /// [`channel`]: fn.channel.html
578 /// [`sync_channel`]: fn.sync_channel.html
579 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
580 #[stable(feature = "rust1", since = "1.0.0")]
581 pub enum TryRecvError {
582     /// This **channel** is currently empty, but the **Sender**(s) have not yet
583     /// disconnected, so data may yet become available.
584     #[stable(feature = "rust1", since = "1.0.0")]
585     Empty,
586
587     /// The **channel**'s sending half has become disconnected, and there will
588     /// never be any more data received on it.
589     #[stable(feature = "rust1", since = "1.0.0")]
590     Disconnected,
591 }
592
593 /// This enumeration is the list of possible errors that made [`recv_timeout`]
594 /// unable to return data when called. This can occur with both a [`channel`] and
595 /// a [`sync_channel`].
596 ///
597 /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
598 /// [`channel`]: fn.channel.html
599 /// [`sync_channel`]: fn.sync_channel.html
600 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
601 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
602 pub enum RecvTimeoutError {
603     /// This **channel** is currently empty, but the **Sender**(s) have not yet
604     /// disconnected, so data may yet become available.
605     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
606     Timeout,
607     /// The **channel**'s sending half has become disconnected, and there will
608     /// never be any more data received on it.
609     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
610     Disconnected,
611 }
612
613 /// This enumeration is the list of the possible error outcomes for the
614 /// [`try_send`] method.
615 ///
616 /// [`try_send`]: struct.SyncSender.html#method.try_send
617 #[stable(feature = "rust1", since = "1.0.0")]
618 #[derive(PartialEq, Eq, Clone, Copy)]
619 pub enum TrySendError<T> {
620     /// The data could not be sent on the [`sync_channel`] because it would require that
621     /// the callee block to send the data.
622     ///
623     /// If this is a buffered channel, then the buffer is full at this time. If
624     /// this is not a buffered channel, then there is no [`Receiver`] available to
625     /// acquire the data.
626     ///
627     /// [`sync_channel`]: fn.sync_channel.html
628     /// [`Receiver`]: struct.Receiver.html
629     #[stable(feature = "rust1", since = "1.0.0")]
630     Full(#[stable(feature = "rust1", since = "1.0.0")] T),
631
632     /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
633     /// sent. The data is returned back to the callee in this case.
634     ///
635     /// [`sync_channel`]: fn.sync_channel.html
636     #[stable(feature = "rust1", since = "1.0.0")]
637     Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
638 }
639
640 enum Flavor<T> {
641     Oneshot(Arc<oneshot::Packet<T>>),
642     Stream(Arc<stream::Packet<T>>),
643     Shared(Arc<shared::Packet<T>>),
644     Sync(Arc<sync::Packet<T>>),
645 }
646
647 #[doc(hidden)]
648 trait UnsafeFlavor<T> {
649     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
650     unsafe fn inner_mut(&self) -> &mut Flavor<T> {
651         &mut *self.inner_unsafe().get()
652     }
653     unsafe fn inner(&self) -> &Flavor<T> {
654         &*self.inner_unsafe().get()
655     }
656 }
657 impl<T> UnsafeFlavor<T> for Sender<T> {
658     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
659         &self.inner
660     }
661 }
662 impl<T> UnsafeFlavor<T> for Receiver<T> {
663     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
664         &self.inner
665     }
666 }
667
668 /// Creates a new asynchronous channel, returning the sender/receiver halves.
669 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
670 /// the same order as it was sent, and no [`send`] will block the calling thread
671 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
672 /// block after its buffer limit is reached). [`recv`] will block until a message
673 /// is available.
674 ///
675 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
676 /// only one [`Receiver`] is supported.
677 ///
678 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
679 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
680 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
681 /// return a [`RecvError`].
682 ///
683 /// [`send`]: struct.Sender.html#method.send
684 /// [`recv`]: struct.Receiver.html#method.recv
685 /// [`Sender`]: struct.Sender.html
686 /// [`Receiver`]: struct.Receiver.html
687 /// [`sync_channel`]: fn.sync_channel.html
688 /// [`SendError`]: struct.SendError.html
689 /// [`RecvError`]: struct.RecvError.html
690 ///
691 /// # Examples
692 ///
693 /// ```
694 /// use std::sync::mpsc::channel;
695 /// use std::thread;
696 ///
697 /// let (sender, receiver) = channel();
698 ///
699 /// // Spawn off an expensive computation
700 /// thread::spawn(move|| {
701 /// #   fn expensive_computation() {}
702 ///     sender.send(expensive_computation()).unwrap();
703 /// });
704 ///
705 /// // Do some useful work for awhile
706 ///
707 /// // Let's see what that answer was
708 /// println!("{:?}", receiver.recv().unwrap());
709 /// ```
710 #[stable(feature = "rust1", since = "1.0.0")]
711 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
712     let a = Arc::new(oneshot::Packet::new());
713     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
714 }
715
716 /// Creates a new synchronous, bounded channel.
717 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
718 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
719 /// [`Receiver`] will block until a message becomes available. `sync_channel`
720 /// differs greatly in the semantics of the sender, however.
721 ///
722 /// This channel has an internal buffer on which messages will be queued.
723 /// `bound` specifies the buffer size. When the internal buffer becomes full,
724 /// future sends will *block* waiting for the buffer to open up. Note that a
725 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
726 /// where each [`send`] will not return until a [`recv`] is paired with it.
727 ///
728 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
729 /// times, but only one [`Receiver`] is supported.
730 ///
731 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
732 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
733 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
734 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
735 ///
736 /// [`channel`]: fn.channel.html
737 /// [`send`]: struct.SyncSender.html#method.send
738 /// [`recv`]: struct.Receiver.html#method.recv
739 /// [`SyncSender`]: struct.SyncSender.html
740 /// [`Receiver`]: struct.Receiver.html
741 /// [`SendError`]: struct.SendError.html
742 /// [`RecvError`]: struct.RecvError.html
743 ///
744 /// # Examples
745 ///
746 /// ```
747 /// use std::sync::mpsc::sync_channel;
748 /// use std::thread;
749 ///
750 /// let (sender, receiver) = sync_channel(1);
751 ///
752 /// // this returns immediately
753 /// sender.send(1).unwrap();
754 ///
755 /// thread::spawn(move|| {
756 ///     // this will block until the previous message has been received
757 ///     sender.send(2).unwrap();
758 /// });
759 ///
760 /// assert_eq!(receiver.recv().unwrap(), 1);
761 /// assert_eq!(receiver.recv().unwrap(), 2);
762 /// ```
763 #[stable(feature = "rust1", since = "1.0.0")]
764 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
765     let a = Arc::new(sync::Packet::new(bound));
766     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
767 }
768
769 ////////////////////////////////////////////////////////////////////////////////
770 // Sender
771 ////////////////////////////////////////////////////////////////////////////////
772
773 impl<T> Sender<T> {
774     fn new(inner: Flavor<T>) -> Sender<T> {
775         Sender {
776             inner: UnsafeCell::new(inner),
777         }
778     }
779
780     /// Attempts to send a value on this channel, returning it back if it could
781     /// not be sent.
782     ///
783     /// A successful send occurs when it is determined that the other end of
784     /// the channel has not hung up already. An unsuccessful send would be one
785     /// where the corresponding receiver has already been deallocated. Note
786     /// that a return value of [`Err`] means that the data will never be
787     /// received, but a return value of [`Ok`] does *not* mean that the data
788     /// will be received. It is possible for the corresponding receiver to
789     /// hang up immediately after this function returns [`Ok`].
790     ///
791     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
792     /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
793     ///
794     /// This method will never block the current thread.
795     ///
796     /// # Examples
797     ///
798     /// ```
799     /// use std::sync::mpsc::channel;
800     ///
801     /// let (tx, rx) = channel();
802     ///
803     /// // This send is always successful
804     /// tx.send(1).unwrap();
805     ///
806     /// // This send will fail because the receiver is gone
807     /// drop(rx);
808     /// assert_eq!(tx.send(1).unwrap_err().0, 1);
809     /// ```
810     #[stable(feature = "rust1", since = "1.0.0")]
811     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
812         let (new_inner, ret) = match *unsafe { self.inner() } {
813             Flavor::Oneshot(ref p) => {
814                 if !p.sent() {
815                     return p.send(t).map_err(SendError);
816                 } else {
817                     let a = Arc::new(stream::Packet::new());
818                     let rx = Receiver::new(Flavor::Stream(a.clone()));
819                     match p.upgrade(rx) {
820                         oneshot::UpSuccess => {
821                             let ret = a.send(t);
822                             (a, ret)
823                         }
824                         oneshot::UpDisconnected => (a, Err(t)),
825                         oneshot::UpWoke(token) => {
826                             // This send cannot panic because the thread is
827                             // asleep (we're looking at it), so the receiver
828                             // can't go away.
829                             a.send(t).ok().unwrap();
830                             token.signal();
831                             (a, Ok(()))
832                         }
833                     }
834                 }
835             }
836             Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
837             Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
838             Flavor::Sync(..) => unreachable!(),
839         };
840
841         unsafe {
842             let tmp = Sender::new(Flavor::Stream(new_inner));
843             mem::swap(self.inner_mut(), tmp.inner_mut());
844         }
845         ret.map_err(SendError)
846     }
847 }
848
849 #[stable(feature = "rust1", since = "1.0.0")]
850 impl<T> Clone for Sender<T> {
851     fn clone(&self) -> Sender<T> {
852         let packet = match *unsafe { self.inner() } {
853             Flavor::Oneshot(ref p) => {
854                 let a = Arc::new(shared::Packet::new());
855                 {
856                     let guard = a.postinit_lock();
857                     let rx = Receiver::new(Flavor::Shared(a.clone()));
858                     let sleeper = match p.upgrade(rx) {
859                         oneshot::UpSuccess |
860                         oneshot::UpDisconnected => None,
861                         oneshot::UpWoke(task) => Some(task),
862                     };
863                     a.inherit_blocker(sleeper, guard);
864                 }
865                 a
866             }
867             Flavor::Stream(ref p) => {
868                 let a = Arc::new(shared::Packet::new());
869                 {
870                     let guard = a.postinit_lock();
871                     let rx = Receiver::new(Flavor::Shared(a.clone()));
872                     let sleeper = match p.upgrade(rx) {
873                         stream::UpSuccess |
874                         stream::UpDisconnected => None,
875                         stream::UpWoke(task) => Some(task),
876                     };
877                     a.inherit_blocker(sleeper, guard);
878                 }
879                 a
880             }
881             Flavor::Shared(ref p) => {
882                 p.clone_chan();
883                 return Sender::new(Flavor::Shared(p.clone()));
884             }
885             Flavor::Sync(..) => unreachable!(),
886         };
887
888         unsafe {
889             let tmp = Sender::new(Flavor::Shared(packet.clone()));
890             mem::swap(self.inner_mut(), tmp.inner_mut());
891         }
892         Sender::new(Flavor::Shared(packet))
893     }
894 }
895
896 #[stable(feature = "rust1", since = "1.0.0")]
897 impl<T> Drop for Sender<T> {
898     fn drop(&mut self) {
899         match *unsafe { self.inner() } {
900             Flavor::Oneshot(ref p) => p.drop_chan(),
901             Flavor::Stream(ref p) => p.drop_chan(),
902             Flavor::Shared(ref p) => p.drop_chan(),
903             Flavor::Sync(..) => unreachable!(),
904         }
905     }
906 }
907
908 #[stable(feature = "mpsc_debug", since = "1.8.0")]
909 impl<T> fmt::Debug for Sender<T> {
910     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
911         f.debug_struct("Sender").finish()
912     }
913 }
914
915 ////////////////////////////////////////////////////////////////////////////////
916 // SyncSender
917 ////////////////////////////////////////////////////////////////////////////////
918
919 impl<T> SyncSender<T> {
920     fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
921         SyncSender { inner }
922     }
923
924     /// Sends a value on this synchronous channel.
925     ///
926     /// This function will *block* until space in the internal buffer becomes
927     /// available or a receiver is available to hand off the message to.
928     ///
929     /// Note that a successful send does *not* guarantee that the receiver will
930     /// ever see the data if there is a buffer on this channel. Items may be
931     /// enqueued in the internal buffer for the receiver to receive at a later
932     /// time. If the buffer size is 0, however, the channel becomes a rendezvous
933     /// channel and it guarantees that the receiver has indeed received
934     /// the data if this function returns success.
935     ///
936     /// This function will never panic, but it may return [`Err`] if the
937     /// [`Receiver`] has disconnected and is no longer able to receive
938     /// information.
939     ///
940     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
941     /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
942     ///
943     /// # Examples
944     ///
945     /// ```rust
946     /// use std::sync::mpsc::sync_channel;
947     /// use std::thread;
948     ///
949     /// // Create a rendezvous sync_channel with buffer size 0
950     /// let (sync_sender, receiver) = sync_channel(0);
951     ///
952     /// thread::spawn(move || {
953     ///    println!("sending message...");
954     ///    sync_sender.send(1).unwrap();
955     ///    // Thread is now blocked until the message is received
956     ///
957     ///    println!("...message received!");
958     /// });
959     ///
960     /// let msg = receiver.recv().unwrap();
961     /// assert_eq!(1, msg);
962     /// ```
963     #[stable(feature = "rust1", since = "1.0.0")]
964     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
965         self.inner.send(t).map_err(SendError)
966     }
967
968     /// Attempts to send a value on this channel without blocking.
969     ///
970     /// This method differs from [`send`] by returning immediately if the
971     /// channel's buffer is full or no receiver is waiting to acquire some
972     /// data. Compared with [`send`], this function has two failure cases
973     /// instead of one (one for disconnection, one for a full buffer).
974     ///
975     /// See [`send`] for notes about guarantees of whether the
976     /// receiver has received the data or not if this function is successful.
977     ///
978     /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
979     ///
980     /// # Examples
981     ///
982     /// ```rust
983     /// use std::sync::mpsc::sync_channel;
984     /// use std::thread;
985     ///
986     /// // Create a sync_channel with buffer size 1
987     /// let (sync_sender, receiver) = sync_channel(1);
988     /// let sync_sender2 = sync_sender.clone();
989     ///
990     /// // First thread owns sync_sender
991     /// thread::spawn(move || {
992     ///     sync_sender.send(1).unwrap();
993     ///     sync_sender.send(2).unwrap();
994     ///     // Thread blocked
995     /// });
996     ///
997     /// // Second thread owns sync_sender2
998     /// thread::spawn(move || {
999     ///     // This will return an error and send
1000     ///     // no message if the buffer is full
1001     ///     let _ = sync_sender2.try_send(3);
1002     /// });
1003     ///
1004     /// let mut msg;
1005     /// msg = receiver.recv().unwrap();
1006     /// println!("message {} received", msg);
1007     ///
1008     /// msg = receiver.recv().unwrap();
1009     /// println!("message {} received", msg);
1010     ///
1011     /// // Third message may have never been sent
1012     /// match receiver.try_recv() {
1013     ///     Ok(msg) => println!("message {} received", msg),
1014     ///     Err(_) => println!("the third message was never sent"),
1015     /// }
1016     /// ```
1017     #[stable(feature = "rust1", since = "1.0.0")]
1018     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
1019         self.inner.try_send(t)
1020     }
1021 }
1022
1023 #[stable(feature = "rust1", since = "1.0.0")]
1024 impl<T> Clone for SyncSender<T> {
1025     fn clone(&self) -> SyncSender<T> {
1026         self.inner.clone_chan();
1027         SyncSender::new(self.inner.clone())
1028     }
1029 }
1030
1031 #[stable(feature = "rust1", since = "1.0.0")]
1032 impl<T> Drop for SyncSender<T> {
1033     fn drop(&mut self) {
1034         self.inner.drop_chan();
1035     }
1036 }
1037
1038 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1039 impl<T> fmt::Debug for SyncSender<T> {
1040     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1041         f.debug_struct("SyncSender").finish()
1042     }
1043 }
1044
1045 ////////////////////////////////////////////////////////////////////////////////
1046 // Receiver
1047 ////////////////////////////////////////////////////////////////////////////////
1048
1049 impl<T> Receiver<T> {
1050     fn new(inner: Flavor<T>) -> Receiver<T> {
1051         Receiver { inner: UnsafeCell::new(inner) }
1052     }
1053
1054     /// Attempts to return a pending value on this receiver without blocking.
1055     ///
1056     /// This method will never block the caller in order to wait for data to
1057     /// become available. Instead, this will always return immediately with a
1058     /// possible option of pending data on the channel.
1059     ///
1060     /// This is useful for a flavor of "optimistic check" before deciding to
1061     /// block on a receiver.
1062     ///
1063     /// Compared with [`recv`], this function has two failure cases instead of one
1064     /// (one for disconnection, one for an empty buffer).
1065     ///
1066     /// [`recv`]: struct.Receiver.html#method.recv
1067     ///
1068     /// # Examples
1069     ///
1070     /// ```rust
1071     /// use std::sync::mpsc::{Receiver, channel};
1072     ///
1073     /// let (_, receiver): (_, Receiver<i32>) = channel();
1074     ///
1075     /// assert!(receiver.try_recv().is_err());
1076     /// ```
1077     #[stable(feature = "rust1", since = "1.0.0")]
1078     pub fn try_recv(&self) -> Result<T, TryRecvError> {
1079         loop {
1080             let new_port = match *unsafe { self.inner() } {
1081                 Flavor::Oneshot(ref p) => {
1082                     match p.try_recv() {
1083                         Ok(t) => return Ok(t),
1084                         Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1085                         Err(oneshot::Disconnected) => {
1086                             return Err(TryRecvError::Disconnected)
1087                         }
1088                         Err(oneshot::Upgraded(rx)) => rx,
1089                     }
1090                 }
1091                 Flavor::Stream(ref p) => {
1092                     match p.try_recv() {
1093                         Ok(t) => return Ok(t),
1094                         Err(stream::Empty) => return Err(TryRecvError::Empty),
1095                         Err(stream::Disconnected) => {
1096                             return Err(TryRecvError::Disconnected)
1097                         }
1098                         Err(stream::Upgraded(rx)) => rx,
1099                     }
1100                 }
1101                 Flavor::Shared(ref p) => {
1102                     match p.try_recv() {
1103                         Ok(t) => return Ok(t),
1104                         Err(shared::Empty) => return Err(TryRecvError::Empty),
1105                         Err(shared::Disconnected) => {
1106                             return Err(TryRecvError::Disconnected)
1107                         }
1108                     }
1109                 }
1110                 Flavor::Sync(ref p) => {
1111                     match p.try_recv() {
1112                         Ok(t) => return Ok(t),
1113                         Err(sync::Empty) => return Err(TryRecvError::Empty),
1114                         Err(sync::Disconnected) => {
1115                             return Err(TryRecvError::Disconnected)
1116                         }
1117                     }
1118                 }
1119             };
1120             unsafe {
1121                 mem::swap(self.inner_mut(),
1122                           new_port.inner_mut());
1123             }
1124         }
1125     }
1126
1127     /// Attempts to wait for a value on this receiver, returning an error if the
1128     /// corresponding channel has hung up.
1129     ///
1130     /// This function will always block the current thread if there is no data
1131     /// available and it's possible for more data to be sent. Once a message is
1132     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1133     /// receiver will wake up and return that message.
1134     ///
1135     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1136     /// this call is blocking, this call will wake up and return [`Err`] to
1137     /// indicate that no more messages can ever be received on this channel.
1138     /// However, since channels are buffered, messages sent before the disconnect
1139     /// will still be properly received.
1140     ///
1141     /// [`Sender`]: struct.Sender.html
1142     /// [`SyncSender`]: struct.SyncSender.html
1143     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1144     ///
1145     /// # Examples
1146     ///
1147     /// ```
1148     /// use std::sync::mpsc;
1149     /// use std::thread;
1150     ///
1151     /// let (send, recv) = mpsc::channel();
1152     /// let handle = thread::spawn(move || {
1153     ///     send.send(1u8).unwrap();
1154     /// });
1155     ///
1156     /// handle.join().unwrap();
1157     ///
1158     /// assert_eq!(Ok(1), recv.recv());
1159     /// ```
1160     ///
1161     /// Buffering behavior:
1162     ///
1163     /// ```
1164     /// use std::sync::mpsc;
1165     /// use std::thread;
1166     /// use std::sync::mpsc::RecvError;
1167     ///
1168     /// let (send, recv) = mpsc::channel();
1169     /// let handle = thread::spawn(move || {
1170     ///     send.send(1u8).unwrap();
1171     ///     send.send(2).unwrap();
1172     ///     send.send(3).unwrap();
1173     ///     drop(send);
1174     /// });
1175     ///
1176     /// // wait for the thread to join so we ensure the sender is dropped
1177     /// handle.join().unwrap();
1178     ///
1179     /// assert_eq!(Ok(1), recv.recv());
1180     /// assert_eq!(Ok(2), recv.recv());
1181     /// assert_eq!(Ok(3), recv.recv());
1182     /// assert_eq!(Err(RecvError), recv.recv());
1183     /// ```
1184     #[stable(feature = "rust1", since = "1.0.0")]
1185     pub fn recv(&self) -> Result<T, RecvError> {
1186         loop {
1187             let new_port = match *unsafe { self.inner() } {
1188                 Flavor::Oneshot(ref p) => {
1189                     match p.recv(None) {
1190                         Ok(t) => return Ok(t),
1191                         Err(oneshot::Disconnected) => return Err(RecvError),
1192                         Err(oneshot::Upgraded(rx)) => rx,
1193                         Err(oneshot::Empty) => unreachable!(),
1194                     }
1195                 }
1196                 Flavor::Stream(ref p) => {
1197                     match p.recv(None) {
1198                         Ok(t) => return Ok(t),
1199                         Err(stream::Disconnected) => return Err(RecvError),
1200                         Err(stream::Upgraded(rx)) => rx,
1201                         Err(stream::Empty) => unreachable!(),
1202                     }
1203                 }
1204                 Flavor::Shared(ref p) => {
1205                     match p.recv(None) {
1206                         Ok(t) => return Ok(t),
1207                         Err(shared::Disconnected) => return Err(RecvError),
1208                         Err(shared::Empty) => unreachable!(),
1209                     }
1210                 }
1211                 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1212             };
1213             unsafe {
1214                 mem::swap(self.inner_mut(), new_port.inner_mut());
1215             }
1216         }
1217     }
1218
1219     /// Attempts to wait for a value on this receiver, returning an error if the
1220     /// corresponding channel has hung up, or if it waits more than `timeout`.
1221     ///
1222     /// This function will always block the current thread if there is no data
1223     /// available and it's possible for more data to be sent. Once a message is
1224     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1225     /// receiver will wake up and return that message.
1226     ///
1227     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1228     /// this call is blocking, this call will wake up and return [`Err`] to
1229     /// indicate that no more messages can ever be received on this channel.
1230     /// However, since channels are buffered, messages sent before the disconnect
1231     /// will still be properly received.
1232     ///
1233     /// [`Sender`]: struct.Sender.html
1234     /// [`SyncSender`]: struct.SyncSender.html
1235     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1236     ///
1237     /// # Known Issues
1238     ///
1239     /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
1240     /// to panic unexpectedly with the following example:
1241     ///
1242     /// ```no_run
1243     /// use std::sync::mpsc::channel;
1244     /// use std::thread;
1245     /// use std::time::Duration;
1246     ///
1247     /// let (tx, rx) = channel::<String>();
1248     ///
1249     /// thread::spawn(move || {
1250     ///     let d = Duration::from_millis(10);
1251     ///     loop {
1252     ///         println!("recv");
1253     ///         let _r = rx.recv_timeout(d);
1254     ///     }
1255     /// });
1256     ///
1257     /// thread::sleep(Duration::from_millis(100));
1258     /// let _c1 = tx.clone();
1259     ///
1260     /// thread::sleep(Duration::from_secs(1));
1261     /// ```
1262     ///
1263     /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
1264     ///
1265     /// # Examples
1266     ///
1267     /// Successfully receiving value before encountering timeout:
1268     ///
1269     /// ```no_run
1270     /// use std::thread;
1271     /// use std::time::Duration;
1272     /// use std::sync::mpsc;
1273     ///
1274     /// let (send, recv) = mpsc::channel();
1275     ///
1276     /// thread::spawn(move || {
1277     ///     send.send('a').unwrap();
1278     /// });
1279     ///
1280     /// assert_eq!(
1281     ///     recv.recv_timeout(Duration::from_millis(400)),
1282     ///     Ok('a')
1283     /// );
1284     /// ```
1285     ///
1286     /// Receiving an error upon reaching timeout:
1287     ///
1288     /// ```no_run
1289     /// use std::thread;
1290     /// use std::time::Duration;
1291     /// use std::sync::mpsc;
1292     ///
1293     /// let (send, recv) = mpsc::channel();
1294     ///
1295     /// thread::spawn(move || {
1296     ///     thread::sleep(Duration::from_millis(800));
1297     ///     send.send('a').unwrap();
1298     /// });
1299     ///
1300     /// assert_eq!(
1301     ///     recv.recv_timeout(Duration::from_millis(400)),
1302     ///     Err(mpsc::RecvTimeoutError::Timeout)
1303     /// );
1304     /// ```
1305     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1306     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1307         // Do an optimistic try_recv to avoid the performance impact of
1308         // Instant::now() in the full-channel case.
1309         match self.try_recv() {
1310             Ok(result) => Ok(result),
1311             Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
1312             Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
1313                 Some(deadline) => self.recv_deadline(deadline),
1314                 // So far in the future that it's practically the same as waiting indefinitely.
1315                 None => self.recv().map_err(RecvTimeoutError::from),
1316             },
1317         }
1318     }
1319
1320     /// Attempts to wait for a value on this receiver, returning an error if the
1321     /// corresponding channel has hung up, or if `deadline` is reached.
1322     ///
1323     /// This function will always block the current thread if there is no data
1324     /// available and it's possible for more data to be sent. Once a message is
1325     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1326     /// receiver will wake up and return that message.
1327     ///
1328     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1329     /// this call is blocking, this call will wake up and return [`Err`] to
1330     /// indicate that no more messages can ever be received on this channel.
1331     /// However, since channels are buffered, messages sent before the disconnect
1332     /// will still be properly received.
1333     ///
1334     /// [`Sender`]: struct.Sender.html
1335     /// [`SyncSender`]: struct.SyncSender.html
1336     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1337     ///
1338     /// # Examples
1339     ///
1340     /// Successfully receiving value before reaching deadline:
1341     ///
1342     /// ```no_run
1343     /// #![feature(deadline_api)]
1344     /// use std::thread;
1345     /// use std::time::{Duration, Instant};
1346     /// use std::sync::mpsc;
1347     ///
1348     /// let (send, recv) = mpsc::channel();
1349     ///
1350     /// thread::spawn(move || {
1351     ///     send.send('a').unwrap();
1352     /// });
1353     ///
1354     /// assert_eq!(
1355     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1356     ///     Ok('a')
1357     /// );
1358     /// ```
1359     ///
1360     /// Receiving an error upon reaching deadline:
1361     ///
1362     /// ```no_run
1363     /// #![feature(deadline_api)]
1364     /// use std::thread;
1365     /// use std::time::{Duration, Instant};
1366     /// use std::sync::mpsc;
1367     ///
1368     /// let (send, recv) = mpsc::channel();
1369     ///
1370     /// thread::spawn(move || {
1371     ///     thread::sleep(Duration::from_millis(800));
1372     ///     send.send('a').unwrap();
1373     /// });
1374     ///
1375     /// assert_eq!(
1376     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1377     ///     Err(mpsc::RecvTimeoutError::Timeout)
1378     /// );
1379     /// ```
1380     #[unstable(feature = "deadline_api", issue = "46316")]
1381     pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1382         use self::RecvTimeoutError::*;
1383
1384         loop {
1385             let port_or_empty = match *unsafe { self.inner() } {
1386                 Flavor::Oneshot(ref p) => {
1387                     match p.recv(Some(deadline)) {
1388                         Ok(t) => return Ok(t),
1389                         Err(oneshot::Disconnected) => return Err(Disconnected),
1390                         Err(oneshot::Upgraded(rx)) => Some(rx),
1391                         Err(oneshot::Empty) => None,
1392                     }
1393                 }
1394                 Flavor::Stream(ref p) => {
1395                     match p.recv(Some(deadline)) {
1396                         Ok(t) => return Ok(t),
1397                         Err(stream::Disconnected) => return Err(Disconnected),
1398                         Err(stream::Upgraded(rx)) => Some(rx),
1399                         Err(stream::Empty) => None,
1400                     }
1401                 }
1402                 Flavor::Shared(ref p) => {
1403                     match p.recv(Some(deadline)) {
1404                         Ok(t) => return Ok(t),
1405                         Err(shared::Disconnected) => return Err(Disconnected),
1406                         Err(shared::Empty) => None,
1407                     }
1408                 }
1409                 Flavor::Sync(ref p) => {
1410                     match p.recv(Some(deadline)) {
1411                         Ok(t) => return Ok(t),
1412                         Err(sync::Disconnected) => return Err(Disconnected),
1413                         Err(sync::Empty) => None,
1414                     }
1415                 }
1416             };
1417
1418             if let Some(new_port) = port_or_empty {
1419                 unsafe {
1420                     mem::swap(self.inner_mut(), new_port.inner_mut());
1421                 }
1422             }
1423
1424             // If we're already passed the deadline, and we're here without
1425             // data, return a timeout, else try again.
1426             if Instant::now() >= deadline {
1427                 return Err(Timeout);
1428             }
1429         }
1430     }
1431
1432     /// Returns an iterator that will block waiting for messages, but never
1433     /// [`panic!`]. It will return [`None`] when the channel has hung up.
1434     ///
1435     /// [`panic!`]: ../../../std/macro.panic.html
1436     /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1437     ///
1438     /// # Examples
1439     ///
1440     /// ```rust
1441     /// use std::sync::mpsc::channel;
1442     /// use std::thread;
1443     ///
1444     /// let (send, recv) = channel();
1445     ///
1446     /// thread::spawn(move || {
1447     ///     send.send(1).unwrap();
1448     ///     send.send(2).unwrap();
1449     ///     send.send(3).unwrap();
1450     /// });
1451     ///
1452     /// let mut iter = recv.iter();
1453     /// assert_eq!(iter.next(), Some(1));
1454     /// assert_eq!(iter.next(), Some(2));
1455     /// assert_eq!(iter.next(), Some(3));
1456     /// assert_eq!(iter.next(), None);
1457     /// ```
1458     #[stable(feature = "rust1", since = "1.0.0")]
1459     pub fn iter(&self) -> Iter<'_, T> {
1460         Iter { rx: self }
1461     }
1462
1463     /// Returns an iterator that will attempt to yield all pending values.
1464     /// It will return `None` if there are no more pending values or if the
1465     /// channel has hung up. The iterator will never [`panic!`] or block the
1466     /// user by waiting for values.
1467     ///
1468     /// [`panic!`]: ../../../std/macro.panic.html
1469     ///
1470     /// # Examples
1471     ///
1472     /// ```no_run
1473     /// use std::sync::mpsc::channel;
1474     /// use std::thread;
1475     /// use std::time::Duration;
1476     ///
1477     /// let (sender, receiver) = channel();
1478     ///
1479     /// // nothing is in the buffer yet
1480     /// assert!(receiver.try_iter().next().is_none());
1481     ///
1482     /// thread::spawn(move || {
1483     ///     thread::sleep(Duration::from_secs(1));
1484     ///     sender.send(1).unwrap();
1485     ///     sender.send(2).unwrap();
1486     ///     sender.send(3).unwrap();
1487     /// });
1488     ///
1489     /// // nothing is in the buffer yet
1490     /// assert!(receiver.try_iter().next().is_none());
1491     ///
1492     /// // block for two seconds
1493     /// thread::sleep(Duration::from_secs(2));
1494     ///
1495     /// let mut iter = receiver.try_iter();
1496     /// assert_eq!(iter.next(), Some(1));
1497     /// assert_eq!(iter.next(), Some(2));
1498     /// assert_eq!(iter.next(), Some(3));
1499     /// assert_eq!(iter.next(), None);
1500     /// ```
1501     #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1502     pub fn try_iter(&self) -> TryIter<'_, T> {
1503         TryIter { rx: self }
1504     }
1505
1506 }
1507
1508 #[stable(feature = "rust1", since = "1.0.0")]
1509 impl<'a, T> Iterator for Iter<'a, T> {
1510     type Item = T;
1511
1512     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1513 }
1514
1515 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1516 impl<'a, T> Iterator for TryIter<'a, T> {
1517     type Item = T;
1518
1519     fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
1520 }
1521
1522 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1523 impl<'a, T> IntoIterator for &'a Receiver<T> {
1524     type Item = T;
1525     type IntoIter = Iter<'a, T>;
1526
1527     fn into_iter(self) -> Iter<'a, T> { self.iter() }
1528 }
1529
1530 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1531 impl<T> Iterator for IntoIter<T> {
1532     type Item = T;
1533     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1534 }
1535
1536 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1537 impl <T> IntoIterator for Receiver<T> {
1538     type Item = T;
1539     type IntoIter = IntoIter<T>;
1540
1541     fn into_iter(self) -> IntoIter<T> {
1542         IntoIter { rx: self }
1543     }
1544 }
1545
1546 #[stable(feature = "rust1", since = "1.0.0")]
1547 impl<T> Drop for Receiver<T> {
1548     fn drop(&mut self) {
1549         match *unsafe { self.inner() } {
1550             Flavor::Oneshot(ref p) => p.drop_port(),
1551             Flavor::Stream(ref p) => p.drop_port(),
1552             Flavor::Shared(ref p) => p.drop_port(),
1553             Flavor::Sync(ref p) => p.drop_port(),
1554         }
1555     }
1556 }
1557
1558 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1559 impl<T> fmt::Debug for Receiver<T> {
1560     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1561         f.debug_struct("Receiver").finish()
1562     }
1563 }
1564
1565 #[stable(feature = "rust1", since = "1.0.0")]
1566 impl<T> fmt::Debug for SendError<T> {
1567     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1568         "SendError(..)".fmt(f)
1569     }
1570 }
1571
1572 #[stable(feature = "rust1", since = "1.0.0")]
1573 impl<T> fmt::Display for SendError<T> {
1574     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1575         "sending on a closed channel".fmt(f)
1576     }
1577 }
1578
1579 #[stable(feature = "rust1", since = "1.0.0")]
1580 impl<T: Send> error::Error for SendError<T> {
1581     fn description(&self) -> &str {
1582         "sending on a closed channel"
1583     }
1584 }
1585
1586 #[stable(feature = "rust1", since = "1.0.0")]
1587 impl<T> fmt::Debug for TrySendError<T> {
1588     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1589         match *self {
1590             TrySendError::Full(..) => "Full(..)".fmt(f),
1591             TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1592         }
1593     }
1594 }
1595
1596 #[stable(feature = "rust1", since = "1.0.0")]
1597 impl<T> fmt::Display for TrySendError<T> {
1598     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1599         match *self {
1600             TrySendError::Full(..) => {
1601                 "sending on a full channel".fmt(f)
1602             }
1603             TrySendError::Disconnected(..) => {
1604                 "sending on a closed channel".fmt(f)
1605             }
1606         }
1607     }
1608 }
1609
1610 #[stable(feature = "rust1", since = "1.0.0")]
1611 impl<T: Send> error::Error for TrySendError<T> {
1612
1613     fn description(&self) -> &str {
1614         match *self {
1615             TrySendError::Full(..) => {
1616                 "sending on a full channel"
1617             }
1618             TrySendError::Disconnected(..) => {
1619                 "sending on a closed channel"
1620             }
1621         }
1622     }
1623 }
1624
1625 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1626 impl<T> From<SendError<T>> for TrySendError<T> {
1627     fn from(err: SendError<T>) -> TrySendError<T> {
1628         match err {
1629             SendError(t) => TrySendError::Disconnected(t),
1630         }
1631     }
1632 }
1633
1634 #[stable(feature = "rust1", since = "1.0.0")]
1635 impl fmt::Display for RecvError {
1636     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1637         "receiving on a closed channel".fmt(f)
1638     }
1639 }
1640
1641 #[stable(feature = "rust1", since = "1.0.0")]
1642 impl error::Error for RecvError {
1643
1644     fn description(&self) -> &str {
1645         "receiving on a closed channel"
1646     }
1647 }
1648
1649 #[stable(feature = "rust1", since = "1.0.0")]
1650 impl fmt::Display for TryRecvError {
1651     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1652         match *self {
1653             TryRecvError::Empty => {
1654                 "receiving on an empty channel".fmt(f)
1655             }
1656             TryRecvError::Disconnected => {
1657                 "receiving on a closed channel".fmt(f)
1658             }
1659         }
1660     }
1661 }
1662
1663 #[stable(feature = "rust1", since = "1.0.0")]
1664 impl error::Error for TryRecvError {
1665
1666     fn description(&self) -> &str {
1667         match *self {
1668             TryRecvError::Empty => {
1669                 "receiving on an empty channel"
1670             }
1671             TryRecvError::Disconnected => {
1672                 "receiving on a closed channel"
1673             }
1674         }
1675     }
1676 }
1677
1678 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1679 impl From<RecvError> for TryRecvError {
1680     fn from(err: RecvError) -> TryRecvError {
1681         match err {
1682             RecvError => TryRecvError::Disconnected,
1683         }
1684     }
1685 }
1686
1687 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1688 impl fmt::Display for RecvTimeoutError {
1689     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1690         match *self {
1691             RecvTimeoutError::Timeout => {
1692                 "timed out waiting on channel".fmt(f)
1693             }
1694             RecvTimeoutError::Disconnected => {
1695                 "channel is empty and sending half is closed".fmt(f)
1696             }
1697         }
1698     }
1699 }
1700
1701 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1702 impl error::Error for RecvTimeoutError {
1703     fn description(&self) -> &str {
1704         match *self {
1705             RecvTimeoutError::Timeout => {
1706                 "timed out waiting on channel"
1707             }
1708             RecvTimeoutError::Disconnected => {
1709                 "channel is empty and sending half is closed"
1710             }
1711         }
1712     }
1713 }
1714
1715 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1716 impl From<RecvError> for RecvTimeoutError {
1717     fn from(err: RecvError) -> RecvTimeoutError {
1718         match err {
1719             RecvError => RecvTimeoutError::Disconnected,
1720         }
1721     }
1722 }
1723
1724 #[cfg(all(test, not(target_os = "emscripten")))]
1725 mod tests {
1726     use super::*;
1727     use crate::env;
1728     use crate::thread;
1729     use crate::time::{Duration, Instant};
1730
1731     pub fn stress_factor() -> usize {
1732         match env::var("RUST_TEST_STRESS") {
1733             Ok(val) => val.parse().unwrap(),
1734             Err(..) => 1,
1735         }
1736     }
1737
1738     #[test]
1739     fn smoke() {
1740         let (tx, rx) = channel::<i32>();
1741         tx.send(1).unwrap();
1742         assert_eq!(rx.recv().unwrap(), 1);
1743     }
1744
1745     #[test]
1746     fn drop_full() {
1747         let (tx, _rx) = channel::<Box<isize>>();
1748         tx.send(box 1).unwrap();
1749     }
1750
1751     #[test]
1752     fn drop_full_shared() {
1753         let (tx, _rx) = channel::<Box<isize>>();
1754         drop(tx.clone());
1755         drop(tx.clone());
1756         tx.send(box 1).unwrap();
1757     }
1758
1759     #[test]
1760     fn smoke_shared() {
1761         let (tx, rx) = channel::<i32>();
1762         tx.send(1).unwrap();
1763         assert_eq!(rx.recv().unwrap(), 1);
1764         let tx = tx.clone();
1765         tx.send(1).unwrap();
1766         assert_eq!(rx.recv().unwrap(), 1);
1767     }
1768
1769     #[test]
1770     fn smoke_threads() {
1771         let (tx, rx) = channel::<i32>();
1772         let _t = thread::spawn(move|| {
1773             tx.send(1).unwrap();
1774         });
1775         assert_eq!(rx.recv().unwrap(), 1);
1776     }
1777
1778     #[test]
1779     fn smoke_port_gone() {
1780         let (tx, rx) = channel::<i32>();
1781         drop(rx);
1782         assert!(tx.send(1).is_err());
1783     }
1784
1785     #[test]
1786     fn smoke_shared_port_gone() {
1787         let (tx, rx) = channel::<i32>();
1788         drop(rx);
1789         assert!(tx.send(1).is_err())
1790     }
1791
1792     #[test]
1793     fn smoke_shared_port_gone2() {
1794         let (tx, rx) = channel::<i32>();
1795         drop(rx);
1796         let tx2 = tx.clone();
1797         drop(tx);
1798         assert!(tx2.send(1).is_err());
1799     }
1800
1801     #[test]
1802     fn port_gone_concurrent() {
1803         let (tx, rx) = channel::<i32>();
1804         let _t = thread::spawn(move|| {
1805             rx.recv().unwrap();
1806         });
1807         while tx.send(1).is_ok() {}
1808     }
1809
1810     #[test]
1811     fn port_gone_concurrent_shared() {
1812         let (tx, rx) = channel::<i32>();
1813         let tx2 = tx.clone();
1814         let _t = thread::spawn(move|| {
1815             rx.recv().unwrap();
1816         });
1817         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1818     }
1819
1820     #[test]
1821     fn smoke_chan_gone() {
1822         let (tx, rx) = channel::<i32>();
1823         drop(tx);
1824         assert!(rx.recv().is_err());
1825     }
1826
1827     #[test]
1828     fn smoke_chan_gone_shared() {
1829         let (tx, rx) = channel::<()>();
1830         let tx2 = tx.clone();
1831         drop(tx);
1832         drop(tx2);
1833         assert!(rx.recv().is_err());
1834     }
1835
1836     #[test]
1837     fn chan_gone_concurrent() {
1838         let (tx, rx) = channel::<i32>();
1839         let _t = thread::spawn(move|| {
1840             tx.send(1).unwrap();
1841             tx.send(1).unwrap();
1842         });
1843         while rx.recv().is_ok() {}
1844     }
1845
1846     #[test]
1847     fn stress() {
1848         let (tx, rx) = channel::<i32>();
1849         let t = thread::spawn(move|| {
1850             for _ in 0..10000 { tx.send(1).unwrap(); }
1851         });
1852         for _ in 0..10000 {
1853             assert_eq!(rx.recv().unwrap(), 1);
1854         }
1855         t.join().ok().expect("thread panicked");
1856     }
1857
1858     #[test]
1859     fn stress_shared() {
1860         const AMT: u32 = 10000;
1861         const NTHREADS: u32 = 8;
1862         let (tx, rx) = channel::<i32>();
1863
1864         let t = thread::spawn(move|| {
1865             for _ in 0..AMT * NTHREADS {
1866                 assert_eq!(rx.recv().unwrap(), 1);
1867             }
1868             match rx.try_recv() {
1869                 Ok(..) => panic!(),
1870                 _ => {}
1871             }
1872         });
1873
1874         for _ in 0..NTHREADS {
1875             let tx = tx.clone();
1876             thread::spawn(move|| {
1877                 for _ in 0..AMT { tx.send(1).unwrap(); }
1878             });
1879         }
1880         drop(tx);
1881         t.join().ok().expect("thread panicked");
1882     }
1883
1884     #[test]
1885     fn send_from_outside_runtime() {
1886         let (tx1, rx1) = channel::<()>();
1887         let (tx2, rx2) = channel::<i32>();
1888         let t1 = thread::spawn(move|| {
1889             tx1.send(()).unwrap();
1890             for _ in 0..40 {
1891                 assert_eq!(rx2.recv().unwrap(), 1);
1892             }
1893         });
1894         rx1.recv().unwrap();
1895         let t2 = thread::spawn(move|| {
1896             for _ in 0..40 {
1897                 tx2.send(1).unwrap();
1898             }
1899         });
1900         t1.join().ok().expect("thread panicked");
1901         t2.join().ok().expect("thread panicked");
1902     }
1903
1904     #[test]
1905     fn recv_from_outside_runtime() {
1906         let (tx, rx) = channel::<i32>();
1907         let t = thread::spawn(move|| {
1908             for _ in 0..40 {
1909                 assert_eq!(rx.recv().unwrap(), 1);
1910             }
1911         });
1912         for _ in 0..40 {
1913             tx.send(1).unwrap();
1914         }
1915         t.join().ok().expect("thread panicked");
1916     }
1917
1918     #[test]
1919     fn no_runtime() {
1920         let (tx1, rx1) = channel::<i32>();
1921         let (tx2, rx2) = channel::<i32>();
1922         let t1 = thread::spawn(move|| {
1923             assert_eq!(rx1.recv().unwrap(), 1);
1924             tx2.send(2).unwrap();
1925         });
1926         let t2 = thread::spawn(move|| {
1927             tx1.send(1).unwrap();
1928             assert_eq!(rx2.recv().unwrap(), 2);
1929         });
1930         t1.join().ok().expect("thread panicked");
1931         t2.join().ok().expect("thread panicked");
1932     }
1933
1934     #[test]
1935     fn oneshot_single_thread_close_port_first() {
1936         // Simple test of closing without sending
1937         let (_tx, rx) = channel::<i32>();
1938         drop(rx);
1939     }
1940
1941     #[test]
1942     fn oneshot_single_thread_close_chan_first() {
1943         // Simple test of closing without sending
1944         let (tx, _rx) = channel::<i32>();
1945         drop(tx);
1946     }
1947
1948     #[test]
1949     fn oneshot_single_thread_send_port_close() {
1950         // Testing that the sender cleans up the payload if receiver is closed
1951         let (tx, rx) = channel::<Box<i32>>();
1952         drop(rx);
1953         assert!(tx.send(box 0).is_err());
1954     }
1955
1956     #[test]
1957     fn oneshot_single_thread_recv_chan_close() {
1958         // Receiving on a closed chan will panic
1959         let res = thread::spawn(move|| {
1960             let (tx, rx) = channel::<i32>();
1961             drop(tx);
1962             rx.recv().unwrap();
1963         }).join();
1964         // What is our res?
1965         assert!(res.is_err());
1966     }
1967
1968     #[test]
1969     fn oneshot_single_thread_send_then_recv() {
1970         let (tx, rx) = channel::<Box<i32>>();
1971         tx.send(box 10).unwrap();
1972         assert!(*rx.recv().unwrap() == 10);
1973     }
1974
1975     #[test]
1976     fn oneshot_single_thread_try_send_open() {
1977         let (tx, rx) = channel::<i32>();
1978         assert!(tx.send(10).is_ok());
1979         assert!(rx.recv().unwrap() == 10);
1980     }
1981
1982     #[test]
1983     fn oneshot_single_thread_try_send_closed() {
1984         let (tx, rx) = channel::<i32>();
1985         drop(rx);
1986         assert!(tx.send(10).is_err());
1987     }
1988
1989     #[test]
1990     fn oneshot_single_thread_try_recv_open() {
1991         let (tx, rx) = channel::<i32>();
1992         tx.send(10).unwrap();
1993         assert!(rx.recv() == Ok(10));
1994     }
1995
1996     #[test]
1997     fn oneshot_single_thread_try_recv_closed() {
1998         let (tx, rx) = channel::<i32>();
1999         drop(tx);
2000         assert!(rx.recv().is_err());
2001     }
2002
2003     #[test]
2004     fn oneshot_single_thread_peek_data() {
2005         let (tx, rx) = channel::<i32>();
2006         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2007         tx.send(10).unwrap();
2008         assert_eq!(rx.try_recv(), Ok(10));
2009     }
2010
2011     #[test]
2012     fn oneshot_single_thread_peek_close() {
2013         let (tx, rx) = channel::<i32>();
2014         drop(tx);
2015         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2016         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2017     }
2018
2019     #[test]
2020     fn oneshot_single_thread_peek_open() {
2021         let (_tx, rx) = channel::<i32>();
2022         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2023     }
2024
2025     #[test]
2026     fn oneshot_multi_task_recv_then_send() {
2027         let (tx, rx) = channel::<Box<i32>>();
2028         let _t = thread::spawn(move|| {
2029             assert!(*rx.recv().unwrap() == 10);
2030         });
2031
2032         tx.send(box 10).unwrap();
2033     }
2034
2035     #[test]
2036     fn oneshot_multi_task_recv_then_close() {
2037         let (tx, rx) = channel::<Box<i32>>();
2038         let _t = thread::spawn(move|| {
2039             drop(tx);
2040         });
2041         let res = thread::spawn(move|| {
2042             assert!(*rx.recv().unwrap() == 10);
2043         }).join();
2044         assert!(res.is_err());
2045     }
2046
2047     #[test]
2048     fn oneshot_multi_thread_close_stress() {
2049         for _ in 0..stress_factor() {
2050             let (tx, rx) = channel::<i32>();
2051             let _t = thread::spawn(move|| {
2052                 drop(rx);
2053             });
2054             drop(tx);
2055         }
2056     }
2057
2058     #[test]
2059     fn oneshot_multi_thread_send_close_stress() {
2060         for _ in 0..stress_factor() {
2061             let (tx, rx) = channel::<i32>();
2062             let _t = thread::spawn(move|| {
2063                 drop(rx);
2064             });
2065             let _ = thread::spawn(move|| {
2066                 tx.send(1).unwrap();
2067             }).join();
2068         }
2069     }
2070
2071     #[test]
2072     fn oneshot_multi_thread_recv_close_stress() {
2073         for _ in 0..stress_factor() {
2074             let (tx, rx) = channel::<i32>();
2075             thread::spawn(move|| {
2076                 let res = thread::spawn(move|| {
2077                     rx.recv().unwrap();
2078                 }).join();
2079                 assert!(res.is_err());
2080             });
2081             let _t = thread::spawn(move|| {
2082                 thread::spawn(move|| {
2083                     drop(tx);
2084                 });
2085             });
2086         }
2087     }
2088
2089     #[test]
2090     fn oneshot_multi_thread_send_recv_stress() {
2091         for _ in 0..stress_factor() {
2092             let (tx, rx) = channel::<Box<isize>>();
2093             let _t = thread::spawn(move|| {
2094                 tx.send(box 10).unwrap();
2095             });
2096             assert!(*rx.recv().unwrap() == 10);
2097         }
2098     }
2099
2100     #[test]
2101     fn stream_send_recv_stress() {
2102         for _ in 0..stress_factor() {
2103             let (tx, rx) = channel();
2104
2105             send(tx, 0);
2106             recv(rx, 0);
2107
2108             fn send(tx: Sender<Box<i32>>, i: i32) {
2109                 if i == 10 { return }
2110
2111                 thread::spawn(move|| {
2112                     tx.send(box i).unwrap();
2113                     send(tx, i + 1);
2114                 });
2115             }
2116
2117             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2118                 if i == 10 { return }
2119
2120                 thread::spawn(move|| {
2121                     assert!(*rx.recv().unwrap() == i);
2122                     recv(rx, i + 1);
2123                 });
2124             }
2125         }
2126     }
2127
2128     #[test]
2129     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2130     fn oneshot_single_thread_recv_timeout() {
2131         let (tx, rx) = channel();
2132         tx.send(()).unwrap();
2133         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2134         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2135         tx.send(()).unwrap();
2136         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2137     }
2138
2139     #[test]
2140     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2141     fn stress_recv_timeout_two_threads() {
2142         let (tx, rx) = channel();
2143         let stress = stress_factor() + 100;
2144         let timeout = Duration::from_millis(100);
2145
2146         thread::spawn(move || {
2147             for i in 0..stress {
2148                 if i % 2 == 0 {
2149                     thread::sleep(timeout * 2);
2150                 }
2151                 tx.send(1usize).unwrap();
2152             }
2153         });
2154
2155         let mut recv_count = 0;
2156         loop {
2157             match rx.recv_timeout(timeout) {
2158                 Ok(n) => {
2159                     assert_eq!(n, 1usize);
2160                     recv_count += 1;
2161                 }
2162                 Err(RecvTimeoutError::Timeout) => continue,
2163                 Err(RecvTimeoutError::Disconnected) => break,
2164             }
2165         }
2166
2167         assert_eq!(recv_count, stress);
2168     }
2169
2170     #[test]
2171     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2172     fn recv_timeout_upgrade() {
2173         let (tx, rx) = channel::<()>();
2174         let timeout = Duration::from_millis(1);
2175         let _tx_clone = tx.clone();
2176
2177         let start = Instant::now();
2178         assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
2179         assert!(Instant::now() >= start + timeout);
2180     }
2181
2182     #[test]
2183     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2184     fn stress_recv_timeout_shared() {
2185         let (tx, rx) = channel();
2186         let stress = stress_factor() + 100;
2187
2188         for i in 0..stress {
2189             let tx = tx.clone();
2190             thread::spawn(move || {
2191                 thread::sleep(Duration::from_millis(i as u64 * 10));
2192                 tx.send(1usize).unwrap();
2193             });
2194         }
2195
2196         drop(tx);
2197
2198         let mut recv_count = 0;
2199         loop {
2200             match rx.recv_timeout(Duration::from_millis(10)) {
2201                 Ok(n) => {
2202                     assert_eq!(n, 1usize);
2203                     recv_count += 1;
2204                 }
2205                 Err(RecvTimeoutError::Timeout) => continue,
2206                 Err(RecvTimeoutError::Disconnected) => break,
2207             }
2208         }
2209
2210         assert_eq!(recv_count, stress);
2211     }
2212
2213     #[test]
2214     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2215     fn very_long_recv_timeout_wont_panic() {
2216         let (tx, rx) = channel::<()>();
2217         let join_handle = thread::spawn(move || {
2218             rx.recv_timeout(Duration::from_secs(u64::max_value()))
2219         });
2220         thread::sleep(Duration::from_secs(1));
2221         assert!(tx.send(()).is_ok());
2222         assert_eq!(join_handle.join().unwrap(), Ok(()));
2223     }
2224
2225     #[test]
2226     fn recv_a_lot() {
2227         // Regression test that we don't run out of stack in scheduler context
2228         let (tx, rx) = channel();
2229         for _ in 0..10000 { tx.send(()).unwrap(); }
2230         for _ in 0..10000 { rx.recv().unwrap(); }
2231     }
2232
2233     #[test]
2234     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2235     fn shared_recv_timeout() {
2236         let (tx, rx) = channel();
2237         let total = 5;
2238         for _ in 0..total {
2239             let tx = tx.clone();
2240             thread::spawn(move|| {
2241                 tx.send(()).unwrap();
2242             });
2243         }
2244
2245         for _ in 0..total { rx.recv().unwrap(); }
2246
2247         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2248         tx.send(()).unwrap();
2249         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2250     }
2251
2252     #[test]
2253     fn shared_chan_stress() {
2254         let (tx, rx) = channel();
2255         let total = stress_factor() + 100;
2256         for _ in 0..total {
2257             let tx = tx.clone();
2258             thread::spawn(move|| {
2259                 tx.send(()).unwrap();
2260             });
2261         }
2262
2263         for _ in 0..total {
2264             rx.recv().unwrap();
2265         }
2266     }
2267
2268     #[test]
2269     fn test_nested_recv_iter() {
2270         let (tx, rx) = channel::<i32>();
2271         let (total_tx, total_rx) = channel::<i32>();
2272
2273         let _t = thread::spawn(move|| {
2274             let mut acc = 0;
2275             for x in rx.iter() {
2276                 acc += x;
2277             }
2278             total_tx.send(acc).unwrap();
2279         });
2280
2281         tx.send(3).unwrap();
2282         tx.send(1).unwrap();
2283         tx.send(2).unwrap();
2284         drop(tx);
2285         assert_eq!(total_rx.recv().unwrap(), 6);
2286     }
2287
2288     #[test]
2289     fn test_recv_iter_break() {
2290         let (tx, rx) = channel::<i32>();
2291         let (count_tx, count_rx) = channel();
2292
2293         let _t = thread::spawn(move|| {
2294             let mut count = 0;
2295             for x in rx.iter() {
2296                 if count >= 3 {
2297                     break;
2298                 } else {
2299                     count += x;
2300                 }
2301             }
2302             count_tx.send(count).unwrap();
2303         });
2304
2305         tx.send(2).unwrap();
2306         tx.send(2).unwrap();
2307         tx.send(2).unwrap();
2308         let _ = tx.send(2);
2309         drop(tx);
2310         assert_eq!(count_rx.recv().unwrap(), 4);
2311     }
2312
2313     #[test]
2314     fn test_recv_try_iter() {
2315         let (request_tx, request_rx) = channel();
2316         let (response_tx, response_rx) = channel();
2317
2318         // Request `x`s until we have `6`.
2319         let t = thread::spawn(move|| {
2320             let mut count = 0;
2321             loop {
2322                 for x in response_rx.try_iter() {
2323                     count += x;
2324                     if count == 6 {
2325                         return count;
2326                     }
2327                 }
2328                 request_tx.send(()).unwrap();
2329             }
2330         });
2331
2332         for _ in request_rx.iter() {
2333             if response_tx.send(2).is_err() {
2334                 break;
2335             }
2336         }
2337
2338         assert_eq!(t.join().unwrap(), 6);
2339     }
2340
2341     #[test]
2342     fn test_recv_into_iter_owned() {
2343         let mut iter = {
2344           let (tx, rx) = channel::<i32>();
2345           tx.send(1).unwrap();
2346           tx.send(2).unwrap();
2347
2348           rx.into_iter()
2349         };
2350         assert_eq!(iter.next().unwrap(), 1);
2351         assert_eq!(iter.next().unwrap(), 2);
2352         assert_eq!(iter.next().is_none(), true);
2353     }
2354
2355     #[test]
2356     fn test_recv_into_iter_borrowed() {
2357         let (tx, rx) = channel::<i32>();
2358         tx.send(1).unwrap();
2359         tx.send(2).unwrap();
2360         drop(tx);
2361         let mut iter = (&rx).into_iter();
2362         assert_eq!(iter.next().unwrap(), 1);
2363         assert_eq!(iter.next().unwrap(), 2);
2364         assert_eq!(iter.next().is_none(), true);
2365     }
2366
2367     #[test]
2368     fn try_recv_states() {
2369         let (tx1, rx1) = channel::<i32>();
2370         let (tx2, rx2) = channel::<()>();
2371         let (tx3, rx3) = channel::<()>();
2372         let _t = thread::spawn(move|| {
2373             rx2.recv().unwrap();
2374             tx1.send(1).unwrap();
2375             tx3.send(()).unwrap();
2376             rx2.recv().unwrap();
2377             drop(tx1);
2378             tx3.send(()).unwrap();
2379         });
2380
2381         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2382         tx2.send(()).unwrap();
2383         rx3.recv().unwrap();
2384         assert_eq!(rx1.try_recv(), Ok(1));
2385         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2386         tx2.send(()).unwrap();
2387         rx3.recv().unwrap();
2388         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2389     }
2390
2391     // This bug used to end up in a livelock inside of the Receiver destructor
2392     // because the internal state of the Shared packet was corrupted
2393     #[test]
2394     fn destroy_upgraded_shared_port_when_sender_still_active() {
2395         let (tx, rx) = channel();
2396         let (tx2, rx2) = channel();
2397         let _t = thread::spawn(move|| {
2398             rx.recv().unwrap(); // wait on a oneshot
2399             drop(rx);  // destroy a shared
2400             tx2.send(()).unwrap();
2401         });
2402         // make sure the other thread has gone to sleep
2403         for _ in 0..5000 { thread::yield_now(); }
2404
2405         // upgrade to a shared chan and send a message
2406         let t = tx.clone();
2407         drop(tx);
2408         t.send(()).unwrap();
2409
2410         // wait for the child thread to exit before we exit
2411         rx2.recv().unwrap();
2412     }
2413
2414     #[test]
2415     fn issue_32114() {
2416         let (tx, _) = channel();
2417         let _ = tx.send(123);
2418         assert_eq!(tx.send(123), Err(SendError(123)));
2419     }
2420 }
2421
2422 #[cfg(all(test, not(target_os = "emscripten")))]
2423 mod sync_tests {
2424     use super::*;
2425     use crate::env;
2426     use crate::thread;
2427     use crate::time::Duration;
2428
2429     pub fn stress_factor() -> usize {
2430         match env::var("RUST_TEST_STRESS") {
2431             Ok(val) => val.parse().unwrap(),
2432             Err(..) => 1,
2433         }
2434     }
2435
2436     #[test]
2437     fn smoke() {
2438         let (tx, rx) = sync_channel::<i32>(1);
2439         tx.send(1).unwrap();
2440         assert_eq!(rx.recv().unwrap(), 1);
2441     }
2442
2443     #[test]
2444     fn drop_full() {
2445         let (tx, _rx) = sync_channel::<Box<isize>>(1);
2446         tx.send(box 1).unwrap();
2447     }
2448
2449     #[test]
2450     fn smoke_shared() {
2451         let (tx, rx) = sync_channel::<i32>(1);
2452         tx.send(1).unwrap();
2453         assert_eq!(rx.recv().unwrap(), 1);
2454         let tx = tx.clone();
2455         tx.send(1).unwrap();
2456         assert_eq!(rx.recv().unwrap(), 1);
2457     }
2458
2459     #[test]
2460     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2461     fn recv_timeout() {
2462         let (tx, rx) = sync_channel::<i32>(1);
2463         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2464         tx.send(1).unwrap();
2465         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2466     }
2467
2468     #[test]
2469     fn smoke_threads() {
2470         let (tx, rx) = sync_channel::<i32>(0);
2471         let _t = thread::spawn(move|| {
2472             tx.send(1).unwrap();
2473         });
2474         assert_eq!(rx.recv().unwrap(), 1);
2475     }
2476
2477     #[test]
2478     fn smoke_port_gone() {
2479         let (tx, rx) = sync_channel::<i32>(0);
2480         drop(rx);
2481         assert!(tx.send(1).is_err());
2482     }
2483
2484     #[test]
2485     fn smoke_shared_port_gone2() {
2486         let (tx, rx) = sync_channel::<i32>(0);
2487         drop(rx);
2488         let tx2 = tx.clone();
2489         drop(tx);
2490         assert!(tx2.send(1).is_err());
2491     }
2492
2493     #[test]
2494     fn port_gone_concurrent() {
2495         let (tx, rx) = sync_channel::<i32>(0);
2496         let _t = thread::spawn(move|| {
2497             rx.recv().unwrap();
2498         });
2499         while tx.send(1).is_ok() {}
2500     }
2501
2502     #[test]
2503     fn port_gone_concurrent_shared() {
2504         let (tx, rx) = sync_channel::<i32>(0);
2505         let tx2 = tx.clone();
2506         let _t = thread::spawn(move|| {
2507             rx.recv().unwrap();
2508         });
2509         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2510     }
2511
2512     #[test]
2513     fn smoke_chan_gone() {
2514         let (tx, rx) = sync_channel::<i32>(0);
2515         drop(tx);
2516         assert!(rx.recv().is_err());
2517     }
2518
2519     #[test]
2520     fn smoke_chan_gone_shared() {
2521         let (tx, rx) = sync_channel::<()>(0);
2522         let tx2 = tx.clone();
2523         drop(tx);
2524         drop(tx2);
2525         assert!(rx.recv().is_err());
2526     }
2527
2528     #[test]
2529     fn chan_gone_concurrent() {
2530         let (tx, rx) = sync_channel::<i32>(0);
2531         thread::spawn(move|| {
2532             tx.send(1).unwrap();
2533             tx.send(1).unwrap();
2534         });
2535         while rx.recv().is_ok() {}
2536     }
2537
2538     #[test]
2539     fn stress() {
2540         let (tx, rx) = sync_channel::<i32>(0);
2541         thread::spawn(move|| {
2542             for _ in 0..10000 { tx.send(1).unwrap(); }
2543         });
2544         for _ in 0..10000 {
2545             assert_eq!(rx.recv().unwrap(), 1);
2546         }
2547     }
2548
2549     #[test]
2550     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2551     fn stress_recv_timeout_two_threads() {
2552         let (tx, rx) = sync_channel::<i32>(0);
2553
2554         thread::spawn(move|| {
2555             for _ in 0..10000 { tx.send(1).unwrap(); }
2556         });
2557
2558         let mut recv_count = 0;
2559         loop {
2560             match rx.recv_timeout(Duration::from_millis(1)) {
2561                 Ok(v) => {
2562                     assert_eq!(v, 1);
2563                     recv_count += 1;
2564                 },
2565                 Err(RecvTimeoutError::Timeout) => continue,
2566                 Err(RecvTimeoutError::Disconnected) => break,
2567             }
2568         }
2569
2570         assert_eq!(recv_count, 10000);
2571     }
2572
2573     #[test]
2574     #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31
2575     fn stress_recv_timeout_shared() {
2576         const AMT: u32 = 1000;
2577         const NTHREADS: u32 = 8;
2578         let (tx, rx) = sync_channel::<i32>(0);
2579         let (dtx, drx) = sync_channel::<()>(0);
2580
2581         thread::spawn(move|| {
2582             let mut recv_count = 0;
2583             loop {
2584                 match rx.recv_timeout(Duration::from_millis(10)) {
2585                     Ok(v) => {
2586                         assert_eq!(v, 1);
2587                         recv_count += 1;
2588                     },
2589                     Err(RecvTimeoutError::Timeout) => continue,
2590                     Err(RecvTimeoutError::Disconnected) => break,
2591                 }
2592             }
2593
2594             assert_eq!(recv_count, AMT * NTHREADS);
2595             assert!(rx.try_recv().is_err());
2596
2597             dtx.send(()).unwrap();
2598         });
2599
2600         for _ in 0..NTHREADS {
2601             let tx = tx.clone();
2602             thread::spawn(move|| {
2603                 for _ in 0..AMT { tx.send(1).unwrap(); }
2604             });
2605         }
2606
2607         drop(tx);
2608
2609         drx.recv().unwrap();
2610     }
2611
2612     #[test]
2613     fn stress_shared() {
2614         const AMT: u32 = 1000;
2615         const NTHREADS: u32 = 8;
2616         let (tx, rx) = sync_channel::<i32>(0);
2617         let (dtx, drx) = sync_channel::<()>(0);
2618
2619         thread::spawn(move|| {
2620             for _ in 0..AMT * NTHREADS {
2621                 assert_eq!(rx.recv().unwrap(), 1);
2622             }
2623             match rx.try_recv() {
2624                 Ok(..) => panic!(),
2625                 _ => {}
2626             }
2627             dtx.send(()).unwrap();
2628         });
2629
2630         for _ in 0..NTHREADS {
2631             let tx = tx.clone();
2632             thread::spawn(move|| {
2633                 for _ in 0..AMT { tx.send(1).unwrap(); }
2634             });
2635         }
2636         drop(tx);
2637         drx.recv().unwrap();
2638     }
2639
2640     #[test]
2641     fn oneshot_single_thread_close_port_first() {
2642         // Simple test of closing without sending
2643         let (_tx, rx) = sync_channel::<i32>(0);
2644         drop(rx);
2645     }
2646
2647     #[test]
2648     fn oneshot_single_thread_close_chan_first() {
2649         // Simple test of closing without sending
2650         let (tx, _rx) = sync_channel::<i32>(0);
2651         drop(tx);
2652     }
2653
2654     #[test]
2655     fn oneshot_single_thread_send_port_close() {
2656         // Testing that the sender cleans up the payload if receiver is closed
2657         let (tx, rx) = sync_channel::<Box<i32>>(0);
2658         drop(rx);
2659         assert!(tx.send(box 0).is_err());
2660     }
2661
2662     #[test]
2663     fn oneshot_single_thread_recv_chan_close() {
2664         // Receiving on a closed chan will panic
2665         let res = thread::spawn(move|| {
2666             let (tx, rx) = sync_channel::<i32>(0);
2667             drop(tx);
2668             rx.recv().unwrap();
2669         }).join();
2670         // What is our res?
2671         assert!(res.is_err());
2672     }
2673
2674     #[test]
2675     fn oneshot_single_thread_send_then_recv() {
2676         let (tx, rx) = sync_channel::<Box<i32>>(1);
2677         tx.send(box 10).unwrap();
2678         assert!(*rx.recv().unwrap() == 10);
2679     }
2680
2681     #[test]
2682     fn oneshot_single_thread_try_send_open() {
2683         let (tx, rx) = sync_channel::<i32>(1);
2684         assert_eq!(tx.try_send(10), Ok(()));
2685         assert!(rx.recv().unwrap() == 10);
2686     }
2687
2688     #[test]
2689     fn oneshot_single_thread_try_send_closed() {
2690         let (tx, rx) = sync_channel::<i32>(0);
2691         drop(rx);
2692         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2693     }
2694
2695     #[test]
2696     fn oneshot_single_thread_try_send_closed2() {
2697         let (tx, _rx) = sync_channel::<i32>(0);
2698         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2699     }
2700
2701     #[test]
2702     fn oneshot_single_thread_try_recv_open() {
2703         let (tx, rx) = sync_channel::<i32>(1);
2704         tx.send(10).unwrap();
2705         assert!(rx.recv() == Ok(10));
2706     }
2707
2708     #[test]
2709     fn oneshot_single_thread_try_recv_closed() {
2710         let (tx, rx) = sync_channel::<i32>(0);
2711         drop(tx);
2712         assert!(rx.recv().is_err());
2713     }
2714
2715     #[test]
2716     fn oneshot_single_thread_try_recv_closed_with_data() {
2717         let (tx, rx) = sync_channel::<i32>(1);
2718         tx.send(10).unwrap();
2719         drop(tx);
2720         assert_eq!(rx.try_recv(), Ok(10));
2721         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2722     }
2723
2724     #[test]
2725     fn oneshot_single_thread_peek_data() {
2726         let (tx, rx) = sync_channel::<i32>(1);
2727         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2728         tx.send(10).unwrap();
2729         assert_eq!(rx.try_recv(), Ok(10));
2730     }
2731
2732     #[test]
2733     fn oneshot_single_thread_peek_close() {
2734         let (tx, rx) = sync_channel::<i32>(0);
2735         drop(tx);
2736         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2737         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2738     }
2739
2740     #[test]
2741     fn oneshot_single_thread_peek_open() {
2742         let (_tx, rx) = sync_channel::<i32>(0);
2743         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2744     }
2745
2746     #[test]
2747     fn oneshot_multi_task_recv_then_send() {
2748         let (tx, rx) = sync_channel::<Box<i32>>(0);
2749         let _t = thread::spawn(move|| {
2750             assert!(*rx.recv().unwrap() == 10);
2751         });
2752
2753         tx.send(box 10).unwrap();
2754     }
2755
2756     #[test]
2757     fn oneshot_multi_task_recv_then_close() {
2758         let (tx, rx) = sync_channel::<Box<i32>>(0);
2759         let _t = thread::spawn(move|| {
2760             drop(tx);
2761         });
2762         let res = thread::spawn(move|| {
2763             assert!(*rx.recv().unwrap() == 10);
2764         }).join();
2765         assert!(res.is_err());
2766     }
2767
2768     #[test]
2769     fn oneshot_multi_thread_close_stress() {
2770         for _ in 0..stress_factor() {
2771             let (tx, rx) = sync_channel::<i32>(0);
2772             let _t = thread::spawn(move|| {
2773                 drop(rx);
2774             });
2775             drop(tx);
2776         }
2777     }
2778
2779     #[test]
2780     fn oneshot_multi_thread_send_close_stress() {
2781         for _ in 0..stress_factor() {
2782             let (tx, rx) = sync_channel::<i32>(0);
2783             let _t = thread::spawn(move|| {
2784                 drop(rx);
2785             });
2786             let _ = thread::spawn(move || {
2787                 tx.send(1).unwrap();
2788             }).join();
2789         }
2790     }
2791
2792     #[test]
2793     fn oneshot_multi_thread_recv_close_stress() {
2794         for _ in 0..stress_factor() {
2795             let (tx, rx) = sync_channel::<i32>(0);
2796             let _t = thread::spawn(move|| {
2797                 let res = thread::spawn(move|| {
2798                     rx.recv().unwrap();
2799                 }).join();
2800                 assert!(res.is_err());
2801             });
2802             let _t = thread::spawn(move|| {
2803                 thread::spawn(move|| {
2804                     drop(tx);
2805                 });
2806             });
2807         }
2808     }
2809
2810     #[test]
2811     fn oneshot_multi_thread_send_recv_stress() {
2812         for _ in 0..stress_factor() {
2813             let (tx, rx) = sync_channel::<Box<i32>>(0);
2814             let _t = thread::spawn(move|| {
2815                 tx.send(box 10).unwrap();
2816             });
2817             assert!(*rx.recv().unwrap() == 10);
2818         }
2819     }
2820
2821     #[test]
2822     fn stream_send_recv_stress() {
2823         for _ in 0..stress_factor() {
2824             let (tx, rx) = sync_channel::<Box<i32>>(0);
2825
2826             send(tx, 0);
2827             recv(rx, 0);
2828
2829             fn send(tx: SyncSender<Box<i32>>, i: i32) {
2830                 if i == 10 { return }
2831
2832                 thread::spawn(move|| {
2833                     tx.send(box i).unwrap();
2834                     send(tx, i + 1);
2835                 });
2836             }
2837
2838             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2839                 if i == 10 { return }
2840
2841                 thread::spawn(move|| {
2842                     assert!(*rx.recv().unwrap() == i);
2843                     recv(rx, i + 1);
2844                 });
2845             }
2846         }
2847     }
2848
2849     #[test]
2850     fn recv_a_lot() {
2851         // Regression test that we don't run out of stack in scheduler context
2852         let (tx, rx) = sync_channel(10000);
2853         for _ in 0..10000 { tx.send(()).unwrap(); }
2854         for _ in 0..10000 { rx.recv().unwrap(); }
2855     }
2856
2857     #[test]
2858     fn shared_chan_stress() {
2859         let (tx, rx) = sync_channel(0);
2860         let total = stress_factor() + 100;
2861         for _ in 0..total {
2862             let tx = tx.clone();
2863             thread::spawn(move|| {
2864                 tx.send(()).unwrap();
2865             });
2866         }
2867
2868         for _ in 0..total {
2869             rx.recv().unwrap();
2870         }
2871     }
2872
2873     #[test]
2874     fn test_nested_recv_iter() {
2875         let (tx, rx) = sync_channel::<i32>(0);
2876         let (total_tx, total_rx) = sync_channel::<i32>(0);
2877
2878         let _t = thread::spawn(move|| {
2879             let mut acc = 0;
2880             for x in rx.iter() {
2881                 acc += x;
2882             }
2883             total_tx.send(acc).unwrap();
2884         });
2885
2886         tx.send(3).unwrap();
2887         tx.send(1).unwrap();
2888         tx.send(2).unwrap();
2889         drop(tx);
2890         assert_eq!(total_rx.recv().unwrap(), 6);
2891     }
2892
2893     #[test]
2894     fn test_recv_iter_break() {
2895         let (tx, rx) = sync_channel::<i32>(0);
2896         let (count_tx, count_rx) = sync_channel(0);
2897
2898         let _t = thread::spawn(move|| {
2899             let mut count = 0;
2900             for x in rx.iter() {
2901                 if count >= 3 {
2902                     break;
2903                 } else {
2904                     count += x;
2905                 }
2906             }
2907             count_tx.send(count).unwrap();
2908         });
2909
2910         tx.send(2).unwrap();
2911         tx.send(2).unwrap();
2912         tx.send(2).unwrap();
2913         let _ = tx.try_send(2);
2914         drop(tx);
2915         assert_eq!(count_rx.recv().unwrap(), 4);
2916     }
2917
2918     #[test]
2919     fn try_recv_states() {
2920         let (tx1, rx1) = sync_channel::<i32>(1);
2921         let (tx2, rx2) = sync_channel::<()>(1);
2922         let (tx3, rx3) = sync_channel::<()>(1);
2923         let _t = thread::spawn(move|| {
2924             rx2.recv().unwrap();
2925             tx1.send(1).unwrap();
2926             tx3.send(()).unwrap();
2927             rx2.recv().unwrap();
2928             drop(tx1);
2929             tx3.send(()).unwrap();
2930         });
2931
2932         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2933         tx2.send(()).unwrap();
2934         rx3.recv().unwrap();
2935         assert_eq!(rx1.try_recv(), Ok(1));
2936         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2937         tx2.send(()).unwrap();
2938         rx3.recv().unwrap();
2939         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2940     }
2941
2942     // This bug used to end up in a livelock inside of the Receiver destructor
2943     // because the internal state of the Shared packet was corrupted
2944     #[test]
2945     fn destroy_upgraded_shared_port_when_sender_still_active() {
2946         let (tx, rx) = sync_channel::<()>(0);
2947         let (tx2, rx2) = sync_channel::<()>(0);
2948         let _t = thread::spawn(move|| {
2949             rx.recv().unwrap(); // wait on a oneshot
2950             drop(rx);  // destroy a shared
2951             tx2.send(()).unwrap();
2952         });
2953         // make sure the other thread has gone to sleep
2954         for _ in 0..5000 { thread::yield_now(); }
2955
2956         // upgrade to a shared chan and send a message
2957         let t = tx.clone();
2958         drop(tx);
2959         t.send(()).unwrap();
2960
2961         // wait for the child thread to exit before we exit
2962         rx2.recv().unwrap();
2963     }
2964
2965     #[test]
2966     fn send1() {
2967         let (tx, rx) = sync_channel::<i32>(0);
2968         let _t = thread::spawn(move|| { rx.recv().unwrap(); });
2969         assert_eq!(tx.send(1), Ok(()));
2970     }
2971
2972     #[test]
2973     fn send2() {
2974         let (tx, rx) = sync_channel::<i32>(0);
2975         let _t = thread::spawn(move|| { drop(rx); });
2976         assert!(tx.send(1).is_err());
2977     }
2978
2979     #[test]
2980     fn send3() {
2981         let (tx, rx) = sync_channel::<i32>(1);
2982         assert_eq!(tx.send(1), Ok(()));
2983         let _t =thread::spawn(move|| { drop(rx); });
2984         assert!(tx.send(1).is_err());
2985     }
2986
2987     #[test]
2988     fn send4() {
2989         let (tx, rx) = sync_channel::<i32>(0);
2990         let tx2 = tx.clone();
2991         let (done, donerx) = channel();
2992         let done2 = done.clone();
2993         let _t = thread::spawn(move|| {
2994             assert!(tx.send(1).is_err());
2995             done.send(()).unwrap();
2996         });
2997         let _t = thread::spawn(move|| {
2998             assert!(tx2.send(2).is_err());
2999             done2.send(()).unwrap();
3000         });
3001         drop(rx);
3002         donerx.recv().unwrap();
3003         donerx.recv().unwrap();
3004     }
3005
3006     #[test]
3007     fn try_send1() {
3008         let (tx, _rx) = sync_channel::<i32>(0);
3009         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3010     }
3011
3012     #[test]
3013     fn try_send2() {
3014         let (tx, _rx) = sync_channel::<i32>(1);
3015         assert_eq!(tx.try_send(1), Ok(()));
3016         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3017     }
3018
3019     #[test]
3020     fn try_send3() {
3021         let (tx, rx) = sync_channel::<i32>(1);
3022         assert_eq!(tx.try_send(1), Ok(()));
3023         drop(rx);
3024         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
3025     }
3026
3027     #[test]
3028     fn issue_15761() {
3029         fn repro() {
3030             let (tx1, rx1) = sync_channel::<()>(3);
3031             let (tx2, rx2) = sync_channel::<()>(3);
3032
3033             let _t = thread::spawn(move|| {
3034                 rx1.recv().unwrap();
3035                 tx2.try_send(()).unwrap();
3036             });
3037
3038             tx1.try_send(()).unwrap();
3039             rx2.recv().unwrap();
3040         }
3041
3042         for _ in 0..100 {
3043             repro()
3044         }
3045     }
3046 }