]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/mod.rs
add inline attributes to stage 0 methods
[rust.git] / src / libstd / sync / mpsc / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Multi-producer, single-consumer FIFO queue communication primitives.
12 //!
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
15 //!
16 //! * `Sender`
17 //! * `SyncSender`
18 //! * `Receiver`
19 //!
20 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
23 //!
24 //! These channels come in two flavors:
25 //!
26 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27 //!    will return a `(Sender, Receiver)` tuple where all sends will be
28 //!    **asynchronous** (they never block). The channel conceptually has an
29 //!    infinite buffer.
30 //!
31 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32 //!    a `(SyncSender, Receiver)` tuple where the storage for pending messages
33 //!    is a pre-allocated buffer of a fixed size. All sends will be
34 //!    **synchronous** by blocking until there is buffer space available. Note
35 //!    that a bound of 0 is allowed, causing the channel to become a
36 //!    "rendezvous" channel where each sender atomically hands off a message to
37 //!    a receiver.
38 //!
39 //! ## Disconnection
40 //!
41 //! The send and receive operations on channels will all return a `Result`
42 //! indicating whether the operation succeeded or not. An unsuccessful operation
43 //! is normally indicative of the other half of a channel having "hung up" by
44 //! being dropped in its corresponding thread.
45 //!
46 //! Once half of a channel has been deallocated, most operations can no longer
47 //! continue to make progress, so `Err` will be returned. Many applications will
48 //! continue to `unwrap()` the results returned from this module, instigating a
49 //! propagation of failure among threads if one unexpectedly dies.
50 //!
51 //! # Examples
52 //!
53 //! Simple usage:
54 //!
55 //! ```
56 //! use std::thread;
57 //! use std::sync::mpsc::channel;
58 //!
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! thread::spawn(move|| {
62 //!     tx.send(10).unwrap();
63 //! });
64 //! assert_eq!(rx.recv().unwrap(), 10);
65 //! ```
66 //!
67 //! Shared usage:
68 //!
69 //! ```
70 //! use std::thread;
71 //! use std::sync::mpsc::channel;
72 //!
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
77 //! for i in 0..10 {
78 //!     let tx = tx.clone();
79 //!     thread::spawn(move|| {
80 //!         tx.send(i).unwrap();
81 //!     });
82 //! }
83 //!
84 //! for _ in 0..10 {
85 //!     let j = rx.recv().unwrap();
86 //!     assert!(0 <= j && j < 10);
87 //! }
88 //! ```
89 //!
90 //! Propagating panics:
91 //!
92 //! ```
93 //! use std::sync::mpsc::channel;
94 //!
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<i32>();
98 //! drop(tx);
99 //! assert!(rx.recv().is_err());
100 //! ```
101 //!
102 //! Synchronous channels:
103 //!
104 //! ```
105 //! use std::thread;
106 //! use std::sync::mpsc::sync_channel;
107 //!
108 //! let (tx, rx) = sync_channel::<i32>(0);
109 //! thread::spawn(move|| {
110 //!     // This will wait for the parent thread to start receiving
111 //!     tx.send(53).unwrap();
112 //! });
113 //! rx.recv().unwrap();
114 //! ```
115
116 #![stable(feature = "rust1", since = "1.0.0")]
117
118 // A description of how Rust's channel implementation works
119 //
120 // Channels are supposed to be the basic building block for all other
121 // concurrent primitives that are used in Rust. As a result, the channel type
122 // needs to be highly optimized, flexible, and broad enough for use everywhere.
123 //
124 // The choice of implementation of all channels is to be built on lock-free data
125 // structures. The channels themselves are then consequently also lock-free data
126 // structures. As always with lock-free code, this is a very "here be dragons"
127 // territory, especially because I'm unaware of any academic papers that have
128 // gone into great length about channels of these flavors.
129 //
130 // ## Flavors of channels
131 //
132 // From the perspective of a consumer of this library, there is only one flavor
133 // of channel. This channel can be used as a stream and cloned to allow multiple
134 // senders. Under the hood, however, there are actually three flavors of
135 // channels in play.
136 //
137 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
138 //                      case. They contain as few atomics as possible and
139 //                      involve one and exactly one allocation.
140 // * Streams - these channels are optimized for the non-shared use case. They
141 //             use a different concurrent queue that is more tailored for this
142 //             use case. The initial allocation of this flavor of channel is not
143 //             optimized.
144 // * Shared - this is the most general form of channel that this module offers,
145 //            a channel with multiple senders. This type is as optimized as it
146 //            can be, but the previous two types mentioned are much faster for
147 //            their use-cases.
148 //
149 // ## Concurrent queues
150 //
151 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
152 // but recv() obviously blocks. This means that under the hood there must be
153 // some shared and concurrent queue holding all of the actual data.
154 //
155 // With two flavors of channels, two flavors of queues are also used. We have
156 // chosen to use queues from a well-known author that are abbreviated as SPSC
157 // and MPSC (single producer, single consumer and multiple producer, single
158 // consumer). SPSC queues are used for streams while MPSC queues are used for
159 // shared channels.
160 //
161 // ### SPSC optimizations
162 //
163 // The SPSC queue found online is essentially a linked list of nodes where one
164 // half of the nodes are the "queue of data" and the other half of nodes are a
165 // cache of unused nodes. The unused nodes are used such that an allocation is
166 // not required on every push() and a free doesn't need to happen on every
167 // pop().
168 //
169 // As found online, however, the cache of nodes is of an infinite size. This
170 // means that if a channel at one point in its life had 50k items in the queue,
171 // then the queue will always have the capacity for 50k items. I believed that
172 // this was an unnecessary limitation of the implementation, so I have altered
173 // the queue to optionally have a bound on the cache size.
174 //
175 // By default, streams will have an unbounded SPSC queue with a small-ish cache
176 // size. The hope is that the cache is still large enough to have very fast
177 // send() operations while not too large such that millions of channels can
178 // coexist at once.
179 //
180 // ### MPSC optimizations
181 //
182 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
183 // a linked list under the hood to earn its unboundedness, but I have not put
184 // forth much effort into having a cache of nodes similar to the SPSC queue.
185 //
186 // For now, I believe that this is "ok" because shared channels are not the most
187 // common type, but soon we may wish to revisit this queue choice and determine
188 // another candidate for backend storage of shared channels.
189 //
190 // ## Overview of the Implementation
191 //
192 // Now that there's a little background on the concurrent queues used, it's
193 // worth going into much more detail about the channels themselves. The basic
194 // pseudocode for a send/recv are:
195 //
196 //
197 //      send(t)                             recv()
198 //        queue.push(t)                       return if queue.pop()
199 //        if increment() == -1                deschedule {
200 //          wakeup()                            if decrement() > 0
201 //                                                cancel_deschedule()
202 //                                            }
203 //                                            queue.pop()
204 //
205 // As mentioned before, there are no locks in this implementation, only atomic
206 // instructions are used.
207 //
208 // ### The internal atomic counter
209 //
210 // Every channel has a shared counter with each half to keep track of the size
211 // of the queue. This counter is used to abort descheduling by the receiver and
212 // to know when to wake up on the sending side.
213 //
214 // As seen in the pseudocode, senders will increment this count and receivers
215 // will decrement the count. The theory behind this is that if a sender sees a
216 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
217 // then it doesn't need to block.
218 //
219 // The recv() method has a beginning call to pop(), and if successful, it needs
220 // to decrement the count. It is a crucial implementation detail that this
221 // decrement does *not* happen to the shared counter. If this were the case,
222 // then it would be possible for the counter to be very negative when there were
223 // no receivers waiting, in which case the senders would have to determine when
224 // it was actually appropriate to wake up a receiver.
225 //
226 // Instead, the "steal count" is kept track of separately (not atomically
227 // because it's only used by receivers), and then the decrement() call when
228 // descheduling will lump in all of the recent steals into one large decrement.
229 //
230 // The implication of this is that if a sender sees a -1 count, then there's
231 // guaranteed to be a waiter waiting!
232 //
233 // ## Native Implementation
234 //
235 // A major goal of these channels is to work seamlessly on and off the runtime.
236 // All of the previous race conditions have been worded in terms of
237 // scheduler-isms (which is obviously not available without the runtime).
238 //
239 // For now, native usage of channels (off the runtime) will fall back onto
240 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
241 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
242 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
243 // condition variable.
244 //
245 // ## Select
246 //
247 // Being able to support selection over channels has greatly influenced this
248 // design, and not only does selection need to work inside the runtime, but also
249 // outside the runtime.
250 //
251 // The implementation is fairly straightforward. The goal of select() is not to
252 // return some data, but only to return which channel can receive data without
253 // blocking. The implementation is essentially the entire blocking procedure
254 // followed by an increment as soon as its woken up. The cancellation procedure
255 // involves an increment and swapping out of to_wake to acquire ownership of the
256 // thread to unblock.
257 //
258 // Sadly this current implementation requires multiple allocations, so I have
259 // seen the throughput of select() be much worse than it should be. I do not
260 // believe that there is anything fundamental that needs to change about these
261 // channels, however, in order to support a more efficient select().
262 //
263 // # Conclusion
264 //
265 // And now that you've seen all the races that I found and attempted to fix,
266 // here's the code for you to find some more!
267
268 use sync::Arc;
269 use error;
270 use fmt;
271 use mem;
272 use cell::UnsafeCell;
273 use time::{Duration, Instant};
274
275 #[unstable(feature = "mpsc_select", issue = "27800")]
276 pub use self::select::{Select, Handle};
277 use self::select::StartResult;
278 use self::select::StartResult::*;
279 use self::blocking::SignalToken;
280
281 mod blocking;
282 mod oneshot;
283 mod select;
284 mod shared;
285 mod stream;
286 mod sync;
287 mod mpsc_queue;
288 mod spsc_queue;
289
290 /// The receiving-half of Rust's channel type. This half can only be owned by
291 /// one thread
292 #[stable(feature = "rust1", since = "1.0.0")]
293 pub struct Receiver<T> {
294     inner: UnsafeCell<Flavor<T>>,
295 }
296
297 // The receiver port can be sent from place to place, so long as it
298 // is not used to receive non-sendable things.
299 #[stable(feature = "rust1", since = "1.0.0")]
300 unsafe impl<T: Send> Send for Receiver<T> { }
301
302 #[stable(feature = "rust1", since = "1.0.0")]
303 impl<T> !Sync for Receiver<T> { }
304
305 /// An iterator over messages on a receiver, this iterator will block
306 /// whenever `next` is called, waiting for a new message, and `None` will be
307 /// returned when the corresponding channel has hung up.
308 #[stable(feature = "rust1", since = "1.0.0")]
309 #[derive(Debug)]
310 pub struct Iter<'a, T: 'a> {
311     rx: &'a Receiver<T>
312 }
313
314 /// An iterator that attempts to yield all pending values for a receiver.
315 /// `None` will be returned when there are no pending values remaining or
316 /// if the corresponding channel has hung up.
317 ///
318 /// This Iterator will never block the caller in order to wait for data to
319 /// become available. Instead, it will return `None`.
320 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
321 #[derive(Debug)]
322 pub struct TryIter<'a, T: 'a> {
323     rx: &'a Receiver<T>
324 }
325
326 /// An owning iterator over messages on a receiver, this iterator will block
327 /// whenever `next` is called, waiting for a new message, and `None` will be
328 /// returned when the corresponding channel has hung up.
329 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
330 #[derive(Debug)]
331 pub struct IntoIter<T> {
332     rx: Receiver<T>
333 }
334
335 /// The sending-half of Rust's asynchronous channel type. This half can only be
336 /// owned by one thread, but it can be cloned to send to other threads.
337 #[stable(feature = "rust1", since = "1.0.0")]
338 pub struct Sender<T> {
339     inner: UnsafeCell<Flavor<T>>,
340 }
341
342 // The send port can be sent from place to place, so long as it
343 // is not used to send non-sendable things.
344 #[stable(feature = "rust1", since = "1.0.0")]
345 unsafe impl<T: Send> Send for Sender<T> { }
346
347 #[stable(feature = "rust1", since = "1.0.0")]
348 impl<T> !Sync for Sender<T> { }
349
350 /// The sending-half of Rust's synchronous channel type. This half can only be
351 /// owned by one thread, but it can be cloned to send to other threads.
352 #[stable(feature = "rust1", since = "1.0.0")]
353 pub struct SyncSender<T> {
354     inner: Arc<sync::Packet<T>>,
355 }
356
357 #[stable(feature = "rust1", since = "1.0.0")]
358 unsafe impl<T: Send> Send for SyncSender<T> {}
359
360 #[stable(feature = "rust1", since = "1.0.0")]
361 impl<T> !Sync for SyncSender<T> {}
362
363 /// An error returned from the `send` function on channels.
364 ///
365 /// A `send` operation can only fail if the receiving end of a channel is
366 /// disconnected, implying that the data could never be received. The error
367 /// contains the data being sent as a payload so it can be recovered.
368 #[stable(feature = "rust1", since = "1.0.0")]
369 #[derive(PartialEq, Eq, Clone, Copy)]
370 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
371
372 /// An error returned from the `recv` function on a `Receiver`.
373 ///
374 /// The `recv` operation can only fail if the sending half of a channel is
375 /// disconnected, implying that no further messages will ever be received.
376 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
377 #[stable(feature = "rust1", since = "1.0.0")]
378 pub struct RecvError;
379
380 /// This enumeration is the list of the possible reasons that `try_recv` could
381 /// not return data when called.
382 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
383 #[stable(feature = "rust1", since = "1.0.0")]
384 pub enum TryRecvError {
385     /// This channel is currently empty, but the sender(s) have not yet
386     /// disconnected, so data may yet become available.
387     #[stable(feature = "rust1", since = "1.0.0")]
388     Empty,
389
390     /// This channel's sending half has become disconnected, and there will
391     /// never be any more data received on this channel
392     #[stable(feature = "rust1", since = "1.0.0")]
393     Disconnected,
394 }
395
396 /// This enumeration is the list of possible errors that `recv_timeout` could
397 /// not return data when called.
398 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
399 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
400 pub enum RecvTimeoutError {
401     /// This channel is currently empty, but the sender(s) have not yet
402     /// disconnected, so data may yet become available.
403     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
404     Timeout,
405     /// This channel's sending half has become disconnected, and there will
406     /// never be any more data received on this channel
407     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
408     Disconnected,
409 }
410
411 /// This enumeration is the list of the possible error outcomes for the
412 /// `SyncSender::try_send` method.
413 #[stable(feature = "rust1", since = "1.0.0")]
414 #[derive(PartialEq, Eq, Clone, Copy)]
415 pub enum TrySendError<T> {
416     /// The data could not be sent on the channel because it would require that
417     /// the callee block to send the data.
418     ///
419     /// If this is a buffered channel, then the buffer is full at this time. If
420     /// this is not a buffered channel, then there is no receiver available to
421     /// acquire the data.
422     #[stable(feature = "rust1", since = "1.0.0")]
423     Full(#[stable(feature = "rust1", since = "1.0.0")] T),
424
425     /// This channel's receiving half has disconnected, so the data could not be
426     /// sent. The data is returned back to the callee in this case.
427     #[stable(feature = "rust1", since = "1.0.0")]
428     Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
429 }
430
431 enum Flavor<T> {
432     Oneshot(Arc<oneshot::Packet<T>>),
433     Stream(Arc<stream::Packet<T>>),
434     Shared(Arc<shared::Packet<T>>),
435     Sync(Arc<sync::Packet<T>>),
436 }
437
438 #[doc(hidden)]
439 trait UnsafeFlavor<T> {
440     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
441     unsafe fn inner_mut(&self) -> &mut Flavor<T> {
442         &mut *self.inner_unsafe().get()
443     }
444     unsafe fn inner(&self) -> &Flavor<T> {
445         &*self.inner_unsafe().get()
446     }
447 }
448 impl<T> UnsafeFlavor<T> for Sender<T> {
449     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
450         &self.inner
451     }
452 }
453 impl<T> UnsafeFlavor<T> for Receiver<T> {
454     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
455         &self.inner
456     }
457 }
458
459 /// Creates a new asynchronous channel, returning the sender/receiver halves.
460 /// All data sent on the sender will become available on the receiver, and no
461 /// send will block the calling thread (this channel has an "infinite buffer").
462 ///
463 /// If the [`Receiver`] is disconnected while trying to [`send()`] with the
464 /// [`Sender`], the [`send()`] method will return an error.
465 ///
466 /// [`send()`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
467 /// [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
468 /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
469 ///
470 /// # Examples
471 ///
472 /// ```
473 /// use std::sync::mpsc::channel;
474 /// use std::thread;
475 ///
476 /// // tx is the sending half (tx for transmission), and rx is the receiving
477 /// // half (rx for receiving).
478 /// let (tx, rx) = channel();
479 ///
480 /// // Spawn off an expensive computation
481 /// thread::spawn(move|| {
482 /// #   fn expensive_computation() {}
483 ///     tx.send(expensive_computation()).unwrap();
484 /// });
485 ///
486 /// // Do some useful work for awhile
487 ///
488 /// // Let's see what that answer was
489 /// println!("{:?}", rx.recv().unwrap());
490 /// ```
491 #[stable(feature = "rust1", since = "1.0.0")]
492 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
493     let a = Arc::new(oneshot::Packet::new());
494     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
495 }
496
497 /// Creates a new synchronous, bounded channel.
498 ///
499 /// Like asynchronous channels, the [`Receiver`] will block until a message
500 /// becomes available. These channels differ greatly in the semantics of the
501 /// sender from asynchronous channels, however.
502 ///
503 /// This channel has an internal buffer on which messages will be queued.
504 /// `bound` specifies the buffer size. When the internal buffer becomes full,
505 /// future sends will *block* waiting for the buffer to open up. Note that a
506 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
507 /// where each [`send()`] will not return until a recv is paired with it.
508 ///
509 /// Like asynchronous channels, if the [`Receiver`] is disconnected while
510 /// trying to [`send()`] with the [`SyncSender`], the [`send()`] method will
511 /// return an error.
512 ///
513 /// [`send()`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
514 /// [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
515 /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
516 ///
517 /// # Examples
518 ///
519 /// ```
520 /// use std::sync::mpsc::sync_channel;
521 /// use std::thread;
522 ///
523 /// let (tx, rx) = sync_channel(1);
524 ///
525 /// // this returns immediately
526 /// tx.send(1).unwrap();
527 ///
528 /// thread::spawn(move|| {
529 ///     // this will block until the previous message has been received
530 ///     tx.send(2).unwrap();
531 /// });
532 ///
533 /// assert_eq!(rx.recv().unwrap(), 1);
534 /// assert_eq!(rx.recv().unwrap(), 2);
535 /// ```
536 #[stable(feature = "rust1", since = "1.0.0")]
537 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
538     let a = Arc::new(sync::Packet::new(bound));
539     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
540 }
541
542 ////////////////////////////////////////////////////////////////////////////////
543 // Sender
544 ////////////////////////////////////////////////////////////////////////////////
545
546 impl<T> Sender<T> {
547     fn new(inner: Flavor<T>) -> Sender<T> {
548         Sender {
549             inner: UnsafeCell::new(inner),
550         }
551     }
552
553     /// Attempts to send a value on this channel, returning it back if it could
554     /// not be sent.
555     ///
556     /// A successful send occurs when it is determined that the other end of
557     /// the channel has not hung up already. An unsuccessful send would be one
558     /// where the corresponding receiver has already been deallocated. Note
559     /// that a return value of `Err` means that the data will never be
560     /// received, but a return value of `Ok` does *not* mean that the data
561     /// will be received.  It is possible for the corresponding receiver to
562     /// hang up immediately after this function returns `Ok`.
563     ///
564     /// This method will never block the current thread.
565     ///
566     /// # Examples
567     ///
568     /// ```
569     /// use std::sync::mpsc::channel;
570     ///
571     /// let (tx, rx) = channel();
572     ///
573     /// // This send is always successful
574     /// tx.send(1).unwrap();
575     ///
576     /// // This send will fail because the receiver is gone
577     /// drop(rx);
578     /// assert_eq!(tx.send(1).unwrap_err().0, 1);
579     /// ```
580     #[stable(feature = "rust1", since = "1.0.0")]
581     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
582         let (new_inner, ret) = match *unsafe { self.inner() } {
583             Flavor::Oneshot(ref p) => {
584                 if !p.sent() {
585                     return p.send(t).map_err(SendError);
586                 } else {
587                     let a = Arc::new(stream::Packet::new());
588                     let rx = Receiver::new(Flavor::Stream(a.clone()));
589                     match p.upgrade(rx) {
590                         oneshot::UpSuccess => {
591                             let ret = a.send(t);
592                             (a, ret)
593                         }
594                         oneshot::UpDisconnected => (a, Err(t)),
595                         oneshot::UpWoke(token) => {
596                             // This send cannot panic because the thread is
597                             // asleep (we're looking at it), so the receiver
598                             // can't go away.
599                             a.send(t).ok().unwrap();
600                             token.signal();
601                             (a, Ok(()))
602                         }
603                     }
604                 }
605             }
606             Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
607             Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
608             Flavor::Sync(..) => unreachable!(),
609         };
610
611         unsafe {
612             let tmp = Sender::new(Flavor::Stream(new_inner));
613             mem::swap(self.inner_mut(), tmp.inner_mut());
614         }
615         ret.map_err(SendError)
616     }
617 }
618
619 #[stable(feature = "rust1", since = "1.0.0")]
620 impl<T> Clone for Sender<T> {
621     fn clone(&self) -> Sender<T> {
622         let packet = match *unsafe { self.inner() } {
623             Flavor::Oneshot(ref p) => {
624                 let a = Arc::new(shared::Packet::new());
625                 {
626                     let guard = a.postinit_lock();
627                     let rx = Receiver::new(Flavor::Shared(a.clone()));
628                     let sleeper = match p.upgrade(rx) {
629                         oneshot::UpSuccess |
630                         oneshot::UpDisconnected => None,
631                         oneshot::UpWoke(task) => Some(task),
632                     };
633                     a.inherit_blocker(sleeper, guard);
634                 }
635                 a
636             }
637             Flavor::Stream(ref p) => {
638                 let a = Arc::new(shared::Packet::new());
639                 {
640                     let guard = a.postinit_lock();
641                     let rx = Receiver::new(Flavor::Shared(a.clone()));
642                     let sleeper = match p.upgrade(rx) {
643                         stream::UpSuccess |
644                         stream::UpDisconnected => None,
645                         stream::UpWoke(task) => Some(task),
646                     };
647                     a.inherit_blocker(sleeper, guard);
648                 }
649                 a
650             }
651             Flavor::Shared(ref p) => {
652                 p.clone_chan();
653                 return Sender::new(Flavor::Shared(p.clone()));
654             }
655             Flavor::Sync(..) => unreachable!(),
656         };
657
658         unsafe {
659             let tmp = Sender::new(Flavor::Shared(packet.clone()));
660             mem::swap(self.inner_mut(), tmp.inner_mut());
661         }
662         Sender::new(Flavor::Shared(packet))
663     }
664 }
665
666 #[stable(feature = "rust1", since = "1.0.0")]
667 impl<T> Drop for Sender<T> {
668     fn drop(&mut self) {
669         match *unsafe { self.inner() } {
670             Flavor::Oneshot(ref p) => p.drop_chan(),
671             Flavor::Stream(ref p) => p.drop_chan(),
672             Flavor::Shared(ref p) => p.drop_chan(),
673             Flavor::Sync(..) => unreachable!(),
674         }
675     }
676 }
677
678 #[stable(feature = "mpsc_debug", since = "1.7.0")]
679 impl<T> fmt::Debug for Sender<T> {
680     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
681         write!(f, "Sender {{ .. }}")
682     }
683 }
684
685 ////////////////////////////////////////////////////////////////////////////////
686 // SyncSender
687 ////////////////////////////////////////////////////////////////////////////////
688
689 impl<T> SyncSender<T> {
690     fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
691         SyncSender { inner: inner }
692     }
693
694     /// Sends a value on this synchronous channel.
695     ///
696     /// This function will *block* until space in the internal buffer becomes
697     /// available or a receiver is available to hand off the message to.
698     ///
699     /// Note that a successful send does *not* guarantee that the receiver will
700     /// ever see the data if there is a buffer on this channel. Items may be
701     /// enqueued in the internal buffer for the receiver to receive at a later
702     /// time. If the buffer size is 0, however, it can be guaranteed that the
703     /// receiver has indeed received the data if this function returns success.
704     ///
705     /// This function will never panic, but it may return `Err` if the
706     /// `Receiver` has disconnected and is no longer able to receive
707     /// information.
708     #[stable(feature = "rust1", since = "1.0.0")]
709     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
710         self.inner.send(t).map_err(SendError)
711     }
712
713     /// Attempts to send a value on this channel without blocking.
714     ///
715     /// This method differs from `send` by returning immediately if the
716     /// channel's buffer is full or no receiver is waiting to acquire some
717     /// data. Compared with `send`, this function has two failure cases
718     /// instead of one (one for disconnection, one for a full buffer).
719     ///
720     /// See `SyncSender::send` for notes about guarantees of whether the
721     /// receiver has received the data or not if this function is successful.
722     #[stable(feature = "rust1", since = "1.0.0")]
723     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
724         self.inner.try_send(t)
725     }
726 }
727
728 #[stable(feature = "rust1", since = "1.0.0")]
729 impl<T> Clone for SyncSender<T> {
730     fn clone(&self) -> SyncSender<T> {
731         self.inner.clone_chan();
732         SyncSender::new(self.inner.clone())
733     }
734 }
735
736 #[stable(feature = "rust1", since = "1.0.0")]
737 impl<T> Drop for SyncSender<T> {
738     fn drop(&mut self) {
739         self.inner.drop_chan();
740     }
741 }
742
743 #[stable(feature = "mpsc_debug", since = "1.7.0")]
744 impl<T> fmt::Debug for SyncSender<T> {
745     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
746         write!(f, "SyncSender {{ .. }}")
747     }
748 }
749
750 ////////////////////////////////////////////////////////////////////////////////
751 // Receiver
752 ////////////////////////////////////////////////////////////////////////////////
753
754 impl<T> Receiver<T> {
755     fn new(inner: Flavor<T>) -> Receiver<T> {
756         Receiver { inner: UnsafeCell::new(inner) }
757     }
758
759     /// Attempts to return a pending value on this receiver without blocking
760     ///
761     /// This method will never block the caller in order to wait for data to
762     /// become available. Instead, this will always return immediately with a
763     /// possible option of pending data on the channel.
764     ///
765     /// This is useful for a flavor of "optimistic check" before deciding to
766     /// block on a receiver.
767     #[stable(feature = "rust1", since = "1.0.0")]
768     pub fn try_recv(&self) -> Result<T, TryRecvError> {
769         loop {
770             let new_port = match *unsafe { self.inner() } {
771                 Flavor::Oneshot(ref p) => {
772                     match p.try_recv() {
773                         Ok(t) => return Ok(t),
774                         Err(oneshot::Empty) => return Err(TryRecvError::Empty),
775                         Err(oneshot::Disconnected) => {
776                             return Err(TryRecvError::Disconnected)
777                         }
778                         Err(oneshot::Upgraded(rx)) => rx,
779                     }
780                 }
781                 Flavor::Stream(ref p) => {
782                     match p.try_recv() {
783                         Ok(t) => return Ok(t),
784                         Err(stream::Empty) => return Err(TryRecvError::Empty),
785                         Err(stream::Disconnected) => {
786                             return Err(TryRecvError::Disconnected)
787                         }
788                         Err(stream::Upgraded(rx)) => rx,
789                     }
790                 }
791                 Flavor::Shared(ref p) => {
792                     match p.try_recv() {
793                         Ok(t) => return Ok(t),
794                         Err(shared::Empty) => return Err(TryRecvError::Empty),
795                         Err(shared::Disconnected) => {
796                             return Err(TryRecvError::Disconnected)
797                         }
798                     }
799                 }
800                 Flavor::Sync(ref p) => {
801                     match p.try_recv() {
802                         Ok(t) => return Ok(t),
803                         Err(sync::Empty) => return Err(TryRecvError::Empty),
804                         Err(sync::Disconnected) => {
805                             return Err(TryRecvError::Disconnected)
806                         }
807                     }
808                 }
809             };
810             unsafe {
811                 mem::swap(self.inner_mut(),
812                           new_port.inner_mut());
813             }
814         }
815     }
816
817     /// Attempts to wait for a value on this receiver, returning an error if the
818     /// corresponding channel has hung up.
819     ///
820     /// This function will always block the current thread if there is no data
821     /// available and it's possible for more data to be sent. Once a message is
822     /// sent to the corresponding `Sender`, then this receiver will wake up and
823     /// return that message.
824     ///
825     /// If the corresponding `Sender` has disconnected, or it disconnects while
826     /// this call is blocking, this call will wake up and return `Err` to
827     /// indicate that no more messages can ever be received on this channel.
828     /// However, since channels are buffered, messages sent before the disconnect
829     /// will still be properly received.
830     ///
831     /// # Examples
832     ///
833     /// ```
834     /// use std::sync::mpsc;
835     /// use std::thread;
836     ///
837     /// let (send, recv) = mpsc::channel();
838     /// let handle = thread::spawn(move || {
839     ///     send.send(1u8).unwrap();
840     /// });
841     ///
842     /// handle.join().unwrap();
843     ///
844     /// assert_eq!(Ok(1), recv.recv());
845     /// ```
846     ///
847     /// Buffering behavior:
848     ///
849     /// ```
850     /// use std::sync::mpsc;
851     /// use std::thread;
852     /// use std::sync::mpsc::RecvError;
853     ///
854     /// let (send, recv) = mpsc::channel();
855     /// let handle = thread::spawn(move || {
856     ///     send.send(1u8).unwrap();
857     ///     send.send(2).unwrap();
858     ///     send.send(3).unwrap();
859     ///     drop(send);
860     /// });
861     ///
862     /// // wait for the thread to join so we ensure the sender is dropped
863     /// handle.join().unwrap();
864     ///
865     /// assert_eq!(Ok(1), recv.recv());
866     /// assert_eq!(Ok(2), recv.recv());
867     /// assert_eq!(Ok(3), recv.recv());
868     /// assert_eq!(Err(RecvError), recv.recv());
869     /// ```
870     #[stable(feature = "rust1", since = "1.0.0")]
871     pub fn recv(&self) -> Result<T, RecvError> {
872         loop {
873             let new_port = match *unsafe { self.inner() } {
874                 Flavor::Oneshot(ref p) => {
875                     match p.recv(None) {
876                         Ok(t) => return Ok(t),
877                         Err(oneshot::Disconnected) => return Err(RecvError),
878                         Err(oneshot::Upgraded(rx)) => rx,
879                         Err(oneshot::Empty) => unreachable!(),
880                     }
881                 }
882                 Flavor::Stream(ref p) => {
883                     match p.recv(None) {
884                         Ok(t) => return Ok(t),
885                         Err(stream::Disconnected) => return Err(RecvError),
886                         Err(stream::Upgraded(rx)) => rx,
887                         Err(stream::Empty) => unreachable!(),
888                     }
889                 }
890                 Flavor::Shared(ref p) => {
891                     match p.recv(None) {
892                         Ok(t) => return Ok(t),
893                         Err(shared::Disconnected) => return Err(RecvError),
894                         Err(shared::Empty) => unreachable!(),
895                     }
896                 }
897                 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
898             };
899             unsafe {
900                 mem::swap(self.inner_mut(), new_port.inner_mut());
901             }
902         }
903     }
904
905     /// Attempts to wait for a value on this receiver, returning an error if the
906     /// corresponding channel has hung up, or if it waits more than `timeout`.
907     ///
908     /// This function will always block the current thread if there is no data
909     /// available and it's possible for more data to be sent. Once a message is
910     /// sent to the corresponding `Sender`, then this receiver will wake up and
911     /// return that message.
912     ///
913     /// If the corresponding `Sender` has disconnected, or it disconnects while
914     /// this call is blocking, this call will wake up and return `Err` to
915     /// indicate that no more messages can ever be received on this channel.
916     /// However, since channels are buffered, messages sent before the disconnect
917     /// will still be properly received.
918     ///
919     /// # Examples
920     ///
921     /// ```no_run
922     /// use std::sync::mpsc::{self, RecvTimeoutError};
923     /// use std::time::Duration;
924     ///
925     /// let (send, recv) = mpsc::channel::<()>();
926     ///
927     /// let timeout = Duration::from_millis(100);
928     /// assert_eq!(Err(RecvTimeoutError::Timeout), recv.recv_timeout(timeout));
929     /// ```
930     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
931     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
932         // Do an optimistic try_recv to avoid the performance impact of
933         // Instant::now() in the full-channel case.
934         match self.try_recv() {
935             Ok(result)
936                 => Ok(result),
937             Err(TryRecvError::Disconnected)
938                 => Err(RecvTimeoutError::Disconnected),
939             Err(TryRecvError::Empty)
940                 => self.recv_max_until(Instant::now() + timeout)
941         }
942     }
943
944     fn recv_max_until(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
945         use self::RecvTimeoutError::*;
946
947         loop {
948             let port_or_empty = match *unsafe { self.inner() } {
949                 Flavor::Oneshot(ref p) => {
950                     match p.recv(Some(deadline)) {
951                         Ok(t) => return Ok(t),
952                         Err(oneshot::Disconnected) => return Err(Disconnected),
953                         Err(oneshot::Upgraded(rx)) => Some(rx),
954                         Err(oneshot::Empty) => None,
955                     }
956                 }
957                 Flavor::Stream(ref p) => {
958                     match p.recv(Some(deadline)) {
959                         Ok(t) => return Ok(t),
960                         Err(stream::Disconnected) => return Err(Disconnected),
961                         Err(stream::Upgraded(rx)) => Some(rx),
962                         Err(stream::Empty) => None,
963                     }
964                 }
965                 Flavor::Shared(ref p) => {
966                     match p.recv(Some(deadline)) {
967                         Ok(t) => return Ok(t),
968                         Err(shared::Disconnected) => return Err(Disconnected),
969                         Err(shared::Empty) => None,
970                     }
971                 }
972                 Flavor::Sync(ref p) => {
973                     match p.recv(Some(deadline)) {
974                         Ok(t) => return Ok(t),
975                         Err(sync::Disconnected) => return Err(Disconnected),
976                         Err(sync::Empty) => None,
977                     }
978                 }
979             };
980
981             if let Some(new_port) = port_or_empty {
982                 unsafe {
983                     mem::swap(self.inner_mut(), new_port.inner_mut());
984                 }
985             }
986
987             // If we're already passed the deadline, and we're here without
988             // data, return a timeout, else try again.
989             if Instant::now() >= deadline {
990                 return Err(Timeout);
991             }
992         }
993     }
994
995     /// Returns an iterator that will block waiting for messages, but never
996     /// `panic!`. It will return `None` when the channel has hung up.
997     #[stable(feature = "rust1", since = "1.0.0")]
998     pub fn iter(&self) -> Iter<T> {
999         Iter { rx: self }
1000     }
1001
1002     /// Returns an iterator that will attempt to yield all pending values.
1003     /// It will return `None` if there are no more pending values or if the
1004     /// channel has hung up. The iterator will never `panic!` or block the
1005     /// user by waiting for values.
1006     #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1007     pub fn try_iter(&self) -> TryIter<T> {
1008         TryIter { rx: self }
1009     }
1010
1011 }
1012
1013 impl<T> select::Packet for Receiver<T> {
1014     fn can_recv(&self) -> bool {
1015         loop {
1016             let new_port = match *unsafe { self.inner() } {
1017                 Flavor::Oneshot(ref p) => {
1018                     match p.can_recv() {
1019                         Ok(ret) => return ret,
1020                         Err(upgrade) => upgrade,
1021                     }
1022                 }
1023                 Flavor::Stream(ref p) => {
1024                     match p.can_recv() {
1025                         Ok(ret) => return ret,
1026                         Err(upgrade) => upgrade,
1027                     }
1028                 }
1029                 Flavor::Shared(ref p) => return p.can_recv(),
1030                 Flavor::Sync(ref p) => return p.can_recv(),
1031             };
1032             unsafe {
1033                 mem::swap(self.inner_mut(),
1034                           new_port.inner_mut());
1035             }
1036         }
1037     }
1038
1039     fn start_selection(&self, mut token: SignalToken) -> StartResult {
1040         loop {
1041             let (t, new_port) = match *unsafe { self.inner() } {
1042                 Flavor::Oneshot(ref p) => {
1043                     match p.start_selection(token) {
1044                         oneshot::SelSuccess => return Installed,
1045                         oneshot::SelCanceled => return Abort,
1046                         oneshot::SelUpgraded(t, rx) => (t, rx),
1047                     }
1048                 }
1049                 Flavor::Stream(ref p) => {
1050                     match p.start_selection(token) {
1051                         stream::SelSuccess => return Installed,
1052                         stream::SelCanceled => return Abort,
1053                         stream::SelUpgraded(t, rx) => (t, rx),
1054                     }
1055                 }
1056                 Flavor::Shared(ref p) => return p.start_selection(token),
1057                 Flavor::Sync(ref p) => return p.start_selection(token),
1058             };
1059             token = t;
1060             unsafe {
1061                 mem::swap(self.inner_mut(), new_port.inner_mut());
1062             }
1063         }
1064     }
1065
1066     fn abort_selection(&self) -> bool {
1067         let mut was_upgrade = false;
1068         loop {
1069             let result = match *unsafe { self.inner() } {
1070                 Flavor::Oneshot(ref p) => p.abort_selection(),
1071                 Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
1072                 Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
1073                 Flavor::Sync(ref p) => return p.abort_selection(),
1074             };
1075             let new_port = match result { Ok(b) => return b, Err(p) => p };
1076             was_upgrade = true;
1077             unsafe {
1078                 mem::swap(self.inner_mut(),
1079                           new_port.inner_mut());
1080             }
1081         }
1082     }
1083 }
1084
1085 #[stable(feature = "rust1", since = "1.0.0")]
1086 impl<'a, T> Iterator for Iter<'a, T> {
1087     type Item = T;
1088
1089     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1090 }
1091
1092 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1093 impl<'a, T> Iterator for TryIter<'a, T> {
1094     type Item = T;
1095
1096     fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
1097 }
1098
1099 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1100 impl<'a, T> IntoIterator for &'a Receiver<T> {
1101     type Item = T;
1102     type IntoIter = Iter<'a, T>;
1103
1104     fn into_iter(self) -> Iter<'a, T> { self.iter() }
1105 }
1106
1107 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1108 impl<T> Iterator for IntoIter<T> {
1109     type Item = T;
1110     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1111 }
1112
1113 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1114 impl <T> IntoIterator for Receiver<T> {
1115     type Item = T;
1116     type IntoIter = IntoIter<T>;
1117
1118     fn into_iter(self) -> IntoIter<T> {
1119         IntoIter { rx: self }
1120     }
1121 }
1122
1123 #[stable(feature = "rust1", since = "1.0.0")]
1124 impl<T> Drop for Receiver<T> {
1125     fn drop(&mut self) {
1126         match *unsafe { self.inner() } {
1127             Flavor::Oneshot(ref p) => p.drop_port(),
1128             Flavor::Stream(ref p) => p.drop_port(),
1129             Flavor::Shared(ref p) => p.drop_port(),
1130             Flavor::Sync(ref p) => p.drop_port(),
1131         }
1132     }
1133 }
1134
1135 #[stable(feature = "mpsc_debug", since = "1.7.0")]
1136 impl<T> fmt::Debug for Receiver<T> {
1137     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1138         write!(f, "Receiver {{ .. }}")
1139     }
1140 }
1141
1142 #[stable(feature = "rust1", since = "1.0.0")]
1143 impl<T> fmt::Debug for SendError<T> {
1144     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1145         "SendError(..)".fmt(f)
1146     }
1147 }
1148
1149 #[stable(feature = "rust1", since = "1.0.0")]
1150 impl<T> fmt::Display for SendError<T> {
1151     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1152         "sending on a closed channel".fmt(f)
1153     }
1154 }
1155
1156 #[stable(feature = "rust1", since = "1.0.0")]
1157 impl<T: Send> error::Error for SendError<T> {
1158     fn description(&self) -> &str {
1159         "sending on a closed channel"
1160     }
1161
1162     fn cause(&self) -> Option<&error::Error> {
1163         None
1164     }
1165 }
1166
1167 #[stable(feature = "rust1", since = "1.0.0")]
1168 impl<T> fmt::Debug for TrySendError<T> {
1169     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1170         match *self {
1171             TrySendError::Full(..) => "Full(..)".fmt(f),
1172             TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1173         }
1174     }
1175 }
1176
1177 #[stable(feature = "rust1", since = "1.0.0")]
1178 impl<T> fmt::Display for TrySendError<T> {
1179     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1180         match *self {
1181             TrySendError::Full(..) => {
1182                 "sending on a full channel".fmt(f)
1183             }
1184             TrySendError::Disconnected(..) => {
1185                 "sending on a closed channel".fmt(f)
1186             }
1187         }
1188     }
1189 }
1190
1191 #[stable(feature = "rust1", since = "1.0.0")]
1192 impl<T: Send> error::Error for TrySendError<T> {
1193
1194     fn description(&self) -> &str {
1195         match *self {
1196             TrySendError::Full(..) => {
1197                 "sending on a full channel"
1198             }
1199             TrySendError::Disconnected(..) => {
1200                 "sending on a closed channel"
1201             }
1202         }
1203     }
1204
1205     fn cause(&self) -> Option<&error::Error> {
1206         None
1207     }
1208 }
1209
1210 #[stable(feature = "rust1", since = "1.0.0")]
1211 impl fmt::Display for RecvError {
1212     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1213         "receiving on a closed channel".fmt(f)
1214     }
1215 }
1216
1217 #[stable(feature = "rust1", since = "1.0.0")]
1218 impl error::Error for RecvError {
1219
1220     fn description(&self) -> &str {
1221         "receiving on a closed channel"
1222     }
1223
1224     fn cause(&self) -> Option<&error::Error> {
1225         None
1226     }
1227 }
1228
1229 #[stable(feature = "rust1", since = "1.0.0")]
1230 impl fmt::Display for TryRecvError {
1231     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1232         match *self {
1233             TryRecvError::Empty => {
1234                 "receiving on an empty channel".fmt(f)
1235             }
1236             TryRecvError::Disconnected => {
1237                 "receiving on a closed channel".fmt(f)
1238             }
1239         }
1240     }
1241 }
1242
1243 #[stable(feature = "rust1", since = "1.0.0")]
1244 impl error::Error for TryRecvError {
1245
1246     fn description(&self) -> &str {
1247         match *self {
1248             TryRecvError::Empty => {
1249                 "receiving on an empty channel"
1250             }
1251             TryRecvError::Disconnected => {
1252                 "receiving on a closed channel"
1253             }
1254         }
1255     }
1256
1257     fn cause(&self) -> Option<&error::Error> {
1258         None
1259     }
1260 }
1261
1262 #[stable(feature = "mpsc_recv_timeout_error", since = "1.14.0")]
1263 impl fmt::Display for RecvTimeoutError {
1264     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1265         match *self {
1266             RecvTimeoutError::Timeout => {
1267                 "timed out waiting on channel".fmt(f)
1268             }
1269             RecvTimeoutError::Disconnected => {
1270                 "channel is empty and sending half is closed".fmt(f)
1271             }
1272         }
1273     }
1274 }
1275
1276 #[stable(feature = "mpsc_recv_timeout_error", since = "1.14.0")]
1277 impl error::Error for RecvTimeoutError {
1278     fn description(&self) -> &str {
1279         match *self {
1280             RecvTimeoutError::Timeout => {
1281                 "timed out waiting on channel"
1282             }
1283             RecvTimeoutError::Disconnected => {
1284                 "channel is empty and sending half is closed"
1285             }
1286         }
1287     }
1288
1289     fn cause(&self) -> Option<&error::Error> {
1290         None
1291     }
1292 }
1293
1294 #[cfg(all(test, not(target_os = "emscripten")))]
1295 mod tests {
1296     use env;
1297     use super::*;
1298     use thread;
1299     use time::{Duration, Instant};
1300
1301     pub fn stress_factor() -> usize {
1302         match env::var("RUST_TEST_STRESS") {
1303             Ok(val) => val.parse().unwrap(),
1304             Err(..) => 1,
1305         }
1306     }
1307
1308     #[test]
1309     fn smoke() {
1310         let (tx, rx) = channel::<i32>();
1311         tx.send(1).unwrap();
1312         assert_eq!(rx.recv().unwrap(), 1);
1313     }
1314
1315     #[test]
1316     fn drop_full() {
1317         let (tx, _rx) = channel::<Box<isize>>();
1318         tx.send(box 1).unwrap();
1319     }
1320
1321     #[test]
1322     fn drop_full_shared() {
1323         let (tx, _rx) = channel::<Box<isize>>();
1324         drop(tx.clone());
1325         drop(tx.clone());
1326         tx.send(box 1).unwrap();
1327     }
1328
1329     #[test]
1330     fn smoke_shared() {
1331         let (tx, rx) = channel::<i32>();
1332         tx.send(1).unwrap();
1333         assert_eq!(rx.recv().unwrap(), 1);
1334         let tx = tx.clone();
1335         tx.send(1).unwrap();
1336         assert_eq!(rx.recv().unwrap(), 1);
1337     }
1338
1339     #[test]
1340     fn smoke_threads() {
1341         let (tx, rx) = channel::<i32>();
1342         let _t = thread::spawn(move|| {
1343             tx.send(1).unwrap();
1344         });
1345         assert_eq!(rx.recv().unwrap(), 1);
1346     }
1347
1348     #[test]
1349     fn smoke_port_gone() {
1350         let (tx, rx) = channel::<i32>();
1351         drop(rx);
1352         assert!(tx.send(1).is_err());
1353     }
1354
1355     #[test]
1356     fn smoke_shared_port_gone() {
1357         let (tx, rx) = channel::<i32>();
1358         drop(rx);
1359         assert!(tx.send(1).is_err())
1360     }
1361
1362     #[test]
1363     fn smoke_shared_port_gone2() {
1364         let (tx, rx) = channel::<i32>();
1365         drop(rx);
1366         let tx2 = tx.clone();
1367         drop(tx);
1368         assert!(tx2.send(1).is_err());
1369     }
1370
1371     #[test]
1372     fn port_gone_concurrent() {
1373         let (tx, rx) = channel::<i32>();
1374         let _t = thread::spawn(move|| {
1375             rx.recv().unwrap();
1376         });
1377         while tx.send(1).is_ok() {}
1378     }
1379
1380     #[test]
1381     fn port_gone_concurrent_shared() {
1382         let (tx, rx) = channel::<i32>();
1383         let tx2 = tx.clone();
1384         let _t = thread::spawn(move|| {
1385             rx.recv().unwrap();
1386         });
1387         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1388     }
1389
1390     #[test]
1391     fn smoke_chan_gone() {
1392         let (tx, rx) = channel::<i32>();
1393         drop(tx);
1394         assert!(rx.recv().is_err());
1395     }
1396
1397     #[test]
1398     fn smoke_chan_gone_shared() {
1399         let (tx, rx) = channel::<()>();
1400         let tx2 = tx.clone();
1401         drop(tx);
1402         drop(tx2);
1403         assert!(rx.recv().is_err());
1404     }
1405
1406     #[test]
1407     fn chan_gone_concurrent() {
1408         let (tx, rx) = channel::<i32>();
1409         let _t = thread::spawn(move|| {
1410             tx.send(1).unwrap();
1411             tx.send(1).unwrap();
1412         });
1413         while rx.recv().is_ok() {}
1414     }
1415
1416     #[test]
1417     fn stress() {
1418         let (tx, rx) = channel::<i32>();
1419         let t = thread::spawn(move|| {
1420             for _ in 0..10000 { tx.send(1).unwrap(); }
1421         });
1422         for _ in 0..10000 {
1423             assert_eq!(rx.recv().unwrap(), 1);
1424         }
1425         t.join().ok().unwrap();
1426     }
1427
1428     #[test]
1429     fn stress_shared() {
1430         const AMT: u32 = 10000;
1431         const NTHREADS: u32 = 8;
1432         let (tx, rx) = channel::<i32>();
1433
1434         let t = thread::spawn(move|| {
1435             for _ in 0..AMT * NTHREADS {
1436                 assert_eq!(rx.recv().unwrap(), 1);
1437             }
1438             match rx.try_recv() {
1439                 Ok(..) => panic!(),
1440                 _ => {}
1441             }
1442         });
1443
1444         for _ in 0..NTHREADS {
1445             let tx = tx.clone();
1446             thread::spawn(move|| {
1447                 for _ in 0..AMT { tx.send(1).unwrap(); }
1448             });
1449         }
1450         drop(tx);
1451         t.join().ok().unwrap();
1452     }
1453
1454     #[test]
1455     fn send_from_outside_runtime() {
1456         let (tx1, rx1) = channel::<()>();
1457         let (tx2, rx2) = channel::<i32>();
1458         let t1 = thread::spawn(move|| {
1459             tx1.send(()).unwrap();
1460             for _ in 0..40 {
1461                 assert_eq!(rx2.recv().unwrap(), 1);
1462             }
1463         });
1464         rx1.recv().unwrap();
1465         let t2 = thread::spawn(move|| {
1466             for _ in 0..40 {
1467                 tx2.send(1).unwrap();
1468             }
1469         });
1470         t1.join().ok().unwrap();
1471         t2.join().ok().unwrap();
1472     }
1473
1474     #[test]
1475     fn recv_from_outside_runtime() {
1476         let (tx, rx) = channel::<i32>();
1477         let t = thread::spawn(move|| {
1478             for _ in 0..40 {
1479                 assert_eq!(rx.recv().unwrap(), 1);
1480             }
1481         });
1482         for _ in 0..40 {
1483             tx.send(1).unwrap();
1484         }
1485         t.join().ok().unwrap();
1486     }
1487
1488     #[test]
1489     fn no_runtime() {
1490         let (tx1, rx1) = channel::<i32>();
1491         let (tx2, rx2) = channel::<i32>();
1492         let t1 = thread::spawn(move|| {
1493             assert_eq!(rx1.recv().unwrap(), 1);
1494             tx2.send(2).unwrap();
1495         });
1496         let t2 = thread::spawn(move|| {
1497             tx1.send(1).unwrap();
1498             assert_eq!(rx2.recv().unwrap(), 2);
1499         });
1500         t1.join().ok().unwrap();
1501         t2.join().ok().unwrap();
1502     }
1503
1504     #[test]
1505     fn oneshot_single_thread_close_port_first() {
1506         // Simple test of closing without sending
1507         let (_tx, rx) = channel::<i32>();
1508         drop(rx);
1509     }
1510
1511     #[test]
1512     fn oneshot_single_thread_close_chan_first() {
1513         // Simple test of closing without sending
1514         let (tx, _rx) = channel::<i32>();
1515         drop(tx);
1516     }
1517
1518     #[test]
1519     fn oneshot_single_thread_send_port_close() {
1520         // Testing that the sender cleans up the payload if receiver is closed
1521         let (tx, rx) = channel::<Box<i32>>();
1522         drop(rx);
1523         assert!(tx.send(box 0).is_err());
1524     }
1525
1526     #[test]
1527     fn oneshot_single_thread_recv_chan_close() {
1528         // Receiving on a closed chan will panic
1529         let res = thread::spawn(move|| {
1530             let (tx, rx) = channel::<i32>();
1531             drop(tx);
1532             rx.recv().unwrap();
1533         }).join();
1534         // What is our res?
1535         assert!(res.is_err());
1536     }
1537
1538     #[test]
1539     fn oneshot_single_thread_send_then_recv() {
1540         let (tx, rx) = channel::<Box<i32>>();
1541         tx.send(box 10).unwrap();
1542         assert!(rx.recv().unwrap() == box 10);
1543     }
1544
1545     #[test]
1546     fn oneshot_single_thread_try_send_open() {
1547         let (tx, rx) = channel::<i32>();
1548         assert!(tx.send(10).is_ok());
1549         assert!(rx.recv().unwrap() == 10);
1550     }
1551
1552     #[test]
1553     fn oneshot_single_thread_try_send_closed() {
1554         let (tx, rx) = channel::<i32>();
1555         drop(rx);
1556         assert!(tx.send(10).is_err());
1557     }
1558
1559     #[test]
1560     fn oneshot_single_thread_try_recv_open() {
1561         let (tx, rx) = channel::<i32>();
1562         tx.send(10).unwrap();
1563         assert!(rx.recv() == Ok(10));
1564     }
1565
1566     #[test]
1567     fn oneshot_single_thread_try_recv_closed() {
1568         let (tx, rx) = channel::<i32>();
1569         drop(tx);
1570         assert!(rx.recv().is_err());
1571     }
1572
1573     #[test]
1574     fn oneshot_single_thread_peek_data() {
1575         let (tx, rx) = channel::<i32>();
1576         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1577         tx.send(10).unwrap();
1578         assert_eq!(rx.try_recv(), Ok(10));
1579     }
1580
1581     #[test]
1582     fn oneshot_single_thread_peek_close() {
1583         let (tx, rx) = channel::<i32>();
1584         drop(tx);
1585         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1586         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1587     }
1588
1589     #[test]
1590     fn oneshot_single_thread_peek_open() {
1591         let (_tx, rx) = channel::<i32>();
1592         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1593     }
1594
1595     #[test]
1596     fn oneshot_multi_task_recv_then_send() {
1597         let (tx, rx) = channel::<Box<i32>>();
1598         let _t = thread::spawn(move|| {
1599             assert!(rx.recv().unwrap() == box 10);
1600         });
1601
1602         tx.send(box 10).unwrap();
1603     }
1604
1605     #[test]
1606     fn oneshot_multi_task_recv_then_close() {
1607         let (tx, rx) = channel::<Box<i32>>();
1608         let _t = thread::spawn(move|| {
1609             drop(tx);
1610         });
1611         let res = thread::spawn(move|| {
1612             assert!(rx.recv().unwrap() == box 10);
1613         }).join();
1614         assert!(res.is_err());
1615     }
1616
1617     #[test]
1618     fn oneshot_multi_thread_close_stress() {
1619         for _ in 0..stress_factor() {
1620             let (tx, rx) = channel::<i32>();
1621             let _t = thread::spawn(move|| {
1622                 drop(rx);
1623             });
1624             drop(tx);
1625         }
1626     }
1627
1628     #[test]
1629     fn oneshot_multi_thread_send_close_stress() {
1630         for _ in 0..stress_factor() {
1631             let (tx, rx) = channel::<i32>();
1632             let _t = thread::spawn(move|| {
1633                 drop(rx);
1634             });
1635             let _ = thread::spawn(move|| {
1636                 tx.send(1).unwrap();
1637             }).join();
1638         }
1639     }
1640
1641     #[test]
1642     fn oneshot_multi_thread_recv_close_stress() {
1643         for _ in 0..stress_factor() {
1644             let (tx, rx) = channel::<i32>();
1645             thread::spawn(move|| {
1646                 let res = thread::spawn(move|| {
1647                     rx.recv().unwrap();
1648                 }).join();
1649                 assert!(res.is_err());
1650             });
1651             let _t = thread::spawn(move|| {
1652                 thread::spawn(move|| {
1653                     drop(tx);
1654                 });
1655             });
1656         }
1657     }
1658
1659     #[test]
1660     fn oneshot_multi_thread_send_recv_stress() {
1661         for _ in 0..stress_factor() {
1662             let (tx, rx) = channel::<Box<isize>>();
1663             let _t = thread::spawn(move|| {
1664                 tx.send(box 10).unwrap();
1665             });
1666             assert!(rx.recv().unwrap() == box 10);
1667         }
1668     }
1669
1670     #[test]
1671     fn stream_send_recv_stress() {
1672         for _ in 0..stress_factor() {
1673             let (tx, rx) = channel();
1674
1675             send(tx, 0);
1676             recv(rx, 0);
1677
1678             fn send(tx: Sender<Box<i32>>, i: i32) {
1679                 if i == 10 { return }
1680
1681                 thread::spawn(move|| {
1682                     tx.send(box i).unwrap();
1683                     send(tx, i + 1);
1684                 });
1685             }
1686
1687             fn recv(rx: Receiver<Box<i32>>, i: i32) {
1688                 if i == 10 { return }
1689
1690                 thread::spawn(move|| {
1691                     assert!(rx.recv().unwrap() == box i);
1692                     recv(rx, i + 1);
1693                 });
1694             }
1695         }
1696     }
1697
1698     #[test]
1699     fn oneshot_single_thread_recv_timeout() {
1700         let (tx, rx) = channel();
1701         tx.send(()).unwrap();
1702         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
1703         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
1704         tx.send(()).unwrap();
1705         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
1706     }
1707
1708     #[test]
1709     fn stress_recv_timeout_two_threads() {
1710         let (tx, rx) = channel();
1711         let stress = stress_factor() + 100;
1712         let timeout = Duration::from_millis(100);
1713
1714         thread::spawn(move || {
1715             for i in 0..stress {
1716                 if i % 2 == 0 {
1717                     thread::sleep(timeout * 2);
1718                 }
1719                 tx.send(1usize).unwrap();
1720             }
1721         });
1722
1723         let mut recv_count = 0;
1724         loop {
1725             match rx.recv_timeout(timeout) {
1726                 Ok(n) => {
1727                     assert_eq!(n, 1usize);
1728                     recv_count += 1;
1729                 }
1730                 Err(RecvTimeoutError::Timeout) => continue,
1731                 Err(RecvTimeoutError::Disconnected) => break,
1732             }
1733         }
1734
1735         assert_eq!(recv_count, stress);
1736     }
1737
1738     #[test]
1739     fn recv_timeout_upgrade() {
1740         let (tx, rx) = channel::<()>();
1741         let timeout = Duration::from_millis(1);
1742         let _tx_clone = tx.clone();
1743
1744         let start = Instant::now();
1745         assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
1746         assert!(Instant::now() >= start + timeout);
1747     }
1748
1749     #[test]
1750     fn stress_recv_timeout_shared() {
1751         let (tx, rx) = channel();
1752         let stress = stress_factor() + 100;
1753
1754         for i in 0..stress {
1755             let tx = tx.clone();
1756             thread::spawn(move || {
1757                 thread::sleep(Duration::from_millis(i as u64 * 10));
1758                 tx.send(1usize).unwrap();
1759             });
1760         }
1761
1762         drop(tx);
1763
1764         let mut recv_count = 0;
1765         loop {
1766             match rx.recv_timeout(Duration::from_millis(10)) {
1767                 Ok(n) => {
1768                     assert_eq!(n, 1usize);
1769                     recv_count += 1;
1770                 }
1771                 Err(RecvTimeoutError::Timeout) => continue,
1772                 Err(RecvTimeoutError::Disconnected) => break,
1773             }
1774         }
1775
1776         assert_eq!(recv_count, stress);
1777     }
1778
1779     #[test]
1780     fn recv_a_lot() {
1781         // Regression test that we don't run out of stack in scheduler context
1782         let (tx, rx) = channel();
1783         for _ in 0..10000 { tx.send(()).unwrap(); }
1784         for _ in 0..10000 { rx.recv().unwrap(); }
1785     }
1786
1787     #[test]
1788     fn shared_recv_timeout() {
1789         let (tx, rx) = channel();
1790         let total = 5;
1791         for _ in 0..total {
1792             let tx = tx.clone();
1793             thread::spawn(move|| {
1794                 tx.send(()).unwrap();
1795             });
1796         }
1797
1798         for _ in 0..total { rx.recv().unwrap(); }
1799
1800         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
1801         tx.send(()).unwrap();
1802         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
1803     }
1804
1805     #[test]
1806     fn shared_chan_stress() {
1807         let (tx, rx) = channel();
1808         let total = stress_factor() + 100;
1809         for _ in 0..total {
1810             let tx = tx.clone();
1811             thread::spawn(move|| {
1812                 tx.send(()).unwrap();
1813             });
1814         }
1815
1816         for _ in 0..total {
1817             rx.recv().unwrap();
1818         }
1819     }
1820
1821     #[test]
1822     fn test_nested_recv_iter() {
1823         let (tx, rx) = channel::<i32>();
1824         let (total_tx, total_rx) = channel::<i32>();
1825
1826         let _t = thread::spawn(move|| {
1827             let mut acc = 0;
1828             for x in rx.iter() {
1829                 acc += x;
1830             }
1831             total_tx.send(acc).unwrap();
1832         });
1833
1834         tx.send(3).unwrap();
1835         tx.send(1).unwrap();
1836         tx.send(2).unwrap();
1837         drop(tx);
1838         assert_eq!(total_rx.recv().unwrap(), 6);
1839     }
1840
1841     #[test]
1842     fn test_recv_iter_break() {
1843         let (tx, rx) = channel::<i32>();
1844         let (count_tx, count_rx) = channel();
1845
1846         let _t = thread::spawn(move|| {
1847             let mut count = 0;
1848             for x in rx.iter() {
1849                 if count >= 3 {
1850                     break;
1851                 } else {
1852                     count += x;
1853                 }
1854             }
1855             count_tx.send(count).unwrap();
1856         });
1857
1858         tx.send(2).unwrap();
1859         tx.send(2).unwrap();
1860         tx.send(2).unwrap();
1861         let _ = tx.send(2);
1862         drop(tx);
1863         assert_eq!(count_rx.recv().unwrap(), 4);
1864     }
1865
1866     #[test]
1867     fn test_recv_try_iter() {
1868         let (request_tx, request_rx) = channel();
1869         let (response_tx, response_rx) = channel();
1870
1871         // Request `x`s until we have `6`.
1872         let t = thread::spawn(move|| {
1873             let mut count = 0;
1874             loop {
1875                 for x in response_rx.try_iter() {
1876                     count += x;
1877                     if count == 6 {
1878                         return count;
1879                     }
1880                 }
1881                 request_tx.send(()).unwrap();
1882             }
1883         });
1884
1885         for _ in request_rx.iter() {
1886             if response_tx.send(2).is_err() {
1887                 break;
1888             }
1889         }
1890
1891         assert_eq!(t.join().unwrap(), 6);
1892     }
1893
1894     #[test]
1895     fn test_recv_into_iter_owned() {
1896         let mut iter = {
1897           let (tx, rx) = channel::<i32>();
1898           tx.send(1).unwrap();
1899           tx.send(2).unwrap();
1900
1901           rx.into_iter()
1902         };
1903         assert_eq!(iter.next().unwrap(), 1);
1904         assert_eq!(iter.next().unwrap(), 2);
1905         assert_eq!(iter.next().is_none(), true);
1906     }
1907
1908     #[test]
1909     fn test_recv_into_iter_borrowed() {
1910         let (tx, rx) = channel::<i32>();
1911         tx.send(1).unwrap();
1912         tx.send(2).unwrap();
1913         drop(tx);
1914         let mut iter = (&rx).into_iter();
1915         assert_eq!(iter.next().unwrap(), 1);
1916         assert_eq!(iter.next().unwrap(), 2);
1917         assert_eq!(iter.next().is_none(), true);
1918     }
1919
1920     #[test]
1921     fn try_recv_states() {
1922         let (tx1, rx1) = channel::<i32>();
1923         let (tx2, rx2) = channel::<()>();
1924         let (tx3, rx3) = channel::<()>();
1925         let _t = thread::spawn(move|| {
1926             rx2.recv().unwrap();
1927             tx1.send(1).unwrap();
1928             tx3.send(()).unwrap();
1929             rx2.recv().unwrap();
1930             drop(tx1);
1931             tx3.send(()).unwrap();
1932         });
1933
1934         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1935         tx2.send(()).unwrap();
1936         rx3.recv().unwrap();
1937         assert_eq!(rx1.try_recv(), Ok(1));
1938         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1939         tx2.send(()).unwrap();
1940         rx3.recv().unwrap();
1941         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1942     }
1943
1944     // This bug used to end up in a livelock inside of the Receiver destructor
1945     // because the internal state of the Shared packet was corrupted
1946     #[test]
1947     fn destroy_upgraded_shared_port_when_sender_still_active() {
1948         let (tx, rx) = channel();
1949         let (tx2, rx2) = channel();
1950         let _t = thread::spawn(move|| {
1951             rx.recv().unwrap(); // wait on a oneshot
1952             drop(rx);  // destroy a shared
1953             tx2.send(()).unwrap();
1954         });
1955         // make sure the other thread has gone to sleep
1956         for _ in 0..5000 { thread::yield_now(); }
1957
1958         // upgrade to a shared chan and send a message
1959         let t = tx.clone();
1960         drop(tx);
1961         t.send(()).unwrap();
1962
1963         // wait for the child thread to exit before we exit
1964         rx2.recv().unwrap();
1965     }
1966
1967     #[test]
1968     fn issue_32114() {
1969         let (tx, _) = channel();
1970         let _ = tx.send(123);
1971         assert_eq!(tx.send(123), Err(SendError(123)));
1972     }
1973 }
1974
1975 #[cfg(all(test, not(target_os = "emscripten")))]
1976 mod sync_tests {
1977     use env;
1978     use thread;
1979     use super::*;
1980     use time::Duration;
1981
1982     pub fn stress_factor() -> usize {
1983         match env::var("RUST_TEST_STRESS") {
1984             Ok(val) => val.parse().unwrap(),
1985             Err(..) => 1,
1986         }
1987     }
1988
1989     #[test]
1990     fn smoke() {
1991         let (tx, rx) = sync_channel::<i32>(1);
1992         tx.send(1).unwrap();
1993         assert_eq!(rx.recv().unwrap(), 1);
1994     }
1995
1996     #[test]
1997     fn drop_full() {
1998         let (tx, _rx) = sync_channel::<Box<isize>>(1);
1999         tx.send(box 1).unwrap();
2000     }
2001
2002     #[test]
2003     fn smoke_shared() {
2004         let (tx, rx) = sync_channel::<i32>(1);
2005         tx.send(1).unwrap();
2006         assert_eq!(rx.recv().unwrap(), 1);
2007         let tx = tx.clone();
2008         tx.send(1).unwrap();
2009         assert_eq!(rx.recv().unwrap(), 1);
2010     }
2011
2012     #[test]
2013     fn recv_timeout() {
2014         let (tx, rx) = sync_channel::<i32>(1);
2015         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2016         tx.send(1).unwrap();
2017         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2018     }
2019
2020     #[test]
2021     fn smoke_threads() {
2022         let (tx, rx) = sync_channel::<i32>(0);
2023         let _t = thread::spawn(move|| {
2024             tx.send(1).unwrap();
2025         });
2026         assert_eq!(rx.recv().unwrap(), 1);
2027     }
2028
2029     #[test]
2030     fn smoke_port_gone() {
2031         let (tx, rx) = sync_channel::<i32>(0);
2032         drop(rx);
2033         assert!(tx.send(1).is_err());
2034     }
2035
2036     #[test]
2037     fn smoke_shared_port_gone2() {
2038         let (tx, rx) = sync_channel::<i32>(0);
2039         drop(rx);
2040         let tx2 = tx.clone();
2041         drop(tx);
2042         assert!(tx2.send(1).is_err());
2043     }
2044
2045     #[test]
2046     fn port_gone_concurrent() {
2047         let (tx, rx) = sync_channel::<i32>(0);
2048         let _t = thread::spawn(move|| {
2049             rx.recv().unwrap();
2050         });
2051         while tx.send(1).is_ok() {}
2052     }
2053
2054     #[test]
2055     fn port_gone_concurrent_shared() {
2056         let (tx, rx) = sync_channel::<i32>(0);
2057         let tx2 = tx.clone();
2058         let _t = thread::spawn(move|| {
2059             rx.recv().unwrap();
2060         });
2061         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2062     }
2063
2064     #[test]
2065     fn smoke_chan_gone() {
2066         let (tx, rx) = sync_channel::<i32>(0);
2067         drop(tx);
2068         assert!(rx.recv().is_err());
2069     }
2070
2071     #[test]
2072     fn smoke_chan_gone_shared() {
2073         let (tx, rx) = sync_channel::<()>(0);
2074         let tx2 = tx.clone();
2075         drop(tx);
2076         drop(tx2);
2077         assert!(rx.recv().is_err());
2078     }
2079
2080     #[test]
2081     fn chan_gone_concurrent() {
2082         let (tx, rx) = sync_channel::<i32>(0);
2083         thread::spawn(move|| {
2084             tx.send(1).unwrap();
2085             tx.send(1).unwrap();
2086         });
2087         while rx.recv().is_ok() {}
2088     }
2089
2090     #[test]
2091     fn stress() {
2092         let (tx, rx) = sync_channel::<i32>(0);
2093         thread::spawn(move|| {
2094             for _ in 0..10000 { tx.send(1).unwrap(); }
2095         });
2096         for _ in 0..10000 {
2097             assert_eq!(rx.recv().unwrap(), 1);
2098         }
2099     }
2100
2101     #[test]
2102     fn stress_recv_timeout_two_threads() {
2103         let (tx, rx) = sync_channel::<i32>(0);
2104
2105         thread::spawn(move|| {
2106             for _ in 0..10000 { tx.send(1).unwrap(); }
2107         });
2108
2109         let mut recv_count = 0;
2110         loop {
2111             match rx.recv_timeout(Duration::from_millis(1)) {
2112                 Ok(v) => {
2113                     assert_eq!(v, 1);
2114                     recv_count += 1;
2115                 },
2116                 Err(RecvTimeoutError::Timeout) => continue,
2117                 Err(RecvTimeoutError::Disconnected) => break,
2118             }
2119         }
2120
2121         assert_eq!(recv_count, 10000);
2122     }
2123
2124     #[test]
2125     fn stress_recv_timeout_shared() {
2126         const AMT: u32 = 1000;
2127         const NTHREADS: u32 = 8;
2128         let (tx, rx) = sync_channel::<i32>(0);
2129         let (dtx, drx) = sync_channel::<()>(0);
2130
2131         thread::spawn(move|| {
2132             let mut recv_count = 0;
2133             loop {
2134                 match rx.recv_timeout(Duration::from_millis(10)) {
2135                     Ok(v) => {
2136                         assert_eq!(v, 1);
2137                         recv_count += 1;
2138                     },
2139                     Err(RecvTimeoutError::Timeout) => continue,
2140                     Err(RecvTimeoutError::Disconnected) => break,
2141                 }
2142             }
2143
2144             assert_eq!(recv_count, AMT * NTHREADS);
2145             assert!(rx.try_recv().is_err());
2146
2147             dtx.send(()).unwrap();
2148         });
2149
2150         for _ in 0..NTHREADS {
2151             let tx = tx.clone();
2152             thread::spawn(move|| {
2153                 for _ in 0..AMT { tx.send(1).unwrap(); }
2154             });
2155         }
2156
2157         drop(tx);
2158
2159         drx.recv().unwrap();
2160     }
2161
2162     #[test]
2163     fn stress_shared() {
2164         const AMT: u32 = 1000;
2165         const NTHREADS: u32 = 8;
2166         let (tx, rx) = sync_channel::<i32>(0);
2167         let (dtx, drx) = sync_channel::<()>(0);
2168
2169         thread::spawn(move|| {
2170             for _ in 0..AMT * NTHREADS {
2171                 assert_eq!(rx.recv().unwrap(), 1);
2172             }
2173             match rx.try_recv() {
2174                 Ok(..) => panic!(),
2175                 _ => {}
2176             }
2177             dtx.send(()).unwrap();
2178         });
2179
2180         for _ in 0..NTHREADS {
2181             let tx = tx.clone();
2182             thread::spawn(move|| {
2183                 for _ in 0..AMT { tx.send(1).unwrap(); }
2184             });
2185         }
2186         drop(tx);
2187         drx.recv().unwrap();
2188     }
2189
2190     #[test]
2191     fn oneshot_single_thread_close_port_first() {
2192         // Simple test of closing without sending
2193         let (_tx, rx) = sync_channel::<i32>(0);
2194         drop(rx);
2195     }
2196
2197     #[test]
2198     fn oneshot_single_thread_close_chan_first() {
2199         // Simple test of closing without sending
2200         let (tx, _rx) = sync_channel::<i32>(0);
2201         drop(tx);
2202     }
2203
2204     #[test]
2205     fn oneshot_single_thread_send_port_close() {
2206         // Testing that the sender cleans up the payload if receiver is closed
2207         let (tx, rx) = sync_channel::<Box<i32>>(0);
2208         drop(rx);
2209         assert!(tx.send(box 0).is_err());
2210     }
2211
2212     #[test]
2213     fn oneshot_single_thread_recv_chan_close() {
2214         // Receiving on a closed chan will panic
2215         let res = thread::spawn(move|| {
2216             let (tx, rx) = sync_channel::<i32>(0);
2217             drop(tx);
2218             rx.recv().unwrap();
2219         }).join();
2220         // What is our res?
2221         assert!(res.is_err());
2222     }
2223
2224     #[test]
2225     fn oneshot_single_thread_send_then_recv() {
2226         let (tx, rx) = sync_channel::<Box<i32>>(1);
2227         tx.send(box 10).unwrap();
2228         assert!(rx.recv().unwrap() == box 10);
2229     }
2230
2231     #[test]
2232     fn oneshot_single_thread_try_send_open() {
2233         let (tx, rx) = sync_channel::<i32>(1);
2234         assert_eq!(tx.try_send(10), Ok(()));
2235         assert!(rx.recv().unwrap() == 10);
2236     }
2237
2238     #[test]
2239     fn oneshot_single_thread_try_send_closed() {
2240         let (tx, rx) = sync_channel::<i32>(0);
2241         drop(rx);
2242         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2243     }
2244
2245     #[test]
2246     fn oneshot_single_thread_try_send_closed2() {
2247         let (tx, _rx) = sync_channel::<i32>(0);
2248         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2249     }
2250
2251     #[test]
2252     fn oneshot_single_thread_try_recv_open() {
2253         let (tx, rx) = sync_channel::<i32>(1);
2254         tx.send(10).unwrap();
2255         assert!(rx.recv() == Ok(10));
2256     }
2257
2258     #[test]
2259     fn oneshot_single_thread_try_recv_closed() {
2260         let (tx, rx) = sync_channel::<i32>(0);
2261         drop(tx);
2262         assert!(rx.recv().is_err());
2263     }
2264
2265     #[test]
2266     fn oneshot_single_thread_try_recv_closed_with_data() {
2267         let (tx, rx) = sync_channel::<i32>(1);
2268         tx.send(10).unwrap();
2269         drop(tx);
2270         assert_eq!(rx.try_recv(), Ok(10));
2271         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2272     }
2273
2274     #[test]
2275     fn oneshot_single_thread_peek_data() {
2276         let (tx, rx) = sync_channel::<i32>(1);
2277         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2278         tx.send(10).unwrap();
2279         assert_eq!(rx.try_recv(), Ok(10));
2280     }
2281
2282     #[test]
2283     fn oneshot_single_thread_peek_close() {
2284         let (tx, rx) = sync_channel::<i32>(0);
2285         drop(tx);
2286         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2287         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2288     }
2289
2290     #[test]
2291     fn oneshot_single_thread_peek_open() {
2292         let (_tx, rx) = sync_channel::<i32>(0);
2293         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2294     }
2295
2296     #[test]
2297     fn oneshot_multi_task_recv_then_send() {
2298         let (tx, rx) = sync_channel::<Box<i32>>(0);
2299         let _t = thread::spawn(move|| {
2300             assert!(rx.recv().unwrap() == box 10);
2301         });
2302
2303         tx.send(box 10).unwrap();
2304     }
2305
2306     #[test]
2307     fn oneshot_multi_task_recv_then_close() {
2308         let (tx, rx) = sync_channel::<Box<i32>>(0);
2309         let _t = thread::spawn(move|| {
2310             drop(tx);
2311         });
2312         let res = thread::spawn(move|| {
2313             assert!(rx.recv().unwrap() == box 10);
2314         }).join();
2315         assert!(res.is_err());
2316     }
2317
2318     #[test]
2319     fn oneshot_multi_thread_close_stress() {
2320         for _ in 0..stress_factor() {
2321             let (tx, rx) = sync_channel::<i32>(0);
2322             let _t = thread::spawn(move|| {
2323                 drop(rx);
2324             });
2325             drop(tx);
2326         }
2327     }
2328
2329     #[test]
2330     fn oneshot_multi_thread_send_close_stress() {
2331         for _ in 0..stress_factor() {
2332             let (tx, rx) = sync_channel::<i32>(0);
2333             let _t = thread::spawn(move|| {
2334                 drop(rx);
2335             });
2336             let _ = thread::spawn(move || {
2337                 tx.send(1).unwrap();
2338             }).join();
2339         }
2340     }
2341
2342     #[test]
2343     fn oneshot_multi_thread_recv_close_stress() {
2344         for _ in 0..stress_factor() {
2345             let (tx, rx) = sync_channel::<i32>(0);
2346             let _t = thread::spawn(move|| {
2347                 let res = thread::spawn(move|| {
2348                     rx.recv().unwrap();
2349                 }).join();
2350                 assert!(res.is_err());
2351             });
2352             let _t = thread::spawn(move|| {
2353                 thread::spawn(move|| {
2354                     drop(tx);
2355                 });
2356             });
2357         }
2358     }
2359
2360     #[test]
2361     fn oneshot_multi_thread_send_recv_stress() {
2362         for _ in 0..stress_factor() {
2363             let (tx, rx) = sync_channel::<Box<i32>>(0);
2364             let _t = thread::spawn(move|| {
2365                 tx.send(box 10).unwrap();
2366             });
2367             assert!(rx.recv().unwrap() == box 10);
2368         }
2369     }
2370
2371     #[test]
2372     fn stream_send_recv_stress() {
2373         for _ in 0..stress_factor() {
2374             let (tx, rx) = sync_channel::<Box<i32>>(0);
2375
2376             send(tx, 0);
2377             recv(rx, 0);
2378
2379             fn send(tx: SyncSender<Box<i32>>, i: i32) {
2380                 if i == 10 { return }
2381
2382                 thread::spawn(move|| {
2383                     tx.send(box i).unwrap();
2384                     send(tx, i + 1);
2385                 });
2386             }
2387
2388             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2389                 if i == 10 { return }
2390
2391                 thread::spawn(move|| {
2392                     assert!(rx.recv().unwrap() == box i);
2393                     recv(rx, i + 1);
2394                 });
2395             }
2396         }
2397     }
2398
2399     #[test]
2400     fn recv_a_lot() {
2401         // Regression test that we don't run out of stack in scheduler context
2402         let (tx, rx) = sync_channel(10000);
2403         for _ in 0..10000 { tx.send(()).unwrap(); }
2404         for _ in 0..10000 { rx.recv().unwrap(); }
2405     }
2406
2407     #[test]
2408     fn shared_chan_stress() {
2409         let (tx, rx) = sync_channel(0);
2410         let total = stress_factor() + 100;
2411         for _ in 0..total {
2412             let tx = tx.clone();
2413             thread::spawn(move|| {
2414                 tx.send(()).unwrap();
2415             });
2416         }
2417
2418         for _ in 0..total {
2419             rx.recv().unwrap();
2420         }
2421     }
2422
2423     #[test]
2424     fn test_nested_recv_iter() {
2425         let (tx, rx) = sync_channel::<i32>(0);
2426         let (total_tx, total_rx) = sync_channel::<i32>(0);
2427
2428         let _t = thread::spawn(move|| {
2429             let mut acc = 0;
2430             for x in rx.iter() {
2431                 acc += x;
2432             }
2433             total_tx.send(acc).unwrap();
2434         });
2435
2436         tx.send(3).unwrap();
2437         tx.send(1).unwrap();
2438         tx.send(2).unwrap();
2439         drop(tx);
2440         assert_eq!(total_rx.recv().unwrap(), 6);
2441     }
2442
2443     #[test]
2444     fn test_recv_iter_break() {
2445         let (tx, rx) = sync_channel::<i32>(0);
2446         let (count_tx, count_rx) = sync_channel(0);
2447
2448         let _t = thread::spawn(move|| {
2449             let mut count = 0;
2450             for x in rx.iter() {
2451                 if count >= 3 {
2452                     break;
2453                 } else {
2454                     count += x;
2455                 }
2456             }
2457             count_tx.send(count).unwrap();
2458         });
2459
2460         tx.send(2).unwrap();
2461         tx.send(2).unwrap();
2462         tx.send(2).unwrap();
2463         let _ = tx.try_send(2);
2464         drop(tx);
2465         assert_eq!(count_rx.recv().unwrap(), 4);
2466     }
2467
2468     #[test]
2469     fn try_recv_states() {
2470         let (tx1, rx1) = sync_channel::<i32>(1);
2471         let (tx2, rx2) = sync_channel::<()>(1);
2472         let (tx3, rx3) = sync_channel::<()>(1);
2473         let _t = thread::spawn(move|| {
2474             rx2.recv().unwrap();
2475             tx1.send(1).unwrap();
2476             tx3.send(()).unwrap();
2477             rx2.recv().unwrap();
2478             drop(tx1);
2479             tx3.send(()).unwrap();
2480         });
2481
2482         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2483         tx2.send(()).unwrap();
2484         rx3.recv().unwrap();
2485         assert_eq!(rx1.try_recv(), Ok(1));
2486         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2487         tx2.send(()).unwrap();
2488         rx3.recv().unwrap();
2489         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2490     }
2491
2492     // This bug used to end up in a livelock inside of the Receiver destructor
2493     // because the internal state of the Shared packet was corrupted
2494     #[test]
2495     fn destroy_upgraded_shared_port_when_sender_still_active() {
2496         let (tx, rx) = sync_channel::<()>(0);
2497         let (tx2, rx2) = sync_channel::<()>(0);
2498         let _t = thread::spawn(move|| {
2499             rx.recv().unwrap(); // wait on a oneshot
2500             drop(rx);  // destroy a shared
2501             tx2.send(()).unwrap();
2502         });
2503         // make sure the other thread has gone to sleep
2504         for _ in 0..5000 { thread::yield_now(); }
2505
2506         // upgrade to a shared chan and send a message
2507         let t = tx.clone();
2508         drop(tx);
2509         t.send(()).unwrap();
2510
2511         // wait for the child thread to exit before we exit
2512         rx2.recv().unwrap();
2513     }
2514
2515     #[test]
2516     fn send1() {
2517         let (tx, rx) = sync_channel::<i32>(0);
2518         let _t = thread::spawn(move|| { rx.recv().unwrap(); });
2519         assert_eq!(tx.send(1), Ok(()));
2520     }
2521
2522     #[test]
2523     fn send2() {
2524         let (tx, rx) = sync_channel::<i32>(0);
2525         let _t = thread::spawn(move|| { drop(rx); });
2526         assert!(tx.send(1).is_err());
2527     }
2528
2529     #[test]
2530     fn send3() {
2531         let (tx, rx) = sync_channel::<i32>(1);
2532         assert_eq!(tx.send(1), Ok(()));
2533         let _t =thread::spawn(move|| { drop(rx); });
2534         assert!(tx.send(1).is_err());
2535     }
2536
2537     #[test]
2538     fn send4() {
2539         let (tx, rx) = sync_channel::<i32>(0);
2540         let tx2 = tx.clone();
2541         let (done, donerx) = channel();
2542         let done2 = done.clone();
2543         let _t = thread::spawn(move|| {
2544             assert!(tx.send(1).is_err());
2545             done.send(()).unwrap();
2546         });
2547         let _t = thread::spawn(move|| {
2548             assert!(tx2.send(2).is_err());
2549             done2.send(()).unwrap();
2550         });
2551         drop(rx);
2552         donerx.recv().unwrap();
2553         donerx.recv().unwrap();
2554     }
2555
2556     #[test]
2557     fn try_send1() {
2558         let (tx, _rx) = sync_channel::<i32>(0);
2559         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2560     }
2561
2562     #[test]
2563     fn try_send2() {
2564         let (tx, _rx) = sync_channel::<i32>(1);
2565         assert_eq!(tx.try_send(1), Ok(()));
2566         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2567     }
2568
2569     #[test]
2570     fn try_send3() {
2571         let (tx, rx) = sync_channel::<i32>(1);
2572         assert_eq!(tx.try_send(1), Ok(()));
2573         drop(rx);
2574         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2575     }
2576
2577     #[test]
2578     fn issue_15761() {
2579         fn repro() {
2580             let (tx1, rx1) = sync_channel::<()>(3);
2581             let (tx2, rx2) = sync_channel::<()>(3);
2582
2583             let _t = thread::spawn(move|| {
2584                 rx1.recv().unwrap();
2585                 tx2.try_send(()).unwrap();
2586             });
2587
2588             tx1.try_send(()).unwrap();
2589             rx2.recv().unwrap();
2590         }
2591
2592         for _ in 0..100 {
2593             repro()
2594         }
2595     }
2596
2597     #[test]
2598     fn fmt_debug_sender() {
2599         let (tx, _) = channel::<i32>();
2600         assert_eq!(format!("{:?}", tx), "Sender { .. }");
2601     }
2602
2603     #[test]
2604     fn fmt_debug_recv() {
2605         let (_, rx) = channel::<i32>();
2606         assert_eq!(format!("{:?}", rx), "Receiver { .. }");
2607     }
2608
2609     #[test]
2610     fn fmt_debug_sync_sender() {
2611         let (tx, _) = sync_channel::<i32>(1);
2612         assert_eq!(format!("{:?}", tx), "SyncSender { .. }");
2613     }
2614 }