]> git.lizzy.rs Git - rust.git/blob - library/std/src/sync/mpsc/mod.rs
Rollup merge of #83571 - a1phyr:feature_const_slice_first_last, r=dtolnay
[rust.git] / library / std / src / sync / mpsc / mod.rs
1 //! Multi-producer, single-consumer FIFO queue communication primitives.
2 //!
3 //! This module provides message-based communication over channels, concretely
4 //! defined among three types:
5 //!
6 //! * [`Sender`]
7 //! * [`SyncSender`]
8 //! * [`Receiver`]
9 //!
10 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
11 //! senders are clone-able (multi-producer) such that many threads can send
12 //! simultaneously to one receiver (single-consumer).
13 //!
14 //! These channels come in two flavors:
15 //!
16 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
17 //!    will return a `(Sender, Receiver)` tuple where all sends will be
18 //!    **asynchronous** (they never block). The channel conceptually has an
19 //!    infinite buffer.
20 //!
21 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
22 //!    return a `(SyncSender, Receiver)` tuple where the storage for pending
23 //!    messages is a pre-allocated buffer of a fixed size. All sends will be
24 //!    **synchronous** by blocking until there is buffer space available. Note
25 //!    that a bound of 0 is allowed, causing the channel to become a "rendezvous"
26 //!    channel where each sender atomically hands off a message to a receiver.
27 //!
28 //! [`send`]: Sender::send
29 //!
30 //! ## Disconnection
31 //!
32 //! The send and receive operations on channels will all return a [`Result`]
33 //! indicating whether the operation succeeded or not. An unsuccessful operation
34 //! is normally indicative of the other half of a channel having "hung up" by
35 //! being dropped in its corresponding thread.
36 //!
37 //! Once half of a channel has been deallocated, most operations can no longer
38 //! continue to make progress, so [`Err`] will be returned. Many applications
39 //! will continue to [`unwrap`] the results returned from this module,
40 //! instigating a propagation of failure among threads if one unexpectedly dies.
41 //!
42 //! [`unwrap`]: Result::unwrap
43 //!
44 //! # Examples
45 //!
46 //! Simple usage:
47 //!
48 //! ```
49 //! use std::thread;
50 //! use std::sync::mpsc::channel;
51 //!
52 //! // Create a simple streaming channel
53 //! let (tx, rx) = channel();
54 //! thread::spawn(move|| {
55 //!     tx.send(10).unwrap();
56 //! });
57 //! assert_eq!(rx.recv().unwrap(), 10);
58 //! ```
59 //!
60 //! Shared usage:
61 //!
62 //! ```
63 //! use std::thread;
64 //! use std::sync::mpsc::channel;
65 //!
66 //! // Create a shared channel that can be sent along from many threads
67 //! // where tx is the sending half (tx for transmission), and rx is the receiving
68 //! // half (rx for receiving).
69 //! let (tx, rx) = channel();
70 //! for i in 0..10 {
71 //!     let tx = tx.clone();
72 //!     thread::spawn(move|| {
73 //!         tx.send(i).unwrap();
74 //!     });
75 //! }
76 //!
77 //! for _ in 0..10 {
78 //!     let j = rx.recv().unwrap();
79 //!     assert!(0 <= j && j < 10);
80 //! }
81 //! ```
82 //!
83 //! Propagating panics:
84 //!
85 //! ```
86 //! use std::sync::mpsc::channel;
87 //!
88 //! // The call to recv() will return an error because the channel has already
89 //! // hung up (or been deallocated)
90 //! let (tx, rx) = channel::<i32>();
91 //! drop(tx);
92 //! assert!(rx.recv().is_err());
93 //! ```
94 //!
95 //! Synchronous channels:
96 //!
97 //! ```
98 //! use std::thread;
99 //! use std::sync::mpsc::sync_channel;
100 //!
101 //! let (tx, rx) = sync_channel::<i32>(0);
102 //! thread::spawn(move|| {
103 //!     // This will wait for the parent thread to start receiving
104 //!     tx.send(53).unwrap();
105 //! });
106 //! rx.recv().unwrap();
107 //! ```
108
109 #![stable(feature = "rust1", since = "1.0.0")]
110
111 #[cfg(all(test, not(target_os = "emscripten")))]
112 mod tests;
113
114 #[cfg(all(test, not(target_os = "emscripten")))]
115 mod sync_tests;
116
117 // A description of how Rust's channel implementation works
118 //
119 // Channels are supposed to be the basic building block for all other
120 // concurrent primitives that are used in Rust. As a result, the channel type
121 // needs to be highly optimized, flexible, and broad enough for use everywhere.
122 //
123 // The choice of implementation of all channels is to be built on lock-free data
124 // structures. The channels themselves are then consequently also lock-free data
125 // structures. As always with lock-free code, this is a very "here be dragons"
126 // territory, especially because I'm unaware of any academic papers that have
127 // gone into great length about channels of these flavors.
128 //
129 // ## Flavors of channels
130 //
131 // From the perspective of a consumer of this library, there is only one flavor
132 // of channel. This channel can be used as a stream and cloned to allow multiple
133 // senders. Under the hood, however, there are actually three flavors of
134 // channels in play.
135 //
136 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
137 //                      case. They contain as few atomics as possible and
138 //                      involve one and exactly one allocation.
139 // * Streams - these channels are optimized for the non-shared use case. They
140 //             use a different concurrent queue that is more tailored for this
141 //             use case. The initial allocation of this flavor of channel is not
142 //             optimized.
143 // * Shared - this is the most general form of channel that this module offers,
144 //            a channel with multiple senders. This type is as optimized as it
145 //            can be, but the previous two types mentioned are much faster for
146 //            their use-cases.
147 //
148 // ## Concurrent queues
149 //
150 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
151 // but recv() obviously blocks. This means that under the hood there must be
152 // some shared and concurrent queue holding all of the actual data.
153 //
154 // With two flavors of channels, two flavors of queues are also used. We have
155 // chosen to use queues from a well-known author that are abbreviated as SPSC
156 // and MPSC (single producer, single consumer and multiple producer, single
157 // consumer). SPSC queues are used for streams while MPSC queues are used for
158 // shared channels.
159 //
160 // ### SPSC optimizations
161 //
162 // The SPSC queue found online is essentially a linked list of nodes where one
163 // half of the nodes are the "queue of data" and the other half of nodes are a
164 // cache of unused nodes. The unused nodes are used such that an allocation is
165 // not required on every push() and a free doesn't need to happen on every
166 // pop().
167 //
168 // As found online, however, the cache of nodes is of an infinite size. This
169 // means that if a channel at one point in its life had 50k items in the queue,
170 // then the queue will always have the capacity for 50k items. I believed that
171 // this was an unnecessary limitation of the implementation, so I have altered
172 // the queue to optionally have a bound on the cache size.
173 //
174 // By default, streams will have an unbounded SPSC queue with a small-ish cache
175 // size. The hope is that the cache is still large enough to have very fast
176 // send() operations while not too large such that millions of channels can
177 // coexist at once.
178 //
179 // ### MPSC optimizations
180 //
181 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
182 // a linked list under the hood to earn its unboundedness, but I have not put
183 // forth much effort into having a cache of nodes similar to the SPSC queue.
184 //
185 // For now, I believe that this is "ok" because shared channels are not the most
186 // common type, but soon we may wish to revisit this queue choice and determine
187 // another candidate for backend storage of shared channels.
188 //
189 // ## Overview of the Implementation
190 //
191 // Now that there's a little background on the concurrent queues used, it's
192 // worth going into much more detail about the channels themselves. The basic
193 // pseudocode for a send/recv are:
194 //
195 //
196 //      send(t)                             recv()
197 //        queue.push(t)                       return if queue.pop()
198 //        if increment() == -1                deschedule {
199 //          wakeup()                            if decrement() > 0
200 //                                                cancel_deschedule()
201 //                                            }
202 //                                            queue.pop()
203 //
204 // As mentioned before, there are no locks in this implementation, only atomic
205 // instructions are used.
206 //
207 // ### The internal atomic counter
208 //
209 // Every channel has a shared counter with each half to keep track of the size
210 // of the queue. This counter is used to abort descheduling by the receiver and
211 // to know when to wake up on the sending side.
212 //
213 // As seen in the pseudocode, senders will increment this count and receivers
214 // will decrement the count. The theory behind this is that if a sender sees a
215 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
216 // then it doesn't need to block.
217 //
218 // The recv() method has a beginning call to pop(), and if successful, it needs
219 // to decrement the count. It is a crucial implementation detail that this
220 // decrement does *not* happen to the shared counter. If this were the case,
221 // then it would be possible for the counter to be very negative when there were
222 // no receivers waiting, in which case the senders would have to determine when
223 // it was actually appropriate to wake up a receiver.
224 //
225 // Instead, the "steal count" is kept track of separately (not atomically
226 // because it's only used by receivers), and then the decrement() call when
227 // descheduling will lump in all of the recent steals into one large decrement.
228 //
229 // The implication of this is that if a sender sees a -1 count, then there's
230 // guaranteed to be a waiter waiting!
231 //
232 // ## Native Implementation
233 //
234 // A major goal of these channels is to work seamlessly on and off the runtime.
235 // All of the previous race conditions have been worded in terms of
236 // scheduler-isms (which is obviously not available without the runtime).
237 //
238 // For now, native usage of channels (off the runtime) will fall back onto
239 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
240 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
241 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
242 // condition variable.
243 //
244 // ## Select
245 //
246 // Being able to support selection over channels has greatly influenced this
247 // design, and not only does selection need to work inside the runtime, but also
248 // outside the runtime.
249 //
250 // The implementation is fairly straightforward. The goal of select() is not to
251 // return some data, but only to return which channel can receive data without
252 // blocking. The implementation is essentially the entire blocking procedure
253 // followed by an increment as soon as its woken up. The cancellation procedure
254 // involves an increment and swapping out of to_wake to acquire ownership of the
255 // thread to unblock.
256 //
257 // Sadly this current implementation requires multiple allocations, so I have
258 // seen the throughput of select() be much worse than it should be. I do not
259 // believe that there is anything fundamental that needs to change about these
260 // channels, however, in order to support a more efficient select().
261 //
262 // FIXME: Select is now removed, so these factors are ready to be cleaned up!
263 //
264 // # Conclusion
265 //
266 // And now that you've seen all the races that I found and attempted to fix,
267 // here's the code for you to find some more!
268
269 use crate::cell::UnsafeCell;
270 use crate::error;
271 use crate::fmt;
272 use crate::mem;
273 use crate::sync::Arc;
274 use crate::time::{Duration, Instant};
275
276 mod blocking;
277 mod mpsc_queue;
278 mod oneshot;
279 mod shared;
280 mod spsc_queue;
281 mod stream;
282 mod sync;
283
284 mod cache_aligned;
285
286 /// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type.
287 /// This half can only be owned by one thread.
288 ///
289 /// Messages sent to the channel can be retrieved using [`recv`].
290 ///
291 /// [`recv`]: Receiver::recv
292 ///
293 /// # Examples
294 ///
295 /// ```rust
296 /// use std::sync::mpsc::channel;
297 /// use std::thread;
298 /// use std::time::Duration;
299 ///
300 /// let (send, recv) = channel();
301 ///
302 /// thread::spawn(move || {
303 ///     send.send("Hello world!").unwrap();
304 ///     thread::sleep(Duration::from_secs(2)); // block for two seconds
305 ///     send.send("Delayed for 2 seconds").unwrap();
306 /// });
307 ///
308 /// println!("{}", recv.recv().unwrap()); // Received immediately
309 /// println!("Waiting...");
310 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
311 /// ```
312 #[stable(feature = "rust1", since = "1.0.0")]
313 #[cfg_attr(not(test), rustc_diagnostic_item = "Receiver")]
314 pub struct Receiver<T> {
315     inner: UnsafeCell<Flavor<T>>,
316 }
317
318 // The receiver port can be sent from place to place, so long as it
319 // is not used to receive non-sendable things.
320 #[stable(feature = "rust1", since = "1.0.0")]
321 unsafe impl<T: Send> Send for Receiver<T> {}
322
323 #[stable(feature = "rust1", since = "1.0.0")]
324 impl<T> !Sync for Receiver<T> {}
325
326 /// An iterator over messages on a [`Receiver`], created by [`iter`].
327 ///
328 /// This iterator will block whenever [`next`] is called,
329 /// waiting for a new message, and [`None`] will be returned
330 /// when the corresponding channel has hung up.
331 ///
332 /// [`iter`]: Receiver::iter
333 /// [`next`]: Iterator::next
334 ///
335 /// # Examples
336 ///
337 /// ```rust
338 /// use std::sync::mpsc::channel;
339 /// use std::thread;
340 ///
341 /// let (send, recv) = channel();
342 ///
343 /// thread::spawn(move || {
344 ///     send.send(1u8).unwrap();
345 ///     send.send(2u8).unwrap();
346 ///     send.send(3u8).unwrap();
347 /// });
348 ///
349 /// for x in recv.iter() {
350 ///     println!("Got: {}", x);
351 /// }
352 /// ```
353 #[stable(feature = "rust1", since = "1.0.0")]
354 #[derive(Debug)]
355 pub struct Iter<'a, T: 'a> {
356     rx: &'a Receiver<T>,
357 }
358
359 /// An iterator that attempts to yield all pending values for a [`Receiver`],
360 /// created by [`try_iter`].
361 ///
362 /// [`None`] will be returned when there are no pending values remaining or
363 /// if the corresponding channel has hung up.
364 ///
365 /// This iterator will never block the caller in order to wait for data to
366 /// become available. Instead, it will return [`None`].
367 ///
368 /// [`try_iter`]: Receiver::try_iter
369 ///
370 /// # Examples
371 ///
372 /// ```rust
373 /// use std::sync::mpsc::channel;
374 /// use std::thread;
375 /// use std::time::Duration;
376 ///
377 /// let (sender, receiver) = channel();
378 ///
379 /// // Nothing is in the buffer yet
380 /// assert!(receiver.try_iter().next().is_none());
381 /// println!("Nothing in the buffer...");
382 ///
383 /// thread::spawn(move || {
384 ///     sender.send(1).unwrap();
385 ///     sender.send(2).unwrap();
386 ///     sender.send(3).unwrap();
387 /// });
388 ///
389 /// println!("Going to sleep...");
390 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
391 ///
392 /// for x in receiver.try_iter() {
393 ///     println!("Got: {}", x);
394 /// }
395 /// ```
396 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
397 #[derive(Debug)]
398 pub struct TryIter<'a, T: 'a> {
399     rx: &'a Receiver<T>,
400 }
401
402 /// An owning iterator over messages on a [`Receiver`],
403 /// created by **Receiver::into_iter**.
404 ///
405 /// This iterator will block whenever [`next`]
406 /// is called, waiting for a new message, and [`None`] will be
407 /// returned if the corresponding channel has hung up.
408 ///
409 /// [`next`]: Iterator::next
410 ///
411 /// # Examples
412 ///
413 /// ```rust
414 /// use std::sync::mpsc::channel;
415 /// use std::thread;
416 ///
417 /// let (send, recv) = channel();
418 ///
419 /// thread::spawn(move || {
420 ///     send.send(1u8).unwrap();
421 ///     send.send(2u8).unwrap();
422 ///     send.send(3u8).unwrap();
423 /// });
424 ///
425 /// for x in recv.into_iter() {
426 ///     println!("Got: {}", x);
427 /// }
428 /// ```
429 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
430 #[derive(Debug)]
431 pub struct IntoIter<T> {
432     rx: Receiver<T>,
433 }
434
435 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
436 /// owned by one thread, but it can be cloned to send to other threads.
437 ///
438 /// Messages can be sent through this channel with [`send`].
439 ///
440 /// [`send`]: Sender::send
441 ///
442 /// # Examples
443 ///
444 /// ```rust
445 /// use std::sync::mpsc::channel;
446 /// use std::thread;
447 ///
448 /// let (sender, receiver) = channel();
449 /// let sender2 = sender.clone();
450 ///
451 /// // First thread owns sender
452 /// thread::spawn(move || {
453 ///     sender.send(1).unwrap();
454 /// });
455 ///
456 /// // Second thread owns sender2
457 /// thread::spawn(move || {
458 ///     sender2.send(2).unwrap();
459 /// });
460 ///
461 /// let msg = receiver.recv().unwrap();
462 /// let msg2 = receiver.recv().unwrap();
463 ///
464 /// assert_eq!(3, msg + msg2);
465 /// ```
466 #[stable(feature = "rust1", since = "1.0.0")]
467 pub struct Sender<T> {
468     inner: UnsafeCell<Flavor<T>>,
469 }
470
471 // The send port can be sent from place to place, so long as it
472 // is not used to send non-sendable things.
473 #[stable(feature = "rust1", since = "1.0.0")]
474 unsafe impl<T: Send> Send for Sender<T> {}
475
476 #[stable(feature = "rust1", since = "1.0.0")]
477 impl<T> !Sync for Sender<T> {}
478
479 /// The sending-half of Rust's synchronous [`sync_channel`] type.
480 ///
481 /// Messages can be sent through this channel with [`send`] or [`try_send`].
482 ///
483 /// [`send`] will block if there is no space in the internal buffer.
484 ///
485 /// [`send`]: SyncSender::send
486 /// [`try_send`]: SyncSender::try_send
487 ///
488 /// # Examples
489 ///
490 /// ```rust
491 /// use std::sync::mpsc::sync_channel;
492 /// use std::thread;
493 ///
494 /// // Create a sync_channel with buffer size 2
495 /// let (sync_sender, receiver) = sync_channel(2);
496 /// let sync_sender2 = sync_sender.clone();
497 ///
498 /// // First thread owns sync_sender
499 /// thread::spawn(move || {
500 ///     sync_sender.send(1).unwrap();
501 ///     sync_sender.send(2).unwrap();
502 /// });
503 ///
504 /// // Second thread owns sync_sender2
505 /// thread::spawn(move || {
506 ///     sync_sender2.send(3).unwrap();
507 ///     // thread will now block since the buffer is full
508 ///     println!("Thread unblocked!");
509 /// });
510 ///
511 /// let mut msg;
512 ///
513 /// msg = receiver.recv().unwrap();
514 /// println!("message {} received", msg);
515 ///
516 /// // "Thread unblocked!" will be printed now
517 ///
518 /// msg = receiver.recv().unwrap();
519 /// println!("message {} received", msg);
520 ///
521 /// msg = receiver.recv().unwrap();
522 ///
523 /// println!("message {} received", msg);
524 /// ```
525 #[stable(feature = "rust1", since = "1.0.0")]
526 pub struct SyncSender<T> {
527     inner: Arc<sync::Packet<T>>,
528 }
529
530 #[stable(feature = "rust1", since = "1.0.0")]
531 unsafe impl<T: Send> Send for SyncSender<T> {}
532
533 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
534 /// function on **channel**s.
535 ///
536 /// A **send** operation can only fail if the receiving end of a channel is
537 /// disconnected, implying that the data could never be received. The error
538 /// contains the data being sent as a payload so it can be recovered.
539 #[stable(feature = "rust1", since = "1.0.0")]
540 #[derive(PartialEq, Eq, Clone, Copy)]
541 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
542
543 /// An error returned from the [`recv`] function on a [`Receiver`].
544 ///
545 /// The [`recv`] operation can only fail if the sending half of a
546 /// [`channel`] (or [`sync_channel`]) is disconnected, implying that no further
547 /// messages will ever be received.
548 ///
549 /// [`recv`]: Receiver::recv
550 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
551 #[stable(feature = "rust1", since = "1.0.0")]
552 pub struct RecvError;
553
554 /// This enumeration is the list of the possible reasons that [`try_recv`] could
555 /// not return data when called. This can occur with both a [`channel`] and
556 /// a [`sync_channel`].
557 ///
558 /// [`try_recv`]: Receiver::try_recv
559 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
560 #[stable(feature = "rust1", since = "1.0.0")]
561 pub enum TryRecvError {
562     /// This **channel** is currently empty, but the **Sender**(s) have not yet
563     /// disconnected, so data may yet become available.
564     #[stable(feature = "rust1", since = "1.0.0")]
565     Empty,
566
567     /// The **channel**'s sending half has become disconnected, and there will
568     /// never be any more data received on it.
569     #[stable(feature = "rust1", since = "1.0.0")]
570     Disconnected,
571 }
572
573 /// This enumeration is the list of possible errors that made [`recv_timeout`]
574 /// unable to return data when called. This can occur with both a [`channel`] and
575 /// a [`sync_channel`].
576 ///
577 /// [`recv_timeout`]: Receiver::recv_timeout
578 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
579 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
580 pub enum RecvTimeoutError {
581     /// This **channel** is currently empty, but the **Sender**(s) have not yet
582     /// disconnected, so data may yet become available.
583     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
584     Timeout,
585     /// The **channel**'s sending half has become disconnected, and there will
586     /// never be any more data received on it.
587     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
588     Disconnected,
589 }
590
591 /// This enumeration is the list of the possible error outcomes for the
592 /// [`try_send`] method.
593 ///
594 /// [`try_send`]: SyncSender::try_send
595 #[stable(feature = "rust1", since = "1.0.0")]
596 #[derive(PartialEq, Eq, Clone, Copy)]
597 pub enum TrySendError<T> {
598     /// The data could not be sent on the [`sync_channel`] because it would require that
599     /// the callee block to send the data.
600     ///
601     /// If this is a buffered channel, then the buffer is full at this time. If
602     /// this is not a buffered channel, then there is no [`Receiver`] available to
603     /// acquire the data.
604     #[stable(feature = "rust1", since = "1.0.0")]
605     Full(#[stable(feature = "rust1", since = "1.0.0")] T),
606
607     /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
608     /// sent. The data is returned back to the callee in this case.
609     #[stable(feature = "rust1", since = "1.0.0")]
610     Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
611 }
612
613 enum Flavor<T> {
614     Oneshot(Arc<oneshot::Packet<T>>),
615     Stream(Arc<stream::Packet<T>>),
616     Shared(Arc<shared::Packet<T>>),
617     Sync(Arc<sync::Packet<T>>),
618 }
619
620 #[doc(hidden)]
621 trait UnsafeFlavor<T> {
622     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
623     unsafe fn inner_mut(&self) -> &mut Flavor<T> {
624         &mut *self.inner_unsafe().get()
625     }
626     unsafe fn inner(&self) -> &Flavor<T> {
627         &*self.inner_unsafe().get()
628     }
629 }
630 impl<T> UnsafeFlavor<T> for Sender<T> {
631     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
632         &self.inner
633     }
634 }
635 impl<T> UnsafeFlavor<T> for Receiver<T> {
636     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
637         &self.inner
638     }
639 }
640
641 /// Creates a new asynchronous channel, returning the sender/receiver halves.
642 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
643 /// the same order as it was sent, and no [`send`] will block the calling thread
644 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
645 /// block after its buffer limit is reached). [`recv`] will block until a message
646 /// is available.
647 ///
648 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
649 /// only one [`Receiver`] is supported.
650 ///
651 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
652 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
653 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
654 /// return a [`RecvError`].
655 ///
656 /// [`send`]: Sender::send
657 /// [`recv`]: Receiver::recv
658 ///
659 /// # Examples
660 ///
661 /// ```
662 /// use std::sync::mpsc::channel;
663 /// use std::thread;
664 ///
665 /// let (sender, receiver) = channel();
666 ///
667 /// // Spawn off an expensive computation
668 /// thread::spawn(move|| {
669 /// #   fn expensive_computation() {}
670 ///     sender.send(expensive_computation()).unwrap();
671 /// });
672 ///
673 /// // Do some useful work for awhile
674 ///
675 /// // Let's see what that answer was
676 /// println!("{:?}", receiver.recv().unwrap());
677 /// ```
678 #[stable(feature = "rust1", since = "1.0.0")]
679 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
680     let a = Arc::new(oneshot::Packet::new());
681     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
682 }
683
684 /// Creates a new synchronous, bounded channel.
685 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
686 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
687 /// [`Receiver`] will block until a message becomes available. `sync_channel`
688 /// differs greatly in the semantics of the sender, however.
689 ///
690 /// This channel has an internal buffer on which messages will be queued.
691 /// `bound` specifies the buffer size. When the internal buffer becomes full,
692 /// future sends will *block* waiting for the buffer to open up. Note that a
693 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
694 /// where each [`send`] will not return until a [`recv`] is paired with it.
695 ///
696 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
697 /// times, but only one [`Receiver`] is supported.
698 ///
699 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
700 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
701 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
702 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
703 ///
704 /// [`send`]: SyncSender::send
705 /// [`recv`]: Receiver::recv
706 ///
707 /// # Examples
708 ///
709 /// ```
710 /// use std::sync::mpsc::sync_channel;
711 /// use std::thread;
712 ///
713 /// let (sender, receiver) = sync_channel(1);
714 ///
715 /// // this returns immediately
716 /// sender.send(1).unwrap();
717 ///
718 /// thread::spawn(move|| {
719 ///     // this will block until the previous message has been received
720 ///     sender.send(2).unwrap();
721 /// });
722 ///
723 /// assert_eq!(receiver.recv().unwrap(), 1);
724 /// assert_eq!(receiver.recv().unwrap(), 2);
725 /// ```
726 #[stable(feature = "rust1", since = "1.0.0")]
727 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
728     let a = Arc::new(sync::Packet::new(bound));
729     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
730 }
731
732 ////////////////////////////////////////////////////////////////////////////////
733 // Sender
734 ////////////////////////////////////////////////////////////////////////////////
735
736 impl<T> Sender<T> {
737     fn new(inner: Flavor<T>) -> Sender<T> {
738         Sender { inner: UnsafeCell::new(inner) }
739     }
740
741     /// Attempts to send a value on this channel, returning it back if it could
742     /// not be sent.
743     ///
744     /// A successful send occurs when it is determined that the other end of
745     /// the channel has not hung up already. An unsuccessful send would be one
746     /// where the corresponding receiver has already been deallocated. Note
747     /// that a return value of [`Err`] means that the data will never be
748     /// received, but a return value of [`Ok`] does *not* mean that the data
749     /// will be received. It is possible for the corresponding receiver to
750     /// hang up immediately after this function returns [`Ok`].
751     ///
752     /// This method will never block the current thread.
753     ///
754     /// # Examples
755     ///
756     /// ```
757     /// use std::sync::mpsc::channel;
758     ///
759     /// let (tx, rx) = channel();
760     ///
761     /// // This send is always successful
762     /// tx.send(1).unwrap();
763     ///
764     /// // This send will fail because the receiver is gone
765     /// drop(rx);
766     /// assert_eq!(tx.send(1).unwrap_err().0, 1);
767     /// ```
768     #[stable(feature = "rust1", since = "1.0.0")]
769     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
770         let (new_inner, ret) = match *unsafe { self.inner() } {
771             Flavor::Oneshot(ref p) => {
772                 if !p.sent() {
773                     return p.send(t).map_err(SendError);
774                 } else {
775                     let a = Arc::new(stream::Packet::new());
776                     let rx = Receiver::new(Flavor::Stream(a.clone()));
777                     match p.upgrade(rx) {
778                         oneshot::UpSuccess => {
779                             let ret = a.send(t);
780                             (a, ret)
781                         }
782                         oneshot::UpDisconnected => (a, Err(t)),
783                         oneshot::UpWoke(token) => {
784                             // This send cannot panic because the thread is
785                             // asleep (we're looking at it), so the receiver
786                             // can't go away.
787                             a.send(t).ok().unwrap();
788                             token.signal();
789                             (a, Ok(()))
790                         }
791                     }
792                 }
793             }
794             Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
795             Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
796             Flavor::Sync(..) => unreachable!(),
797         };
798
799         unsafe {
800             let tmp = Sender::new(Flavor::Stream(new_inner));
801             mem::swap(self.inner_mut(), tmp.inner_mut());
802         }
803         ret.map_err(SendError)
804     }
805 }
806
807 #[stable(feature = "rust1", since = "1.0.0")]
808 impl<T> Clone for Sender<T> {
809     fn clone(&self) -> Sender<T> {
810         let packet = match *unsafe { self.inner() } {
811             Flavor::Oneshot(ref p) => {
812                 let a = Arc::new(shared::Packet::new());
813                 {
814                     let guard = a.postinit_lock();
815                     let rx = Receiver::new(Flavor::Shared(a.clone()));
816                     let sleeper = match p.upgrade(rx) {
817                         oneshot::UpSuccess | oneshot::UpDisconnected => None,
818                         oneshot::UpWoke(task) => Some(task),
819                     };
820                     a.inherit_blocker(sleeper, guard);
821                 }
822                 a
823             }
824             Flavor::Stream(ref p) => {
825                 let a = Arc::new(shared::Packet::new());
826                 {
827                     let guard = a.postinit_lock();
828                     let rx = Receiver::new(Flavor::Shared(a.clone()));
829                     let sleeper = match p.upgrade(rx) {
830                         stream::UpSuccess | stream::UpDisconnected => None,
831                         stream::UpWoke(task) => Some(task),
832                     };
833                     a.inherit_blocker(sleeper, guard);
834                 }
835                 a
836             }
837             Flavor::Shared(ref p) => {
838                 p.clone_chan();
839                 return Sender::new(Flavor::Shared(p.clone()));
840             }
841             Flavor::Sync(..) => unreachable!(),
842         };
843
844         unsafe {
845             let tmp = Sender::new(Flavor::Shared(packet.clone()));
846             mem::swap(self.inner_mut(), tmp.inner_mut());
847         }
848         Sender::new(Flavor::Shared(packet))
849     }
850 }
851
852 #[stable(feature = "rust1", since = "1.0.0")]
853 impl<T> Drop for Sender<T> {
854     fn drop(&mut self) {
855         match *unsafe { self.inner() } {
856             Flavor::Oneshot(ref p) => p.drop_chan(),
857             Flavor::Stream(ref p) => p.drop_chan(),
858             Flavor::Shared(ref p) => p.drop_chan(),
859             Flavor::Sync(..) => unreachable!(),
860         }
861     }
862 }
863
864 #[stable(feature = "mpsc_debug", since = "1.8.0")]
865 impl<T> fmt::Debug for Sender<T> {
866     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
867         f.debug_struct("Sender").finish_non_exhaustive()
868     }
869 }
870
871 ////////////////////////////////////////////////////////////////////////////////
872 // SyncSender
873 ////////////////////////////////////////////////////////////////////////////////
874
875 impl<T> SyncSender<T> {
876     fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
877         SyncSender { inner }
878     }
879
880     /// Sends a value on this synchronous channel.
881     ///
882     /// This function will *block* until space in the internal buffer becomes
883     /// available or a receiver is available to hand off the message to.
884     ///
885     /// Note that a successful send does *not* guarantee that the receiver will
886     /// ever see the data if there is a buffer on this channel. Items may be
887     /// enqueued in the internal buffer for the receiver to receive at a later
888     /// time. If the buffer size is 0, however, the channel becomes a rendezvous
889     /// channel and it guarantees that the receiver has indeed received
890     /// the data if this function returns success.
891     ///
892     /// This function will never panic, but it may return [`Err`] if the
893     /// [`Receiver`] has disconnected and is no longer able to receive
894     /// information.
895     ///
896     /// # Examples
897     ///
898     /// ```rust
899     /// use std::sync::mpsc::sync_channel;
900     /// use std::thread;
901     ///
902     /// // Create a rendezvous sync_channel with buffer size 0
903     /// let (sync_sender, receiver) = sync_channel(0);
904     ///
905     /// thread::spawn(move || {
906     ///    println!("sending message...");
907     ///    sync_sender.send(1).unwrap();
908     ///    // Thread is now blocked until the message is received
909     ///
910     ///    println!("...message received!");
911     /// });
912     ///
913     /// let msg = receiver.recv().unwrap();
914     /// assert_eq!(1, msg);
915     /// ```
916     #[stable(feature = "rust1", since = "1.0.0")]
917     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
918         self.inner.send(t).map_err(SendError)
919     }
920
921     /// Attempts to send a value on this channel without blocking.
922     ///
923     /// This method differs from [`send`] by returning immediately if the
924     /// channel's buffer is full or no receiver is waiting to acquire some
925     /// data. Compared with [`send`], this function has two failure cases
926     /// instead of one (one for disconnection, one for a full buffer).
927     ///
928     /// See [`send`] for notes about guarantees of whether the
929     /// receiver has received the data or not if this function is successful.
930     ///
931     /// [`send`]: Self::send
932     ///
933     /// # Examples
934     ///
935     /// ```rust
936     /// use std::sync::mpsc::sync_channel;
937     /// use std::thread;
938     ///
939     /// // Create a sync_channel with buffer size 1
940     /// let (sync_sender, receiver) = sync_channel(1);
941     /// let sync_sender2 = sync_sender.clone();
942     ///
943     /// // First thread owns sync_sender
944     /// thread::spawn(move || {
945     ///     sync_sender.send(1).unwrap();
946     ///     sync_sender.send(2).unwrap();
947     ///     // Thread blocked
948     /// });
949     ///
950     /// // Second thread owns sync_sender2
951     /// thread::spawn(move || {
952     ///     // This will return an error and send
953     ///     // no message if the buffer is full
954     ///     let _ = sync_sender2.try_send(3);
955     /// });
956     ///
957     /// let mut msg;
958     /// msg = receiver.recv().unwrap();
959     /// println!("message {} received", msg);
960     ///
961     /// msg = receiver.recv().unwrap();
962     /// println!("message {} received", msg);
963     ///
964     /// // Third message may have never been sent
965     /// match receiver.try_recv() {
966     ///     Ok(msg) => println!("message {} received", msg),
967     ///     Err(_) => println!("the third message was never sent"),
968     /// }
969     /// ```
970     #[stable(feature = "rust1", since = "1.0.0")]
971     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
972         self.inner.try_send(t)
973     }
974 }
975
976 #[stable(feature = "rust1", since = "1.0.0")]
977 impl<T> Clone for SyncSender<T> {
978     fn clone(&self) -> SyncSender<T> {
979         self.inner.clone_chan();
980         SyncSender::new(self.inner.clone())
981     }
982 }
983
984 #[stable(feature = "rust1", since = "1.0.0")]
985 impl<T> Drop for SyncSender<T> {
986     fn drop(&mut self) {
987         self.inner.drop_chan();
988     }
989 }
990
991 #[stable(feature = "mpsc_debug", since = "1.8.0")]
992 impl<T> fmt::Debug for SyncSender<T> {
993     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
994         f.debug_struct("SyncSender").finish_non_exhaustive()
995     }
996 }
997
998 ////////////////////////////////////////////////////////////////////////////////
999 // Receiver
1000 ////////////////////////////////////////////////////////////////////////////////
1001
1002 impl<T> Receiver<T> {
1003     fn new(inner: Flavor<T>) -> Receiver<T> {
1004         Receiver { inner: UnsafeCell::new(inner) }
1005     }
1006
1007     /// Attempts to return a pending value on this receiver without blocking.
1008     ///
1009     /// This method will never block the caller in order to wait for data to
1010     /// become available. Instead, this will always return immediately with a
1011     /// possible option of pending data on the channel.
1012     ///
1013     /// This is useful for a flavor of "optimistic check" before deciding to
1014     /// block on a receiver.
1015     ///
1016     /// Compared with [`recv`], this function has two failure cases instead of one
1017     /// (one for disconnection, one for an empty buffer).
1018     ///
1019     /// [`recv`]: Self::recv
1020     ///
1021     /// # Examples
1022     ///
1023     /// ```rust
1024     /// use std::sync::mpsc::{Receiver, channel};
1025     ///
1026     /// let (_, receiver): (_, Receiver<i32>) = channel();
1027     ///
1028     /// assert!(receiver.try_recv().is_err());
1029     /// ```
1030     #[stable(feature = "rust1", since = "1.0.0")]
1031     pub fn try_recv(&self) -> Result<T, TryRecvError> {
1032         loop {
1033             let new_port = match *unsafe { self.inner() } {
1034                 Flavor::Oneshot(ref p) => match p.try_recv() {
1035                     Ok(t) => return Ok(t),
1036                     Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1037                     Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected),
1038                     Err(oneshot::Upgraded(rx)) => rx,
1039                 },
1040                 Flavor::Stream(ref p) => match p.try_recv() {
1041                     Ok(t) => return Ok(t),
1042                     Err(stream::Empty) => return Err(TryRecvError::Empty),
1043                     Err(stream::Disconnected) => return Err(TryRecvError::Disconnected),
1044                     Err(stream::Upgraded(rx)) => rx,
1045                 },
1046                 Flavor::Shared(ref p) => match p.try_recv() {
1047                     Ok(t) => return Ok(t),
1048                     Err(shared::Empty) => return Err(TryRecvError::Empty),
1049                     Err(shared::Disconnected) => return Err(TryRecvError::Disconnected),
1050                 },
1051                 Flavor::Sync(ref p) => match p.try_recv() {
1052                     Ok(t) => return Ok(t),
1053                     Err(sync::Empty) => return Err(TryRecvError::Empty),
1054                     Err(sync::Disconnected) => return Err(TryRecvError::Disconnected),
1055                 },
1056             };
1057             unsafe {
1058                 mem::swap(self.inner_mut(), new_port.inner_mut());
1059             }
1060         }
1061     }
1062
1063     /// Attempts to wait for a value on this receiver, returning an error if the
1064     /// corresponding channel has hung up.
1065     ///
1066     /// This function will always block the current thread if there is no data
1067     /// available and it's possible for more data to be sent. Once a message is
1068     /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this
1069     /// receiver will wake up and return that message.
1070     ///
1071     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1072     /// this call is blocking, this call will wake up and return [`Err`] to
1073     /// indicate that no more messages can ever be received on this channel.
1074     /// However, since channels are buffered, messages sent before the disconnect
1075     /// will still be properly received.
1076     ///
1077     /// # Examples
1078     ///
1079     /// ```
1080     /// use std::sync::mpsc;
1081     /// use std::thread;
1082     ///
1083     /// let (send, recv) = mpsc::channel();
1084     /// let handle = thread::spawn(move || {
1085     ///     send.send(1u8).unwrap();
1086     /// });
1087     ///
1088     /// handle.join().unwrap();
1089     ///
1090     /// assert_eq!(Ok(1), recv.recv());
1091     /// ```
1092     ///
1093     /// Buffering behavior:
1094     ///
1095     /// ```
1096     /// use std::sync::mpsc;
1097     /// use std::thread;
1098     /// use std::sync::mpsc::RecvError;
1099     ///
1100     /// let (send, recv) = mpsc::channel();
1101     /// let handle = thread::spawn(move || {
1102     ///     send.send(1u8).unwrap();
1103     ///     send.send(2).unwrap();
1104     ///     send.send(3).unwrap();
1105     ///     drop(send);
1106     /// });
1107     ///
1108     /// // wait for the thread to join so we ensure the sender is dropped
1109     /// handle.join().unwrap();
1110     ///
1111     /// assert_eq!(Ok(1), recv.recv());
1112     /// assert_eq!(Ok(2), recv.recv());
1113     /// assert_eq!(Ok(3), recv.recv());
1114     /// assert_eq!(Err(RecvError), recv.recv());
1115     /// ```
1116     #[stable(feature = "rust1", since = "1.0.0")]
1117     pub fn recv(&self) -> Result<T, RecvError> {
1118         loop {
1119             let new_port = match *unsafe { self.inner() } {
1120                 Flavor::Oneshot(ref p) => match p.recv(None) {
1121                     Ok(t) => return Ok(t),
1122                     Err(oneshot::Disconnected) => return Err(RecvError),
1123                     Err(oneshot::Upgraded(rx)) => rx,
1124                     Err(oneshot::Empty) => unreachable!(),
1125                 },
1126                 Flavor::Stream(ref p) => match p.recv(None) {
1127                     Ok(t) => return Ok(t),
1128                     Err(stream::Disconnected) => return Err(RecvError),
1129                     Err(stream::Upgraded(rx)) => rx,
1130                     Err(stream::Empty) => unreachable!(),
1131                 },
1132                 Flavor::Shared(ref p) => match p.recv(None) {
1133                     Ok(t) => return Ok(t),
1134                     Err(shared::Disconnected) => return Err(RecvError),
1135                     Err(shared::Empty) => unreachable!(),
1136                 },
1137                 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1138             };
1139             unsafe {
1140                 mem::swap(self.inner_mut(), new_port.inner_mut());
1141             }
1142         }
1143     }
1144
1145     /// Attempts to wait for a value on this receiver, returning an error if the
1146     /// corresponding channel has hung up, or if it waits more than `timeout`.
1147     ///
1148     /// This function will always block the current thread if there is no data
1149     /// available and it's possible for more data to be sent. Once a message is
1150     /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this
1151     /// receiver will wake up and return that message.
1152     ///
1153     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1154     /// this call is blocking, this call will wake up and return [`Err`] to
1155     /// indicate that no more messages can ever be received on this channel.
1156     /// However, since channels are buffered, messages sent before the disconnect
1157     /// will still be properly received.
1158     ///
1159     /// # Known Issues
1160     ///
1161     /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
1162     /// to panic unexpectedly with the following example:
1163     ///
1164     /// ```no_run
1165     /// use std::sync::mpsc::channel;
1166     /// use std::thread;
1167     /// use std::time::Duration;
1168     ///
1169     /// let (tx, rx) = channel::<String>();
1170     ///
1171     /// thread::spawn(move || {
1172     ///     let d = Duration::from_millis(10);
1173     ///     loop {
1174     ///         println!("recv");
1175     ///         let _r = rx.recv_timeout(d);
1176     ///     }
1177     /// });
1178     ///
1179     /// thread::sleep(Duration::from_millis(100));
1180     /// let _c1 = tx.clone();
1181     ///
1182     /// thread::sleep(Duration::from_secs(1));
1183     /// ```
1184     ///
1185     /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
1186     ///
1187     /// # Examples
1188     ///
1189     /// Successfully receiving value before encountering timeout:
1190     ///
1191     /// ```no_run
1192     /// use std::thread;
1193     /// use std::time::Duration;
1194     /// use std::sync::mpsc;
1195     ///
1196     /// let (send, recv) = mpsc::channel();
1197     ///
1198     /// thread::spawn(move || {
1199     ///     send.send('a').unwrap();
1200     /// });
1201     ///
1202     /// assert_eq!(
1203     ///     recv.recv_timeout(Duration::from_millis(400)),
1204     ///     Ok('a')
1205     /// );
1206     /// ```
1207     ///
1208     /// Receiving an error upon reaching timeout:
1209     ///
1210     /// ```no_run
1211     /// use std::thread;
1212     /// use std::time::Duration;
1213     /// use std::sync::mpsc;
1214     ///
1215     /// let (send, recv) = mpsc::channel();
1216     ///
1217     /// thread::spawn(move || {
1218     ///     thread::sleep(Duration::from_millis(800));
1219     ///     send.send('a').unwrap();
1220     /// });
1221     ///
1222     /// assert_eq!(
1223     ///     recv.recv_timeout(Duration::from_millis(400)),
1224     ///     Err(mpsc::RecvTimeoutError::Timeout)
1225     /// );
1226     /// ```
1227     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1228     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1229         // Do an optimistic try_recv to avoid the performance impact of
1230         // Instant::now() in the full-channel case.
1231         match self.try_recv() {
1232             Ok(result) => Ok(result),
1233             Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
1234             Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
1235                 Some(deadline) => self.recv_deadline(deadline),
1236                 // So far in the future that it's practically the same as waiting indefinitely.
1237                 None => self.recv().map_err(RecvTimeoutError::from),
1238             },
1239         }
1240     }
1241
1242     /// Attempts to wait for a value on this receiver, returning an error if the
1243     /// corresponding channel has hung up, or if `deadline` is reached.
1244     ///
1245     /// This function will always block the current thread if there is no data
1246     /// available and it's possible for more data to be sent. Once a message is
1247     /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this
1248     /// receiver will wake up and return that message.
1249     ///
1250     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1251     /// this call is blocking, this call will wake up and return [`Err`] to
1252     /// indicate that no more messages can ever be received on this channel.
1253     /// However, since channels are buffered, messages sent before the disconnect
1254     /// will still be properly received.
1255     ///
1256     /// # Examples
1257     ///
1258     /// Successfully receiving value before reaching deadline:
1259     ///
1260     /// ```no_run
1261     /// #![feature(deadline_api)]
1262     /// use std::thread;
1263     /// use std::time::{Duration, Instant};
1264     /// use std::sync::mpsc;
1265     ///
1266     /// let (send, recv) = mpsc::channel();
1267     ///
1268     /// thread::spawn(move || {
1269     ///     send.send('a').unwrap();
1270     /// });
1271     ///
1272     /// assert_eq!(
1273     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1274     ///     Ok('a')
1275     /// );
1276     /// ```
1277     ///
1278     /// Receiving an error upon reaching deadline:
1279     ///
1280     /// ```no_run
1281     /// #![feature(deadline_api)]
1282     /// use std::thread;
1283     /// use std::time::{Duration, Instant};
1284     /// use std::sync::mpsc;
1285     ///
1286     /// let (send, recv) = mpsc::channel();
1287     ///
1288     /// thread::spawn(move || {
1289     ///     thread::sleep(Duration::from_millis(800));
1290     ///     send.send('a').unwrap();
1291     /// });
1292     ///
1293     /// assert_eq!(
1294     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1295     ///     Err(mpsc::RecvTimeoutError::Timeout)
1296     /// );
1297     /// ```
1298     #[unstable(feature = "deadline_api", issue = "46316")]
1299     pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1300         use self::RecvTimeoutError::*;
1301
1302         loop {
1303             let port_or_empty = match *unsafe { self.inner() } {
1304                 Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) {
1305                     Ok(t) => return Ok(t),
1306                     Err(oneshot::Disconnected) => return Err(Disconnected),
1307                     Err(oneshot::Upgraded(rx)) => Some(rx),
1308                     Err(oneshot::Empty) => None,
1309                 },
1310                 Flavor::Stream(ref p) => match p.recv(Some(deadline)) {
1311                     Ok(t) => return Ok(t),
1312                     Err(stream::Disconnected) => return Err(Disconnected),
1313                     Err(stream::Upgraded(rx)) => Some(rx),
1314                     Err(stream::Empty) => None,
1315                 },
1316                 Flavor::Shared(ref p) => match p.recv(Some(deadline)) {
1317                     Ok(t) => return Ok(t),
1318                     Err(shared::Disconnected) => return Err(Disconnected),
1319                     Err(shared::Empty) => None,
1320                 },
1321                 Flavor::Sync(ref p) => match p.recv(Some(deadline)) {
1322                     Ok(t) => return Ok(t),
1323                     Err(sync::Disconnected) => return Err(Disconnected),
1324                     Err(sync::Empty) => None,
1325                 },
1326             };
1327
1328             if let Some(new_port) = port_or_empty {
1329                 unsafe {
1330                     mem::swap(self.inner_mut(), new_port.inner_mut());
1331                 }
1332             }
1333
1334             // If we're already passed the deadline, and we're here without
1335             // data, return a timeout, else try again.
1336             if Instant::now() >= deadline {
1337                 return Err(Timeout);
1338             }
1339         }
1340     }
1341
1342     /// Returns an iterator that will block waiting for messages, but never
1343     /// [`panic!`]. It will return [`None`] when the channel has hung up.
1344     ///
1345     /// # Examples
1346     ///
1347     /// ```rust
1348     /// use std::sync::mpsc::channel;
1349     /// use std::thread;
1350     ///
1351     /// let (send, recv) = channel();
1352     ///
1353     /// thread::spawn(move || {
1354     ///     send.send(1).unwrap();
1355     ///     send.send(2).unwrap();
1356     ///     send.send(3).unwrap();
1357     /// });
1358     ///
1359     /// let mut iter = recv.iter();
1360     /// assert_eq!(iter.next(), Some(1));
1361     /// assert_eq!(iter.next(), Some(2));
1362     /// assert_eq!(iter.next(), Some(3));
1363     /// assert_eq!(iter.next(), None);
1364     /// ```
1365     #[stable(feature = "rust1", since = "1.0.0")]
1366     pub fn iter(&self) -> Iter<'_, T> {
1367         Iter { rx: self }
1368     }
1369
1370     /// Returns an iterator that will attempt to yield all pending values.
1371     /// It will return `None` if there are no more pending values or if the
1372     /// channel has hung up. The iterator will never [`panic!`] or block the
1373     /// user by waiting for values.
1374     ///
1375     /// # Examples
1376     ///
1377     /// ```no_run
1378     /// use std::sync::mpsc::channel;
1379     /// use std::thread;
1380     /// use std::time::Duration;
1381     ///
1382     /// let (sender, receiver) = channel();
1383     ///
1384     /// // nothing is in the buffer yet
1385     /// assert!(receiver.try_iter().next().is_none());
1386     ///
1387     /// thread::spawn(move || {
1388     ///     thread::sleep(Duration::from_secs(1));
1389     ///     sender.send(1).unwrap();
1390     ///     sender.send(2).unwrap();
1391     ///     sender.send(3).unwrap();
1392     /// });
1393     ///
1394     /// // nothing is in the buffer yet
1395     /// assert!(receiver.try_iter().next().is_none());
1396     ///
1397     /// // block for two seconds
1398     /// thread::sleep(Duration::from_secs(2));
1399     ///
1400     /// let mut iter = receiver.try_iter();
1401     /// assert_eq!(iter.next(), Some(1));
1402     /// assert_eq!(iter.next(), Some(2));
1403     /// assert_eq!(iter.next(), Some(3));
1404     /// assert_eq!(iter.next(), None);
1405     /// ```
1406     #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1407     pub fn try_iter(&self) -> TryIter<'_, T> {
1408         TryIter { rx: self }
1409     }
1410 }
1411
1412 #[stable(feature = "rust1", since = "1.0.0")]
1413 impl<'a, T> Iterator for Iter<'a, T> {
1414     type Item = T;
1415
1416     fn next(&mut self) -> Option<T> {
1417         self.rx.recv().ok()
1418     }
1419 }
1420
1421 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1422 impl<'a, T> Iterator for TryIter<'a, T> {
1423     type Item = T;
1424
1425     fn next(&mut self) -> Option<T> {
1426         self.rx.try_recv().ok()
1427     }
1428 }
1429
1430 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1431 impl<'a, T> IntoIterator for &'a Receiver<T> {
1432     type Item = T;
1433     type IntoIter = Iter<'a, T>;
1434
1435     fn into_iter(self) -> Iter<'a, T> {
1436         self.iter()
1437     }
1438 }
1439
1440 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1441 impl<T> Iterator for IntoIter<T> {
1442     type Item = T;
1443     fn next(&mut self) -> Option<T> {
1444         self.rx.recv().ok()
1445     }
1446 }
1447
1448 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1449 impl<T> IntoIterator for Receiver<T> {
1450     type Item = T;
1451     type IntoIter = IntoIter<T>;
1452
1453     fn into_iter(self) -> IntoIter<T> {
1454         IntoIter { rx: self }
1455     }
1456 }
1457
1458 #[stable(feature = "rust1", since = "1.0.0")]
1459 impl<T> Drop for Receiver<T> {
1460     fn drop(&mut self) {
1461         match *unsafe { self.inner() } {
1462             Flavor::Oneshot(ref p) => p.drop_port(),
1463             Flavor::Stream(ref p) => p.drop_port(),
1464             Flavor::Shared(ref p) => p.drop_port(),
1465             Flavor::Sync(ref p) => p.drop_port(),
1466         }
1467     }
1468 }
1469
1470 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1471 impl<T> fmt::Debug for Receiver<T> {
1472     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1473         f.debug_struct("Receiver").finish_non_exhaustive()
1474     }
1475 }
1476
1477 #[stable(feature = "rust1", since = "1.0.0")]
1478 impl<T> fmt::Debug for SendError<T> {
1479     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1480         "SendError(..)".fmt(f)
1481     }
1482 }
1483
1484 #[stable(feature = "rust1", since = "1.0.0")]
1485 impl<T> fmt::Display for SendError<T> {
1486     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1487         "sending on a closed channel".fmt(f)
1488     }
1489 }
1490
1491 #[stable(feature = "rust1", since = "1.0.0")]
1492 impl<T: Send> error::Error for SendError<T> {
1493     #[allow(deprecated)]
1494     fn description(&self) -> &str {
1495         "sending on a closed channel"
1496     }
1497 }
1498
1499 #[stable(feature = "rust1", since = "1.0.0")]
1500 impl<T> fmt::Debug for TrySendError<T> {
1501     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1502         match *self {
1503             TrySendError::Full(..) => "Full(..)".fmt(f),
1504             TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1505         }
1506     }
1507 }
1508
1509 #[stable(feature = "rust1", since = "1.0.0")]
1510 impl<T> fmt::Display for TrySendError<T> {
1511     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1512         match *self {
1513             TrySendError::Full(..) => "sending on a full channel".fmt(f),
1514             TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
1515         }
1516     }
1517 }
1518
1519 #[stable(feature = "rust1", since = "1.0.0")]
1520 impl<T: Send> error::Error for TrySendError<T> {
1521     #[allow(deprecated)]
1522     fn description(&self) -> &str {
1523         match *self {
1524             TrySendError::Full(..) => "sending on a full channel",
1525             TrySendError::Disconnected(..) => "sending on a closed channel",
1526         }
1527     }
1528 }
1529
1530 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1531 impl<T> From<SendError<T>> for TrySendError<T> {
1532     /// Converts a `SendError<T>` into a `TrySendError<T>`.
1533     ///
1534     /// This conversion always returns a `TrySendError::Disconnected` containing the data in the `SendError<T>`.
1535     ///
1536     /// No data is allocated on the heap.
1537     fn from(err: SendError<T>) -> TrySendError<T> {
1538         match err {
1539             SendError(t) => TrySendError::Disconnected(t),
1540         }
1541     }
1542 }
1543
1544 #[stable(feature = "rust1", since = "1.0.0")]
1545 impl fmt::Display for RecvError {
1546     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1547         "receiving on a closed channel".fmt(f)
1548     }
1549 }
1550
1551 #[stable(feature = "rust1", since = "1.0.0")]
1552 impl error::Error for RecvError {
1553     #[allow(deprecated)]
1554     fn description(&self) -> &str {
1555         "receiving on a closed channel"
1556     }
1557 }
1558
1559 #[stable(feature = "rust1", since = "1.0.0")]
1560 impl fmt::Display for TryRecvError {
1561     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1562         match *self {
1563             TryRecvError::Empty => "receiving on an empty channel".fmt(f),
1564             TryRecvError::Disconnected => "receiving on a closed channel".fmt(f),
1565         }
1566     }
1567 }
1568
1569 #[stable(feature = "rust1", since = "1.0.0")]
1570 impl error::Error for TryRecvError {
1571     #[allow(deprecated)]
1572     fn description(&self) -> &str {
1573         match *self {
1574             TryRecvError::Empty => "receiving on an empty channel",
1575             TryRecvError::Disconnected => "receiving on a closed channel",
1576         }
1577     }
1578 }
1579
1580 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1581 impl From<RecvError> for TryRecvError {
1582     /// Converts a `RecvError` into a `TryRecvError`.
1583     ///
1584     /// This conversion always returns `TryRecvError::Disconnected`.
1585     ///
1586     /// No data is allocated on the heap.
1587     fn from(err: RecvError) -> TryRecvError {
1588         match err {
1589             RecvError => TryRecvError::Disconnected,
1590         }
1591     }
1592 }
1593
1594 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1595 impl fmt::Display for RecvTimeoutError {
1596     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1597         match *self {
1598             RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f),
1599             RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f),
1600         }
1601     }
1602 }
1603
1604 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1605 impl error::Error for RecvTimeoutError {
1606     #[allow(deprecated)]
1607     fn description(&self) -> &str {
1608         match *self {
1609             RecvTimeoutError::Timeout => "timed out waiting on channel",
1610             RecvTimeoutError::Disconnected => "channel is empty and sending half is closed",
1611         }
1612     }
1613 }
1614
1615 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1616 impl From<RecvError> for RecvTimeoutError {
1617     /// Converts a `RecvError` into a `RecvTimeoutError`.
1618     ///
1619     /// This conversion always returns `RecvTimeoutError::Disconnected`.
1620     ///
1621     /// No data is allocated on the heap.
1622     fn from(err: RecvError) -> RecvTimeoutError {
1623         match err {
1624             RecvError => RecvTimeoutError::Disconnected,
1625         }
1626     }
1627 }