]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/mod.rs
Auto merge of #42394 - ollie27:rustdoc_deref_box, r=QuietMisdreavus
[rust.git] / src / libstd / sync / mpsc / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Multi-producer, single-consumer FIFO queue communication primitives.
12 //!
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
15 //!
16 //! * [`Sender`]
17 //! * [`SyncSender`]
18 //! * [`Receiver`]
19 //!
20 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
23 //!
24 //! These channels come in two flavors:
25 //!
26 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
27 //!    will return a `(Sender, Receiver)` tuple where all sends will be
28 //!    **asynchronous** (they never block). The channel conceptually has an
29 //!    infinite buffer.
30 //!
31 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
32 //!    return a `(SyncSender, Receiver)` tuple where the storage for pending
33 //!    messages is a pre-allocated buffer of a fixed size. All sends will be
34 //!    **synchronous** by blocking until there is buffer space available. Note
35 //!    that a bound of 0 is allowed, causing the channel to become a "rendezvous"
36 //!    channel where each sender atomically hands off a message to a receiver.
37 //!
38 //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
39 //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
40 //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
41 //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
42 //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
43 //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
44 //!
45 //! ## Disconnection
46 //!
47 //! The send and receive operations on channels will all return a [`Result`]
48 //! indicating whether the operation succeeded or not. An unsuccessful operation
49 //! is normally indicative of the other half of a channel having "hung up" by
50 //! being dropped in its corresponding thread.
51 //!
52 //! Once half of a channel has been deallocated, most operations can no longer
53 //! continue to make progress, so [`Err`] will be returned. Many applications
54 //! will continue to [`unwrap`] the results returned from this module,
55 //! instigating a propagation of failure among threads if one unexpectedly dies.
56 //!
57 //! [`Result`]: ../../../std/result/enum.Result.html
58 //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
59 //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
60 //!
61 //! # Examples
62 //!
63 //! Simple usage:
64 //!
65 //! ```
66 //! use std::thread;
67 //! use std::sync::mpsc::channel;
68 //!
69 //! // Create a simple streaming channel
70 //! let (tx, rx) = channel();
71 //! thread::spawn(move|| {
72 //!     tx.send(10).unwrap();
73 //! });
74 //! assert_eq!(rx.recv().unwrap(), 10);
75 //! ```
76 //!
77 //! Shared usage:
78 //!
79 //! ```
80 //! use std::thread;
81 //! use std::sync::mpsc::channel;
82 //!
83 //! // Create a shared channel that can be sent along from many threads
84 //! // where tx is the sending half (tx for transmission), and rx is the receiving
85 //! // half (rx for receiving).
86 //! let (tx, rx) = channel();
87 //! for i in 0..10 {
88 //!     let tx = tx.clone();
89 //!     thread::spawn(move|| {
90 //!         tx.send(i).unwrap();
91 //!     });
92 //! }
93 //!
94 //! for _ in 0..10 {
95 //!     let j = rx.recv().unwrap();
96 //!     assert!(0 <= j && j < 10);
97 //! }
98 //! ```
99 //!
100 //! Propagating panics:
101 //!
102 //! ```
103 //! use std::sync::mpsc::channel;
104 //!
105 //! // The call to recv() will return an error because the channel has already
106 //! // hung up (or been deallocated)
107 //! let (tx, rx) = channel::<i32>();
108 //! drop(tx);
109 //! assert!(rx.recv().is_err());
110 //! ```
111 //!
112 //! Synchronous channels:
113 //!
114 //! ```
115 //! use std::thread;
116 //! use std::sync::mpsc::sync_channel;
117 //!
118 //! let (tx, rx) = sync_channel::<i32>(0);
119 //! thread::spawn(move|| {
120 //!     // This will wait for the parent thread to start receiving
121 //!     tx.send(53).unwrap();
122 //! });
123 //! rx.recv().unwrap();
124 //! ```
125
126 #![stable(feature = "rust1", since = "1.0.0")]
127
128 // A description of how Rust's channel implementation works
129 //
130 // Channels are supposed to be the basic building block for all other
131 // concurrent primitives that are used in Rust. As a result, the channel type
132 // needs to be highly optimized, flexible, and broad enough for use everywhere.
133 //
134 // The choice of implementation of all channels is to be built on lock-free data
135 // structures. The channels themselves are then consequently also lock-free data
136 // structures. As always with lock-free code, this is a very "here be dragons"
137 // territory, especially because I'm unaware of any academic papers that have
138 // gone into great length about channels of these flavors.
139 //
140 // ## Flavors of channels
141 //
142 // From the perspective of a consumer of this library, there is only one flavor
143 // of channel. This channel can be used as a stream and cloned to allow multiple
144 // senders. Under the hood, however, there are actually three flavors of
145 // channels in play.
146 //
147 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
148 //                      case. They contain as few atomics as possible and
149 //                      involve one and exactly one allocation.
150 // * Streams - these channels are optimized for the non-shared use case. They
151 //             use a different concurrent queue that is more tailored for this
152 //             use case. The initial allocation of this flavor of channel is not
153 //             optimized.
154 // * Shared - this is the most general form of channel that this module offers,
155 //            a channel with multiple senders. This type is as optimized as it
156 //            can be, but the previous two types mentioned are much faster for
157 //            their use-cases.
158 //
159 // ## Concurrent queues
160 //
161 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
162 // but recv() obviously blocks. This means that under the hood there must be
163 // some shared and concurrent queue holding all of the actual data.
164 //
165 // With two flavors of channels, two flavors of queues are also used. We have
166 // chosen to use queues from a well-known author that are abbreviated as SPSC
167 // and MPSC (single producer, single consumer and multiple producer, single
168 // consumer). SPSC queues are used for streams while MPSC queues are used for
169 // shared channels.
170 //
171 // ### SPSC optimizations
172 //
173 // The SPSC queue found online is essentially a linked list of nodes where one
174 // half of the nodes are the "queue of data" and the other half of nodes are a
175 // cache of unused nodes. The unused nodes are used such that an allocation is
176 // not required on every push() and a free doesn't need to happen on every
177 // pop().
178 //
179 // As found online, however, the cache of nodes is of an infinite size. This
180 // means that if a channel at one point in its life had 50k items in the queue,
181 // then the queue will always have the capacity for 50k items. I believed that
182 // this was an unnecessary limitation of the implementation, so I have altered
183 // the queue to optionally have a bound on the cache size.
184 //
185 // By default, streams will have an unbounded SPSC queue with a small-ish cache
186 // size. The hope is that the cache is still large enough to have very fast
187 // send() operations while not too large such that millions of channels can
188 // coexist at once.
189 //
190 // ### MPSC optimizations
191 //
192 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
193 // a linked list under the hood to earn its unboundedness, but I have not put
194 // forth much effort into having a cache of nodes similar to the SPSC queue.
195 //
196 // For now, I believe that this is "ok" because shared channels are not the most
197 // common type, but soon we may wish to revisit this queue choice and determine
198 // another candidate for backend storage of shared channels.
199 //
200 // ## Overview of the Implementation
201 //
202 // Now that there's a little background on the concurrent queues used, it's
203 // worth going into much more detail about the channels themselves. The basic
204 // pseudocode for a send/recv are:
205 //
206 //
207 //      send(t)                             recv()
208 //        queue.push(t)                       return if queue.pop()
209 //        if increment() == -1                deschedule {
210 //          wakeup()                            if decrement() > 0
211 //                                                cancel_deschedule()
212 //                                            }
213 //                                            queue.pop()
214 //
215 // As mentioned before, there are no locks in this implementation, only atomic
216 // instructions are used.
217 //
218 // ### The internal atomic counter
219 //
220 // Every channel has a shared counter with each half to keep track of the size
221 // of the queue. This counter is used to abort descheduling by the receiver and
222 // to know when to wake up on the sending side.
223 //
224 // As seen in the pseudocode, senders will increment this count and receivers
225 // will decrement the count. The theory behind this is that if a sender sees a
226 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
227 // then it doesn't need to block.
228 //
229 // The recv() method has a beginning call to pop(), and if successful, it needs
230 // to decrement the count. It is a crucial implementation detail that this
231 // decrement does *not* happen to the shared counter. If this were the case,
232 // then it would be possible for the counter to be very negative when there were
233 // no receivers waiting, in which case the senders would have to determine when
234 // it was actually appropriate to wake up a receiver.
235 //
236 // Instead, the "steal count" is kept track of separately (not atomically
237 // because it's only used by receivers), and then the decrement() call when
238 // descheduling will lump in all of the recent steals into one large decrement.
239 //
240 // The implication of this is that if a sender sees a -1 count, then there's
241 // guaranteed to be a waiter waiting!
242 //
243 // ## Native Implementation
244 //
245 // A major goal of these channels is to work seamlessly on and off the runtime.
246 // All of the previous race conditions have been worded in terms of
247 // scheduler-isms (which is obviously not available without the runtime).
248 //
249 // For now, native usage of channels (off the runtime) will fall back onto
250 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
251 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
252 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
253 // condition variable.
254 //
255 // ## Select
256 //
257 // Being able to support selection over channels has greatly influenced this
258 // design, and not only does selection need to work inside the runtime, but also
259 // outside the runtime.
260 //
261 // The implementation is fairly straightforward. The goal of select() is not to
262 // return some data, but only to return which channel can receive data without
263 // blocking. The implementation is essentially the entire blocking procedure
264 // followed by an increment as soon as its woken up. The cancellation procedure
265 // involves an increment and swapping out of to_wake to acquire ownership of the
266 // thread to unblock.
267 //
268 // Sadly this current implementation requires multiple allocations, so I have
269 // seen the throughput of select() be much worse than it should be. I do not
270 // believe that there is anything fundamental that needs to change about these
271 // channels, however, in order to support a more efficient select().
272 //
273 // # Conclusion
274 //
275 // And now that you've seen all the races that I found and attempted to fix,
276 // here's the code for you to find some more!
277
278 use sync::Arc;
279 use error;
280 use fmt;
281 use mem;
282 use cell::UnsafeCell;
283 use time::{Duration, Instant};
284
285 #[unstable(feature = "mpsc_select", issue = "27800")]
286 pub use self::select::{Select, Handle};
287 use self::select::StartResult;
288 use self::select::StartResult::*;
289 use self::blocking::SignalToken;
290
291 mod blocking;
292 mod oneshot;
293 mod select;
294 mod shared;
295 mod stream;
296 mod sync;
297 mod mpsc_queue;
298 mod spsc_queue;
299
300 /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
301 /// This half can only be owned by one thread.
302 ///
303 /// Messages sent to the channel can be retrieved using [`recv`].
304 ///
305 /// [`channel`]: fn.channel.html
306 /// [`sync_channel`]: fn.sync_channel.html
307 /// [`recv`]: struct.Receiver.html#method.recv
308 ///
309 /// # Examples
310 ///
311 /// ```rust
312 /// use std::sync::mpsc::channel;
313 /// use std::thread;
314 /// use std::time::Duration;
315 ///
316 /// let (send, recv) = channel();
317 ///
318 /// thread::spawn(move || {
319 ///     send.send("Hello world!").unwrap();
320 ///     thread::sleep(Duration::from_secs(2)); // block for two seconds
321 ///     send.send("Delayed for 2 seconds").unwrap();
322 /// });
323 ///
324 /// println!("{}", recv.recv().unwrap()); // Received immediately
325 /// println!("Waiting...");
326 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
327 /// ```
328 #[stable(feature = "rust1", since = "1.0.0")]
329 pub struct Receiver<T> {
330     inner: UnsafeCell<Flavor<T>>,
331 }
332
333 // The receiver port can be sent from place to place, so long as it
334 // is not used to receive non-sendable things.
335 #[stable(feature = "rust1", since = "1.0.0")]
336 unsafe impl<T: Send> Send for Receiver<T> { }
337
338 #[stable(feature = "rust1", since = "1.0.0")]
339 impl<T> !Sync for Receiver<T> { }
340
341 /// An iterator over messages on a [`Receiver`], created by [`iter`].
342 ///
343 /// This iterator will block whenever [`next`] is called,
344 /// waiting for a new message, and [`None`] will be returned
345 /// when the corresponding channel has hung up.
346 ///
347 /// [`iter`]: struct.Receiver.html#method.iter
348 /// [`Receiver`]: struct.Receiver.html
349 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
350 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
351 ///
352 /// # Examples
353 ///
354 /// ```rust
355 /// use std::sync::mpsc::channel;
356 /// use std::thread;
357 ///
358 /// let (send, recv) = channel();
359 ///
360 /// thread::spawn(move || {
361 ///     send.send(1u8).unwrap();
362 ///     send.send(2u8).unwrap();
363 ///     send.send(3u8).unwrap();
364 /// });
365 ///
366 /// for x in recv.iter() {
367 ///     println!("Got: {}", x);
368 /// }
369 /// ```
370 #[stable(feature = "rust1", since = "1.0.0")]
371 #[derive(Debug)]
372 pub struct Iter<'a, T: 'a> {
373     rx: &'a Receiver<T>
374 }
375
376 /// An iterator that attempts to yield all pending values for a [`Receiver`],
377 /// created by [`try_iter`].
378 ///
379 /// [`None`] will be returned when there are no pending values remaining or
380 /// if the corresponding channel has hung up.
381 ///
382 /// This iterator will never block the caller in order to wait for data to
383 /// become available. Instead, it will return [`None`].
384 ///
385 /// [`Receiver`]: struct.Receiver.html
386 /// [`try_iter`]: struct.Receiver.html#method.try_iter
387 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
388 ///
389 /// # Examples
390 ///
391 /// ```rust
392 /// use std::sync::mpsc::channel;
393 /// use std::thread;
394 /// use std::time::Duration;
395 ///
396 /// let (sender, receiver) = channel();
397 ///
398 /// // Nothing is in the buffer yet
399 /// assert!(receiver.try_iter().next().is_none());
400 /// println!("Nothing in the buffer...");
401 ///
402 /// thread::spawn(move || {
403 ///     sender.send(1).unwrap();
404 ///     sender.send(2).unwrap();
405 ///     sender.send(3).unwrap();
406 /// });
407 ///
408 /// println!("Going to sleep...");
409 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
410 ///
411 /// for x in receiver.try_iter() {
412 ///     println!("Got: {}", x);
413 /// }
414 /// ```
415 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
416 #[derive(Debug)]
417 pub struct TryIter<'a, T: 'a> {
418     rx: &'a Receiver<T>
419 }
420
421 /// An owning iterator over messages on a [`Receiver`],
422 /// created by **Receiver::into_iter**.
423 ///
424 /// This iterator will block whenever [`next`]
425 /// is called, waiting for a new message, and [`None`] will be
426 /// returned if the corresponding channel has hung up.
427 ///
428 /// [`Receiver`]: struct.Receiver.html
429 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
430 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
431 ///
432 /// # Examples
433 ///
434 /// ```rust
435 /// use std::sync::mpsc::channel;
436 /// use std::thread;
437 ///
438 /// let (send, recv) = channel();
439 ///
440 /// thread::spawn(move || {
441 ///     send.send(1u8).unwrap();
442 ///     send.send(2u8).unwrap();
443 ///     send.send(3u8).unwrap();
444 /// });
445 ///
446 /// for x in recv.into_iter() {
447 ///     println!("Got: {}", x);
448 /// }
449 /// ```
450 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
451 #[derive(Debug)]
452 pub struct IntoIter<T> {
453     rx: Receiver<T>
454 }
455
456 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
457 /// owned by one thread, but it can be cloned to send to other threads.
458 ///
459 /// Messages can be sent through this channel with [`send`].
460 ///
461 /// [`channel`]: fn.channel.html
462 /// [`send`]: struct.Sender.html#method.send
463 ///
464 /// # Examples
465 ///
466 /// ```rust
467 /// use std::sync::mpsc::channel;
468 /// use std::thread;
469 ///
470 /// let (sender, receiver) = channel();
471 /// let sender2 = sender.clone();
472 ///
473 /// // First thread owns sender
474 /// thread::spawn(move || {
475 ///     sender.send(1).unwrap();
476 /// });
477 ///
478 /// // Second thread owns sender2
479 /// thread::spawn(move || {
480 ///     sender2.send(2).unwrap();
481 /// });
482 ///
483 /// let msg = receiver.recv().unwrap();
484 /// let msg2 = receiver.recv().unwrap();
485 ///
486 /// assert_eq!(3, msg + msg2);
487 /// ```
488 #[stable(feature = "rust1", since = "1.0.0")]
489 pub struct Sender<T> {
490     inner: UnsafeCell<Flavor<T>>,
491 }
492
493 // The send port can be sent from place to place, so long as it
494 // is not used to send non-sendable things.
495 #[stable(feature = "rust1", since = "1.0.0")]
496 unsafe impl<T: Send> Send for Sender<T> { }
497
498 #[stable(feature = "rust1", since = "1.0.0")]
499 impl<T> !Sync for Sender<T> { }
500
501 /// The sending-half of Rust's synchronous [`sync_channel`] type.
502 /// This half can only be owned by one thread, but it can be cloned
503 /// to send to other threads.
504 ///
505 /// Messages can be sent through this channel with [`send`] or [`try_send`].
506 ///
507 /// [`send`] will block if there is no space in the internal buffer.
508 ///
509 /// [`sync_channel`]: fn.sync_channel.html
510 /// [`send`]: struct.SyncSender.html#method.send
511 /// [`try_send`]: struct.SyncSender.html#method.try_send
512 ///
513 /// # Examples
514 ///
515 /// ```rust
516 /// use std::sync::mpsc::sync_channel;
517 /// use std::thread;
518 ///
519 /// // Create a sync_channel with buffer size 2
520 /// let (sync_sender, receiver) = sync_channel(2);
521 /// let sync_sender2 = sync_sender.clone();
522 ///
523 /// // First thread owns sync_sender
524 /// thread::spawn(move || {
525 ///     sync_sender.send(1).unwrap();
526 ///     sync_sender.send(2).unwrap();
527 /// });
528 ///
529 /// // Second thread owns sync_sender2
530 /// thread::spawn(move || {
531 ///     sync_sender2.send(3).unwrap();
532 ///     // thread will now block since the buffer is full
533 ///     println!("Thread unblocked!");
534 /// });
535 ///
536 /// let mut msg;
537 ///
538 /// msg = receiver.recv().unwrap();
539 /// println!("message {} received", msg);
540 ///
541 /// // "Thread unblocked!" will be printed now
542 ///
543 /// msg = receiver.recv().unwrap();
544 /// println!("message {} received", msg);
545 ///
546 /// msg = receiver.recv().unwrap();
547 ///
548 /// println!("message {} received", msg);
549 /// ```
550 #[stable(feature = "rust1", since = "1.0.0")]
551 pub struct SyncSender<T> {
552     inner: Arc<sync::Packet<T>>,
553 }
554
555 #[stable(feature = "rust1", since = "1.0.0")]
556 unsafe impl<T: Send> Send for SyncSender<T> {}
557
558 #[stable(feature = "rust1", since = "1.0.0")]
559 impl<T> !Sync for SyncSender<T> {}
560
561 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
562 /// function on **channel**s.
563 ///
564 /// A **send** operation can only fail if the receiving end of a channel is
565 /// disconnected, implying that the data could never be received. The error
566 /// contains the data being sent as a payload so it can be recovered.
567 ///
568 /// [`Sender::send`]: struct.Sender.html#method.send
569 /// [`SyncSender::send`]: struct.SyncSender.html#method.send
570 #[stable(feature = "rust1", since = "1.0.0")]
571 #[derive(PartialEq, Eq, Clone, Copy)]
572 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
573
574 /// An error returned from the [`recv`] function on a [`Receiver`].
575 ///
576 /// The [`recv`] operation can only fail if the sending half of a
577 /// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
578 /// messages will ever be received.
579 ///
580 /// [`recv`]: struct.Receiver.html#method.recv
581 /// [`Receiver`]: struct.Receiver.html
582 /// [`channel`]: fn.channel.html
583 /// [`sync_channel`]: fn.sync_channel.html
584 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
585 #[stable(feature = "rust1", since = "1.0.0")]
586 pub struct RecvError;
587
588 /// This enumeration is the list of the possible reasons that [`try_recv`] could
589 /// not return data when called. This can occur with both a [`channel`] and
590 /// a [`sync_channel`].
591 ///
592 /// [`try_recv`]: struct.Receiver.html#method.try_recv
593 /// [`channel`]: fn.channel.html
594 /// [`sync_channel`]: fn.sync_channel.html
595 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
596 #[stable(feature = "rust1", since = "1.0.0")]
597 pub enum TryRecvError {
598     /// This **channel** is currently empty, but the **Sender**(s) have not yet
599     /// disconnected, so data may yet become available.
600     #[stable(feature = "rust1", since = "1.0.0")]
601     Empty,
602
603     /// The **channel**'s sending half has become disconnected, and there will
604     /// never be any more data received on it.
605     #[stable(feature = "rust1", since = "1.0.0")]
606     Disconnected,
607 }
608
609 /// This enumeration is the list of possible errors that made [`recv_timeout`]
610 /// unable to return data when called. This can occur with both a [`channel`] and
611 /// a [`sync_channel`].
612 ///
613 /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
614 /// [`channel`]: fn.channel.html
615 /// [`sync_channel`]: fn.sync_channel.html
616 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
617 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
618 pub enum RecvTimeoutError {
619     /// This **channel** is currently empty, but the **Sender**(s) have not yet
620     /// disconnected, so data may yet become available.
621     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
622     Timeout,
623     /// The **channel**'s sending half has become disconnected, and there will
624     /// never be any more data received on it.
625     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
626     Disconnected,
627 }
628
629 /// This enumeration is the list of the possible error outcomes for the
630 /// [`try_send`] method.
631 ///
632 /// [`try_send`]: struct.SyncSender.html#method.try_send
633 #[stable(feature = "rust1", since = "1.0.0")]
634 #[derive(PartialEq, Eq, Clone, Copy)]
635 pub enum TrySendError<T> {
636     /// The data could not be sent on the [`sync_channel`] because it would require that
637     /// the callee block to send the data.
638     ///
639     /// If this is a buffered channel, then the buffer is full at this time. If
640     /// this is not a buffered channel, then there is no [`Receiver`] available to
641     /// acquire the data.
642     ///
643     /// [`sync_channel`]: fn.sync_channel.html
644     /// [`Receiver`]: struct.Receiver.html
645     #[stable(feature = "rust1", since = "1.0.0")]
646     Full(#[stable(feature = "rust1", since = "1.0.0")] T),
647
648     /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
649     /// sent. The data is returned back to the callee in this case.
650     ///
651     /// [`sync_channel`]: fn.sync_channel.html
652     #[stable(feature = "rust1", since = "1.0.0")]
653     Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
654 }
655
656 enum Flavor<T> {
657     Oneshot(Arc<oneshot::Packet<T>>),
658     Stream(Arc<stream::Packet<T>>),
659     Shared(Arc<shared::Packet<T>>),
660     Sync(Arc<sync::Packet<T>>),
661 }
662
663 #[doc(hidden)]
664 trait UnsafeFlavor<T> {
665     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
666     unsafe fn inner_mut(&self) -> &mut Flavor<T> {
667         &mut *self.inner_unsafe().get()
668     }
669     unsafe fn inner(&self) -> &Flavor<T> {
670         &*self.inner_unsafe().get()
671     }
672 }
673 impl<T> UnsafeFlavor<T> for Sender<T> {
674     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
675         &self.inner
676     }
677 }
678 impl<T> UnsafeFlavor<T> for Receiver<T> {
679     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
680         &self.inner
681     }
682 }
683
684 /// Creates a new asynchronous channel, returning the sender/receiver halves.
685 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
686 /// the same order as it was sent, and no [`send`] will block the calling thread
687 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
688 /// block after its buffer limit is reached). [`recv`] will block until a message
689 /// is available.
690 ///
691 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
692 /// only one [`Receiver`] is supported.
693 ///
694 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
695 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, If the
696 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
697 /// return a [`RecvError`].
698 ///
699 /// [`send`]: struct.Sender.html#method.send
700 /// [`recv`]: struct.Receiver.html#method.recv
701 /// [`Sender`]: struct.Sender.html
702 /// [`Receiver`]: struct.Receiver.html
703 /// [`sync_channel`]: fn.sync_channel.html
704 /// [`SendError`]: struct.SendError.html
705 /// [`RecvError`]: struct.RecvError.html
706 ///
707 /// # Examples
708 ///
709 /// ```
710 /// use std::sync::mpsc::channel;
711 /// use std::thread;
712 ///
713 /// let (sender, receiver) = channel();
714 ///
715 /// // Spawn off an expensive computation
716 /// thread::spawn(move|| {
717 /// #   fn expensive_computation() {}
718 ///     sender.send(expensive_computation()).unwrap();
719 /// });
720 ///
721 /// // Do some useful work for awhile
722 ///
723 /// // Let's see what that answer was
724 /// println!("{:?}", receiver.recv().unwrap());
725 /// ```
726 #[stable(feature = "rust1", since = "1.0.0")]
727 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
728     let a = Arc::new(oneshot::Packet::new());
729     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
730 }
731
732 /// Creates a new synchronous, bounded channel.
733 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
734 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
735 /// [`Receiver`] will block until a message becomes available. `sync_channel`
736 /// differs greatly in the semantics of the sender, however.
737 ///
738 /// This channel has an internal buffer on which messages will be queued.
739 /// `bound` specifies the buffer size. When the internal buffer becomes full,
740 /// future sends will *block* waiting for the buffer to open up. Note that a
741 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
742 /// where each [`send`] will not return until a [`recv`] is paired with it.
743 ///
744 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
745 /// times, but only one [`Receiver`] is supported.
746 ///
747 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
748 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
749 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
750 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
751 ///
752 /// [`channel`]: fn.channel.html
753 /// [`send`]: struct.SyncSender.html#method.send
754 /// [`recv`]: struct.Receiver.html#method.recv
755 /// [`SyncSender`]: struct.SyncSender.html
756 /// [`Receiver`]: struct.Receiver.html
757 /// [`SendError`]: struct.SendError.html
758 /// [`RecvError`]: struct.RecvError.html
759 ///
760 /// # Examples
761 ///
762 /// ```
763 /// use std::sync::mpsc::sync_channel;
764 /// use std::thread;
765 ///
766 /// let (sender, receiver) = sync_channel(1);
767 ///
768 /// // this returns immediately
769 /// sender.send(1).unwrap();
770 ///
771 /// thread::spawn(move|| {
772 ///     // this will block until the previous message has been received
773 ///     sender.send(2).unwrap();
774 /// });
775 ///
776 /// assert_eq!(receiver.recv().unwrap(), 1);
777 /// assert_eq!(receiver.recv().unwrap(), 2);
778 /// ```
779 #[stable(feature = "rust1", since = "1.0.0")]
780 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
781     let a = Arc::new(sync::Packet::new(bound));
782     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
783 }
784
785 ////////////////////////////////////////////////////////////////////////////////
786 // Sender
787 ////////////////////////////////////////////////////////////////////////////////
788
789 impl<T> Sender<T> {
790     fn new(inner: Flavor<T>) -> Sender<T> {
791         Sender {
792             inner: UnsafeCell::new(inner),
793         }
794     }
795
796     /// Attempts to send a value on this channel, returning it back if it could
797     /// not be sent.
798     ///
799     /// A successful send occurs when it is determined that the other end of
800     /// the channel has not hung up already. An unsuccessful send would be one
801     /// where the corresponding receiver has already been deallocated. Note
802     /// that a return value of [`Err`] means that the data will never be
803     /// received, but a return value of [`Ok`] does *not* mean that the data
804     /// will be received.  It is possible for the corresponding receiver to
805     /// hang up immediately after this function returns [`Ok`].
806     ///
807     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
808     /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
809     ///
810     /// This method will never block the current thread.
811     ///
812     /// # Examples
813     ///
814     /// ```
815     /// use std::sync::mpsc::channel;
816     ///
817     /// let (tx, rx) = channel();
818     ///
819     /// // This send is always successful
820     /// tx.send(1).unwrap();
821     ///
822     /// // This send will fail because the receiver is gone
823     /// drop(rx);
824     /// assert_eq!(tx.send(1).unwrap_err().0, 1);
825     /// ```
826     #[stable(feature = "rust1", since = "1.0.0")]
827     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
828         let (new_inner, ret) = match *unsafe { self.inner() } {
829             Flavor::Oneshot(ref p) => {
830                 if !p.sent() {
831                     return p.send(t).map_err(SendError);
832                 } else {
833                     let a = Arc::new(stream::Packet::new());
834                     let rx = Receiver::new(Flavor::Stream(a.clone()));
835                     match p.upgrade(rx) {
836                         oneshot::UpSuccess => {
837                             let ret = a.send(t);
838                             (a, ret)
839                         }
840                         oneshot::UpDisconnected => (a, Err(t)),
841                         oneshot::UpWoke(token) => {
842                             // This send cannot panic because the thread is
843                             // asleep (we're looking at it), so the receiver
844                             // can't go away.
845                             a.send(t).ok().unwrap();
846                             token.signal();
847                             (a, Ok(()))
848                         }
849                     }
850                 }
851             }
852             Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
853             Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
854             Flavor::Sync(..) => unreachable!(),
855         };
856
857         unsafe {
858             let tmp = Sender::new(Flavor::Stream(new_inner));
859             mem::swap(self.inner_mut(), tmp.inner_mut());
860         }
861         ret.map_err(SendError)
862     }
863 }
864
865 #[stable(feature = "rust1", since = "1.0.0")]
866 impl<T> Clone for Sender<T> {
867     fn clone(&self) -> Sender<T> {
868         let packet = match *unsafe { self.inner() } {
869             Flavor::Oneshot(ref p) => {
870                 let a = Arc::new(shared::Packet::new());
871                 {
872                     let guard = a.postinit_lock();
873                     let rx = Receiver::new(Flavor::Shared(a.clone()));
874                     let sleeper = match p.upgrade(rx) {
875                         oneshot::UpSuccess |
876                         oneshot::UpDisconnected => None,
877                         oneshot::UpWoke(task) => Some(task),
878                     };
879                     a.inherit_blocker(sleeper, guard);
880                 }
881                 a
882             }
883             Flavor::Stream(ref p) => {
884                 let a = Arc::new(shared::Packet::new());
885                 {
886                     let guard = a.postinit_lock();
887                     let rx = Receiver::new(Flavor::Shared(a.clone()));
888                     let sleeper = match p.upgrade(rx) {
889                         stream::UpSuccess |
890                         stream::UpDisconnected => None,
891                         stream::UpWoke(task) => Some(task),
892                     };
893                     a.inherit_blocker(sleeper, guard);
894                 }
895                 a
896             }
897             Flavor::Shared(ref p) => {
898                 p.clone_chan();
899                 return Sender::new(Flavor::Shared(p.clone()));
900             }
901             Flavor::Sync(..) => unreachable!(),
902         };
903
904         unsafe {
905             let tmp = Sender::new(Flavor::Shared(packet.clone()));
906             mem::swap(self.inner_mut(), tmp.inner_mut());
907         }
908         Sender::new(Flavor::Shared(packet))
909     }
910 }
911
912 #[stable(feature = "rust1", since = "1.0.0")]
913 impl<T> Drop for Sender<T> {
914     fn drop(&mut self) {
915         match *unsafe { self.inner() } {
916             Flavor::Oneshot(ref p) => p.drop_chan(),
917             Flavor::Stream(ref p) => p.drop_chan(),
918             Flavor::Shared(ref p) => p.drop_chan(),
919             Flavor::Sync(..) => unreachable!(),
920         }
921     }
922 }
923
924 #[stable(feature = "mpsc_debug", since = "1.8.0")]
925 impl<T> fmt::Debug for Sender<T> {
926     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
927         write!(f, "Sender {{ .. }}")
928     }
929 }
930
931 ////////////////////////////////////////////////////////////////////////////////
932 // SyncSender
933 ////////////////////////////////////////////////////////////////////////////////
934
935 impl<T> SyncSender<T> {
936     fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
937         SyncSender { inner: inner }
938     }
939
940     /// Sends a value on this synchronous channel.
941     ///
942     /// This function will *block* until space in the internal buffer becomes
943     /// available or a receiver is available to hand off the message to.
944     ///
945     /// Note that a successful send does *not* guarantee that the receiver will
946     /// ever see the data if there is a buffer on this channel. Items may be
947     /// enqueued in the internal buffer for the receiver to receive at a later
948     /// time. If the buffer size is 0, however, the channel becomes a rendezvous
949     /// channel and it guarantees that the receiver has indeed received
950     /// the data if this function returns success.
951     ///
952     /// This function will never panic, but it may return [`Err`] if the
953     /// [`Receiver`] has disconnected and is no longer able to receive
954     /// information.
955     ///
956     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
957     /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
958     ///
959     /// # Examples
960     ///
961     /// ```rust
962     /// use std::sync::mpsc::sync_channel;
963     /// use std::thread;
964     ///
965     /// // Create a rendezvous sync_channel with buffer size 0
966     /// let (sync_sender, receiver) = sync_channel(0);
967     ///
968     /// thread::spawn(move || {
969     ///    println!("sending message...");
970     ///    sync_sender.send(1).unwrap();
971     ///    // Thread is now blocked until the message is received
972     ///
973     ///    println!("...message received!");
974     /// });
975     ///
976     /// let msg = receiver.recv().unwrap();
977     /// assert_eq!(1, msg);
978     /// ```
979     #[stable(feature = "rust1", since = "1.0.0")]
980     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
981         self.inner.send(t).map_err(SendError)
982     }
983
984     /// Attempts to send a value on this channel without blocking.
985     ///
986     /// This method differs from [`send`] by returning immediately if the
987     /// channel's buffer is full or no receiver is waiting to acquire some
988     /// data. Compared with [`send`], this function has two failure cases
989     /// instead of one (one for disconnection, one for a full buffer).
990     ///
991     /// See [`send`] for notes about guarantees of whether the
992     /// receiver has received the data or not if this function is successful.
993     ///
994     /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
995     ///
996     /// # Examples
997     ///
998     /// ```rust
999     /// use std::sync::mpsc::sync_channel;
1000     /// use std::thread;
1001     ///
1002     /// // Create a sync_channel with buffer size 1
1003     /// let (sync_sender, receiver) = sync_channel(1);
1004     /// let sync_sender2 = sync_sender.clone();
1005     ///
1006     /// // First thread owns sync_sender
1007     /// thread::spawn(move || {
1008     ///     sync_sender.send(1).unwrap();
1009     ///     sync_sender.send(2).unwrap();
1010     ///     // Thread blocked
1011     /// });
1012     ///
1013     /// // Second thread owns sync_sender2
1014     /// thread::spawn(move || {
1015     ///     // This will return an error and send
1016     ///     // no message if the buffer is full
1017     ///     sync_sender2.try_send(3).is_err();
1018     /// });
1019     ///
1020     /// let mut msg;
1021     /// msg = receiver.recv().unwrap();
1022     /// println!("message {} received", msg);
1023     ///
1024     /// msg = receiver.recv().unwrap();
1025     /// println!("message {} received", msg);
1026     ///
1027     /// // Third message may have never been sent
1028     /// match receiver.try_recv() {
1029     ///     Ok(msg) => println!("message {} received", msg),
1030     ///     Err(_) => println!("the third message was never sent"),
1031     /// }
1032     /// ```
1033     #[stable(feature = "rust1", since = "1.0.0")]
1034     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
1035         self.inner.try_send(t)
1036     }
1037 }
1038
1039 #[stable(feature = "rust1", since = "1.0.0")]
1040 impl<T> Clone for SyncSender<T> {
1041     fn clone(&self) -> SyncSender<T> {
1042         self.inner.clone_chan();
1043         SyncSender::new(self.inner.clone())
1044     }
1045 }
1046
1047 #[stable(feature = "rust1", since = "1.0.0")]
1048 impl<T> Drop for SyncSender<T> {
1049     fn drop(&mut self) {
1050         self.inner.drop_chan();
1051     }
1052 }
1053
1054 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1055 impl<T> fmt::Debug for SyncSender<T> {
1056     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1057         write!(f, "SyncSender {{ .. }}")
1058     }
1059 }
1060
1061 ////////////////////////////////////////////////////////////////////////////////
1062 // Receiver
1063 ////////////////////////////////////////////////////////////////////////////////
1064
1065 impl<T> Receiver<T> {
1066     fn new(inner: Flavor<T>) -> Receiver<T> {
1067         Receiver { inner: UnsafeCell::new(inner) }
1068     }
1069
1070     /// Attempts to return a pending value on this receiver without blocking.
1071     ///
1072     /// This method will never block the caller in order to wait for data to
1073     /// become available. Instead, this will always return immediately with a
1074     /// possible option of pending data on the channel.
1075     ///
1076     /// This is useful for a flavor of "optimistic check" before deciding to
1077     /// block on a receiver.
1078     ///
1079     /// Compared with [`recv`], this function has two failure cases instead of one
1080     /// (one for disconnection, one for an empty buffer).
1081     ///
1082     /// [`recv`]: struct.Receiver.html#method.recv
1083     ///
1084     /// # Examples
1085     ///
1086     /// ```rust
1087     /// use std::sync::mpsc::{Receiver, channel};
1088     ///
1089     /// let (_, receiver): (_, Receiver<i32>) = channel();
1090     ///
1091     /// assert!(receiver.try_recv().is_err());
1092     /// ```
1093     #[stable(feature = "rust1", since = "1.0.0")]
1094     pub fn try_recv(&self) -> Result<T, TryRecvError> {
1095         loop {
1096             let new_port = match *unsafe { self.inner() } {
1097                 Flavor::Oneshot(ref p) => {
1098                     match p.try_recv() {
1099                         Ok(t) => return Ok(t),
1100                         Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1101                         Err(oneshot::Disconnected) => {
1102                             return Err(TryRecvError::Disconnected)
1103                         }
1104                         Err(oneshot::Upgraded(rx)) => rx,
1105                     }
1106                 }
1107                 Flavor::Stream(ref p) => {
1108                     match p.try_recv() {
1109                         Ok(t) => return Ok(t),
1110                         Err(stream::Empty) => return Err(TryRecvError::Empty),
1111                         Err(stream::Disconnected) => {
1112                             return Err(TryRecvError::Disconnected)
1113                         }
1114                         Err(stream::Upgraded(rx)) => rx,
1115                     }
1116                 }
1117                 Flavor::Shared(ref p) => {
1118                     match p.try_recv() {
1119                         Ok(t) => return Ok(t),
1120                         Err(shared::Empty) => return Err(TryRecvError::Empty),
1121                         Err(shared::Disconnected) => {
1122                             return Err(TryRecvError::Disconnected)
1123                         }
1124                     }
1125                 }
1126                 Flavor::Sync(ref p) => {
1127                     match p.try_recv() {
1128                         Ok(t) => return Ok(t),
1129                         Err(sync::Empty) => return Err(TryRecvError::Empty),
1130                         Err(sync::Disconnected) => {
1131                             return Err(TryRecvError::Disconnected)
1132                         }
1133                     }
1134                 }
1135             };
1136             unsafe {
1137                 mem::swap(self.inner_mut(),
1138                           new_port.inner_mut());
1139             }
1140         }
1141     }
1142
1143     /// Attempts to wait for a value on this receiver, returning an error if the
1144     /// corresponding channel has hung up.
1145     ///
1146     /// This function will always block the current thread if there is no data
1147     /// available and it's possible for more data to be sent. Once a message is
1148     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1149     /// receiver will wake up and return that message.
1150     ///
1151     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1152     /// this call is blocking, this call will wake up and return [`Err`] to
1153     /// indicate that no more messages can ever be received on this channel.
1154     /// However, since channels are buffered, messages sent before the disconnect
1155     /// will still be properly received.
1156     ///
1157     /// [`Sender`]: struct.Sender.html
1158     /// [`SyncSender`]: struct.SyncSender.html
1159     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1160     ///
1161     /// # Examples
1162     ///
1163     /// ```
1164     /// use std::sync::mpsc;
1165     /// use std::thread;
1166     ///
1167     /// let (send, recv) = mpsc::channel();
1168     /// let handle = thread::spawn(move || {
1169     ///     send.send(1u8).unwrap();
1170     /// });
1171     ///
1172     /// handle.join().unwrap();
1173     ///
1174     /// assert_eq!(Ok(1), recv.recv());
1175     /// ```
1176     ///
1177     /// Buffering behavior:
1178     ///
1179     /// ```
1180     /// use std::sync::mpsc;
1181     /// use std::thread;
1182     /// use std::sync::mpsc::RecvError;
1183     ///
1184     /// let (send, recv) = mpsc::channel();
1185     /// let handle = thread::spawn(move || {
1186     ///     send.send(1u8).unwrap();
1187     ///     send.send(2).unwrap();
1188     ///     send.send(3).unwrap();
1189     ///     drop(send);
1190     /// });
1191     ///
1192     /// // wait for the thread to join so we ensure the sender is dropped
1193     /// handle.join().unwrap();
1194     ///
1195     /// assert_eq!(Ok(1), recv.recv());
1196     /// assert_eq!(Ok(2), recv.recv());
1197     /// assert_eq!(Ok(3), recv.recv());
1198     /// assert_eq!(Err(RecvError), recv.recv());
1199     /// ```
1200     #[stable(feature = "rust1", since = "1.0.0")]
1201     pub fn recv(&self) -> Result<T, RecvError> {
1202         loop {
1203             let new_port = match *unsafe { self.inner() } {
1204                 Flavor::Oneshot(ref p) => {
1205                     match p.recv(None) {
1206                         Ok(t) => return Ok(t),
1207                         Err(oneshot::Disconnected) => return Err(RecvError),
1208                         Err(oneshot::Upgraded(rx)) => rx,
1209                         Err(oneshot::Empty) => unreachable!(),
1210                     }
1211                 }
1212                 Flavor::Stream(ref p) => {
1213                     match p.recv(None) {
1214                         Ok(t) => return Ok(t),
1215                         Err(stream::Disconnected) => return Err(RecvError),
1216                         Err(stream::Upgraded(rx)) => rx,
1217                         Err(stream::Empty) => unreachable!(),
1218                     }
1219                 }
1220                 Flavor::Shared(ref p) => {
1221                     match p.recv(None) {
1222                         Ok(t) => return Ok(t),
1223                         Err(shared::Disconnected) => return Err(RecvError),
1224                         Err(shared::Empty) => unreachable!(),
1225                     }
1226                 }
1227                 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1228             };
1229             unsafe {
1230                 mem::swap(self.inner_mut(), new_port.inner_mut());
1231             }
1232         }
1233     }
1234
1235     /// Attempts to wait for a value on this receiver, returning an error if the
1236     /// corresponding channel has hung up, or if it waits more than `timeout`.
1237     ///
1238     /// This function will always block the current thread if there is no data
1239     /// available and it's possible for more data to be sent. Once a message is
1240     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1241     /// receiver will wake up and return that message.
1242     ///
1243     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1244     /// this call is blocking, this call will wake up and return [`Err`] to
1245     /// indicate that no more messages can ever be received on this channel.
1246     /// However, since channels are buffered, messages sent before the disconnect
1247     /// will still be properly received.
1248     ///
1249     /// [`Sender`]: struct.Sender.html
1250     /// [`SyncSender`]: struct.SyncSender.html
1251     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1252     ///
1253     /// # Examples
1254     ///
1255     /// Successfully receiving value before encountering timeout:
1256     ///
1257     /// ```no_run
1258     /// use std::thread;
1259     /// use std::time::Duration;
1260     /// use std::sync::mpsc;
1261     ///
1262     /// let (send, recv) = mpsc::channel();
1263     ///
1264     /// thread::spawn(move || {
1265     ///     send.send('a').unwrap();
1266     /// });
1267     ///
1268     /// assert_eq!(
1269     ///     recv.recv_timeout(Duration::from_millis(400)),
1270     ///     Ok('a')
1271     /// );
1272     /// ```
1273     ///
1274     /// Receiving an error upon reaching timeout:
1275     ///
1276     /// ```no_run
1277     /// use std::thread;
1278     /// use std::time::Duration;
1279     /// use std::sync::mpsc;
1280     ///
1281     /// let (send, recv) = mpsc::channel();
1282     ///
1283     /// thread::spawn(move || {
1284     ///     thread::sleep(Duration::from_millis(800));
1285     ///     send.send('a').unwrap();
1286     /// });
1287     ///
1288     /// assert_eq!(
1289     ///     recv.recv_timeout(Duration::from_millis(400)),
1290     ///     Err(mpsc::RecvTimeoutError::Timeout)
1291     /// );
1292     /// ```
1293     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1294     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1295         // Do an optimistic try_recv to avoid the performance impact of
1296         // Instant::now() in the full-channel case.
1297         match self.try_recv() {
1298             Ok(result)
1299                 => Ok(result),
1300             Err(TryRecvError::Disconnected)
1301                 => Err(RecvTimeoutError::Disconnected),
1302             Err(TryRecvError::Empty)
1303                 => self.recv_max_until(Instant::now() + timeout)
1304         }
1305     }
1306
1307     fn recv_max_until(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1308         use self::RecvTimeoutError::*;
1309
1310         loop {
1311             let port_or_empty = match *unsafe { self.inner() } {
1312                 Flavor::Oneshot(ref p) => {
1313                     match p.recv(Some(deadline)) {
1314                         Ok(t) => return Ok(t),
1315                         Err(oneshot::Disconnected) => return Err(Disconnected),
1316                         Err(oneshot::Upgraded(rx)) => Some(rx),
1317                         Err(oneshot::Empty) => None,
1318                     }
1319                 }
1320                 Flavor::Stream(ref p) => {
1321                     match p.recv(Some(deadline)) {
1322                         Ok(t) => return Ok(t),
1323                         Err(stream::Disconnected) => return Err(Disconnected),
1324                         Err(stream::Upgraded(rx)) => Some(rx),
1325                         Err(stream::Empty) => None,
1326                     }
1327                 }
1328                 Flavor::Shared(ref p) => {
1329                     match p.recv(Some(deadline)) {
1330                         Ok(t) => return Ok(t),
1331                         Err(shared::Disconnected) => return Err(Disconnected),
1332                         Err(shared::Empty) => None,
1333                     }
1334                 }
1335                 Flavor::Sync(ref p) => {
1336                     match p.recv(Some(deadline)) {
1337                         Ok(t) => return Ok(t),
1338                         Err(sync::Disconnected) => return Err(Disconnected),
1339                         Err(sync::Empty) => None,
1340                     }
1341                 }
1342             };
1343
1344             if let Some(new_port) = port_or_empty {
1345                 unsafe {
1346                     mem::swap(self.inner_mut(), new_port.inner_mut());
1347                 }
1348             }
1349
1350             // If we're already passed the deadline, and we're here without
1351             // data, return a timeout, else try again.
1352             if Instant::now() >= deadline {
1353                 return Err(Timeout);
1354             }
1355         }
1356     }
1357
1358     /// Returns an iterator that will block waiting for messages, but never
1359     /// [`panic!`]. It will return [`None`] when the channel has hung up.
1360     ///
1361     /// [`panic!`]: ../../../std/macro.panic.html
1362     /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1363     ///
1364     /// # Examples
1365     ///
1366     /// ```rust
1367     /// use std::sync::mpsc::channel;
1368     /// use std::thread;
1369     ///
1370     /// let (send, recv) = channel();
1371     ///
1372     /// thread::spawn(move || {
1373     ///     send.send(1).unwrap();
1374     ///     send.send(2).unwrap();
1375     ///     send.send(3).unwrap();
1376     /// });
1377     ///
1378     /// let mut iter = recv.iter();
1379     /// assert_eq!(iter.next(), Some(1));
1380     /// assert_eq!(iter.next(), Some(2));
1381     /// assert_eq!(iter.next(), Some(3));
1382     /// assert_eq!(iter.next(), None);
1383     /// ```
1384     #[stable(feature = "rust1", since = "1.0.0")]
1385     pub fn iter(&self) -> Iter<T> {
1386         Iter { rx: self }
1387     }
1388
1389     /// Returns an iterator that will attempt to yield all pending values.
1390     /// It will return `None` if there are no more pending values or if the
1391     /// channel has hung up. The iterator will never [`panic!`] or block the
1392     /// user by waiting for values.
1393     ///
1394     /// [`panic!`]: ../../../std/macro.panic.html
1395     ///
1396     /// # Examples
1397     ///
1398     /// ```no_run
1399     /// use std::sync::mpsc::channel;
1400     /// use std::thread;
1401     /// use std::time::Duration;
1402     ///
1403     /// let (sender, receiver) = channel();
1404     ///
1405     /// // nothing is in the buffer yet
1406     /// assert!(receiver.try_iter().next().is_none());
1407     ///
1408     /// thread::spawn(move || {
1409     ///     thread::sleep(Duration::from_secs(1));
1410     ///     sender.send(1).unwrap();
1411     ///     sender.send(2).unwrap();
1412     ///     sender.send(3).unwrap();
1413     /// });
1414     ///
1415     /// // nothing is in the buffer yet
1416     /// assert!(receiver.try_iter().next().is_none());
1417     ///
1418     /// // block for two seconds
1419     /// thread::sleep(Duration::from_secs(2));
1420     ///
1421     /// let mut iter = receiver.try_iter();
1422     /// assert_eq!(iter.next(), Some(1));
1423     /// assert_eq!(iter.next(), Some(2));
1424     /// assert_eq!(iter.next(), Some(3));
1425     /// assert_eq!(iter.next(), None);
1426     /// ```
1427     #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1428     pub fn try_iter(&self) -> TryIter<T> {
1429         TryIter { rx: self }
1430     }
1431
1432 }
1433
1434 impl<T> select::Packet for Receiver<T> {
1435     fn can_recv(&self) -> bool {
1436         loop {
1437             let new_port = match *unsafe { self.inner() } {
1438                 Flavor::Oneshot(ref p) => {
1439                     match p.can_recv() {
1440                         Ok(ret) => return ret,
1441                         Err(upgrade) => upgrade,
1442                     }
1443                 }
1444                 Flavor::Stream(ref p) => {
1445                     match p.can_recv() {
1446                         Ok(ret) => return ret,
1447                         Err(upgrade) => upgrade,
1448                     }
1449                 }
1450                 Flavor::Shared(ref p) => return p.can_recv(),
1451                 Flavor::Sync(ref p) => return p.can_recv(),
1452             };
1453             unsafe {
1454                 mem::swap(self.inner_mut(),
1455                           new_port.inner_mut());
1456             }
1457         }
1458     }
1459
1460     fn start_selection(&self, mut token: SignalToken) -> StartResult {
1461         loop {
1462             let (t, new_port) = match *unsafe { self.inner() } {
1463                 Flavor::Oneshot(ref p) => {
1464                     match p.start_selection(token) {
1465                         oneshot::SelSuccess => return Installed,
1466                         oneshot::SelCanceled => return Abort,
1467                         oneshot::SelUpgraded(t, rx) => (t, rx),
1468                     }
1469                 }
1470                 Flavor::Stream(ref p) => {
1471                     match p.start_selection(token) {
1472                         stream::SelSuccess => return Installed,
1473                         stream::SelCanceled => return Abort,
1474                         stream::SelUpgraded(t, rx) => (t, rx),
1475                     }
1476                 }
1477                 Flavor::Shared(ref p) => return p.start_selection(token),
1478                 Flavor::Sync(ref p) => return p.start_selection(token),
1479             };
1480             token = t;
1481             unsafe {
1482                 mem::swap(self.inner_mut(), new_port.inner_mut());
1483             }
1484         }
1485     }
1486
1487     fn abort_selection(&self) -> bool {
1488         let mut was_upgrade = false;
1489         loop {
1490             let result = match *unsafe { self.inner() } {
1491                 Flavor::Oneshot(ref p) => p.abort_selection(),
1492                 Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
1493                 Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
1494                 Flavor::Sync(ref p) => return p.abort_selection(),
1495             };
1496             let new_port = match result { Ok(b) => return b, Err(p) => p };
1497             was_upgrade = true;
1498             unsafe {
1499                 mem::swap(self.inner_mut(),
1500                           new_port.inner_mut());
1501             }
1502         }
1503     }
1504 }
1505
1506 #[stable(feature = "rust1", since = "1.0.0")]
1507 impl<'a, T> Iterator for Iter<'a, T> {
1508     type Item = T;
1509
1510     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1511 }
1512
1513 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1514 impl<'a, T> Iterator for TryIter<'a, T> {
1515     type Item = T;
1516
1517     fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
1518 }
1519
1520 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1521 impl<'a, T> IntoIterator for &'a Receiver<T> {
1522     type Item = T;
1523     type IntoIter = Iter<'a, T>;
1524
1525     fn into_iter(self) -> Iter<'a, T> { self.iter() }
1526 }
1527
1528 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1529 impl<T> Iterator for IntoIter<T> {
1530     type Item = T;
1531     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1532 }
1533
1534 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1535 impl <T> IntoIterator for Receiver<T> {
1536     type Item = T;
1537     type IntoIter = IntoIter<T>;
1538
1539     fn into_iter(self) -> IntoIter<T> {
1540         IntoIter { rx: self }
1541     }
1542 }
1543
1544 #[stable(feature = "rust1", since = "1.0.0")]
1545 impl<T> Drop for Receiver<T> {
1546     fn drop(&mut self) {
1547         match *unsafe { self.inner() } {
1548             Flavor::Oneshot(ref p) => p.drop_port(),
1549             Flavor::Stream(ref p) => p.drop_port(),
1550             Flavor::Shared(ref p) => p.drop_port(),
1551             Flavor::Sync(ref p) => p.drop_port(),
1552         }
1553     }
1554 }
1555
1556 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1557 impl<T> fmt::Debug for Receiver<T> {
1558     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1559         write!(f, "Receiver {{ .. }}")
1560     }
1561 }
1562
1563 #[stable(feature = "rust1", since = "1.0.0")]
1564 impl<T> fmt::Debug for SendError<T> {
1565     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1566         "SendError(..)".fmt(f)
1567     }
1568 }
1569
1570 #[stable(feature = "rust1", since = "1.0.0")]
1571 impl<T> fmt::Display for SendError<T> {
1572     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1573         "sending on a closed channel".fmt(f)
1574     }
1575 }
1576
1577 #[stable(feature = "rust1", since = "1.0.0")]
1578 impl<T: Send> error::Error for SendError<T> {
1579     fn description(&self) -> &str {
1580         "sending on a closed channel"
1581     }
1582
1583     fn cause(&self) -> Option<&error::Error> {
1584         None
1585     }
1586 }
1587
1588 #[stable(feature = "rust1", since = "1.0.0")]
1589 impl<T> fmt::Debug for TrySendError<T> {
1590     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1591         match *self {
1592             TrySendError::Full(..) => "Full(..)".fmt(f),
1593             TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1594         }
1595     }
1596 }
1597
1598 #[stable(feature = "rust1", since = "1.0.0")]
1599 impl<T> fmt::Display for TrySendError<T> {
1600     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1601         match *self {
1602             TrySendError::Full(..) => {
1603                 "sending on a full channel".fmt(f)
1604             }
1605             TrySendError::Disconnected(..) => {
1606                 "sending on a closed channel".fmt(f)
1607             }
1608         }
1609     }
1610 }
1611
1612 #[stable(feature = "rust1", since = "1.0.0")]
1613 impl<T: Send> error::Error for TrySendError<T> {
1614
1615     fn description(&self) -> &str {
1616         match *self {
1617             TrySendError::Full(..) => {
1618                 "sending on a full channel"
1619             }
1620             TrySendError::Disconnected(..) => {
1621                 "sending on a closed channel"
1622             }
1623         }
1624     }
1625
1626     fn cause(&self) -> Option<&error::Error> {
1627         None
1628     }
1629 }
1630
1631 #[stable(feature = "rust1", since = "1.0.0")]
1632 impl fmt::Display for RecvError {
1633     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1634         "receiving on a closed channel".fmt(f)
1635     }
1636 }
1637
1638 #[stable(feature = "rust1", since = "1.0.0")]
1639 impl error::Error for RecvError {
1640
1641     fn description(&self) -> &str {
1642         "receiving on a closed channel"
1643     }
1644
1645     fn cause(&self) -> Option<&error::Error> {
1646         None
1647     }
1648 }
1649
1650 #[stable(feature = "rust1", since = "1.0.0")]
1651 impl fmt::Display for TryRecvError {
1652     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1653         match *self {
1654             TryRecvError::Empty => {
1655                 "receiving on an empty channel".fmt(f)
1656             }
1657             TryRecvError::Disconnected => {
1658                 "receiving on a closed channel".fmt(f)
1659             }
1660         }
1661     }
1662 }
1663
1664 #[stable(feature = "rust1", since = "1.0.0")]
1665 impl error::Error for TryRecvError {
1666
1667     fn description(&self) -> &str {
1668         match *self {
1669             TryRecvError::Empty => {
1670                 "receiving on an empty channel"
1671             }
1672             TryRecvError::Disconnected => {
1673                 "receiving on a closed channel"
1674             }
1675         }
1676     }
1677
1678     fn cause(&self) -> Option<&error::Error> {
1679         None
1680     }
1681 }
1682
1683 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1684 impl fmt::Display for RecvTimeoutError {
1685     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1686         match *self {
1687             RecvTimeoutError::Timeout => {
1688                 "timed out waiting on channel".fmt(f)
1689             }
1690             RecvTimeoutError::Disconnected => {
1691                 "channel is empty and sending half is closed".fmt(f)
1692             }
1693         }
1694     }
1695 }
1696
1697 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1698 impl error::Error for RecvTimeoutError {
1699     fn description(&self) -> &str {
1700         match *self {
1701             RecvTimeoutError::Timeout => {
1702                 "timed out waiting on channel"
1703             }
1704             RecvTimeoutError::Disconnected => {
1705                 "channel is empty and sending half is closed"
1706             }
1707         }
1708     }
1709
1710     fn cause(&self) -> Option<&error::Error> {
1711         None
1712     }
1713 }
1714
1715 #[cfg(all(test, not(target_os = "emscripten")))]
1716 mod tests {
1717     use env;
1718     use super::*;
1719     use thread;
1720     use time::{Duration, Instant};
1721
1722     pub fn stress_factor() -> usize {
1723         match env::var("RUST_TEST_STRESS") {
1724             Ok(val) => val.parse().unwrap(),
1725             Err(..) => 1,
1726         }
1727     }
1728
1729     #[test]
1730     fn smoke() {
1731         let (tx, rx) = channel::<i32>();
1732         tx.send(1).unwrap();
1733         assert_eq!(rx.recv().unwrap(), 1);
1734     }
1735
1736     #[test]
1737     fn drop_full() {
1738         let (tx, _rx) = channel::<Box<isize>>();
1739         tx.send(box 1).unwrap();
1740     }
1741
1742     #[test]
1743     fn drop_full_shared() {
1744         let (tx, _rx) = channel::<Box<isize>>();
1745         drop(tx.clone());
1746         drop(tx.clone());
1747         tx.send(box 1).unwrap();
1748     }
1749
1750     #[test]
1751     fn smoke_shared() {
1752         let (tx, rx) = channel::<i32>();
1753         tx.send(1).unwrap();
1754         assert_eq!(rx.recv().unwrap(), 1);
1755         let tx = tx.clone();
1756         tx.send(1).unwrap();
1757         assert_eq!(rx.recv().unwrap(), 1);
1758     }
1759
1760     #[test]
1761     fn smoke_threads() {
1762         let (tx, rx) = channel::<i32>();
1763         let _t = thread::spawn(move|| {
1764             tx.send(1).unwrap();
1765         });
1766         assert_eq!(rx.recv().unwrap(), 1);
1767     }
1768
1769     #[test]
1770     fn smoke_port_gone() {
1771         let (tx, rx) = channel::<i32>();
1772         drop(rx);
1773         assert!(tx.send(1).is_err());
1774     }
1775
1776     #[test]
1777     fn smoke_shared_port_gone() {
1778         let (tx, rx) = channel::<i32>();
1779         drop(rx);
1780         assert!(tx.send(1).is_err())
1781     }
1782
1783     #[test]
1784     fn smoke_shared_port_gone2() {
1785         let (tx, rx) = channel::<i32>();
1786         drop(rx);
1787         let tx2 = tx.clone();
1788         drop(tx);
1789         assert!(tx2.send(1).is_err());
1790     }
1791
1792     #[test]
1793     fn port_gone_concurrent() {
1794         let (tx, rx) = channel::<i32>();
1795         let _t = thread::spawn(move|| {
1796             rx.recv().unwrap();
1797         });
1798         while tx.send(1).is_ok() {}
1799     }
1800
1801     #[test]
1802     fn port_gone_concurrent_shared() {
1803         let (tx, rx) = channel::<i32>();
1804         let tx2 = tx.clone();
1805         let _t = thread::spawn(move|| {
1806             rx.recv().unwrap();
1807         });
1808         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1809     }
1810
1811     #[test]
1812     fn smoke_chan_gone() {
1813         let (tx, rx) = channel::<i32>();
1814         drop(tx);
1815         assert!(rx.recv().is_err());
1816     }
1817
1818     #[test]
1819     fn smoke_chan_gone_shared() {
1820         let (tx, rx) = channel::<()>();
1821         let tx2 = tx.clone();
1822         drop(tx);
1823         drop(tx2);
1824         assert!(rx.recv().is_err());
1825     }
1826
1827     #[test]
1828     fn chan_gone_concurrent() {
1829         let (tx, rx) = channel::<i32>();
1830         let _t = thread::spawn(move|| {
1831             tx.send(1).unwrap();
1832             tx.send(1).unwrap();
1833         });
1834         while rx.recv().is_ok() {}
1835     }
1836
1837     #[test]
1838     fn stress() {
1839         let (tx, rx) = channel::<i32>();
1840         let t = thread::spawn(move|| {
1841             for _ in 0..10000 { tx.send(1).unwrap(); }
1842         });
1843         for _ in 0..10000 {
1844             assert_eq!(rx.recv().unwrap(), 1);
1845         }
1846         t.join().ok().unwrap();
1847     }
1848
1849     #[test]
1850     fn stress_shared() {
1851         const AMT: u32 = 10000;
1852         const NTHREADS: u32 = 8;
1853         let (tx, rx) = channel::<i32>();
1854
1855         let t = thread::spawn(move|| {
1856             for _ in 0..AMT * NTHREADS {
1857                 assert_eq!(rx.recv().unwrap(), 1);
1858             }
1859             match rx.try_recv() {
1860                 Ok(..) => panic!(),
1861                 _ => {}
1862             }
1863         });
1864
1865         for _ in 0..NTHREADS {
1866             let tx = tx.clone();
1867             thread::spawn(move|| {
1868                 for _ in 0..AMT { tx.send(1).unwrap(); }
1869             });
1870         }
1871         drop(tx);
1872         t.join().ok().unwrap();
1873     }
1874
1875     #[test]
1876     fn send_from_outside_runtime() {
1877         let (tx1, rx1) = channel::<()>();
1878         let (tx2, rx2) = channel::<i32>();
1879         let t1 = thread::spawn(move|| {
1880             tx1.send(()).unwrap();
1881             for _ in 0..40 {
1882                 assert_eq!(rx2.recv().unwrap(), 1);
1883             }
1884         });
1885         rx1.recv().unwrap();
1886         let t2 = thread::spawn(move|| {
1887             for _ in 0..40 {
1888                 tx2.send(1).unwrap();
1889             }
1890         });
1891         t1.join().ok().unwrap();
1892         t2.join().ok().unwrap();
1893     }
1894
1895     #[test]
1896     fn recv_from_outside_runtime() {
1897         let (tx, rx) = channel::<i32>();
1898         let t = thread::spawn(move|| {
1899             for _ in 0..40 {
1900                 assert_eq!(rx.recv().unwrap(), 1);
1901             }
1902         });
1903         for _ in 0..40 {
1904             tx.send(1).unwrap();
1905         }
1906         t.join().ok().unwrap();
1907     }
1908
1909     #[test]
1910     fn no_runtime() {
1911         let (tx1, rx1) = channel::<i32>();
1912         let (tx2, rx2) = channel::<i32>();
1913         let t1 = thread::spawn(move|| {
1914             assert_eq!(rx1.recv().unwrap(), 1);
1915             tx2.send(2).unwrap();
1916         });
1917         let t2 = thread::spawn(move|| {
1918             tx1.send(1).unwrap();
1919             assert_eq!(rx2.recv().unwrap(), 2);
1920         });
1921         t1.join().ok().unwrap();
1922         t2.join().ok().unwrap();
1923     }
1924
1925     #[test]
1926     fn oneshot_single_thread_close_port_first() {
1927         // Simple test of closing without sending
1928         let (_tx, rx) = channel::<i32>();
1929         drop(rx);
1930     }
1931
1932     #[test]
1933     fn oneshot_single_thread_close_chan_first() {
1934         // Simple test of closing without sending
1935         let (tx, _rx) = channel::<i32>();
1936         drop(tx);
1937     }
1938
1939     #[test]
1940     fn oneshot_single_thread_send_port_close() {
1941         // Testing that the sender cleans up the payload if receiver is closed
1942         let (tx, rx) = channel::<Box<i32>>();
1943         drop(rx);
1944         assert!(tx.send(box 0).is_err());
1945     }
1946
1947     #[test]
1948     fn oneshot_single_thread_recv_chan_close() {
1949         // Receiving on a closed chan will panic
1950         let res = thread::spawn(move|| {
1951             let (tx, rx) = channel::<i32>();
1952             drop(tx);
1953             rx.recv().unwrap();
1954         }).join();
1955         // What is our res?
1956         assert!(res.is_err());
1957     }
1958
1959     #[test]
1960     fn oneshot_single_thread_send_then_recv() {
1961         let (tx, rx) = channel::<Box<i32>>();
1962         tx.send(box 10).unwrap();
1963         assert!(*rx.recv().unwrap() == 10);
1964     }
1965
1966     #[test]
1967     fn oneshot_single_thread_try_send_open() {
1968         let (tx, rx) = channel::<i32>();
1969         assert!(tx.send(10).is_ok());
1970         assert!(rx.recv().unwrap() == 10);
1971     }
1972
1973     #[test]
1974     fn oneshot_single_thread_try_send_closed() {
1975         let (tx, rx) = channel::<i32>();
1976         drop(rx);
1977         assert!(tx.send(10).is_err());
1978     }
1979
1980     #[test]
1981     fn oneshot_single_thread_try_recv_open() {
1982         let (tx, rx) = channel::<i32>();
1983         tx.send(10).unwrap();
1984         assert!(rx.recv() == Ok(10));
1985     }
1986
1987     #[test]
1988     fn oneshot_single_thread_try_recv_closed() {
1989         let (tx, rx) = channel::<i32>();
1990         drop(tx);
1991         assert!(rx.recv().is_err());
1992     }
1993
1994     #[test]
1995     fn oneshot_single_thread_peek_data() {
1996         let (tx, rx) = channel::<i32>();
1997         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1998         tx.send(10).unwrap();
1999         assert_eq!(rx.try_recv(), Ok(10));
2000     }
2001
2002     #[test]
2003     fn oneshot_single_thread_peek_close() {
2004         let (tx, rx) = channel::<i32>();
2005         drop(tx);
2006         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2007         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2008     }
2009
2010     #[test]
2011     fn oneshot_single_thread_peek_open() {
2012         let (_tx, rx) = channel::<i32>();
2013         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2014     }
2015
2016     #[test]
2017     fn oneshot_multi_task_recv_then_send() {
2018         let (tx, rx) = channel::<Box<i32>>();
2019         let _t = thread::spawn(move|| {
2020             assert!(*rx.recv().unwrap() == 10);
2021         });
2022
2023         tx.send(box 10).unwrap();
2024     }
2025
2026     #[test]
2027     fn oneshot_multi_task_recv_then_close() {
2028         let (tx, rx) = channel::<Box<i32>>();
2029         let _t = thread::spawn(move|| {
2030             drop(tx);
2031         });
2032         let res = thread::spawn(move|| {
2033             assert!(*rx.recv().unwrap() == 10);
2034         }).join();
2035         assert!(res.is_err());
2036     }
2037
2038     #[test]
2039     fn oneshot_multi_thread_close_stress() {
2040         for _ in 0..stress_factor() {
2041             let (tx, rx) = channel::<i32>();
2042             let _t = thread::spawn(move|| {
2043                 drop(rx);
2044             });
2045             drop(tx);
2046         }
2047     }
2048
2049     #[test]
2050     fn oneshot_multi_thread_send_close_stress() {
2051         for _ in 0..stress_factor() {
2052             let (tx, rx) = channel::<i32>();
2053             let _t = thread::spawn(move|| {
2054                 drop(rx);
2055             });
2056             let _ = thread::spawn(move|| {
2057                 tx.send(1).unwrap();
2058             }).join();
2059         }
2060     }
2061
2062     #[test]
2063     fn oneshot_multi_thread_recv_close_stress() {
2064         for _ in 0..stress_factor() {
2065             let (tx, rx) = channel::<i32>();
2066             thread::spawn(move|| {
2067                 let res = thread::spawn(move|| {
2068                     rx.recv().unwrap();
2069                 }).join();
2070                 assert!(res.is_err());
2071             });
2072             let _t = thread::spawn(move|| {
2073                 thread::spawn(move|| {
2074                     drop(tx);
2075                 });
2076             });
2077         }
2078     }
2079
2080     #[test]
2081     fn oneshot_multi_thread_send_recv_stress() {
2082         for _ in 0..stress_factor() {
2083             let (tx, rx) = channel::<Box<isize>>();
2084             let _t = thread::spawn(move|| {
2085                 tx.send(box 10).unwrap();
2086             });
2087             assert!(*rx.recv().unwrap() == 10);
2088         }
2089     }
2090
2091     #[test]
2092     fn stream_send_recv_stress() {
2093         for _ in 0..stress_factor() {
2094             let (tx, rx) = channel();
2095
2096             send(tx, 0);
2097             recv(rx, 0);
2098
2099             fn send(tx: Sender<Box<i32>>, i: i32) {
2100                 if i == 10 { return }
2101
2102                 thread::spawn(move|| {
2103                     tx.send(box i).unwrap();
2104                     send(tx, i + 1);
2105                 });
2106             }
2107
2108             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2109                 if i == 10 { return }
2110
2111                 thread::spawn(move|| {
2112                     assert!(*rx.recv().unwrap() == i);
2113                     recv(rx, i + 1);
2114                 });
2115             }
2116         }
2117     }
2118
2119     #[test]
2120     fn oneshot_single_thread_recv_timeout() {
2121         let (tx, rx) = channel();
2122         tx.send(()).unwrap();
2123         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2124         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2125         tx.send(()).unwrap();
2126         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2127     }
2128
2129     #[test]
2130     fn stress_recv_timeout_two_threads() {
2131         let (tx, rx) = channel();
2132         let stress = stress_factor() + 100;
2133         let timeout = Duration::from_millis(100);
2134
2135         thread::spawn(move || {
2136             for i in 0..stress {
2137                 if i % 2 == 0 {
2138                     thread::sleep(timeout * 2);
2139                 }
2140                 tx.send(1usize).unwrap();
2141             }
2142         });
2143
2144         let mut recv_count = 0;
2145         loop {
2146             match rx.recv_timeout(timeout) {
2147                 Ok(n) => {
2148                     assert_eq!(n, 1usize);
2149                     recv_count += 1;
2150                 }
2151                 Err(RecvTimeoutError::Timeout) => continue,
2152                 Err(RecvTimeoutError::Disconnected) => break,
2153             }
2154         }
2155
2156         assert_eq!(recv_count, stress);
2157     }
2158
2159     #[test]
2160     fn recv_timeout_upgrade() {
2161         let (tx, rx) = channel::<()>();
2162         let timeout = Duration::from_millis(1);
2163         let _tx_clone = tx.clone();
2164
2165         let start = Instant::now();
2166         assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
2167         assert!(Instant::now() >= start + timeout);
2168     }
2169
2170     #[test]
2171     fn stress_recv_timeout_shared() {
2172         let (tx, rx) = channel();
2173         let stress = stress_factor() + 100;
2174
2175         for i in 0..stress {
2176             let tx = tx.clone();
2177             thread::spawn(move || {
2178                 thread::sleep(Duration::from_millis(i as u64 * 10));
2179                 tx.send(1usize).unwrap();
2180             });
2181         }
2182
2183         drop(tx);
2184
2185         let mut recv_count = 0;
2186         loop {
2187             match rx.recv_timeout(Duration::from_millis(10)) {
2188                 Ok(n) => {
2189                     assert_eq!(n, 1usize);
2190                     recv_count += 1;
2191                 }
2192                 Err(RecvTimeoutError::Timeout) => continue,
2193                 Err(RecvTimeoutError::Disconnected) => break,
2194             }
2195         }
2196
2197         assert_eq!(recv_count, stress);
2198     }
2199
2200     #[test]
2201     fn recv_a_lot() {
2202         // Regression test that we don't run out of stack in scheduler context
2203         let (tx, rx) = channel();
2204         for _ in 0..10000 { tx.send(()).unwrap(); }
2205         for _ in 0..10000 { rx.recv().unwrap(); }
2206     }
2207
2208     #[test]
2209     fn shared_recv_timeout() {
2210         let (tx, rx) = channel();
2211         let total = 5;
2212         for _ in 0..total {
2213             let tx = tx.clone();
2214             thread::spawn(move|| {
2215                 tx.send(()).unwrap();
2216             });
2217         }
2218
2219         for _ in 0..total { rx.recv().unwrap(); }
2220
2221         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2222         tx.send(()).unwrap();
2223         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2224     }
2225
2226     #[test]
2227     fn shared_chan_stress() {
2228         let (tx, rx) = channel();
2229         let total = stress_factor() + 100;
2230         for _ in 0..total {
2231             let tx = tx.clone();
2232             thread::spawn(move|| {
2233                 tx.send(()).unwrap();
2234             });
2235         }
2236
2237         for _ in 0..total {
2238             rx.recv().unwrap();
2239         }
2240     }
2241
2242     #[test]
2243     fn test_nested_recv_iter() {
2244         let (tx, rx) = channel::<i32>();
2245         let (total_tx, total_rx) = channel::<i32>();
2246
2247         let _t = thread::spawn(move|| {
2248             let mut acc = 0;
2249             for x in rx.iter() {
2250                 acc += x;
2251             }
2252             total_tx.send(acc).unwrap();
2253         });
2254
2255         tx.send(3).unwrap();
2256         tx.send(1).unwrap();
2257         tx.send(2).unwrap();
2258         drop(tx);
2259         assert_eq!(total_rx.recv().unwrap(), 6);
2260     }
2261
2262     #[test]
2263     fn test_recv_iter_break() {
2264         let (tx, rx) = channel::<i32>();
2265         let (count_tx, count_rx) = channel();
2266
2267         let _t = thread::spawn(move|| {
2268             let mut count = 0;
2269             for x in rx.iter() {
2270                 if count >= 3 {
2271                     break;
2272                 } else {
2273                     count += x;
2274                 }
2275             }
2276             count_tx.send(count).unwrap();
2277         });
2278
2279         tx.send(2).unwrap();
2280         tx.send(2).unwrap();
2281         tx.send(2).unwrap();
2282         let _ = tx.send(2);
2283         drop(tx);
2284         assert_eq!(count_rx.recv().unwrap(), 4);
2285     }
2286
2287     #[test]
2288     fn test_recv_try_iter() {
2289         let (request_tx, request_rx) = channel();
2290         let (response_tx, response_rx) = channel();
2291
2292         // Request `x`s until we have `6`.
2293         let t = thread::spawn(move|| {
2294             let mut count = 0;
2295             loop {
2296                 for x in response_rx.try_iter() {
2297                     count += x;
2298                     if count == 6 {
2299                         return count;
2300                     }
2301                 }
2302                 request_tx.send(()).unwrap();
2303             }
2304         });
2305
2306         for _ in request_rx.iter() {
2307             if response_tx.send(2).is_err() {
2308                 break;
2309             }
2310         }
2311
2312         assert_eq!(t.join().unwrap(), 6);
2313     }
2314
2315     #[test]
2316     fn test_recv_into_iter_owned() {
2317         let mut iter = {
2318           let (tx, rx) = channel::<i32>();
2319           tx.send(1).unwrap();
2320           tx.send(2).unwrap();
2321
2322           rx.into_iter()
2323         };
2324         assert_eq!(iter.next().unwrap(), 1);
2325         assert_eq!(iter.next().unwrap(), 2);
2326         assert_eq!(iter.next().is_none(), true);
2327     }
2328
2329     #[test]
2330     fn test_recv_into_iter_borrowed() {
2331         let (tx, rx) = channel::<i32>();
2332         tx.send(1).unwrap();
2333         tx.send(2).unwrap();
2334         drop(tx);
2335         let mut iter = (&rx).into_iter();
2336         assert_eq!(iter.next().unwrap(), 1);
2337         assert_eq!(iter.next().unwrap(), 2);
2338         assert_eq!(iter.next().is_none(), true);
2339     }
2340
2341     #[test]
2342     fn try_recv_states() {
2343         let (tx1, rx1) = channel::<i32>();
2344         let (tx2, rx2) = channel::<()>();
2345         let (tx3, rx3) = channel::<()>();
2346         let _t = thread::spawn(move|| {
2347             rx2.recv().unwrap();
2348             tx1.send(1).unwrap();
2349             tx3.send(()).unwrap();
2350             rx2.recv().unwrap();
2351             drop(tx1);
2352             tx3.send(()).unwrap();
2353         });
2354
2355         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2356         tx2.send(()).unwrap();
2357         rx3.recv().unwrap();
2358         assert_eq!(rx1.try_recv(), Ok(1));
2359         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2360         tx2.send(()).unwrap();
2361         rx3.recv().unwrap();
2362         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2363     }
2364
2365     // This bug used to end up in a livelock inside of the Receiver destructor
2366     // because the internal state of the Shared packet was corrupted
2367     #[test]
2368     fn destroy_upgraded_shared_port_when_sender_still_active() {
2369         let (tx, rx) = channel();
2370         let (tx2, rx2) = channel();
2371         let _t = thread::spawn(move|| {
2372             rx.recv().unwrap(); // wait on a oneshot
2373             drop(rx);  // destroy a shared
2374             tx2.send(()).unwrap();
2375         });
2376         // make sure the other thread has gone to sleep
2377         for _ in 0..5000 { thread::yield_now(); }
2378
2379         // upgrade to a shared chan and send a message
2380         let t = tx.clone();
2381         drop(tx);
2382         t.send(()).unwrap();
2383
2384         // wait for the child thread to exit before we exit
2385         rx2.recv().unwrap();
2386     }
2387
2388     #[test]
2389     fn issue_32114() {
2390         let (tx, _) = channel();
2391         let _ = tx.send(123);
2392         assert_eq!(tx.send(123), Err(SendError(123)));
2393     }
2394 }
2395
2396 #[cfg(all(test, not(target_os = "emscripten")))]
2397 mod sync_tests {
2398     use env;
2399     use thread;
2400     use super::*;
2401     use time::Duration;
2402
2403     pub fn stress_factor() -> usize {
2404         match env::var("RUST_TEST_STRESS") {
2405             Ok(val) => val.parse().unwrap(),
2406             Err(..) => 1,
2407         }
2408     }
2409
2410     #[test]
2411     fn smoke() {
2412         let (tx, rx) = sync_channel::<i32>(1);
2413         tx.send(1).unwrap();
2414         assert_eq!(rx.recv().unwrap(), 1);
2415     }
2416
2417     #[test]
2418     fn drop_full() {
2419         let (tx, _rx) = sync_channel::<Box<isize>>(1);
2420         tx.send(box 1).unwrap();
2421     }
2422
2423     #[test]
2424     fn smoke_shared() {
2425         let (tx, rx) = sync_channel::<i32>(1);
2426         tx.send(1).unwrap();
2427         assert_eq!(rx.recv().unwrap(), 1);
2428         let tx = tx.clone();
2429         tx.send(1).unwrap();
2430         assert_eq!(rx.recv().unwrap(), 1);
2431     }
2432
2433     #[test]
2434     fn recv_timeout() {
2435         let (tx, rx) = sync_channel::<i32>(1);
2436         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2437         tx.send(1).unwrap();
2438         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2439     }
2440
2441     #[test]
2442     fn smoke_threads() {
2443         let (tx, rx) = sync_channel::<i32>(0);
2444         let _t = thread::spawn(move|| {
2445             tx.send(1).unwrap();
2446         });
2447         assert_eq!(rx.recv().unwrap(), 1);
2448     }
2449
2450     #[test]
2451     fn smoke_port_gone() {
2452         let (tx, rx) = sync_channel::<i32>(0);
2453         drop(rx);
2454         assert!(tx.send(1).is_err());
2455     }
2456
2457     #[test]
2458     fn smoke_shared_port_gone2() {
2459         let (tx, rx) = sync_channel::<i32>(0);
2460         drop(rx);
2461         let tx2 = tx.clone();
2462         drop(tx);
2463         assert!(tx2.send(1).is_err());
2464     }
2465
2466     #[test]
2467     fn port_gone_concurrent() {
2468         let (tx, rx) = sync_channel::<i32>(0);
2469         let _t = thread::spawn(move|| {
2470             rx.recv().unwrap();
2471         });
2472         while tx.send(1).is_ok() {}
2473     }
2474
2475     #[test]
2476     fn port_gone_concurrent_shared() {
2477         let (tx, rx) = sync_channel::<i32>(0);
2478         let tx2 = tx.clone();
2479         let _t = thread::spawn(move|| {
2480             rx.recv().unwrap();
2481         });
2482         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2483     }
2484
2485     #[test]
2486     fn smoke_chan_gone() {
2487         let (tx, rx) = sync_channel::<i32>(0);
2488         drop(tx);
2489         assert!(rx.recv().is_err());
2490     }
2491
2492     #[test]
2493     fn smoke_chan_gone_shared() {
2494         let (tx, rx) = sync_channel::<()>(0);
2495         let tx2 = tx.clone();
2496         drop(tx);
2497         drop(tx2);
2498         assert!(rx.recv().is_err());
2499     }
2500
2501     #[test]
2502     fn chan_gone_concurrent() {
2503         let (tx, rx) = sync_channel::<i32>(0);
2504         thread::spawn(move|| {
2505             tx.send(1).unwrap();
2506             tx.send(1).unwrap();
2507         });
2508         while rx.recv().is_ok() {}
2509     }
2510
2511     #[test]
2512     fn stress() {
2513         let (tx, rx) = sync_channel::<i32>(0);
2514         thread::spawn(move|| {
2515             for _ in 0..10000 { tx.send(1).unwrap(); }
2516         });
2517         for _ in 0..10000 {
2518             assert_eq!(rx.recv().unwrap(), 1);
2519         }
2520     }
2521
2522     #[test]
2523     fn stress_recv_timeout_two_threads() {
2524         let (tx, rx) = sync_channel::<i32>(0);
2525
2526         thread::spawn(move|| {
2527             for _ in 0..10000 { tx.send(1).unwrap(); }
2528         });
2529
2530         let mut recv_count = 0;
2531         loop {
2532             match rx.recv_timeout(Duration::from_millis(1)) {
2533                 Ok(v) => {
2534                     assert_eq!(v, 1);
2535                     recv_count += 1;
2536                 },
2537                 Err(RecvTimeoutError::Timeout) => continue,
2538                 Err(RecvTimeoutError::Disconnected) => break,
2539             }
2540         }
2541
2542         assert_eq!(recv_count, 10000);
2543     }
2544
2545     #[test]
2546     fn stress_recv_timeout_shared() {
2547         const AMT: u32 = 1000;
2548         const NTHREADS: u32 = 8;
2549         let (tx, rx) = sync_channel::<i32>(0);
2550         let (dtx, drx) = sync_channel::<()>(0);
2551
2552         thread::spawn(move|| {
2553             let mut recv_count = 0;
2554             loop {
2555                 match rx.recv_timeout(Duration::from_millis(10)) {
2556                     Ok(v) => {
2557                         assert_eq!(v, 1);
2558                         recv_count += 1;
2559                     },
2560                     Err(RecvTimeoutError::Timeout) => continue,
2561                     Err(RecvTimeoutError::Disconnected) => break,
2562                 }
2563             }
2564
2565             assert_eq!(recv_count, AMT * NTHREADS);
2566             assert!(rx.try_recv().is_err());
2567
2568             dtx.send(()).unwrap();
2569         });
2570
2571         for _ in 0..NTHREADS {
2572             let tx = tx.clone();
2573             thread::spawn(move|| {
2574                 for _ in 0..AMT { tx.send(1).unwrap(); }
2575             });
2576         }
2577
2578         drop(tx);
2579
2580         drx.recv().unwrap();
2581     }
2582
2583     #[test]
2584     fn stress_shared() {
2585         const AMT: u32 = 1000;
2586         const NTHREADS: u32 = 8;
2587         let (tx, rx) = sync_channel::<i32>(0);
2588         let (dtx, drx) = sync_channel::<()>(0);
2589
2590         thread::spawn(move|| {
2591             for _ in 0..AMT * NTHREADS {
2592                 assert_eq!(rx.recv().unwrap(), 1);
2593             }
2594             match rx.try_recv() {
2595                 Ok(..) => panic!(),
2596                 _ => {}
2597             }
2598             dtx.send(()).unwrap();
2599         });
2600
2601         for _ in 0..NTHREADS {
2602             let tx = tx.clone();
2603             thread::spawn(move|| {
2604                 for _ in 0..AMT { tx.send(1).unwrap(); }
2605             });
2606         }
2607         drop(tx);
2608         drx.recv().unwrap();
2609     }
2610
2611     #[test]
2612     fn oneshot_single_thread_close_port_first() {
2613         // Simple test of closing without sending
2614         let (_tx, rx) = sync_channel::<i32>(0);
2615         drop(rx);
2616     }
2617
2618     #[test]
2619     fn oneshot_single_thread_close_chan_first() {
2620         // Simple test of closing without sending
2621         let (tx, _rx) = sync_channel::<i32>(0);
2622         drop(tx);
2623     }
2624
2625     #[test]
2626     fn oneshot_single_thread_send_port_close() {
2627         // Testing that the sender cleans up the payload if receiver is closed
2628         let (tx, rx) = sync_channel::<Box<i32>>(0);
2629         drop(rx);
2630         assert!(tx.send(box 0).is_err());
2631     }
2632
2633     #[test]
2634     fn oneshot_single_thread_recv_chan_close() {
2635         // Receiving on a closed chan will panic
2636         let res = thread::spawn(move|| {
2637             let (tx, rx) = sync_channel::<i32>(0);
2638             drop(tx);
2639             rx.recv().unwrap();
2640         }).join();
2641         // What is our res?
2642         assert!(res.is_err());
2643     }
2644
2645     #[test]
2646     fn oneshot_single_thread_send_then_recv() {
2647         let (tx, rx) = sync_channel::<Box<i32>>(1);
2648         tx.send(box 10).unwrap();
2649         assert!(*rx.recv().unwrap() == 10);
2650     }
2651
2652     #[test]
2653     fn oneshot_single_thread_try_send_open() {
2654         let (tx, rx) = sync_channel::<i32>(1);
2655         assert_eq!(tx.try_send(10), Ok(()));
2656         assert!(rx.recv().unwrap() == 10);
2657     }
2658
2659     #[test]
2660     fn oneshot_single_thread_try_send_closed() {
2661         let (tx, rx) = sync_channel::<i32>(0);
2662         drop(rx);
2663         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2664     }
2665
2666     #[test]
2667     fn oneshot_single_thread_try_send_closed2() {
2668         let (tx, _rx) = sync_channel::<i32>(0);
2669         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2670     }
2671
2672     #[test]
2673     fn oneshot_single_thread_try_recv_open() {
2674         let (tx, rx) = sync_channel::<i32>(1);
2675         tx.send(10).unwrap();
2676         assert!(rx.recv() == Ok(10));
2677     }
2678
2679     #[test]
2680     fn oneshot_single_thread_try_recv_closed() {
2681         let (tx, rx) = sync_channel::<i32>(0);
2682         drop(tx);
2683         assert!(rx.recv().is_err());
2684     }
2685
2686     #[test]
2687     fn oneshot_single_thread_try_recv_closed_with_data() {
2688         let (tx, rx) = sync_channel::<i32>(1);
2689         tx.send(10).unwrap();
2690         drop(tx);
2691         assert_eq!(rx.try_recv(), Ok(10));
2692         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2693     }
2694
2695     #[test]
2696     fn oneshot_single_thread_peek_data() {
2697         let (tx, rx) = sync_channel::<i32>(1);
2698         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2699         tx.send(10).unwrap();
2700         assert_eq!(rx.try_recv(), Ok(10));
2701     }
2702
2703     #[test]
2704     fn oneshot_single_thread_peek_close() {
2705         let (tx, rx) = sync_channel::<i32>(0);
2706         drop(tx);
2707         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2708         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2709     }
2710
2711     #[test]
2712     fn oneshot_single_thread_peek_open() {
2713         let (_tx, rx) = sync_channel::<i32>(0);
2714         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2715     }
2716
2717     #[test]
2718     fn oneshot_multi_task_recv_then_send() {
2719         let (tx, rx) = sync_channel::<Box<i32>>(0);
2720         let _t = thread::spawn(move|| {
2721             assert!(*rx.recv().unwrap() == 10);
2722         });
2723
2724         tx.send(box 10).unwrap();
2725     }
2726
2727     #[test]
2728     fn oneshot_multi_task_recv_then_close() {
2729         let (tx, rx) = sync_channel::<Box<i32>>(0);
2730         let _t = thread::spawn(move|| {
2731             drop(tx);
2732         });
2733         let res = thread::spawn(move|| {
2734             assert!(*rx.recv().unwrap() == 10);
2735         }).join();
2736         assert!(res.is_err());
2737     }
2738
2739     #[test]
2740     fn oneshot_multi_thread_close_stress() {
2741         for _ in 0..stress_factor() {
2742             let (tx, rx) = sync_channel::<i32>(0);
2743             let _t = thread::spawn(move|| {
2744                 drop(rx);
2745             });
2746             drop(tx);
2747         }
2748     }
2749
2750     #[test]
2751     fn oneshot_multi_thread_send_close_stress() {
2752         for _ in 0..stress_factor() {
2753             let (tx, rx) = sync_channel::<i32>(0);
2754             let _t = thread::spawn(move|| {
2755                 drop(rx);
2756             });
2757             let _ = thread::spawn(move || {
2758                 tx.send(1).unwrap();
2759             }).join();
2760         }
2761     }
2762
2763     #[test]
2764     fn oneshot_multi_thread_recv_close_stress() {
2765         for _ in 0..stress_factor() {
2766             let (tx, rx) = sync_channel::<i32>(0);
2767             let _t = thread::spawn(move|| {
2768                 let res = thread::spawn(move|| {
2769                     rx.recv().unwrap();
2770                 }).join();
2771                 assert!(res.is_err());
2772             });
2773             let _t = thread::spawn(move|| {
2774                 thread::spawn(move|| {
2775                     drop(tx);
2776                 });
2777             });
2778         }
2779     }
2780
2781     #[test]
2782     fn oneshot_multi_thread_send_recv_stress() {
2783         for _ in 0..stress_factor() {
2784             let (tx, rx) = sync_channel::<Box<i32>>(0);
2785             let _t = thread::spawn(move|| {
2786                 tx.send(box 10).unwrap();
2787             });
2788             assert!(*rx.recv().unwrap() == 10);
2789         }
2790     }
2791
2792     #[test]
2793     fn stream_send_recv_stress() {
2794         for _ in 0..stress_factor() {
2795             let (tx, rx) = sync_channel::<Box<i32>>(0);
2796
2797             send(tx, 0);
2798             recv(rx, 0);
2799
2800             fn send(tx: SyncSender<Box<i32>>, i: i32) {
2801                 if i == 10 { return }
2802
2803                 thread::spawn(move|| {
2804                     tx.send(box i).unwrap();
2805                     send(tx, i + 1);
2806                 });
2807             }
2808
2809             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2810                 if i == 10 { return }
2811
2812                 thread::spawn(move|| {
2813                     assert!(*rx.recv().unwrap() == i);
2814                     recv(rx, i + 1);
2815                 });
2816             }
2817         }
2818     }
2819
2820     #[test]
2821     fn recv_a_lot() {
2822         // Regression test that we don't run out of stack in scheduler context
2823         let (tx, rx) = sync_channel(10000);
2824         for _ in 0..10000 { tx.send(()).unwrap(); }
2825         for _ in 0..10000 { rx.recv().unwrap(); }
2826     }
2827
2828     #[test]
2829     fn shared_chan_stress() {
2830         let (tx, rx) = sync_channel(0);
2831         let total = stress_factor() + 100;
2832         for _ in 0..total {
2833             let tx = tx.clone();
2834             thread::spawn(move|| {
2835                 tx.send(()).unwrap();
2836             });
2837         }
2838
2839         for _ in 0..total {
2840             rx.recv().unwrap();
2841         }
2842     }
2843
2844     #[test]
2845     fn test_nested_recv_iter() {
2846         let (tx, rx) = sync_channel::<i32>(0);
2847         let (total_tx, total_rx) = sync_channel::<i32>(0);
2848
2849         let _t = thread::spawn(move|| {
2850             let mut acc = 0;
2851             for x in rx.iter() {
2852                 acc += x;
2853             }
2854             total_tx.send(acc).unwrap();
2855         });
2856
2857         tx.send(3).unwrap();
2858         tx.send(1).unwrap();
2859         tx.send(2).unwrap();
2860         drop(tx);
2861         assert_eq!(total_rx.recv().unwrap(), 6);
2862     }
2863
2864     #[test]
2865     fn test_recv_iter_break() {
2866         let (tx, rx) = sync_channel::<i32>(0);
2867         let (count_tx, count_rx) = sync_channel(0);
2868
2869         let _t = thread::spawn(move|| {
2870             let mut count = 0;
2871             for x in rx.iter() {
2872                 if count >= 3 {
2873                     break;
2874                 } else {
2875                     count += x;
2876                 }
2877             }
2878             count_tx.send(count).unwrap();
2879         });
2880
2881         tx.send(2).unwrap();
2882         tx.send(2).unwrap();
2883         tx.send(2).unwrap();
2884         let _ = tx.try_send(2);
2885         drop(tx);
2886         assert_eq!(count_rx.recv().unwrap(), 4);
2887     }
2888
2889     #[test]
2890     fn try_recv_states() {
2891         let (tx1, rx1) = sync_channel::<i32>(1);
2892         let (tx2, rx2) = sync_channel::<()>(1);
2893         let (tx3, rx3) = sync_channel::<()>(1);
2894         let _t = thread::spawn(move|| {
2895             rx2.recv().unwrap();
2896             tx1.send(1).unwrap();
2897             tx3.send(()).unwrap();
2898             rx2.recv().unwrap();
2899             drop(tx1);
2900             tx3.send(()).unwrap();
2901         });
2902
2903         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2904         tx2.send(()).unwrap();
2905         rx3.recv().unwrap();
2906         assert_eq!(rx1.try_recv(), Ok(1));
2907         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2908         tx2.send(()).unwrap();
2909         rx3.recv().unwrap();
2910         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2911     }
2912
2913     // This bug used to end up in a livelock inside of the Receiver destructor
2914     // because the internal state of the Shared packet was corrupted
2915     #[test]
2916     fn destroy_upgraded_shared_port_when_sender_still_active() {
2917         let (tx, rx) = sync_channel::<()>(0);
2918         let (tx2, rx2) = sync_channel::<()>(0);
2919         let _t = thread::spawn(move|| {
2920             rx.recv().unwrap(); // wait on a oneshot
2921             drop(rx);  // destroy a shared
2922             tx2.send(()).unwrap();
2923         });
2924         // make sure the other thread has gone to sleep
2925         for _ in 0..5000 { thread::yield_now(); }
2926
2927         // upgrade to a shared chan and send a message
2928         let t = tx.clone();
2929         drop(tx);
2930         t.send(()).unwrap();
2931
2932         // wait for the child thread to exit before we exit
2933         rx2.recv().unwrap();
2934     }
2935
2936     #[test]
2937     fn send1() {
2938         let (tx, rx) = sync_channel::<i32>(0);
2939         let _t = thread::spawn(move|| { rx.recv().unwrap(); });
2940         assert_eq!(tx.send(1), Ok(()));
2941     }
2942
2943     #[test]
2944     fn send2() {
2945         let (tx, rx) = sync_channel::<i32>(0);
2946         let _t = thread::spawn(move|| { drop(rx); });
2947         assert!(tx.send(1).is_err());
2948     }
2949
2950     #[test]
2951     fn send3() {
2952         let (tx, rx) = sync_channel::<i32>(1);
2953         assert_eq!(tx.send(1), Ok(()));
2954         let _t =thread::spawn(move|| { drop(rx); });
2955         assert!(tx.send(1).is_err());
2956     }
2957
2958     #[test]
2959     fn send4() {
2960         let (tx, rx) = sync_channel::<i32>(0);
2961         let tx2 = tx.clone();
2962         let (done, donerx) = channel();
2963         let done2 = done.clone();
2964         let _t = thread::spawn(move|| {
2965             assert!(tx.send(1).is_err());
2966             done.send(()).unwrap();
2967         });
2968         let _t = thread::spawn(move|| {
2969             assert!(tx2.send(2).is_err());
2970             done2.send(()).unwrap();
2971         });
2972         drop(rx);
2973         donerx.recv().unwrap();
2974         donerx.recv().unwrap();
2975     }
2976
2977     #[test]
2978     fn try_send1() {
2979         let (tx, _rx) = sync_channel::<i32>(0);
2980         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2981     }
2982
2983     #[test]
2984     fn try_send2() {
2985         let (tx, _rx) = sync_channel::<i32>(1);
2986         assert_eq!(tx.try_send(1), Ok(()));
2987         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2988     }
2989
2990     #[test]
2991     fn try_send3() {
2992         let (tx, rx) = sync_channel::<i32>(1);
2993         assert_eq!(tx.try_send(1), Ok(()));
2994         drop(rx);
2995         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2996     }
2997
2998     #[test]
2999     fn issue_15761() {
3000         fn repro() {
3001             let (tx1, rx1) = sync_channel::<()>(3);
3002             let (tx2, rx2) = sync_channel::<()>(3);
3003
3004             let _t = thread::spawn(move|| {
3005                 rx1.recv().unwrap();
3006                 tx2.try_send(()).unwrap();
3007             });
3008
3009             tx1.try_send(()).unwrap();
3010             rx2.recv().unwrap();
3011         }
3012
3013         for _ in 0..100 {
3014             repro()
3015         }
3016     }
3017
3018     #[test]
3019     fn fmt_debug_sender() {
3020         let (tx, _) = channel::<i32>();
3021         assert_eq!(format!("{:?}", tx), "Sender { .. }");
3022     }
3023
3024     #[test]
3025     fn fmt_debug_recv() {
3026         let (_, rx) = channel::<i32>();
3027         assert_eq!(format!("{:?}", rx), "Receiver { .. }");
3028     }
3029
3030     #[test]
3031     fn fmt_debug_sync_sender() {
3032         let (tx, _) = sync_channel::<i32>(1);
3033         assert_eq!(format!("{:?}", tx), "SyncSender { .. }");
3034     }
3035 }