]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/mod.rs
Remove unnecessary explicit lifetime bounds.
[rust.git] / src / libstd / sync / mpsc / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Multi-producer, single-consumer FIFO queue communication primitives.
12 //!
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
15 //!
16 //! * `Sender`
17 //! * `SyncSender`
18 //! * `Receiver`
19 //!
20 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
23 //!
24 //! These channels come in two flavors:
25 //!
26 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27 //!    will return a `(Sender, Receiver)` tuple where all sends will be
28 //!    **asynchronous** (they never block). The channel conceptually has an
29 //!    infinite buffer.
30 //!
31 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32 //!    a `(SyncSender, Receiver)` tuple where the storage for pending messages
33 //!    is a pre-allocated buffer of a fixed size. All sends will be
34 //!    **synchronous** by blocking until there is buffer space available. Note
35 //!    that a bound of 0 is allowed, causing the channel to become a
36 //!    "rendezvous" channel where each sender atomically hands off a message to
37 //!    a receiver.
38 //!
39 //! ## Disconnection
40 //!
41 //! The send and receive operations on channels will all return a `Result`
42 //! indicating whether the operation succeeded or not. An unsuccessful operation
43 //! is normally indicative of the other half of a channel having "hung up" by
44 //! being dropped in its corresponding thread.
45 //!
46 //! Once half of a channel has been deallocated, most operations can no longer
47 //! continue to make progress, so `Err` will be returned. Many applications will
48 //! continue to `unwrap()` the results returned from this module, instigating a
49 //! propagation of failure among threads if one unexpectedly dies.
50 //!
51 //! # Examples
52 //!
53 //! Simple usage:
54 //!
55 //! ```
56 //! use std::thread;
57 //! use std::sync::mpsc::channel;
58 //!
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! thread::spawn(move|| {
62 //!     tx.send(10).unwrap();
63 //! });
64 //! assert_eq!(rx.recv().unwrap(), 10);
65 //! ```
66 //!
67 //! Shared usage:
68 //!
69 //! ```
70 //! use std::thread;
71 //! use std::sync::mpsc::channel;
72 //!
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
77 //! for i in 0..10 {
78 //!     let tx = tx.clone();
79 //!     thread::spawn(move|| {
80 //!         tx.send(i).unwrap();
81 //!     });
82 //! }
83 //!
84 //! for _ in 0..10 {
85 //!     let j = rx.recv().unwrap();
86 //!     assert!(0 <= j && j < 10);
87 //! }
88 //! ```
89 //!
90 //! Propagating panics:
91 //!
92 //! ```
93 //! use std::sync::mpsc::channel;
94 //!
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<i32>();
98 //! drop(tx);
99 //! assert!(rx.recv().is_err());
100 //! ```
101 //!
102 //! Synchronous channels:
103 //!
104 //! ```
105 //! use std::thread;
106 //! use std::sync::mpsc::sync_channel;
107 //!
108 //! let (tx, rx) = sync_channel::<i32>(0);
109 //! thread::spawn(move|| {
110 //!     // This will wait for the parent thread to start receiving
111 //!     tx.send(53).unwrap();
112 //! });
113 //! rx.recv().unwrap();
114 //! ```
115
116 #![stable(feature = "rust1", since = "1.0.0")]
117
118 // A description of how Rust's channel implementation works
119 //
120 // Channels are supposed to be the basic building block for all other
121 // concurrent primitives that are used in Rust. As a result, the channel type
122 // needs to be highly optimized, flexible, and broad enough for use everywhere.
123 //
124 // The choice of implementation of all channels is to be built on lock-free data
125 // structures. The channels themselves are then consequently also lock-free data
126 // structures. As always with lock-free code, this is a very "here be dragons"
127 // territory, especially because I'm unaware of any academic papers that have
128 // gone into great length about channels of these flavors.
129 //
130 // ## Flavors of channels
131 //
132 // From the perspective of a consumer of this library, there is only one flavor
133 // of channel. This channel can be used as a stream and cloned to allow multiple
134 // senders. Under the hood, however, there are actually three flavors of
135 // channels in play.
136 //
137 // * Flavor::Oneshots - these channels are highly optimized for the one-send use case.
138 //              They contain as few atomics as possible and involve one and
139 //              exactly one allocation.
140 // * Streams - these channels are optimized for the non-shared use case. They
141 //             use a different concurrent queue that is more tailored for this
142 //             use case. The initial allocation of this flavor of channel is not
143 //             optimized.
144 // * Shared - this is the most general form of channel that this module offers,
145 //            a channel with multiple senders. This type is as optimized as it
146 //            can be, but the previous two types mentioned are much faster for
147 //            their use-cases.
148 //
149 // ## Concurrent queues
150 //
151 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
152 // recv() obviously blocks. This means that under the hood there must be some
153 // shared and concurrent queue holding all of the actual data.
154 //
155 // With two flavors of channels, two flavors of queues are also used. We have
156 // chosen to use queues from a well-known author that are abbreviated as SPSC
157 // and MPSC (single producer, single consumer and multiple producer, single
158 // consumer). SPSC queues are used for streams while MPSC queues are used for
159 // shared channels.
160 //
161 // ### SPSC optimizations
162 //
163 // The SPSC queue found online is essentially a linked list of nodes where one
164 // half of the nodes are the "queue of data" and the other half of nodes are a
165 // cache of unused nodes. The unused nodes are used such that an allocation is
166 // not required on every push() and a free doesn't need to happen on every
167 // pop().
168 //
169 // As found online, however, the cache of nodes is of an infinite size. This
170 // means that if a channel at one point in its life had 50k items in the queue,
171 // then the queue will always have the capacity for 50k items. I believed that
172 // this was an unnecessary limitation of the implementation, so I have altered
173 // the queue to optionally have a bound on the cache size.
174 //
175 // By default, streams will have an unbounded SPSC queue with a small-ish cache
176 // size. The hope is that the cache is still large enough to have very fast
177 // send() operations while not too large such that millions of channels can
178 // coexist at once.
179 //
180 // ### MPSC optimizations
181 //
182 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
183 // a linked list under the hood to earn its unboundedness, but I have not put
184 // forth much effort into having a cache of nodes similar to the SPSC queue.
185 //
186 // For now, I believe that this is "ok" because shared channels are not the most
187 // common type, but soon we may wish to revisit this queue choice and determine
188 // another candidate for backend storage of shared channels.
189 //
190 // ## Overview of the Implementation
191 //
192 // Now that there's a little background on the concurrent queues used, it's
193 // worth going into much more detail about the channels themselves. The basic
194 // pseudocode for a send/recv are:
195 //
196 //
197 //      send(t)                             recv()
198 //        queue.push(t)                       return if queue.pop()
199 //        if increment() == -1                deschedule {
200 //          wakeup()                            if decrement() > 0
201 //                                                cancel_deschedule()
202 //                                            }
203 //                                            queue.pop()
204 //
205 // As mentioned before, there are no locks in this implementation, only atomic
206 // instructions are used.
207 //
208 // ### The internal atomic counter
209 //
210 // Every channel has a shared counter with each half to keep track of the size
211 // of the queue. This counter is used to abort descheduling by the receiver and
212 // to know when to wake up on the sending side.
213 //
214 // As seen in the pseudocode, senders will increment this count and receivers
215 // will decrement the count. The theory behind this is that if a sender sees a
216 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
217 // then it doesn't need to block.
218 //
219 // The recv() method has a beginning call to pop(), and if successful, it needs
220 // to decrement the count. It is a crucial implementation detail that this
221 // decrement does *not* happen to the shared counter. If this were the case,
222 // then it would be possible for the counter to be very negative when there were
223 // no receivers waiting, in which case the senders would have to determine when
224 // it was actually appropriate to wake up a receiver.
225 //
226 // Instead, the "steal count" is kept track of separately (not atomically
227 // because it's only used by receivers), and then the decrement() call when
228 // descheduling will lump in all of the recent steals into one large decrement.
229 //
230 // The implication of this is that if a sender sees a -1 count, then there's
231 // guaranteed to be a waiter waiting!
232 //
233 // ## Native Implementation
234 //
235 // A major goal of these channels is to work seamlessly on and off the runtime.
236 // All of the previous race conditions have been worded in terms of
237 // scheduler-isms (which is obviously not available without the runtime).
238 //
239 // For now, native usage of channels (off the runtime) will fall back onto
240 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
241 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
242 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
243 // condition variable.
244 //
245 // ## Select
246 //
247 // Being able to support selection over channels has greatly influenced this
248 // design, and not only does selection need to work inside the runtime, but also
249 // outside the runtime.
250 //
251 // The implementation is fairly straightforward. The goal of select() is not to
252 // return some data, but only to return which channel can receive data without
253 // blocking. The implementation is essentially the entire blocking procedure
254 // followed by an increment as soon as its woken up. The cancellation procedure
255 // involves an increment and swapping out of to_wake to acquire ownership of the
256 // thread to unblock.
257 //
258 // Sadly this current implementation requires multiple allocations, so I have
259 // seen the throughput of select() be much worse than it should be. I do not
260 // believe that there is anything fundamental that needs to change about these
261 // channels, however, in order to support a more efficient select().
262 //
263 // # Conclusion
264 //
265 // And now that you've seen all the races that I found and attempted to fix,
266 // here's the code for you to find some more!
267
268 use sync::Arc;
269 use error;
270 use fmt;
271 use mem;
272 use cell::UnsafeCell;
273 use marker::Reflect;
274
275 #[unstable(feature = "mpsc_select", issue = "27800")]
276 pub use self::select::{Select, Handle};
277 use self::select::StartResult;
278 use self::select::StartResult::*;
279 use self::blocking::SignalToken;
280
281 mod blocking;
282 mod oneshot;
283 mod select;
284 mod shared;
285 mod stream;
286 mod sync;
287 mod mpsc_queue;
288 mod spsc_queue;
289
290 /// The receiving-half of Rust's channel type. This half can only be owned by
291 /// one thread
292 #[stable(feature = "rust1", since = "1.0.0")]
293 pub struct Receiver<T> {
294     inner: UnsafeCell<Flavor<T>>,
295 }
296
297 // The receiver port can be sent from place to place, so long as it
298 // is not used to receive non-sendable things.
299 #[stable(feature = "rust1", since = "1.0.0")]
300 unsafe impl<T: Send> Send for Receiver<T> { }
301
302 /// An iterator over messages on a receiver, this iterator will block
303 /// whenever `next` is called, waiting for a new message, and `None` will be
304 /// returned when the corresponding channel has hung up.
305 #[stable(feature = "rust1", since = "1.0.0")]
306 pub struct Iter<'a, T: 'a> {
307     rx: &'a Receiver<T>
308 }
309
310 /// An owning iterator over messages on a receiver, this iterator will block
311 /// whenever `next` is called, waiting for a new message, and `None` will be
312 /// returned when the corresponding channel has hung up.
313 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
314 pub struct IntoIter<T> {
315     rx: Receiver<T>
316 }
317
318 /// The sending-half of Rust's asynchronous channel type. This half can only be
319 /// owned by one thread, but it can be cloned to send to other threads.
320 #[stable(feature = "rust1", since = "1.0.0")]
321 pub struct Sender<T> {
322     inner: UnsafeCell<Flavor<T>>,
323 }
324
325 // The send port can be sent from place to place, so long as it
326 // is not used to send non-sendable things.
327 #[stable(feature = "rust1", since = "1.0.0")]
328 unsafe impl<T: Send> Send for Sender<T> { }
329
330 /// The sending-half of Rust's synchronous channel type. This half can only be
331 /// owned by one thread, but it can be cloned to send to other threads.
332 #[stable(feature = "rust1", since = "1.0.0")]
333 pub struct SyncSender<T> {
334     inner: Arc<UnsafeCell<sync::Packet<T>>>,
335 }
336
337 #[stable(feature = "rust1", since = "1.0.0")]
338 unsafe impl<T: Send> Send for SyncSender<T> {}
339
340 #[stable(feature = "rust1", since = "1.0.0")]
341 impl<T> !Sync for SyncSender<T> {}
342
343 /// An error returned from the `send` function on channels.
344 ///
345 /// A `send` operation can only fail if the receiving end of a channel is
346 /// disconnected, implying that the data could never be received. The error
347 /// contains the data being sent as a payload so it can be recovered.
348 #[stable(feature = "rust1", since = "1.0.0")]
349 #[derive(PartialEq, Eq, Clone, Copy)]
350 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
351
352 /// An error returned from the `recv` function on a `Receiver`.
353 ///
354 /// The `recv` operation can only fail if the sending half of a channel is
355 /// disconnected, implying that no further messages will ever be received.
356 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
357 #[stable(feature = "rust1", since = "1.0.0")]
358 pub struct RecvError;
359
360 /// This enumeration is the list of the possible reasons that `try_recv` could
361 /// not return data when called.
362 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
363 #[stable(feature = "rust1", since = "1.0.0")]
364 pub enum TryRecvError {
365     /// This channel is currently empty, but the sender(s) have not yet
366     /// disconnected, so data may yet become available.
367     #[stable(feature = "rust1", since = "1.0.0")]
368     Empty,
369
370     /// This channel's sending half has become disconnected, and there will
371     /// never be any more data received on this channel
372     #[stable(feature = "rust1", since = "1.0.0")]
373     Disconnected,
374 }
375
376 /// This enumeration is the list of the possible error outcomes for the
377 /// `SyncSender::try_send` method.
378 #[stable(feature = "rust1", since = "1.0.0")]
379 #[derive(PartialEq, Eq, Clone, Copy)]
380 pub enum TrySendError<T> {
381     /// The data could not be sent on the channel because it would require that
382     /// the callee block to send the data.
383     ///
384     /// If this is a buffered channel, then the buffer is full at this time. If
385     /// this is not a buffered channel, then there is no receiver available to
386     /// acquire the data.
387     #[stable(feature = "rust1", since = "1.0.0")]
388     Full(#[cfg_attr(not(stage0), stable(feature = "rust1", since = "1.0.0"))] T),
389
390     /// This channel's receiving half has disconnected, so the data could not be
391     /// sent. The data is returned back to the callee in this case.
392     #[stable(feature = "rust1", since = "1.0.0")]
393     Disconnected(#[cfg_attr(not(stage0), stable(feature = "rust1", since = "1.0.0"))] T),
394 }
395
396 enum Flavor<T> {
397     Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
398     Stream(Arc<UnsafeCell<stream::Packet<T>>>),
399     Shared(Arc<UnsafeCell<shared::Packet<T>>>),
400     Sync(Arc<UnsafeCell<sync::Packet<T>>>),
401 }
402
403 #[doc(hidden)]
404 trait UnsafeFlavor<T> {
405     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
406     unsafe fn inner_mut(&self) -> &mut Flavor<T> {
407         &mut *self.inner_unsafe().get()
408     }
409     unsafe fn inner(&self) -> &Flavor<T> {
410         &*self.inner_unsafe().get()
411     }
412 }
413 impl<T> UnsafeFlavor<T> for Sender<T> {
414     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
415         &self.inner
416     }
417 }
418 impl<T> UnsafeFlavor<T> for Receiver<T> {
419     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
420         &self.inner
421     }
422 }
423
424 /// Creates a new asynchronous channel, returning the sender/receiver halves.
425 ///
426 /// All data sent on the sender will become available on the receiver, and no
427 /// send will block the calling thread (this channel has an "infinite buffer").
428 ///
429 /// # Examples
430 ///
431 /// ```
432 /// use std::sync::mpsc::channel;
433 /// use std::thread;
434 ///
435 /// // tx is the sending half (tx for transmission), and rx is the receiving
436 /// // half (rx for receiving).
437 /// let (tx, rx) = channel();
438 ///
439 /// // Spawn off an expensive computation
440 /// thread::spawn(move|| {
441 /// #   fn expensive_computation() {}
442 ///     tx.send(expensive_computation()).unwrap();
443 /// });
444 ///
445 /// // Do some useful work for awhile
446 ///
447 /// // Let's see what that answer was
448 /// println!("{:?}", rx.recv().unwrap());
449 /// ```
450 #[stable(feature = "rust1", since = "1.0.0")]
451 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
452     let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
453     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
454 }
455
456 /// Creates a new synchronous, bounded channel.
457 ///
458 /// Like asynchronous channels, the `Receiver` will block until a message
459 /// becomes available. These channels differ greatly in the semantics of the
460 /// sender from asynchronous channels, however.
461 ///
462 /// This channel has an internal buffer on which messages will be queued. When
463 /// the internal buffer becomes full, future sends will *block* waiting for the
464 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
465 /// becomes  "rendezvous channel" where each send will not return until a recv
466 /// is paired with it.
467 ///
468 /// As with asynchronous channels, all senders will panic in `send` if the
469 /// `Receiver` has been destroyed.
470 ///
471 /// # Examples
472 ///
473 /// ```
474 /// use std::sync::mpsc::sync_channel;
475 /// use std::thread;
476 ///
477 /// let (tx, rx) = sync_channel(1);
478 ///
479 /// // this returns immediately
480 /// tx.send(1).unwrap();
481 ///
482 /// thread::spawn(move|| {
483 ///     // this will block until the previous message has been received
484 ///     tx.send(2).unwrap();
485 /// });
486 ///
487 /// assert_eq!(rx.recv().unwrap(), 1);
488 /// assert_eq!(rx.recv().unwrap(), 2);
489 /// ```
490 #[stable(feature = "rust1", since = "1.0.0")]
491 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
492     let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
493     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
494 }
495
496 ////////////////////////////////////////////////////////////////////////////////
497 // Sender
498 ////////////////////////////////////////////////////////////////////////////////
499
500 impl<T> Sender<T> {
501     fn new(inner: Flavor<T>) -> Sender<T> {
502         Sender {
503             inner: UnsafeCell::new(inner),
504         }
505     }
506
507     /// Attempts to send a value on this channel, returning it back if it could
508     /// not be sent.
509     ///
510     /// A successful send occurs when it is determined that the other end of
511     /// the channel has not hung up already. An unsuccessful send would be one
512     /// where the corresponding receiver has already been deallocated. Note
513     /// that a return value of `Err` means that the data will never be
514     /// received, but a return value of `Ok` does *not* mean that the data
515     /// will be received.  It is possible for the corresponding receiver to
516     /// hang up immediately after this function returns `Ok`.
517     ///
518     /// This method will never block the current thread.
519     ///
520     /// # Examples
521     ///
522     /// ```
523     /// use std::sync::mpsc::channel;
524     ///
525     /// let (tx, rx) = channel();
526     ///
527     /// // This send is always successful
528     /// tx.send(1).unwrap();
529     ///
530     /// // This send will fail because the receiver is gone
531     /// drop(rx);
532     /// assert_eq!(tx.send(1).err().unwrap().0, 1);
533     /// ```
534     #[stable(feature = "rust1", since = "1.0.0")]
535     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
536         let (new_inner, ret) = match *unsafe { self.inner() } {
537             Flavor::Oneshot(ref p) => {
538                 unsafe {
539                     let p = p.get();
540                     if !(*p).sent() {
541                         return (*p).send(t).map_err(SendError);
542                     } else {
543                         let a =
544                             Arc::new(UnsafeCell::new(stream::Packet::new()));
545                         let rx = Receiver::new(Flavor::Stream(a.clone()));
546                         match (*p).upgrade(rx) {
547                             oneshot::UpSuccess => {
548                                 let ret = (*a.get()).send(t);
549                                 (a, ret)
550                             }
551                             oneshot::UpDisconnected => (a, Err(t)),
552                             oneshot::UpWoke(token) => {
553                                 // This send cannot panic because the thread is
554                                 // asleep (we're looking at it), so the receiver
555                                 // can't go away.
556                                 (*a.get()).send(t).ok().unwrap();
557                                 token.signal();
558                                 (a, Ok(()))
559                             }
560                         }
561                     }
562                 }
563             }
564             Flavor::Stream(ref p) => return unsafe {
565                 (*p.get()).send(t).map_err(SendError)
566             },
567             Flavor::Shared(ref p) => return unsafe {
568                 (*p.get()).send(t).map_err(SendError)
569             },
570             Flavor::Sync(..) => unreachable!(),
571         };
572
573         unsafe {
574             let tmp = Sender::new(Flavor::Stream(new_inner));
575             mem::swap(self.inner_mut(), tmp.inner_mut());
576         }
577         ret.map_err(SendError)
578     }
579 }
580
581 #[stable(feature = "rust1", since = "1.0.0")]
582 impl<T> Clone for Sender<T> {
583     fn clone(&self) -> Sender<T> {
584         let (packet, sleeper, guard) = match *unsafe { self.inner() } {
585             Flavor::Oneshot(ref p) => {
586                 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
587                 unsafe {
588                     let guard = (*a.get()).postinit_lock();
589                     let rx = Receiver::new(Flavor::Shared(a.clone()));
590                     match (*p.get()).upgrade(rx) {
591                         oneshot::UpSuccess |
592                         oneshot::UpDisconnected => (a, None, guard),
593                         oneshot::UpWoke(task) => (a, Some(task), guard)
594                     }
595                 }
596             }
597             Flavor::Stream(ref p) => {
598                 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
599                 unsafe {
600                     let guard = (*a.get()).postinit_lock();
601                     let rx = Receiver::new(Flavor::Shared(a.clone()));
602                     match (*p.get()).upgrade(rx) {
603                         stream::UpSuccess |
604                         stream::UpDisconnected => (a, None, guard),
605                         stream::UpWoke(task) => (a, Some(task), guard),
606                     }
607                 }
608             }
609             Flavor::Shared(ref p) => {
610                 unsafe { (*p.get()).clone_chan(); }
611                 return Sender::new(Flavor::Shared(p.clone()));
612             }
613             Flavor::Sync(..) => unreachable!(),
614         };
615
616         unsafe {
617             (*packet.get()).inherit_blocker(sleeper, guard);
618
619             let tmp = Sender::new(Flavor::Shared(packet.clone()));
620             mem::swap(self.inner_mut(), tmp.inner_mut());
621         }
622         Sender::new(Flavor::Shared(packet))
623     }
624 }
625
626 #[stable(feature = "rust1", since = "1.0.0")]
627 impl<T> Drop for Sender<T> {
628     fn drop(&mut self) {
629         match *unsafe { self.inner_mut() } {
630             Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
631             Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
632             Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
633             Flavor::Sync(..) => unreachable!(),
634         }
635     }
636 }
637
638 #[stable(feature = "mpsc_debug", since = "1.7.0")]
639 impl<T> fmt::Debug for Sender<T> {
640     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
641         write!(f, "Sender {{ .. }}")
642     }
643 }
644
645 ////////////////////////////////////////////////////////////////////////////////
646 // SyncSender
647 ////////////////////////////////////////////////////////////////////////////////
648
649 impl<T> SyncSender<T> {
650     fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
651         SyncSender { inner: inner }
652     }
653
654     /// Sends a value on this synchronous channel.
655     ///
656     /// This function will *block* until space in the internal buffer becomes
657     /// available or a receiver is available to hand off the message to.
658     ///
659     /// Note that a successful send does *not* guarantee that the receiver will
660     /// ever see the data if there is a buffer on this channel. Items may be
661     /// enqueued in the internal buffer for the receiver to receive at a later
662     /// time. If the buffer size is 0, however, it can be guaranteed that the
663     /// receiver has indeed received the data if this function returns success.
664     ///
665     /// This function will never panic, but it may return `Err` if the
666     /// `Receiver` has disconnected and is no longer able to receive
667     /// information.
668     #[stable(feature = "rust1", since = "1.0.0")]
669     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
670         unsafe { (*self.inner.get()).send(t).map_err(SendError) }
671     }
672
673     /// Attempts to send a value on this channel without blocking.
674     ///
675     /// This method differs from `send` by returning immediately if the
676     /// channel's buffer is full or no receiver is waiting to acquire some
677     /// data. Compared with `send`, this function has two failure cases
678     /// instead of one (one for disconnection, one for a full buffer).
679     ///
680     /// See `SyncSender::send` for notes about guarantees of whether the
681     /// receiver has received the data or not if this function is successful.
682     #[stable(feature = "rust1", since = "1.0.0")]
683     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
684         unsafe { (*self.inner.get()).try_send(t) }
685     }
686 }
687
688 #[stable(feature = "rust1", since = "1.0.0")]
689 impl<T> Clone for SyncSender<T> {
690     fn clone(&self) -> SyncSender<T> {
691         unsafe { (*self.inner.get()).clone_chan(); }
692         SyncSender::new(self.inner.clone())
693     }
694 }
695
696 #[stable(feature = "rust1", since = "1.0.0")]
697 impl<T> Drop for SyncSender<T> {
698     fn drop(&mut self) {
699         unsafe { (*self.inner.get()).drop_chan(); }
700     }
701 }
702
703 #[stable(feature = "mpsc_debug", since = "1.7.0")]
704 impl<T> fmt::Debug for SyncSender<T> {
705     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
706         write!(f, "SyncSender {{ .. }}")
707     }
708 }
709
710 ////////////////////////////////////////////////////////////////////////////////
711 // Receiver
712 ////////////////////////////////////////////////////////////////////////////////
713
714 impl<T> Receiver<T> {
715     fn new(inner: Flavor<T>) -> Receiver<T> {
716         Receiver { inner: UnsafeCell::new(inner) }
717     }
718
719     /// Attempts to return a pending value on this receiver without blocking
720     ///
721     /// This method will never block the caller in order to wait for data to
722     /// become available. Instead, this will always return immediately with a
723     /// possible option of pending data on the channel.
724     ///
725     /// This is useful for a flavor of "optimistic check" before deciding to
726     /// block on a receiver.
727     #[stable(feature = "rust1", since = "1.0.0")]
728     pub fn try_recv(&self) -> Result<T, TryRecvError> {
729         loop {
730             let new_port = match *unsafe { self.inner() } {
731                 Flavor::Oneshot(ref p) => {
732                     match unsafe { (*p.get()).try_recv() } {
733                         Ok(t) => return Ok(t),
734                         Err(oneshot::Empty) => return Err(TryRecvError::Empty),
735                         Err(oneshot::Disconnected) => {
736                             return Err(TryRecvError::Disconnected)
737                         }
738                         Err(oneshot::Upgraded(rx)) => rx,
739                     }
740                 }
741                 Flavor::Stream(ref p) => {
742                     match unsafe { (*p.get()).try_recv() } {
743                         Ok(t) => return Ok(t),
744                         Err(stream::Empty) => return Err(TryRecvError::Empty),
745                         Err(stream::Disconnected) => {
746                             return Err(TryRecvError::Disconnected)
747                         }
748                         Err(stream::Upgraded(rx)) => rx,
749                     }
750                 }
751                 Flavor::Shared(ref p) => {
752                     match unsafe { (*p.get()).try_recv() } {
753                         Ok(t) => return Ok(t),
754                         Err(shared::Empty) => return Err(TryRecvError::Empty),
755                         Err(shared::Disconnected) => {
756                             return Err(TryRecvError::Disconnected)
757                         }
758                     }
759                 }
760                 Flavor::Sync(ref p) => {
761                     match unsafe { (*p.get()).try_recv() } {
762                         Ok(t) => return Ok(t),
763                         Err(sync::Empty) => return Err(TryRecvError::Empty),
764                         Err(sync::Disconnected) => {
765                             return Err(TryRecvError::Disconnected)
766                         }
767                     }
768                 }
769             };
770             unsafe {
771                 mem::swap(self.inner_mut(),
772                           new_port.inner_mut());
773             }
774         }
775     }
776
777     /// Attempts to wait for a value on this receiver, returning an error if the
778     /// corresponding channel has hung up.
779     ///
780     /// This function will always block the current thread if there is no data
781     /// available and it's possible for more data to be sent. Once a message is
782     /// sent to the corresponding `Sender`, then this receiver will wake up and
783     /// return that message.
784     ///
785     /// If the corresponding `Sender` has disconnected, or it disconnects while
786     /// this call is blocking, this call will wake up and return `Err` to
787     /// indicate that no more messages can ever be received on this channel.
788     /// However, since channels are buffered, messages sent before the disconnect
789     /// will still be properly received.
790     ///
791     /// # Examples
792     ///
793     /// ```
794     /// use std::sync::mpsc;
795     /// use std::thread;
796     ///
797     /// let (send, recv) = mpsc::channel();
798     /// let handle = thread::spawn(move || {
799     ///     send.send(1u8).unwrap();
800     /// });
801     ///
802     /// handle.join().unwrap();
803     ///
804     /// assert_eq!(Ok(1), recv.recv());
805     /// ```
806     ///
807     /// Buffering behavior:
808     ///
809     /// ```
810     /// use std::sync::mpsc;
811     /// use std::thread;
812     /// use std::sync::mpsc::RecvError;
813     ///
814     /// let (send, recv) = mpsc::channel();
815     /// let handle = thread::spawn(move || {
816     ///     send.send(1u8).unwrap();
817     ///     send.send(2).unwrap();
818     ///     send.send(3).unwrap();
819     ///     drop(send);
820     /// });
821     ///
822     /// // wait for the thread to join so we ensure the sender is dropped
823     /// handle.join().unwrap();
824     ///
825     /// assert_eq!(Ok(1), recv.recv());
826     /// assert_eq!(Ok(2), recv.recv());
827     /// assert_eq!(Ok(3), recv.recv());
828     /// assert_eq!(Err(RecvError), recv.recv());
829     /// ```
830     #[stable(feature = "rust1", since = "1.0.0")]
831     pub fn recv(&self) -> Result<T, RecvError> {
832         loop {
833             let new_port = match *unsafe { self.inner() } {
834                 Flavor::Oneshot(ref p) => {
835                     match unsafe { (*p.get()).recv() } {
836                         Ok(t) => return Ok(t),
837                         Err(oneshot::Empty) => return unreachable!(),
838                         Err(oneshot::Disconnected) => return Err(RecvError),
839                         Err(oneshot::Upgraded(rx)) => rx,
840                     }
841                 }
842                 Flavor::Stream(ref p) => {
843                     match unsafe { (*p.get()).recv() } {
844                         Ok(t) => return Ok(t),
845                         Err(stream::Empty) => return unreachable!(),
846                         Err(stream::Disconnected) => return Err(RecvError),
847                         Err(stream::Upgraded(rx)) => rx,
848                     }
849                 }
850                 Flavor::Shared(ref p) => {
851                     match unsafe { (*p.get()).recv() } {
852                         Ok(t) => return Ok(t),
853                         Err(shared::Empty) => return unreachable!(),
854                         Err(shared::Disconnected) => return Err(RecvError),
855                     }
856                 }
857                 Flavor::Sync(ref p) => return unsafe {
858                     (*p.get()).recv().map_err(|()| RecvError)
859                 }
860             };
861             unsafe {
862                 mem::swap(self.inner_mut(), new_port.inner_mut());
863             }
864         }
865     }
866
867     /// Returns an iterator that will block waiting for messages, but never
868     /// `panic!`. It will return `None` when the channel has hung up.
869     #[stable(feature = "rust1", since = "1.0.0")]
870     pub fn iter(&self) -> Iter<T> {
871         Iter { rx: self }
872     }
873 }
874
875 impl<T> select::Packet for Receiver<T> {
876     fn can_recv(&self) -> bool {
877         loop {
878             let new_port = match *unsafe { self.inner() } {
879                 Flavor::Oneshot(ref p) => {
880                     match unsafe { (*p.get()).can_recv() } {
881                         Ok(ret) => return ret,
882                         Err(upgrade) => upgrade,
883                     }
884                 }
885                 Flavor::Stream(ref p) => {
886                     match unsafe { (*p.get()).can_recv() } {
887                         Ok(ret) => return ret,
888                         Err(upgrade) => upgrade,
889                     }
890                 }
891                 Flavor::Shared(ref p) => {
892                     return unsafe { (*p.get()).can_recv() };
893                 }
894                 Flavor::Sync(ref p) => {
895                     return unsafe { (*p.get()).can_recv() };
896                 }
897             };
898             unsafe {
899                 mem::swap(self.inner_mut(),
900                           new_port.inner_mut());
901             }
902         }
903     }
904
905     fn start_selection(&self, mut token: SignalToken) -> StartResult {
906         loop {
907             let (t, new_port) = match *unsafe { self.inner() } {
908                 Flavor::Oneshot(ref p) => {
909                     match unsafe { (*p.get()).start_selection(token) } {
910                         oneshot::SelSuccess => return Installed,
911                         oneshot::SelCanceled => return Abort,
912                         oneshot::SelUpgraded(t, rx) => (t, rx),
913                     }
914                 }
915                 Flavor::Stream(ref p) => {
916                     match unsafe { (*p.get()).start_selection(token) } {
917                         stream::SelSuccess => return Installed,
918                         stream::SelCanceled => return Abort,
919                         stream::SelUpgraded(t, rx) => (t, rx),
920                     }
921                 }
922                 Flavor::Shared(ref p) => {
923                     return unsafe { (*p.get()).start_selection(token) };
924                 }
925                 Flavor::Sync(ref p) => {
926                     return unsafe { (*p.get()).start_selection(token) };
927                 }
928             };
929             token = t;
930             unsafe {
931                 mem::swap(self.inner_mut(), new_port.inner_mut());
932             }
933         }
934     }
935
936     fn abort_selection(&self) -> bool {
937         let mut was_upgrade = false;
938         loop {
939             let result = match *unsafe { self.inner() } {
940                 Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
941                 Flavor::Stream(ref p) => unsafe {
942                     (*p.get()).abort_selection(was_upgrade)
943                 },
944                 Flavor::Shared(ref p) => return unsafe {
945                     (*p.get()).abort_selection(was_upgrade)
946                 },
947                 Flavor::Sync(ref p) => return unsafe {
948                     (*p.get()).abort_selection()
949                 },
950             };
951             let new_port = match result { Ok(b) => return b, Err(p) => p };
952             was_upgrade = true;
953             unsafe {
954                 mem::swap(self.inner_mut(),
955                           new_port.inner_mut());
956             }
957         }
958     }
959 }
960
961 #[stable(feature = "rust1", since = "1.0.0")]
962 impl<'a, T> Iterator for Iter<'a, T> {
963     type Item = T;
964
965     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
966 }
967
968 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
969 impl<'a, T> IntoIterator for &'a Receiver<T> {
970     type Item = T;
971     type IntoIter = Iter<'a, T>;
972
973     fn into_iter(self) -> Iter<'a, T> { self.iter() }
974 }
975
976 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
977 impl<T> Iterator for IntoIter<T> {
978     type Item = T;
979     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
980 }
981
982 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
983 impl <T> IntoIterator for Receiver<T> {
984     type Item = T;
985     type IntoIter = IntoIter<T>;
986
987     fn into_iter(self) -> IntoIter<T> {
988         IntoIter { rx: self }
989     }
990 }
991
992 #[stable(feature = "rust1", since = "1.0.0")]
993 impl<T> Drop for Receiver<T> {
994     fn drop(&mut self) {
995         match *unsafe { self.inner_mut() } {
996             Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
997             Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
998             Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
999             Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
1000         }
1001     }
1002 }
1003
1004 #[stable(feature = "mpsc_debug", since = "1.7.0")]
1005 impl<T> fmt::Debug for Receiver<T> {
1006     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1007         write!(f, "Receiver {{ .. }}")
1008     }
1009 }
1010
1011 #[stable(feature = "rust1", since = "1.0.0")]
1012 impl<T> fmt::Debug for SendError<T> {
1013     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1014         "SendError(..)".fmt(f)
1015     }
1016 }
1017
1018 #[stable(feature = "rust1", since = "1.0.0")]
1019 impl<T> fmt::Display for SendError<T> {
1020     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1021         "sending on a closed channel".fmt(f)
1022     }
1023 }
1024
1025 #[stable(feature = "rust1", since = "1.0.0")]
1026 impl<T: Send + Reflect> error::Error for SendError<T> {
1027     fn description(&self) -> &str {
1028         "sending on a closed channel"
1029     }
1030
1031     fn cause(&self) -> Option<&error::Error> {
1032         None
1033     }
1034 }
1035
1036 #[stable(feature = "rust1", since = "1.0.0")]
1037 impl<T> fmt::Debug for TrySendError<T> {
1038     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1039         match *self {
1040             TrySendError::Full(..) => "Full(..)".fmt(f),
1041             TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1042         }
1043     }
1044 }
1045
1046 #[stable(feature = "rust1", since = "1.0.0")]
1047 impl<T> fmt::Display for TrySendError<T> {
1048     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1049         match *self {
1050             TrySendError::Full(..) => {
1051                 "sending on a full channel".fmt(f)
1052             }
1053             TrySendError::Disconnected(..) => {
1054                 "sending on a closed channel".fmt(f)
1055             }
1056         }
1057     }
1058 }
1059
1060 #[stable(feature = "rust1", since = "1.0.0")]
1061 impl<T: Send + Reflect> error::Error for TrySendError<T> {
1062
1063     fn description(&self) -> &str {
1064         match *self {
1065             TrySendError::Full(..) => {
1066                 "sending on a full channel"
1067             }
1068             TrySendError::Disconnected(..) => {
1069                 "sending on a closed channel"
1070             }
1071         }
1072     }
1073
1074     fn cause(&self) -> Option<&error::Error> {
1075         None
1076     }
1077 }
1078
1079 #[stable(feature = "rust1", since = "1.0.0")]
1080 impl fmt::Display for RecvError {
1081     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1082         "receiving on a closed channel".fmt(f)
1083     }
1084 }
1085
1086 #[stable(feature = "rust1", since = "1.0.0")]
1087 impl error::Error for RecvError {
1088
1089     fn description(&self) -> &str {
1090         "receiving on a closed channel"
1091     }
1092
1093     fn cause(&self) -> Option<&error::Error> {
1094         None
1095     }
1096 }
1097
1098 #[stable(feature = "rust1", since = "1.0.0")]
1099 impl fmt::Display for TryRecvError {
1100     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1101         match *self {
1102             TryRecvError::Empty => {
1103                 "receiving on an empty channel".fmt(f)
1104             }
1105             TryRecvError::Disconnected => {
1106                 "receiving on a closed channel".fmt(f)
1107             }
1108         }
1109     }
1110 }
1111
1112 #[stable(feature = "rust1", since = "1.0.0")]
1113 impl error::Error for TryRecvError {
1114
1115     fn description(&self) -> &str {
1116         match *self {
1117             TryRecvError::Empty => {
1118                 "receiving on an empty channel"
1119             }
1120             TryRecvError::Disconnected => {
1121                 "receiving on a closed channel"
1122             }
1123         }
1124     }
1125
1126     fn cause(&self) -> Option<&error::Error> {
1127         None
1128     }
1129 }
1130
1131 #[cfg(test)]
1132 mod tests {
1133     use prelude::v1::*;
1134
1135     use env;
1136     use super::*;
1137     use thread;
1138
1139     pub fn stress_factor() -> usize {
1140         match env::var("RUST_TEST_STRESS") {
1141             Ok(val) => val.parse().unwrap(),
1142             Err(..) => 1,
1143         }
1144     }
1145
1146     #[test]
1147     fn smoke() {
1148         let (tx, rx) = channel::<i32>();
1149         tx.send(1).unwrap();
1150         assert_eq!(rx.recv().unwrap(), 1);
1151     }
1152
1153     #[test]
1154     fn drop_full() {
1155         let (tx, _rx) = channel::<Box<isize>>();
1156         tx.send(box 1).unwrap();
1157     }
1158
1159     #[test]
1160     fn drop_full_shared() {
1161         let (tx, _rx) = channel::<Box<isize>>();
1162         drop(tx.clone());
1163         drop(tx.clone());
1164         tx.send(box 1).unwrap();
1165     }
1166
1167     #[test]
1168     fn smoke_shared() {
1169         let (tx, rx) = channel::<i32>();
1170         tx.send(1).unwrap();
1171         assert_eq!(rx.recv().unwrap(), 1);
1172         let tx = tx.clone();
1173         tx.send(1).unwrap();
1174         assert_eq!(rx.recv().unwrap(), 1);
1175     }
1176
1177     #[test]
1178     fn smoke_threads() {
1179         let (tx, rx) = channel::<i32>();
1180         let _t = thread::spawn(move|| {
1181             tx.send(1).unwrap();
1182         });
1183         assert_eq!(rx.recv().unwrap(), 1);
1184     }
1185
1186     #[test]
1187     fn smoke_port_gone() {
1188         let (tx, rx) = channel::<i32>();
1189         drop(rx);
1190         assert!(tx.send(1).is_err());
1191     }
1192
1193     #[test]
1194     fn smoke_shared_port_gone() {
1195         let (tx, rx) = channel::<i32>();
1196         drop(rx);
1197         assert!(tx.send(1).is_err())
1198     }
1199
1200     #[test]
1201     fn smoke_shared_port_gone2() {
1202         let (tx, rx) = channel::<i32>();
1203         drop(rx);
1204         let tx2 = tx.clone();
1205         drop(tx);
1206         assert!(tx2.send(1).is_err());
1207     }
1208
1209     #[test]
1210     fn port_gone_concurrent() {
1211         let (tx, rx) = channel::<i32>();
1212         let _t = thread::spawn(move|| {
1213             rx.recv().unwrap();
1214         });
1215         while tx.send(1).is_ok() {}
1216     }
1217
1218     #[test]
1219     fn port_gone_concurrent_shared() {
1220         let (tx, rx) = channel::<i32>();
1221         let tx2 = tx.clone();
1222         let _t = thread::spawn(move|| {
1223             rx.recv().unwrap();
1224         });
1225         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1226     }
1227
1228     #[test]
1229     fn smoke_chan_gone() {
1230         let (tx, rx) = channel::<i32>();
1231         drop(tx);
1232         assert!(rx.recv().is_err());
1233     }
1234
1235     #[test]
1236     fn smoke_chan_gone_shared() {
1237         let (tx, rx) = channel::<()>();
1238         let tx2 = tx.clone();
1239         drop(tx);
1240         drop(tx2);
1241         assert!(rx.recv().is_err());
1242     }
1243
1244     #[test]
1245     fn chan_gone_concurrent() {
1246         let (tx, rx) = channel::<i32>();
1247         let _t = thread::spawn(move|| {
1248             tx.send(1).unwrap();
1249             tx.send(1).unwrap();
1250         });
1251         while rx.recv().is_ok() {}
1252     }
1253
1254     #[test]
1255     fn stress() {
1256         let (tx, rx) = channel::<i32>();
1257         let t = thread::spawn(move|| {
1258             for _ in 0..10000 { tx.send(1).unwrap(); }
1259         });
1260         for _ in 0..10000 {
1261             assert_eq!(rx.recv().unwrap(), 1);
1262         }
1263         t.join().ok().unwrap();
1264     }
1265
1266     #[test]
1267     fn stress_shared() {
1268         const AMT: u32 = 10000;
1269         const NTHREADS: u32 = 8;
1270         let (tx, rx) = channel::<i32>();
1271
1272         let t = thread::spawn(move|| {
1273             for _ in 0..AMT * NTHREADS {
1274                 assert_eq!(rx.recv().unwrap(), 1);
1275             }
1276             match rx.try_recv() {
1277                 Ok(..) => panic!(),
1278                 _ => {}
1279             }
1280         });
1281
1282         for _ in 0..NTHREADS {
1283             let tx = tx.clone();
1284             thread::spawn(move|| {
1285                 for _ in 0..AMT { tx.send(1).unwrap(); }
1286             });
1287         }
1288         drop(tx);
1289         t.join().ok().unwrap();
1290     }
1291
1292     #[test]
1293     fn send_from_outside_runtime() {
1294         let (tx1, rx1) = channel::<()>();
1295         let (tx2, rx2) = channel::<i32>();
1296         let t1 = thread::spawn(move|| {
1297             tx1.send(()).unwrap();
1298             for _ in 0..40 {
1299                 assert_eq!(rx2.recv().unwrap(), 1);
1300             }
1301         });
1302         rx1.recv().unwrap();
1303         let t2 = thread::spawn(move|| {
1304             for _ in 0..40 {
1305                 tx2.send(1).unwrap();
1306             }
1307         });
1308         t1.join().ok().unwrap();
1309         t2.join().ok().unwrap();
1310     }
1311
1312     #[test]
1313     fn recv_from_outside_runtime() {
1314         let (tx, rx) = channel::<i32>();
1315         let t = thread::spawn(move|| {
1316             for _ in 0..40 {
1317                 assert_eq!(rx.recv().unwrap(), 1);
1318             }
1319         });
1320         for _ in 0..40 {
1321             tx.send(1).unwrap();
1322         }
1323         t.join().ok().unwrap();
1324     }
1325
1326     #[test]
1327     fn no_runtime() {
1328         let (tx1, rx1) = channel::<i32>();
1329         let (tx2, rx2) = channel::<i32>();
1330         let t1 = thread::spawn(move|| {
1331             assert_eq!(rx1.recv().unwrap(), 1);
1332             tx2.send(2).unwrap();
1333         });
1334         let t2 = thread::spawn(move|| {
1335             tx1.send(1).unwrap();
1336             assert_eq!(rx2.recv().unwrap(), 2);
1337         });
1338         t1.join().ok().unwrap();
1339         t2.join().ok().unwrap();
1340     }
1341
1342     #[test]
1343     fn oneshot_single_thread_close_port_first() {
1344         // Simple test of closing without sending
1345         let (_tx, rx) = channel::<i32>();
1346         drop(rx);
1347     }
1348
1349     #[test]
1350     fn oneshot_single_thread_close_chan_first() {
1351         // Simple test of closing without sending
1352         let (tx, _rx) = channel::<i32>();
1353         drop(tx);
1354     }
1355
1356     #[test]
1357     fn oneshot_single_thread_send_port_close() {
1358         // Testing that the sender cleans up the payload if receiver is closed
1359         let (tx, rx) = channel::<Box<i32>>();
1360         drop(rx);
1361         assert!(tx.send(box 0).is_err());
1362     }
1363
1364     #[test]
1365     fn oneshot_single_thread_recv_chan_close() {
1366         // Receiving on a closed chan will panic
1367         let res = thread::spawn(move|| {
1368             let (tx, rx) = channel::<i32>();
1369             drop(tx);
1370             rx.recv().unwrap();
1371         }).join();
1372         // What is our res?
1373         assert!(res.is_err());
1374     }
1375
1376     #[test]
1377     fn oneshot_single_thread_send_then_recv() {
1378         let (tx, rx) = channel::<Box<i32>>();
1379         tx.send(box 10).unwrap();
1380         assert!(rx.recv().unwrap() == box 10);
1381     }
1382
1383     #[test]
1384     fn oneshot_single_thread_try_send_open() {
1385         let (tx, rx) = channel::<i32>();
1386         assert!(tx.send(10).is_ok());
1387         assert!(rx.recv().unwrap() == 10);
1388     }
1389
1390     #[test]
1391     fn oneshot_single_thread_try_send_closed() {
1392         let (tx, rx) = channel::<i32>();
1393         drop(rx);
1394         assert!(tx.send(10).is_err());
1395     }
1396
1397     #[test]
1398     fn oneshot_single_thread_try_recv_open() {
1399         let (tx, rx) = channel::<i32>();
1400         tx.send(10).unwrap();
1401         assert!(rx.recv() == Ok(10));
1402     }
1403
1404     #[test]
1405     fn oneshot_single_thread_try_recv_closed() {
1406         let (tx, rx) = channel::<i32>();
1407         drop(tx);
1408         assert!(rx.recv().is_err());
1409     }
1410
1411     #[test]
1412     fn oneshot_single_thread_peek_data() {
1413         let (tx, rx) = channel::<i32>();
1414         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1415         tx.send(10).unwrap();
1416         assert_eq!(rx.try_recv(), Ok(10));
1417     }
1418
1419     #[test]
1420     fn oneshot_single_thread_peek_close() {
1421         let (tx, rx) = channel::<i32>();
1422         drop(tx);
1423         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1424         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1425     }
1426
1427     #[test]
1428     fn oneshot_single_thread_peek_open() {
1429         let (_tx, rx) = channel::<i32>();
1430         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1431     }
1432
1433     #[test]
1434     fn oneshot_multi_task_recv_then_send() {
1435         let (tx, rx) = channel::<Box<i32>>();
1436         let _t = thread::spawn(move|| {
1437             assert!(rx.recv().unwrap() == box 10);
1438         });
1439
1440         tx.send(box 10).unwrap();
1441     }
1442
1443     #[test]
1444     fn oneshot_multi_task_recv_then_close() {
1445         let (tx, rx) = channel::<Box<i32>>();
1446         let _t = thread::spawn(move|| {
1447             drop(tx);
1448         });
1449         let res = thread::spawn(move|| {
1450             assert!(rx.recv().unwrap() == box 10);
1451         }).join();
1452         assert!(res.is_err());
1453     }
1454
1455     #[test]
1456     fn oneshot_multi_thread_close_stress() {
1457         for _ in 0..stress_factor() {
1458             let (tx, rx) = channel::<i32>();
1459             let _t = thread::spawn(move|| {
1460                 drop(rx);
1461             });
1462             drop(tx);
1463         }
1464     }
1465
1466     #[test]
1467     fn oneshot_multi_thread_send_close_stress() {
1468         for _ in 0..stress_factor() {
1469             let (tx, rx) = channel::<i32>();
1470             let _t = thread::spawn(move|| {
1471                 drop(rx);
1472             });
1473             let _ = thread::spawn(move|| {
1474                 tx.send(1).unwrap();
1475             }).join();
1476         }
1477     }
1478
1479     #[test]
1480     fn oneshot_multi_thread_recv_close_stress() {
1481         for _ in 0..stress_factor() {
1482             let (tx, rx) = channel::<i32>();
1483             thread::spawn(move|| {
1484                 let res = thread::spawn(move|| {
1485                     rx.recv().unwrap();
1486                 }).join();
1487                 assert!(res.is_err());
1488             });
1489             let _t = thread::spawn(move|| {
1490                 thread::spawn(move|| {
1491                     drop(tx);
1492                 });
1493             });
1494         }
1495     }
1496
1497     #[test]
1498     fn oneshot_multi_thread_send_recv_stress() {
1499         for _ in 0..stress_factor() {
1500             let (tx, rx) = channel::<Box<isize>>();
1501             let _t = thread::spawn(move|| {
1502                 tx.send(box 10).unwrap();
1503             });
1504             assert!(rx.recv().unwrap() == box 10);
1505         }
1506     }
1507
1508     #[test]
1509     fn stream_send_recv_stress() {
1510         for _ in 0..stress_factor() {
1511             let (tx, rx) = channel();
1512
1513             send(tx, 0);
1514             recv(rx, 0);
1515
1516             fn send(tx: Sender<Box<i32>>, i: i32) {
1517                 if i == 10 { return }
1518
1519                 thread::spawn(move|| {
1520                     tx.send(box i).unwrap();
1521                     send(tx, i + 1);
1522                 });
1523             }
1524
1525             fn recv(rx: Receiver<Box<i32>>, i: i32) {
1526                 if i == 10 { return }
1527
1528                 thread::spawn(move|| {
1529                     assert!(rx.recv().unwrap() == box i);
1530                     recv(rx, i + 1);
1531                 });
1532             }
1533         }
1534     }
1535
1536     #[test]
1537     fn recv_a_lot() {
1538         // Regression test that we don't run out of stack in scheduler context
1539         let (tx, rx) = channel();
1540         for _ in 0..10000 { tx.send(()).unwrap(); }
1541         for _ in 0..10000 { rx.recv().unwrap(); }
1542     }
1543
1544     #[test]
1545     fn shared_chan_stress() {
1546         let (tx, rx) = channel();
1547         let total = stress_factor() + 100;
1548         for _ in 0..total {
1549             let tx = tx.clone();
1550             thread::spawn(move|| {
1551                 tx.send(()).unwrap();
1552             });
1553         }
1554
1555         for _ in 0..total {
1556             rx.recv().unwrap();
1557         }
1558     }
1559
1560     #[test]
1561     fn test_nested_recv_iter() {
1562         let (tx, rx) = channel::<i32>();
1563         let (total_tx, total_rx) = channel::<i32>();
1564
1565         let _t = thread::spawn(move|| {
1566             let mut acc = 0;
1567             for x in rx.iter() {
1568                 acc += x;
1569             }
1570             total_tx.send(acc).unwrap();
1571         });
1572
1573         tx.send(3).unwrap();
1574         tx.send(1).unwrap();
1575         tx.send(2).unwrap();
1576         drop(tx);
1577         assert_eq!(total_rx.recv().unwrap(), 6);
1578     }
1579
1580     #[test]
1581     fn test_recv_iter_break() {
1582         let (tx, rx) = channel::<i32>();
1583         let (count_tx, count_rx) = channel();
1584
1585         let _t = thread::spawn(move|| {
1586             let mut count = 0;
1587             for x in rx.iter() {
1588                 if count >= 3 {
1589                     break;
1590                 } else {
1591                     count += x;
1592                 }
1593             }
1594             count_tx.send(count).unwrap();
1595         });
1596
1597         tx.send(2).unwrap();
1598         tx.send(2).unwrap();
1599         tx.send(2).unwrap();
1600         let _ = tx.send(2);
1601         drop(tx);
1602         assert_eq!(count_rx.recv().unwrap(), 4);
1603     }
1604
1605     #[test]
1606     fn test_recv_into_iter_owned() {
1607         let mut iter = {
1608           let (tx, rx) = channel::<i32>();
1609           tx.send(1).unwrap();
1610           tx.send(2).unwrap();
1611
1612           rx.into_iter()
1613         };
1614         assert_eq!(iter.next().unwrap(), 1);
1615         assert_eq!(iter.next().unwrap(), 2);
1616         assert_eq!(iter.next().is_none(), true);
1617     }
1618
1619     #[test]
1620     fn test_recv_into_iter_borrowed() {
1621         let (tx, rx) = channel::<i32>();
1622         tx.send(1).unwrap();
1623         tx.send(2).unwrap();
1624         drop(tx);
1625         let mut iter = (&rx).into_iter();
1626         assert_eq!(iter.next().unwrap(), 1);
1627         assert_eq!(iter.next().unwrap(), 2);
1628         assert_eq!(iter.next().is_none(), true);
1629     }
1630
1631     #[test]
1632     fn try_recv_states() {
1633         let (tx1, rx1) = channel::<i32>();
1634         let (tx2, rx2) = channel::<()>();
1635         let (tx3, rx3) = channel::<()>();
1636         let _t = thread::spawn(move|| {
1637             rx2.recv().unwrap();
1638             tx1.send(1).unwrap();
1639             tx3.send(()).unwrap();
1640             rx2.recv().unwrap();
1641             drop(tx1);
1642             tx3.send(()).unwrap();
1643         });
1644
1645         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1646         tx2.send(()).unwrap();
1647         rx3.recv().unwrap();
1648         assert_eq!(rx1.try_recv(), Ok(1));
1649         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1650         tx2.send(()).unwrap();
1651         rx3.recv().unwrap();
1652         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1653     }
1654
1655     // This bug used to end up in a livelock inside of the Receiver destructor
1656     // because the internal state of the Shared packet was corrupted
1657     #[test]
1658     fn destroy_upgraded_shared_port_when_sender_still_active() {
1659         let (tx, rx) = channel();
1660         let (tx2, rx2) = channel();
1661         let _t = thread::spawn(move|| {
1662             rx.recv().unwrap(); // wait on a oneshot
1663             drop(rx);  // destroy a shared
1664             tx2.send(()).unwrap();
1665         });
1666         // make sure the other thread has gone to sleep
1667         for _ in 0..5000 { thread::yield_now(); }
1668
1669         // upgrade to a shared chan and send a message
1670         let t = tx.clone();
1671         drop(tx);
1672         t.send(()).unwrap();
1673
1674         // wait for the child thread to exit before we exit
1675         rx2.recv().unwrap();
1676     }
1677 }
1678
1679 #[cfg(test)]
1680 mod sync_tests {
1681     use prelude::v1::*;
1682
1683     use env;
1684     use thread;
1685     use super::*;
1686
1687     pub fn stress_factor() -> usize {
1688         match env::var("RUST_TEST_STRESS") {
1689             Ok(val) => val.parse().unwrap(),
1690             Err(..) => 1,
1691         }
1692     }
1693
1694     #[test]
1695     fn smoke() {
1696         let (tx, rx) = sync_channel::<i32>(1);
1697         tx.send(1).unwrap();
1698         assert_eq!(rx.recv().unwrap(), 1);
1699     }
1700
1701     #[test]
1702     fn drop_full() {
1703         let (tx, _rx) = sync_channel::<Box<isize>>(1);
1704         tx.send(box 1).unwrap();
1705     }
1706
1707     #[test]
1708     fn smoke_shared() {
1709         let (tx, rx) = sync_channel::<i32>(1);
1710         tx.send(1).unwrap();
1711         assert_eq!(rx.recv().unwrap(), 1);
1712         let tx = tx.clone();
1713         tx.send(1).unwrap();
1714         assert_eq!(rx.recv().unwrap(), 1);
1715     }
1716
1717     #[test]
1718     fn smoke_threads() {
1719         let (tx, rx) = sync_channel::<i32>(0);
1720         let _t = thread::spawn(move|| {
1721             tx.send(1).unwrap();
1722         });
1723         assert_eq!(rx.recv().unwrap(), 1);
1724     }
1725
1726     #[test]
1727     fn smoke_port_gone() {
1728         let (tx, rx) = sync_channel::<i32>(0);
1729         drop(rx);
1730         assert!(tx.send(1).is_err());
1731     }
1732
1733     #[test]
1734     fn smoke_shared_port_gone2() {
1735         let (tx, rx) = sync_channel::<i32>(0);
1736         drop(rx);
1737         let tx2 = tx.clone();
1738         drop(tx);
1739         assert!(tx2.send(1).is_err());
1740     }
1741
1742     #[test]
1743     fn port_gone_concurrent() {
1744         let (tx, rx) = sync_channel::<i32>(0);
1745         let _t = thread::spawn(move|| {
1746             rx.recv().unwrap();
1747         });
1748         while tx.send(1).is_ok() {}
1749     }
1750
1751     #[test]
1752     fn port_gone_concurrent_shared() {
1753         let (tx, rx) = sync_channel::<i32>(0);
1754         let tx2 = tx.clone();
1755         let _t = thread::spawn(move|| {
1756             rx.recv().unwrap();
1757         });
1758         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1759     }
1760
1761     #[test]
1762     fn smoke_chan_gone() {
1763         let (tx, rx) = sync_channel::<i32>(0);
1764         drop(tx);
1765         assert!(rx.recv().is_err());
1766     }
1767
1768     #[test]
1769     fn smoke_chan_gone_shared() {
1770         let (tx, rx) = sync_channel::<()>(0);
1771         let tx2 = tx.clone();
1772         drop(tx);
1773         drop(tx2);
1774         assert!(rx.recv().is_err());
1775     }
1776
1777     #[test]
1778     fn chan_gone_concurrent() {
1779         let (tx, rx) = sync_channel::<i32>(0);
1780         thread::spawn(move|| {
1781             tx.send(1).unwrap();
1782             tx.send(1).unwrap();
1783         });
1784         while rx.recv().is_ok() {}
1785     }
1786
1787     #[test]
1788     fn stress() {
1789         let (tx, rx) = sync_channel::<i32>(0);
1790         thread::spawn(move|| {
1791             for _ in 0..10000 { tx.send(1).unwrap(); }
1792         });
1793         for _ in 0..10000 {
1794             assert_eq!(rx.recv().unwrap(), 1);
1795         }
1796     }
1797
1798     #[test]
1799     fn stress_shared() {
1800         const AMT: u32 = 1000;
1801         const NTHREADS: u32 = 8;
1802         let (tx, rx) = sync_channel::<i32>(0);
1803         let (dtx, drx) = sync_channel::<()>(0);
1804
1805         thread::spawn(move|| {
1806             for _ in 0..AMT * NTHREADS {
1807                 assert_eq!(rx.recv().unwrap(), 1);
1808             }
1809             match rx.try_recv() {
1810                 Ok(..) => panic!(),
1811                 _ => {}
1812             }
1813             dtx.send(()).unwrap();
1814         });
1815
1816         for _ in 0..NTHREADS {
1817             let tx = tx.clone();
1818             thread::spawn(move|| {
1819                 for _ in 0..AMT { tx.send(1).unwrap(); }
1820             });
1821         }
1822         drop(tx);
1823         drx.recv().unwrap();
1824     }
1825
1826     #[test]
1827     fn oneshot_single_thread_close_port_first() {
1828         // Simple test of closing without sending
1829         let (_tx, rx) = sync_channel::<i32>(0);
1830         drop(rx);
1831     }
1832
1833     #[test]
1834     fn oneshot_single_thread_close_chan_first() {
1835         // Simple test of closing without sending
1836         let (tx, _rx) = sync_channel::<i32>(0);
1837         drop(tx);
1838     }
1839
1840     #[test]
1841     fn oneshot_single_thread_send_port_close() {
1842         // Testing that the sender cleans up the payload if receiver is closed
1843         let (tx, rx) = sync_channel::<Box<i32>>(0);
1844         drop(rx);
1845         assert!(tx.send(box 0).is_err());
1846     }
1847
1848     #[test]
1849     fn oneshot_single_thread_recv_chan_close() {
1850         // Receiving on a closed chan will panic
1851         let res = thread::spawn(move|| {
1852             let (tx, rx) = sync_channel::<i32>(0);
1853             drop(tx);
1854             rx.recv().unwrap();
1855         }).join();
1856         // What is our res?
1857         assert!(res.is_err());
1858     }
1859
1860     #[test]
1861     fn oneshot_single_thread_send_then_recv() {
1862         let (tx, rx) = sync_channel::<Box<i32>>(1);
1863         tx.send(box 10).unwrap();
1864         assert!(rx.recv().unwrap() == box 10);
1865     }
1866
1867     #[test]
1868     fn oneshot_single_thread_try_send_open() {
1869         let (tx, rx) = sync_channel::<i32>(1);
1870         assert_eq!(tx.try_send(10), Ok(()));
1871         assert!(rx.recv().unwrap() == 10);
1872     }
1873
1874     #[test]
1875     fn oneshot_single_thread_try_send_closed() {
1876         let (tx, rx) = sync_channel::<i32>(0);
1877         drop(rx);
1878         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1879     }
1880
1881     #[test]
1882     fn oneshot_single_thread_try_send_closed2() {
1883         let (tx, _rx) = sync_channel::<i32>(0);
1884         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1885     }
1886
1887     #[test]
1888     fn oneshot_single_thread_try_recv_open() {
1889         let (tx, rx) = sync_channel::<i32>(1);
1890         tx.send(10).unwrap();
1891         assert!(rx.recv() == Ok(10));
1892     }
1893
1894     #[test]
1895     fn oneshot_single_thread_try_recv_closed() {
1896         let (tx, rx) = sync_channel::<i32>(0);
1897         drop(tx);
1898         assert!(rx.recv().is_err());
1899     }
1900
1901     #[test]
1902     fn oneshot_single_thread_peek_data() {
1903         let (tx, rx) = sync_channel::<i32>(1);
1904         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1905         tx.send(10).unwrap();
1906         assert_eq!(rx.try_recv(), Ok(10));
1907     }
1908
1909     #[test]
1910     fn oneshot_single_thread_peek_close() {
1911         let (tx, rx) = sync_channel::<i32>(0);
1912         drop(tx);
1913         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1914         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1915     }
1916
1917     #[test]
1918     fn oneshot_single_thread_peek_open() {
1919         let (_tx, rx) = sync_channel::<i32>(0);
1920         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1921     }
1922
1923     #[test]
1924     fn oneshot_multi_task_recv_then_send() {
1925         let (tx, rx) = sync_channel::<Box<i32>>(0);
1926         let _t = thread::spawn(move|| {
1927             assert!(rx.recv().unwrap() == box 10);
1928         });
1929
1930         tx.send(box 10).unwrap();
1931     }
1932
1933     #[test]
1934     fn oneshot_multi_task_recv_then_close() {
1935         let (tx, rx) = sync_channel::<Box<i32>>(0);
1936         let _t = thread::spawn(move|| {
1937             drop(tx);
1938         });
1939         let res = thread::spawn(move|| {
1940             assert!(rx.recv().unwrap() == box 10);
1941         }).join();
1942         assert!(res.is_err());
1943     }
1944
1945     #[test]
1946     fn oneshot_multi_thread_close_stress() {
1947         for _ in 0..stress_factor() {
1948             let (tx, rx) = sync_channel::<i32>(0);
1949             let _t = thread::spawn(move|| {
1950                 drop(rx);
1951             });
1952             drop(tx);
1953         }
1954     }
1955
1956     #[test]
1957     fn oneshot_multi_thread_send_close_stress() {
1958         for _ in 0..stress_factor() {
1959             let (tx, rx) = sync_channel::<i32>(0);
1960             let _t = thread::spawn(move|| {
1961                 drop(rx);
1962             });
1963             let _ = thread::spawn(move || {
1964                 tx.send(1).unwrap();
1965             }).join();
1966         }
1967     }
1968
1969     #[test]
1970     fn oneshot_multi_thread_recv_close_stress() {
1971         for _ in 0..stress_factor() {
1972             let (tx, rx) = sync_channel::<i32>(0);
1973             let _t = thread::spawn(move|| {
1974                 let res = thread::spawn(move|| {
1975                     rx.recv().unwrap();
1976                 }).join();
1977                 assert!(res.is_err());
1978             });
1979             let _t = thread::spawn(move|| {
1980                 thread::spawn(move|| {
1981                     drop(tx);
1982                 });
1983             });
1984         }
1985     }
1986
1987     #[test]
1988     fn oneshot_multi_thread_send_recv_stress() {
1989         for _ in 0..stress_factor() {
1990             let (tx, rx) = sync_channel::<Box<i32>>(0);
1991             let _t = thread::spawn(move|| {
1992                 tx.send(box 10).unwrap();
1993             });
1994             assert!(rx.recv().unwrap() == box 10);
1995         }
1996     }
1997
1998     #[test]
1999     fn stream_send_recv_stress() {
2000         for _ in 0..stress_factor() {
2001             let (tx, rx) = sync_channel::<Box<i32>>(0);
2002
2003             send(tx, 0);
2004             recv(rx, 0);
2005
2006             fn send(tx: SyncSender<Box<i32>>, i: i32) {
2007                 if i == 10 { return }
2008
2009                 thread::spawn(move|| {
2010                     tx.send(box i).unwrap();
2011                     send(tx, i + 1);
2012                 });
2013             }
2014
2015             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2016                 if i == 10 { return }
2017
2018                 thread::spawn(move|| {
2019                     assert!(rx.recv().unwrap() == box i);
2020                     recv(rx, i + 1);
2021                 });
2022             }
2023         }
2024     }
2025
2026     #[test]
2027     fn recv_a_lot() {
2028         // Regression test that we don't run out of stack in scheduler context
2029         let (tx, rx) = sync_channel(10000);
2030         for _ in 0..10000 { tx.send(()).unwrap(); }
2031         for _ in 0..10000 { rx.recv().unwrap(); }
2032     }
2033
2034     #[test]
2035     fn shared_chan_stress() {
2036         let (tx, rx) = sync_channel(0);
2037         let total = stress_factor() + 100;
2038         for _ in 0..total {
2039             let tx = tx.clone();
2040             thread::spawn(move|| {
2041                 tx.send(()).unwrap();
2042             });
2043         }
2044
2045         for _ in 0..total {
2046             rx.recv().unwrap();
2047         }
2048     }
2049
2050     #[test]
2051     fn test_nested_recv_iter() {
2052         let (tx, rx) = sync_channel::<i32>(0);
2053         let (total_tx, total_rx) = sync_channel::<i32>(0);
2054
2055         let _t = thread::spawn(move|| {
2056             let mut acc = 0;
2057             for x in rx.iter() {
2058                 acc += x;
2059             }
2060             total_tx.send(acc).unwrap();
2061         });
2062
2063         tx.send(3).unwrap();
2064         tx.send(1).unwrap();
2065         tx.send(2).unwrap();
2066         drop(tx);
2067         assert_eq!(total_rx.recv().unwrap(), 6);
2068     }
2069
2070     #[test]
2071     fn test_recv_iter_break() {
2072         let (tx, rx) = sync_channel::<i32>(0);
2073         let (count_tx, count_rx) = sync_channel(0);
2074
2075         let _t = thread::spawn(move|| {
2076             let mut count = 0;
2077             for x in rx.iter() {
2078                 if count >= 3 {
2079                     break;
2080                 } else {
2081                     count += x;
2082                 }
2083             }
2084             count_tx.send(count).unwrap();
2085         });
2086
2087         tx.send(2).unwrap();
2088         tx.send(2).unwrap();
2089         tx.send(2).unwrap();
2090         let _ = tx.try_send(2);
2091         drop(tx);
2092         assert_eq!(count_rx.recv().unwrap(), 4);
2093     }
2094
2095     #[test]
2096     fn try_recv_states() {
2097         let (tx1, rx1) = sync_channel::<i32>(1);
2098         let (tx2, rx2) = sync_channel::<()>(1);
2099         let (tx3, rx3) = sync_channel::<()>(1);
2100         let _t = thread::spawn(move|| {
2101             rx2.recv().unwrap();
2102             tx1.send(1).unwrap();
2103             tx3.send(()).unwrap();
2104             rx2.recv().unwrap();
2105             drop(tx1);
2106             tx3.send(()).unwrap();
2107         });
2108
2109         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2110         tx2.send(()).unwrap();
2111         rx3.recv().unwrap();
2112         assert_eq!(rx1.try_recv(), Ok(1));
2113         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2114         tx2.send(()).unwrap();
2115         rx3.recv().unwrap();
2116         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2117     }
2118
2119     // This bug used to end up in a livelock inside of the Receiver destructor
2120     // because the internal state of the Shared packet was corrupted
2121     #[test]
2122     fn destroy_upgraded_shared_port_when_sender_still_active() {
2123         let (tx, rx) = sync_channel::<()>(0);
2124         let (tx2, rx2) = sync_channel::<()>(0);
2125         let _t = thread::spawn(move|| {
2126             rx.recv().unwrap(); // wait on a oneshot
2127             drop(rx);  // destroy a shared
2128             tx2.send(()).unwrap();
2129         });
2130         // make sure the other thread has gone to sleep
2131         for _ in 0..5000 { thread::yield_now(); }
2132
2133         // upgrade to a shared chan and send a message
2134         let t = tx.clone();
2135         drop(tx);
2136         t.send(()).unwrap();
2137
2138         // wait for the child thread to exit before we exit
2139         rx2.recv().unwrap();
2140     }
2141
2142     #[test]
2143     fn send1() {
2144         let (tx, rx) = sync_channel::<i32>(0);
2145         let _t = thread::spawn(move|| { rx.recv().unwrap(); });
2146         assert_eq!(tx.send(1), Ok(()));
2147     }
2148
2149     #[test]
2150     fn send2() {
2151         let (tx, rx) = sync_channel::<i32>(0);
2152         let _t = thread::spawn(move|| { drop(rx); });
2153         assert!(tx.send(1).is_err());
2154     }
2155
2156     #[test]
2157     fn send3() {
2158         let (tx, rx) = sync_channel::<i32>(1);
2159         assert_eq!(tx.send(1), Ok(()));
2160         let _t =thread::spawn(move|| { drop(rx); });
2161         assert!(tx.send(1).is_err());
2162     }
2163
2164     #[test]
2165     fn send4() {
2166         let (tx, rx) = sync_channel::<i32>(0);
2167         let tx2 = tx.clone();
2168         let (done, donerx) = channel();
2169         let done2 = done.clone();
2170         let _t = thread::spawn(move|| {
2171             assert!(tx.send(1).is_err());
2172             done.send(()).unwrap();
2173         });
2174         let _t = thread::spawn(move|| {
2175             assert!(tx2.send(2).is_err());
2176             done2.send(()).unwrap();
2177         });
2178         drop(rx);
2179         donerx.recv().unwrap();
2180         donerx.recv().unwrap();
2181     }
2182
2183     #[test]
2184     fn try_send1() {
2185         let (tx, _rx) = sync_channel::<i32>(0);
2186         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2187     }
2188
2189     #[test]
2190     fn try_send2() {
2191         let (tx, _rx) = sync_channel::<i32>(1);
2192         assert_eq!(tx.try_send(1), Ok(()));
2193         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2194     }
2195
2196     #[test]
2197     fn try_send3() {
2198         let (tx, rx) = sync_channel::<i32>(1);
2199         assert_eq!(tx.try_send(1), Ok(()));
2200         drop(rx);
2201         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2202     }
2203
2204     #[test]
2205     fn issue_15761() {
2206         fn repro() {
2207             let (tx1, rx1) = sync_channel::<()>(3);
2208             let (tx2, rx2) = sync_channel::<()>(3);
2209
2210             let _t = thread::spawn(move|| {
2211                 rx1.recv().unwrap();
2212                 tx2.try_send(()).unwrap();
2213             });
2214
2215             tx1.try_send(()).unwrap();
2216             rx2.recv().unwrap();
2217         }
2218
2219         for _ in 0..100 {
2220             repro()
2221         }
2222     }
2223
2224     #[test]
2225     fn fmt_debug_sender() {
2226         let (tx, _) = channel::<i32>();
2227         assert_eq!(format!("{:?}", tx), "Sender { .. }");
2228     }
2229
2230     #[test]
2231     fn fmt_debug_recv() {
2232         let (_, rx) = channel::<i32>();
2233         assert_eq!(format!("{:?}", rx), "Receiver { .. }");
2234     }
2235
2236     #[test]
2237     fn fmt_debug_sync_sender() {
2238         let (tx, _) = sync_channel::<i32>(1);
2239         assert_eq!(format!("{:?}", tx), "SyncSender { .. }");
2240     }
2241 }