]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/mod.rs
Auto merge of #41258 - clarcharr:str_box_extras, r=Kimundi
[rust.git] / src / libstd / sync / mpsc / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Multi-producer, single-consumer FIFO queue communication primitives.
12 //!
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
15 //!
16 //! * [`Sender`]
17 //! * [`SyncSender`]
18 //! * [`Receiver`]
19 //!
20 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
23 //!
24 //! These channels come in two flavors:
25 //!
26 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
27 //!    will return a `(Sender, Receiver)` tuple where all sends will be
28 //!    **asynchronous** (they never block). The channel conceptually has an
29 //!    infinite buffer.
30 //!
31 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
32 //!    return a `(SyncSender, Receiver)` tuple where the storage for pending
33 //!    messages is a pre-allocated buffer of a fixed size. All sends will be
34 //!    **synchronous** by blocking until there is buffer space available. Note
35 //!    that a bound of 0 is allowed, causing the channel to become a "rendezvous"
36 //!    channel where each sender atomically hands off a message to a receiver.
37 //!
38 //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
39 //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
40 //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
41 //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
42 //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
43 //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
44 //!
45 //! ## Disconnection
46 //!
47 //! The send and receive operations on channels will all return a [`Result`]
48 //! indicating whether the operation succeeded or not. An unsuccessful operation
49 //! is normally indicative of the other half of a channel having "hung up" by
50 //! being dropped in its corresponding thread.
51 //!
52 //! Once half of a channel has been deallocated, most operations can no longer
53 //! continue to make progress, so [`Err`] will be returned. Many applications
54 //! will continue to [`unwrap`] the results returned from this module,
55 //! instigating a propagation of failure among threads if one unexpectedly dies.
56 //!
57 //! [`Result`]: ../../../std/result/enum.Result.html
58 //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
59 //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
60 //!
61 //! # Examples
62 //!
63 //! Simple usage:
64 //!
65 //! ```
66 //! use std::thread;
67 //! use std::sync::mpsc::channel;
68 //!
69 //! // Create a simple streaming channel
70 //! let (tx, rx) = channel();
71 //! thread::spawn(move|| {
72 //!     tx.send(10).unwrap();
73 //! });
74 //! assert_eq!(rx.recv().unwrap(), 10);
75 //! ```
76 //!
77 //! Shared usage:
78 //!
79 //! ```
80 //! use std::thread;
81 //! use std::sync::mpsc::channel;
82 //!
83 //! // Create a shared channel that can be sent along from many threads
84 //! // where tx is the sending half (tx for transmission), and rx is the receiving
85 //! // half (rx for receiving).
86 //! let (tx, rx) = channel();
87 //! for i in 0..10 {
88 //!     let tx = tx.clone();
89 //!     thread::spawn(move|| {
90 //!         tx.send(i).unwrap();
91 //!     });
92 //! }
93 //!
94 //! for _ in 0..10 {
95 //!     let j = rx.recv().unwrap();
96 //!     assert!(0 <= j && j < 10);
97 //! }
98 //! ```
99 //!
100 //! Propagating panics:
101 //!
102 //! ```
103 //! use std::sync::mpsc::channel;
104 //!
105 //! // The call to recv() will return an error because the channel has already
106 //! // hung up (or been deallocated)
107 //! let (tx, rx) = channel::<i32>();
108 //! drop(tx);
109 //! assert!(rx.recv().is_err());
110 //! ```
111 //!
112 //! Synchronous channels:
113 //!
114 //! ```
115 //! use std::thread;
116 //! use std::sync::mpsc::sync_channel;
117 //!
118 //! let (tx, rx) = sync_channel::<i32>(0);
119 //! thread::spawn(move|| {
120 //!     // This will wait for the parent thread to start receiving
121 //!     tx.send(53).unwrap();
122 //! });
123 //! rx.recv().unwrap();
124 //! ```
125
126 #![stable(feature = "rust1", since = "1.0.0")]
127
128 // A description of how Rust's channel implementation works
129 //
130 // Channels are supposed to be the basic building block for all other
131 // concurrent primitives that are used in Rust. As a result, the channel type
132 // needs to be highly optimized, flexible, and broad enough for use everywhere.
133 //
134 // The choice of implementation of all channels is to be built on lock-free data
135 // structures. The channels themselves are then consequently also lock-free data
136 // structures. As always with lock-free code, this is a very "here be dragons"
137 // territory, especially because I'm unaware of any academic papers that have
138 // gone into great length about channels of these flavors.
139 //
140 // ## Flavors of channels
141 //
142 // From the perspective of a consumer of this library, there is only one flavor
143 // of channel. This channel can be used as a stream and cloned to allow multiple
144 // senders. Under the hood, however, there are actually three flavors of
145 // channels in play.
146 //
147 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
148 //                      case. They contain as few atomics as possible and
149 //                      involve one and exactly one allocation.
150 // * Streams - these channels are optimized for the non-shared use case. They
151 //             use a different concurrent queue that is more tailored for this
152 //             use case. The initial allocation of this flavor of channel is not
153 //             optimized.
154 // * Shared - this is the most general form of channel that this module offers,
155 //            a channel with multiple senders. This type is as optimized as it
156 //            can be, but the previous two types mentioned are much faster for
157 //            their use-cases.
158 //
159 // ## Concurrent queues
160 //
161 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
162 // but recv() obviously blocks. This means that under the hood there must be
163 // some shared and concurrent queue holding all of the actual data.
164 //
165 // With two flavors of channels, two flavors of queues are also used. We have
166 // chosen to use queues from a well-known author that are abbreviated as SPSC
167 // and MPSC (single producer, single consumer and multiple producer, single
168 // consumer). SPSC queues are used for streams while MPSC queues are used for
169 // shared channels.
170 //
171 // ### SPSC optimizations
172 //
173 // The SPSC queue found online is essentially a linked list of nodes where one
174 // half of the nodes are the "queue of data" and the other half of nodes are a
175 // cache of unused nodes. The unused nodes are used such that an allocation is
176 // not required on every push() and a free doesn't need to happen on every
177 // pop().
178 //
179 // As found online, however, the cache of nodes is of an infinite size. This
180 // means that if a channel at one point in its life had 50k items in the queue,
181 // then the queue will always have the capacity for 50k items. I believed that
182 // this was an unnecessary limitation of the implementation, so I have altered
183 // the queue to optionally have a bound on the cache size.
184 //
185 // By default, streams will have an unbounded SPSC queue with a small-ish cache
186 // size. The hope is that the cache is still large enough to have very fast
187 // send() operations while not too large such that millions of channels can
188 // coexist at once.
189 //
190 // ### MPSC optimizations
191 //
192 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
193 // a linked list under the hood to earn its unboundedness, but I have not put
194 // forth much effort into having a cache of nodes similar to the SPSC queue.
195 //
196 // For now, I believe that this is "ok" because shared channels are not the most
197 // common type, but soon we may wish to revisit this queue choice and determine
198 // another candidate for backend storage of shared channels.
199 //
200 // ## Overview of the Implementation
201 //
202 // Now that there's a little background on the concurrent queues used, it's
203 // worth going into much more detail about the channels themselves. The basic
204 // pseudocode for a send/recv are:
205 //
206 //
207 //      send(t)                             recv()
208 //        queue.push(t)                       return if queue.pop()
209 //        if increment() == -1                deschedule {
210 //          wakeup()                            if decrement() > 0
211 //                                                cancel_deschedule()
212 //                                            }
213 //                                            queue.pop()
214 //
215 // As mentioned before, there are no locks in this implementation, only atomic
216 // instructions are used.
217 //
218 // ### The internal atomic counter
219 //
220 // Every channel has a shared counter with each half to keep track of the size
221 // of the queue. This counter is used to abort descheduling by the receiver and
222 // to know when to wake up on the sending side.
223 //
224 // As seen in the pseudocode, senders will increment this count and receivers
225 // will decrement the count. The theory behind this is that if a sender sees a
226 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
227 // then it doesn't need to block.
228 //
229 // The recv() method has a beginning call to pop(), and if successful, it needs
230 // to decrement the count. It is a crucial implementation detail that this
231 // decrement does *not* happen to the shared counter. If this were the case,
232 // then it would be possible for the counter to be very negative when there were
233 // no receivers waiting, in which case the senders would have to determine when
234 // it was actually appropriate to wake up a receiver.
235 //
236 // Instead, the "steal count" is kept track of separately (not atomically
237 // because it's only used by receivers), and then the decrement() call when
238 // descheduling will lump in all of the recent steals into one large decrement.
239 //
240 // The implication of this is that if a sender sees a -1 count, then there's
241 // guaranteed to be a waiter waiting!
242 //
243 // ## Native Implementation
244 //
245 // A major goal of these channels is to work seamlessly on and off the runtime.
246 // All of the previous race conditions have been worded in terms of
247 // scheduler-isms (which is obviously not available without the runtime).
248 //
249 // For now, native usage of channels (off the runtime) will fall back onto
250 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
251 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
252 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
253 // condition variable.
254 //
255 // ## Select
256 //
257 // Being able to support selection over channels has greatly influenced this
258 // design, and not only does selection need to work inside the runtime, but also
259 // outside the runtime.
260 //
261 // The implementation is fairly straightforward. The goal of select() is not to
262 // return some data, but only to return which channel can receive data without
263 // blocking. The implementation is essentially the entire blocking procedure
264 // followed by an increment as soon as its woken up. The cancellation procedure
265 // involves an increment and swapping out of to_wake to acquire ownership of the
266 // thread to unblock.
267 //
268 // Sadly this current implementation requires multiple allocations, so I have
269 // seen the throughput of select() be much worse than it should be. I do not
270 // believe that there is anything fundamental that needs to change about these
271 // channels, however, in order to support a more efficient select().
272 //
273 // # Conclusion
274 //
275 // And now that you've seen all the races that I found and attempted to fix,
276 // here's the code for you to find some more!
277
278 use sync::Arc;
279 use error;
280 use fmt;
281 use mem;
282 use cell::UnsafeCell;
283 use time::{Duration, Instant};
284
285 #[unstable(feature = "mpsc_select", issue = "27800")]
286 pub use self::select::{Select, Handle};
287 use self::select::StartResult;
288 use self::select::StartResult::*;
289 use self::blocking::SignalToken;
290
291 mod blocking;
292 mod oneshot;
293 mod select;
294 mod shared;
295 mod stream;
296 mod sync;
297 mod mpsc_queue;
298 mod spsc_queue;
299
300 /// The receiving-half of Rust's channel type. This half can only be owned by
301 /// one thread.
302 ///
303 /// Messages sent to the channel can be retrieved using [`recv`].
304 ///
305 /// [`recv`]: ../../../std/sync/mpsc/struct.Receiver.html#method.recv
306 ///
307 /// # Examples
308 ///
309 /// ```rust
310 /// use std::sync::mpsc::channel;
311 /// use std::thread;
312 /// use std::time::Duration;
313 ///
314 /// let (send, recv) = channel();
315 ///
316 /// thread::spawn(move || {
317 ///     send.send("Hello world!").unwrap();
318 ///     thread::sleep(Duration::from_secs(2)); // block for two seconds
319 ///     send.send("Delayed for 2 seconds").unwrap();
320 /// });
321 ///
322 /// println!("{}", recv.recv().unwrap()); // Received immediately
323 /// println!("Waiting...");
324 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
325 /// ```
326 #[stable(feature = "rust1", since = "1.0.0")]
327 pub struct Receiver<T> {
328     inner: UnsafeCell<Flavor<T>>,
329 }
330
331 // The receiver port can be sent from place to place, so long as it
332 // is not used to receive non-sendable things.
333 #[stable(feature = "rust1", since = "1.0.0")]
334 unsafe impl<T: Send> Send for Receiver<T> { }
335
336 #[stable(feature = "rust1", since = "1.0.0")]
337 impl<T> !Sync for Receiver<T> { }
338
339 /// An iterator over messages on a receiver, this iterator will block whenever
340 /// [`next`] is called, waiting for a new message, and [`None`] will be returned
341 /// when the corresponding channel has hung up.
342 ///
343 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
344 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
345 #[stable(feature = "rust1", since = "1.0.0")]
346 #[derive(Debug)]
347 pub struct Iter<'a, T: 'a> {
348     rx: &'a Receiver<T>
349 }
350
351 /// An iterator that attempts to yield all pending values for a receiver.
352 /// [`None`] will be returned when there are no pending values remaining or if
353 /// the corresponding channel has hung up.
354 ///
355 /// This Iterator will never block the caller in order to wait for data to
356 /// become available. Instead, it will return [`None`].
357 ///
358 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
359 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
360 #[derive(Debug)]
361 pub struct TryIter<'a, T: 'a> {
362     rx: &'a Receiver<T>
363 }
364
365 /// An owning iterator over messages on a receiver, this iterator will block
366 /// whenever [`next`] is called, waiting for a new message, and [`None`] will be
367 /// returned when the corresponding channel has hung up.
368 ///
369 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
370 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
371 ///
372 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
373 #[derive(Debug)]
374 pub struct IntoIter<T> {
375     rx: Receiver<T>
376 }
377
378 /// The sending-half of Rust's asynchronous channel type. This half can only be
379 /// owned by one thread, but it can be cloned to send to other threads.
380 ///
381 /// Messages can be sent through this channel with [`send`].
382 ///
383 /// [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
384 ///
385 /// # Examples
386 ///
387 /// ```rust
388 /// use std::sync::mpsc::channel;
389 /// use std::thread;
390 ///
391 /// let (sender, receiver) = channel();
392 /// let sender2 = sender.clone();
393 ///
394 /// // First thread owns sender
395 /// thread::spawn(move || {
396 ///     sender.send(1).unwrap();
397 /// });
398 ///
399 /// // Second thread owns sender2
400 /// thread::spawn(move || {
401 ///     sender2.send(2).unwrap();
402 /// });
403 ///
404 /// let msg = receiver.recv().unwrap();
405 /// let msg2 = receiver.recv().unwrap();
406 ///
407 /// assert_eq!(3, msg + msg2);
408 /// ```
409 #[stable(feature = "rust1", since = "1.0.0")]
410 pub struct Sender<T> {
411     inner: UnsafeCell<Flavor<T>>,
412 }
413
414 // The send port can be sent from place to place, so long as it
415 // is not used to send non-sendable things.
416 #[stable(feature = "rust1", since = "1.0.0")]
417 unsafe impl<T: Send> Send for Sender<T> { }
418
419 #[stable(feature = "rust1", since = "1.0.0")]
420 impl<T> !Sync for Sender<T> { }
421
422 /// The sending-half of Rust's synchronous channel type. This half can only be
423 /// owned by one thread, but it can be cloned to send to other threads.
424 ///
425 /// [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
426 /// [`SyncSender::send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
427 ///
428 #[stable(feature = "rust1", since = "1.0.0")]
429 pub struct SyncSender<T> {
430     inner: Arc<sync::Packet<T>>,
431 }
432
433 #[stable(feature = "rust1", since = "1.0.0")]
434 unsafe impl<T: Send> Send for SyncSender<T> {}
435
436 #[stable(feature = "rust1", since = "1.0.0")]
437 impl<T> !Sync for SyncSender<T> {}
438
439 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
440 /// function on **channel**s.
441 ///
442 /// A **send** operation can only fail if the receiving end of a channel is
443 /// disconnected, implying that the data could never be received. The error
444 /// contains the data being sent as a payload so it can be recovered.
445 ///
446 /// [`Sender::send`]: struct.Sender.html#method.send
447 /// [`SyncSender::send`]: struct.SyncSender.html#method.send
448 #[stable(feature = "rust1", since = "1.0.0")]
449 #[derive(PartialEq, Eq, Clone, Copy)]
450 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
451
452 /// An error returned from the [`recv`] function on a [`Receiver`].
453 ///
454 /// The [`recv`] operation can only fail if the sending half of a
455 /// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
456 /// messages will ever be received.
457 ///
458 /// [`recv`]: struct.Receiver.html#method.recv
459 /// [`Receiver`]: struct.Receiver.html
460 /// [`channel`]: fn.channel.html
461 /// [`sync_channel`]: fn.sync_channel.html
462 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
463 #[stable(feature = "rust1", since = "1.0.0")]
464 pub struct RecvError;
465
466 /// This enumeration is the list of the possible reasons that [`try_recv`] could
467 /// not return data when called. This can occur with both a [`channel`] and
468 /// a [`sync_channel`].
469 ///
470 /// [`try_recv`]: struct.Receiver.html#method.try_recv
471 /// [`channel`]: fn.channel.html
472 /// [`sync_channel`]: fn.sync_channel.html
473 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
474 #[stable(feature = "rust1", since = "1.0.0")]
475 pub enum TryRecvError {
476     /// This **channel** is currently empty, but the **Sender**(s) have not yet
477     /// disconnected, so data may yet become available.
478     #[stable(feature = "rust1", since = "1.0.0")]
479     Empty,
480
481     /// The **channel**'s sending half has become disconnected, and there will
482     /// never be any more data received on it.
483     #[stable(feature = "rust1", since = "1.0.0")]
484     Disconnected,
485 }
486
487 /// This enumeration is the list of possible errors that made [`recv_timeout`]
488 /// unable to return data when called. This can occur with both a [`channel`] and
489 /// a [`sync_channel`].
490 ///
491 /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
492 /// [`channel`]: fn.channel.html
493 /// [`sync_channel`]: fn.sync_channel.html
494 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
495 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
496 pub enum RecvTimeoutError {
497     /// This **channel** is currently empty, but the **Sender**(s) have not yet
498     /// disconnected, so data may yet become available.
499     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
500     Timeout,
501     /// The **channel**'s sending half has become disconnected, and there will
502     /// never be any more data received on it.
503     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
504     Disconnected,
505 }
506
507 /// This enumeration is the list of the possible error outcomes for the
508 /// [`try_send`] method.
509 ///
510 /// [`try_send`]: struct.SyncSender.html#method.try_send
511 #[stable(feature = "rust1", since = "1.0.0")]
512 #[derive(PartialEq, Eq, Clone, Copy)]
513 pub enum TrySendError<T> {
514     /// The data could not be sent on the [`sync_channel`] because it would require that
515     /// the callee block to send the data.
516     ///
517     /// If this is a buffered channel, then the buffer is full at this time. If
518     /// this is not a buffered channel, then there is no [`Receiver`] available to
519     /// acquire the data.
520     ///
521     /// [`sync_channel`]: fn.sync_channel.html
522     /// [`Receiver`]: struct.Receiver.html
523     #[stable(feature = "rust1", since = "1.0.0")]
524     Full(#[stable(feature = "rust1", since = "1.0.0")] T),
525
526     /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
527     /// sent. The data is returned back to the callee in this case.
528     ///
529     /// [`sync_channel`]: fn.sync_channel.html
530     #[stable(feature = "rust1", since = "1.0.0")]
531     Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
532 }
533
534 enum Flavor<T> {
535     Oneshot(Arc<oneshot::Packet<T>>),
536     Stream(Arc<stream::Packet<T>>),
537     Shared(Arc<shared::Packet<T>>),
538     Sync(Arc<sync::Packet<T>>),
539 }
540
541 #[doc(hidden)]
542 trait UnsafeFlavor<T> {
543     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
544     unsafe fn inner_mut(&self) -> &mut Flavor<T> {
545         &mut *self.inner_unsafe().get()
546     }
547     unsafe fn inner(&self) -> &Flavor<T> {
548         &*self.inner_unsafe().get()
549     }
550 }
551 impl<T> UnsafeFlavor<T> for Sender<T> {
552     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
553         &self.inner
554     }
555 }
556 impl<T> UnsafeFlavor<T> for Receiver<T> {
557     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
558         &self.inner
559     }
560 }
561
562 /// Creates a new asynchronous channel, returning the sender/receiver halves.
563 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
564 /// the same order as it was sent, and no [`send`] will block the calling thread
565 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
566 /// block after its buffer limit is reached). [`recv`] will block until a message
567 /// is available.
568 ///
569 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
570 /// only one [`Receiver`] is supported.
571 ///
572 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
573 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, If the
574 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
575 /// return a [`RecvError`].
576 ///
577 /// [`send`]: struct.Sender.html#method.send
578 /// [`recv`]: struct.Receiver.html#method.recv
579 /// [`Sender`]: struct.Sender.html
580 /// [`Receiver`]: struct.Receiver.html
581 /// [`sync_channel`]: fn.sync_channel.html
582 /// [`SendError`]: struct.SendError.html
583 /// [`RecvError`]: struct.RecvError.html
584 ///
585 /// # Examples
586 ///
587 /// ```
588 /// use std::sync::mpsc::channel;
589 /// use std::thread;
590 ///
591 /// let (sender, receiver) = channel();
592 ///
593 /// // Spawn off an expensive computation
594 /// thread::spawn(move|| {
595 /// #   fn expensive_computation() {}
596 ///     sender.send(expensive_computation()).unwrap();
597 /// });
598 ///
599 /// // Do some useful work for awhile
600 ///
601 /// // Let's see what that answer was
602 /// println!("{:?}", receiver.recv().unwrap());
603 /// ```
604 #[stable(feature = "rust1", since = "1.0.0")]
605 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
606     let a = Arc::new(oneshot::Packet::new());
607     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
608 }
609
610 /// Creates a new synchronous, bounded channel.
611 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
612 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
613 /// [`Receiver`] will block until a message becomes available. `sync_channel`
614 /// differs greatly in the semantics of the sender, however.
615 ///
616 /// This channel has an internal buffer on which messages will be queued.
617 /// `bound` specifies the buffer size. When the internal buffer becomes full,
618 /// future sends will *block* waiting for the buffer to open up. Note that a
619 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
620 /// where each [`send`] will not return until a [`recv`] is paired with it.
621 ///
622 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
623 /// times, but only one [`Receiver`] is supported.
624 ///
625 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
626 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
627 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
628 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
629 ///
630 /// [`channel`]: fn.channel.html
631 /// [`send`]: struct.SyncSender.html#method.send
632 /// [`recv`]: struct.Receiver.html#method.recv
633 /// [`SyncSender`]: struct.SyncSender.html
634 /// [`Receiver`]: struct.Receiver.html
635 /// [`SendError`]: struct.SendError.html
636 /// [`RecvError`]: struct.RecvError.html
637 ///
638 /// # Examples
639 ///
640 /// ```
641 /// use std::sync::mpsc::sync_channel;
642 /// use std::thread;
643 ///
644 /// let (sender, receiver) = sync_channel(1);
645 ///
646 /// // this returns immediately
647 /// sender.send(1).unwrap();
648 ///
649 /// thread::spawn(move|| {
650 ///     // this will block until the previous message has been received
651 ///     sender.send(2).unwrap();
652 /// });
653 ///
654 /// assert_eq!(receiver.recv().unwrap(), 1);
655 /// assert_eq!(receiver.recv().unwrap(), 2);
656 /// ```
657 #[stable(feature = "rust1", since = "1.0.0")]
658 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
659     let a = Arc::new(sync::Packet::new(bound));
660     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
661 }
662
663 ////////////////////////////////////////////////////////////////////////////////
664 // Sender
665 ////////////////////////////////////////////////////////////////////////////////
666
667 impl<T> Sender<T> {
668     fn new(inner: Flavor<T>) -> Sender<T> {
669         Sender {
670             inner: UnsafeCell::new(inner),
671         }
672     }
673
674     /// Attempts to send a value on this channel, returning it back if it could
675     /// not be sent.
676     ///
677     /// A successful send occurs when it is determined that the other end of
678     /// the channel has not hung up already. An unsuccessful send would be one
679     /// where the corresponding receiver has already been deallocated. Note
680     /// that a return value of [`Err`] means that the data will never be
681     /// received, but a return value of [`Ok`] does *not* mean that the data
682     /// will be received.  It is possible for the corresponding receiver to
683     /// hang up immediately after this function returns [`Ok`].
684     ///
685     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
686     /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
687     ///
688     /// This method will never block the current thread.
689     ///
690     /// # Examples
691     ///
692     /// ```
693     /// use std::sync::mpsc::channel;
694     ///
695     /// let (tx, rx) = channel();
696     ///
697     /// // This send is always successful
698     /// tx.send(1).unwrap();
699     ///
700     /// // This send will fail because the receiver is gone
701     /// drop(rx);
702     /// assert_eq!(tx.send(1).unwrap_err().0, 1);
703     /// ```
704     #[stable(feature = "rust1", since = "1.0.0")]
705     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
706         let (new_inner, ret) = match *unsafe { self.inner() } {
707             Flavor::Oneshot(ref p) => {
708                 if !p.sent() {
709                     return p.send(t).map_err(SendError);
710                 } else {
711                     let a = Arc::new(stream::Packet::new());
712                     let rx = Receiver::new(Flavor::Stream(a.clone()));
713                     match p.upgrade(rx) {
714                         oneshot::UpSuccess => {
715                             let ret = a.send(t);
716                             (a, ret)
717                         }
718                         oneshot::UpDisconnected => (a, Err(t)),
719                         oneshot::UpWoke(token) => {
720                             // This send cannot panic because the thread is
721                             // asleep (we're looking at it), so the receiver
722                             // can't go away.
723                             a.send(t).ok().unwrap();
724                             token.signal();
725                             (a, Ok(()))
726                         }
727                     }
728                 }
729             }
730             Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
731             Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
732             Flavor::Sync(..) => unreachable!(),
733         };
734
735         unsafe {
736             let tmp = Sender::new(Flavor::Stream(new_inner));
737             mem::swap(self.inner_mut(), tmp.inner_mut());
738         }
739         ret.map_err(SendError)
740     }
741 }
742
743 #[stable(feature = "rust1", since = "1.0.0")]
744 impl<T> Clone for Sender<T> {
745     fn clone(&self) -> Sender<T> {
746         let packet = match *unsafe { self.inner() } {
747             Flavor::Oneshot(ref p) => {
748                 let a = Arc::new(shared::Packet::new());
749                 {
750                     let guard = a.postinit_lock();
751                     let rx = Receiver::new(Flavor::Shared(a.clone()));
752                     let sleeper = match p.upgrade(rx) {
753                         oneshot::UpSuccess |
754                         oneshot::UpDisconnected => None,
755                         oneshot::UpWoke(task) => Some(task),
756                     };
757                     a.inherit_blocker(sleeper, guard);
758                 }
759                 a
760             }
761             Flavor::Stream(ref p) => {
762                 let a = Arc::new(shared::Packet::new());
763                 {
764                     let guard = a.postinit_lock();
765                     let rx = Receiver::new(Flavor::Shared(a.clone()));
766                     let sleeper = match p.upgrade(rx) {
767                         stream::UpSuccess |
768                         stream::UpDisconnected => None,
769                         stream::UpWoke(task) => Some(task),
770                     };
771                     a.inherit_blocker(sleeper, guard);
772                 }
773                 a
774             }
775             Flavor::Shared(ref p) => {
776                 p.clone_chan();
777                 return Sender::new(Flavor::Shared(p.clone()));
778             }
779             Flavor::Sync(..) => unreachable!(),
780         };
781
782         unsafe {
783             let tmp = Sender::new(Flavor::Shared(packet.clone()));
784             mem::swap(self.inner_mut(), tmp.inner_mut());
785         }
786         Sender::new(Flavor::Shared(packet))
787     }
788 }
789
790 #[stable(feature = "rust1", since = "1.0.0")]
791 impl<T> Drop for Sender<T> {
792     fn drop(&mut self) {
793         match *unsafe { self.inner() } {
794             Flavor::Oneshot(ref p) => p.drop_chan(),
795             Flavor::Stream(ref p) => p.drop_chan(),
796             Flavor::Shared(ref p) => p.drop_chan(),
797             Flavor::Sync(..) => unreachable!(),
798         }
799     }
800 }
801
802 #[stable(feature = "mpsc_debug", since = "1.7.0")]
803 impl<T> fmt::Debug for Sender<T> {
804     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
805         write!(f, "Sender {{ .. }}")
806     }
807 }
808
809 ////////////////////////////////////////////////////////////////////////////////
810 // SyncSender
811 ////////////////////////////////////////////////////////////////////////////////
812
813 impl<T> SyncSender<T> {
814     fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
815         SyncSender { inner: inner }
816     }
817
818     /// Sends a value on this synchronous channel.
819     ///
820     /// This function will *block* until space in the internal buffer becomes
821     /// available or a receiver is available to hand off the message to.
822     ///
823     /// Note that a successful send does *not* guarantee that the receiver will
824     /// ever see the data if there is a buffer on this channel. Items may be
825     /// enqueued in the internal buffer for the receiver to receive at a later
826     /// time. If the buffer size is 0, however, it can be guaranteed that the
827     /// receiver has indeed received the data if this function returns success.
828     ///
829     /// This function will never panic, but it may return [`Err`] if the
830     /// [`Receiver`] has disconnected and is no longer able to receive
831     /// information.
832     ///
833     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
834     /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
835     #[stable(feature = "rust1", since = "1.0.0")]
836     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
837         self.inner.send(t).map_err(SendError)
838     }
839
840     /// Attempts to send a value on this channel without blocking.
841     ///
842     /// This method differs from [`send`] by returning immediately if the
843     /// channel's buffer is full or no receiver is waiting to acquire some
844     /// data. Compared with [`send`], this function has two failure cases
845     /// instead of one (one for disconnection, one for a full buffer).
846     ///
847     /// See [`SyncSender::send`] for notes about guarantees of whether the
848     /// receiver has received the data or not if this function is successful.
849     ///
850     /// [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
851     /// [`SyncSender::send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
852     #[stable(feature = "rust1", since = "1.0.0")]
853     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
854         self.inner.try_send(t)
855     }
856 }
857
858 #[stable(feature = "rust1", since = "1.0.0")]
859 impl<T> Clone for SyncSender<T> {
860     fn clone(&self) -> SyncSender<T> {
861         self.inner.clone_chan();
862         SyncSender::new(self.inner.clone())
863     }
864 }
865
866 #[stable(feature = "rust1", since = "1.0.0")]
867 impl<T> Drop for SyncSender<T> {
868     fn drop(&mut self) {
869         self.inner.drop_chan();
870     }
871 }
872
873 #[stable(feature = "mpsc_debug", since = "1.7.0")]
874 impl<T> fmt::Debug for SyncSender<T> {
875     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
876         write!(f, "SyncSender {{ .. }}")
877     }
878 }
879
880 ////////////////////////////////////////////////////////////////////////////////
881 // Receiver
882 ////////////////////////////////////////////////////////////////////////////////
883
884 impl<T> Receiver<T> {
885     fn new(inner: Flavor<T>) -> Receiver<T> {
886         Receiver { inner: UnsafeCell::new(inner) }
887     }
888
889     /// Attempts to return a pending value on this receiver without blocking
890     ///
891     /// This method will never block the caller in order to wait for data to
892     /// become available. Instead, this will always return immediately with a
893     /// possible option of pending data on the channel.
894     ///
895     /// This is useful for a flavor of "optimistic check" before deciding to
896     /// block on a receiver.
897     #[stable(feature = "rust1", since = "1.0.0")]
898     pub fn try_recv(&self) -> Result<T, TryRecvError> {
899         loop {
900             let new_port = match *unsafe { self.inner() } {
901                 Flavor::Oneshot(ref p) => {
902                     match p.try_recv() {
903                         Ok(t) => return Ok(t),
904                         Err(oneshot::Empty) => return Err(TryRecvError::Empty),
905                         Err(oneshot::Disconnected) => {
906                             return Err(TryRecvError::Disconnected)
907                         }
908                         Err(oneshot::Upgraded(rx)) => rx,
909                     }
910                 }
911                 Flavor::Stream(ref p) => {
912                     match p.try_recv() {
913                         Ok(t) => return Ok(t),
914                         Err(stream::Empty) => return Err(TryRecvError::Empty),
915                         Err(stream::Disconnected) => {
916                             return Err(TryRecvError::Disconnected)
917                         }
918                         Err(stream::Upgraded(rx)) => rx,
919                     }
920                 }
921                 Flavor::Shared(ref p) => {
922                     match p.try_recv() {
923                         Ok(t) => return Ok(t),
924                         Err(shared::Empty) => return Err(TryRecvError::Empty),
925                         Err(shared::Disconnected) => {
926                             return Err(TryRecvError::Disconnected)
927                         }
928                     }
929                 }
930                 Flavor::Sync(ref p) => {
931                     match p.try_recv() {
932                         Ok(t) => return Ok(t),
933                         Err(sync::Empty) => return Err(TryRecvError::Empty),
934                         Err(sync::Disconnected) => {
935                             return Err(TryRecvError::Disconnected)
936                         }
937                     }
938                 }
939             };
940             unsafe {
941                 mem::swap(self.inner_mut(),
942                           new_port.inner_mut());
943             }
944         }
945     }
946
947     /// Attempts to wait for a value on this receiver, returning an error if the
948     /// corresponding channel has hung up.
949     ///
950     /// This function will always block the current thread if there is no data
951     /// available and it's possible for more data to be sent. Once a message is
952     /// sent to the corresponding [`Sender`], then this receiver will wake up and
953     /// return that message.
954     ///
955     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
956     /// this call is blocking, this call will wake up and return [`Err`] to
957     /// indicate that no more messages can ever be received on this channel.
958     /// However, since channels are buffered, messages sent before the disconnect
959     /// will still be properly received.
960     ///
961     /// [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
962     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
963     ///
964     /// # Examples
965     ///
966     /// ```
967     /// use std::sync::mpsc;
968     /// use std::thread;
969     ///
970     /// let (send, recv) = mpsc::channel();
971     /// let handle = thread::spawn(move || {
972     ///     send.send(1u8).unwrap();
973     /// });
974     ///
975     /// handle.join().unwrap();
976     ///
977     /// assert_eq!(Ok(1), recv.recv());
978     /// ```
979     ///
980     /// Buffering behavior:
981     ///
982     /// ```
983     /// use std::sync::mpsc;
984     /// use std::thread;
985     /// use std::sync::mpsc::RecvError;
986     ///
987     /// let (send, recv) = mpsc::channel();
988     /// let handle = thread::spawn(move || {
989     ///     send.send(1u8).unwrap();
990     ///     send.send(2).unwrap();
991     ///     send.send(3).unwrap();
992     ///     drop(send);
993     /// });
994     ///
995     /// // wait for the thread to join so we ensure the sender is dropped
996     /// handle.join().unwrap();
997     ///
998     /// assert_eq!(Ok(1), recv.recv());
999     /// assert_eq!(Ok(2), recv.recv());
1000     /// assert_eq!(Ok(3), recv.recv());
1001     /// assert_eq!(Err(RecvError), recv.recv());
1002     /// ```
1003     #[stable(feature = "rust1", since = "1.0.0")]
1004     pub fn recv(&self) -> Result<T, RecvError> {
1005         loop {
1006             let new_port = match *unsafe { self.inner() } {
1007                 Flavor::Oneshot(ref p) => {
1008                     match p.recv(None) {
1009                         Ok(t) => return Ok(t),
1010                         Err(oneshot::Disconnected) => return Err(RecvError),
1011                         Err(oneshot::Upgraded(rx)) => rx,
1012                         Err(oneshot::Empty) => unreachable!(),
1013                     }
1014                 }
1015                 Flavor::Stream(ref p) => {
1016                     match p.recv(None) {
1017                         Ok(t) => return Ok(t),
1018                         Err(stream::Disconnected) => return Err(RecvError),
1019                         Err(stream::Upgraded(rx)) => rx,
1020                         Err(stream::Empty) => unreachable!(),
1021                     }
1022                 }
1023                 Flavor::Shared(ref p) => {
1024                     match p.recv(None) {
1025                         Ok(t) => return Ok(t),
1026                         Err(shared::Disconnected) => return Err(RecvError),
1027                         Err(shared::Empty) => unreachable!(),
1028                     }
1029                 }
1030                 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1031             };
1032             unsafe {
1033                 mem::swap(self.inner_mut(), new_port.inner_mut());
1034             }
1035         }
1036     }
1037
1038     /// Attempts to wait for a value on this receiver, returning an error if the
1039     /// corresponding channel has hung up, or if it waits more than `timeout`.
1040     ///
1041     /// This function will always block the current thread if there is no data
1042     /// available and it's possible for more data to be sent. Once a message is
1043     /// sent to the corresponding [`Sender`], then this receiver will wake up and
1044     /// return that message.
1045     ///
1046     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1047     /// this call is blocking, this call will wake up and return [`Err`] to
1048     /// indicate that no more messages can ever be received on this channel.
1049     /// However, since channels are buffered, messages sent before the disconnect
1050     /// will still be properly received.
1051     ///
1052     /// [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
1053     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1054     ///
1055     /// # Examples
1056     ///
1057     /// ```no_run
1058     /// use std::sync::mpsc::{self, RecvTimeoutError};
1059     /// use std::time::Duration;
1060     ///
1061     /// let (send, recv) = mpsc::channel::<()>();
1062     ///
1063     /// let timeout = Duration::from_millis(100);
1064     /// assert_eq!(Err(RecvTimeoutError::Timeout), recv.recv_timeout(timeout));
1065     /// ```
1066     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1067     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1068         // Do an optimistic try_recv to avoid the performance impact of
1069         // Instant::now() in the full-channel case.
1070         match self.try_recv() {
1071             Ok(result)
1072                 => Ok(result),
1073             Err(TryRecvError::Disconnected)
1074                 => Err(RecvTimeoutError::Disconnected),
1075             Err(TryRecvError::Empty)
1076                 => self.recv_max_until(Instant::now() + timeout)
1077         }
1078     }
1079
1080     fn recv_max_until(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1081         use self::RecvTimeoutError::*;
1082
1083         loop {
1084             let port_or_empty = match *unsafe { self.inner() } {
1085                 Flavor::Oneshot(ref p) => {
1086                     match p.recv(Some(deadline)) {
1087                         Ok(t) => return Ok(t),
1088                         Err(oneshot::Disconnected) => return Err(Disconnected),
1089                         Err(oneshot::Upgraded(rx)) => Some(rx),
1090                         Err(oneshot::Empty) => None,
1091                     }
1092                 }
1093                 Flavor::Stream(ref p) => {
1094                     match p.recv(Some(deadline)) {
1095                         Ok(t) => return Ok(t),
1096                         Err(stream::Disconnected) => return Err(Disconnected),
1097                         Err(stream::Upgraded(rx)) => Some(rx),
1098                         Err(stream::Empty) => None,
1099                     }
1100                 }
1101                 Flavor::Shared(ref p) => {
1102                     match p.recv(Some(deadline)) {
1103                         Ok(t) => return Ok(t),
1104                         Err(shared::Disconnected) => return Err(Disconnected),
1105                         Err(shared::Empty) => None,
1106                     }
1107                 }
1108                 Flavor::Sync(ref p) => {
1109                     match p.recv(Some(deadline)) {
1110                         Ok(t) => return Ok(t),
1111                         Err(sync::Disconnected) => return Err(Disconnected),
1112                         Err(sync::Empty) => None,
1113                     }
1114                 }
1115             };
1116
1117             if let Some(new_port) = port_or_empty {
1118                 unsafe {
1119                     mem::swap(self.inner_mut(), new_port.inner_mut());
1120                 }
1121             }
1122
1123             // If we're already passed the deadline, and we're here without
1124             // data, return a timeout, else try again.
1125             if Instant::now() >= deadline {
1126                 return Err(Timeout);
1127             }
1128         }
1129     }
1130
1131     /// Returns an iterator that will block waiting for messages, but never
1132     /// [`panic!`]. It will return [`None`] when the channel has hung up.
1133     ///
1134     /// [`panic!`]: ../../../std/macro.panic.html
1135     /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1136     ///
1137     /// # Examples
1138     ///
1139     /// ```rust
1140     /// use std::sync::mpsc::channel;
1141     /// use std::thread;
1142     ///
1143     /// let (send, recv) = channel();
1144     ///
1145     /// thread::spawn(move || {
1146     ///     send.send(1u8).unwrap();
1147     ///     send.send(2u8).unwrap();
1148     ///     send.send(3u8).unwrap();
1149     /// });
1150     ///
1151     /// for x in recv.iter() {
1152     ///     println!("Got: {}", x);
1153     /// }
1154     /// ```
1155     #[stable(feature = "rust1", since = "1.0.0")]
1156     pub fn iter(&self) -> Iter<T> {
1157         Iter { rx: self }
1158     }
1159
1160     /// Returns an iterator that will attempt to yield all pending values.
1161     /// It will return `None` if there are no more pending values or if the
1162     /// channel has hung up. The iterator will never [`panic!`] or block the
1163     /// user by waiting for values.
1164     ///
1165     /// [`panic!`]: ../../../std/macro.panic.html
1166     #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1167     pub fn try_iter(&self) -> TryIter<T> {
1168         TryIter { rx: self }
1169     }
1170
1171 }
1172
1173 impl<T> select::Packet for Receiver<T> {
1174     fn can_recv(&self) -> bool {
1175         loop {
1176             let new_port = match *unsafe { self.inner() } {
1177                 Flavor::Oneshot(ref p) => {
1178                     match p.can_recv() {
1179                         Ok(ret) => return ret,
1180                         Err(upgrade) => upgrade,
1181                     }
1182                 }
1183                 Flavor::Stream(ref p) => {
1184                     match p.can_recv() {
1185                         Ok(ret) => return ret,
1186                         Err(upgrade) => upgrade,
1187                     }
1188                 }
1189                 Flavor::Shared(ref p) => return p.can_recv(),
1190                 Flavor::Sync(ref p) => return p.can_recv(),
1191             };
1192             unsafe {
1193                 mem::swap(self.inner_mut(),
1194                           new_port.inner_mut());
1195             }
1196         }
1197     }
1198
1199     fn start_selection(&self, mut token: SignalToken) -> StartResult {
1200         loop {
1201             let (t, new_port) = match *unsafe { self.inner() } {
1202                 Flavor::Oneshot(ref p) => {
1203                     match p.start_selection(token) {
1204                         oneshot::SelSuccess => return Installed,
1205                         oneshot::SelCanceled => return Abort,
1206                         oneshot::SelUpgraded(t, rx) => (t, rx),
1207                     }
1208                 }
1209                 Flavor::Stream(ref p) => {
1210                     match p.start_selection(token) {
1211                         stream::SelSuccess => return Installed,
1212                         stream::SelCanceled => return Abort,
1213                         stream::SelUpgraded(t, rx) => (t, rx),
1214                     }
1215                 }
1216                 Flavor::Shared(ref p) => return p.start_selection(token),
1217                 Flavor::Sync(ref p) => return p.start_selection(token),
1218             };
1219             token = t;
1220             unsafe {
1221                 mem::swap(self.inner_mut(), new_port.inner_mut());
1222             }
1223         }
1224     }
1225
1226     fn abort_selection(&self) -> bool {
1227         let mut was_upgrade = false;
1228         loop {
1229             let result = match *unsafe { self.inner() } {
1230                 Flavor::Oneshot(ref p) => p.abort_selection(),
1231                 Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
1232                 Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
1233                 Flavor::Sync(ref p) => return p.abort_selection(),
1234             };
1235             let new_port = match result { Ok(b) => return b, Err(p) => p };
1236             was_upgrade = true;
1237             unsafe {
1238                 mem::swap(self.inner_mut(),
1239                           new_port.inner_mut());
1240             }
1241         }
1242     }
1243 }
1244
1245 #[stable(feature = "rust1", since = "1.0.0")]
1246 impl<'a, T> Iterator for Iter<'a, T> {
1247     type Item = T;
1248
1249     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1250 }
1251
1252 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1253 impl<'a, T> Iterator for TryIter<'a, T> {
1254     type Item = T;
1255
1256     fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
1257 }
1258
1259 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1260 impl<'a, T> IntoIterator for &'a Receiver<T> {
1261     type Item = T;
1262     type IntoIter = Iter<'a, T>;
1263
1264     fn into_iter(self) -> Iter<'a, T> { self.iter() }
1265 }
1266
1267 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1268 impl<T> Iterator for IntoIter<T> {
1269     type Item = T;
1270     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1271 }
1272
1273 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1274 impl <T> IntoIterator for Receiver<T> {
1275     type Item = T;
1276     type IntoIter = IntoIter<T>;
1277
1278     fn into_iter(self) -> IntoIter<T> {
1279         IntoIter { rx: self }
1280     }
1281 }
1282
1283 #[stable(feature = "rust1", since = "1.0.0")]
1284 impl<T> Drop for Receiver<T> {
1285     fn drop(&mut self) {
1286         match *unsafe { self.inner() } {
1287             Flavor::Oneshot(ref p) => p.drop_port(),
1288             Flavor::Stream(ref p) => p.drop_port(),
1289             Flavor::Shared(ref p) => p.drop_port(),
1290             Flavor::Sync(ref p) => p.drop_port(),
1291         }
1292     }
1293 }
1294
1295 #[stable(feature = "mpsc_debug", since = "1.7.0")]
1296 impl<T> fmt::Debug for Receiver<T> {
1297     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1298         write!(f, "Receiver {{ .. }}")
1299     }
1300 }
1301
1302 #[stable(feature = "rust1", since = "1.0.0")]
1303 impl<T> fmt::Debug for SendError<T> {
1304     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1305         "SendError(..)".fmt(f)
1306     }
1307 }
1308
1309 #[stable(feature = "rust1", since = "1.0.0")]
1310 impl<T> fmt::Display for SendError<T> {
1311     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1312         "sending on a closed channel".fmt(f)
1313     }
1314 }
1315
1316 #[stable(feature = "rust1", since = "1.0.0")]
1317 impl<T: Send> error::Error for SendError<T> {
1318     fn description(&self) -> &str {
1319         "sending on a closed channel"
1320     }
1321
1322     fn cause(&self) -> Option<&error::Error> {
1323         None
1324     }
1325 }
1326
1327 #[stable(feature = "rust1", since = "1.0.0")]
1328 impl<T> fmt::Debug for TrySendError<T> {
1329     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1330         match *self {
1331             TrySendError::Full(..) => "Full(..)".fmt(f),
1332             TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1333         }
1334     }
1335 }
1336
1337 #[stable(feature = "rust1", since = "1.0.0")]
1338 impl<T> fmt::Display for TrySendError<T> {
1339     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1340         match *self {
1341             TrySendError::Full(..) => {
1342                 "sending on a full channel".fmt(f)
1343             }
1344             TrySendError::Disconnected(..) => {
1345                 "sending on a closed channel".fmt(f)
1346             }
1347         }
1348     }
1349 }
1350
1351 #[stable(feature = "rust1", since = "1.0.0")]
1352 impl<T: Send> error::Error for TrySendError<T> {
1353
1354     fn description(&self) -> &str {
1355         match *self {
1356             TrySendError::Full(..) => {
1357                 "sending on a full channel"
1358             }
1359             TrySendError::Disconnected(..) => {
1360                 "sending on a closed channel"
1361             }
1362         }
1363     }
1364
1365     fn cause(&self) -> Option<&error::Error> {
1366         None
1367     }
1368 }
1369
1370 #[stable(feature = "rust1", since = "1.0.0")]
1371 impl fmt::Display for RecvError {
1372     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1373         "receiving on a closed channel".fmt(f)
1374     }
1375 }
1376
1377 #[stable(feature = "rust1", since = "1.0.0")]
1378 impl error::Error for RecvError {
1379
1380     fn description(&self) -> &str {
1381         "receiving on a closed channel"
1382     }
1383
1384     fn cause(&self) -> Option<&error::Error> {
1385         None
1386     }
1387 }
1388
1389 #[stable(feature = "rust1", since = "1.0.0")]
1390 impl fmt::Display for TryRecvError {
1391     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1392         match *self {
1393             TryRecvError::Empty => {
1394                 "receiving on an empty channel".fmt(f)
1395             }
1396             TryRecvError::Disconnected => {
1397                 "receiving on a closed channel".fmt(f)
1398             }
1399         }
1400     }
1401 }
1402
1403 #[stable(feature = "rust1", since = "1.0.0")]
1404 impl error::Error for TryRecvError {
1405
1406     fn description(&self) -> &str {
1407         match *self {
1408             TryRecvError::Empty => {
1409                 "receiving on an empty channel"
1410             }
1411             TryRecvError::Disconnected => {
1412                 "receiving on a closed channel"
1413             }
1414         }
1415     }
1416
1417     fn cause(&self) -> Option<&error::Error> {
1418         None
1419     }
1420 }
1421
1422 #[stable(feature = "mpsc_recv_timeout_error", since = "1.14.0")]
1423 impl fmt::Display for RecvTimeoutError {
1424     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1425         match *self {
1426             RecvTimeoutError::Timeout => {
1427                 "timed out waiting on channel".fmt(f)
1428             }
1429             RecvTimeoutError::Disconnected => {
1430                 "channel is empty and sending half is closed".fmt(f)
1431             }
1432         }
1433     }
1434 }
1435
1436 #[stable(feature = "mpsc_recv_timeout_error", since = "1.14.0")]
1437 impl error::Error for RecvTimeoutError {
1438     fn description(&self) -> &str {
1439         match *self {
1440             RecvTimeoutError::Timeout => {
1441                 "timed out waiting on channel"
1442             }
1443             RecvTimeoutError::Disconnected => {
1444                 "channel is empty and sending half is closed"
1445             }
1446         }
1447     }
1448
1449     fn cause(&self) -> Option<&error::Error> {
1450         None
1451     }
1452 }
1453
1454 #[cfg(all(test, not(target_os = "emscripten")))]
1455 mod tests {
1456     use env;
1457     use super::*;
1458     use thread;
1459     use time::{Duration, Instant};
1460
1461     pub fn stress_factor() -> usize {
1462         match env::var("RUST_TEST_STRESS") {
1463             Ok(val) => val.parse().unwrap(),
1464             Err(..) => 1,
1465         }
1466     }
1467
1468     #[test]
1469     fn smoke() {
1470         let (tx, rx) = channel::<i32>();
1471         tx.send(1).unwrap();
1472         assert_eq!(rx.recv().unwrap(), 1);
1473     }
1474
1475     #[test]
1476     fn drop_full() {
1477         let (tx, _rx) = channel::<Box<isize>>();
1478         tx.send(box 1).unwrap();
1479     }
1480
1481     #[test]
1482     fn drop_full_shared() {
1483         let (tx, _rx) = channel::<Box<isize>>();
1484         drop(tx.clone());
1485         drop(tx.clone());
1486         tx.send(box 1).unwrap();
1487     }
1488
1489     #[test]
1490     fn smoke_shared() {
1491         let (tx, rx) = channel::<i32>();
1492         tx.send(1).unwrap();
1493         assert_eq!(rx.recv().unwrap(), 1);
1494         let tx = tx.clone();
1495         tx.send(1).unwrap();
1496         assert_eq!(rx.recv().unwrap(), 1);
1497     }
1498
1499     #[test]
1500     fn smoke_threads() {
1501         let (tx, rx) = channel::<i32>();
1502         let _t = thread::spawn(move|| {
1503             tx.send(1).unwrap();
1504         });
1505         assert_eq!(rx.recv().unwrap(), 1);
1506     }
1507
1508     #[test]
1509     fn smoke_port_gone() {
1510         let (tx, rx) = channel::<i32>();
1511         drop(rx);
1512         assert!(tx.send(1).is_err());
1513     }
1514
1515     #[test]
1516     fn smoke_shared_port_gone() {
1517         let (tx, rx) = channel::<i32>();
1518         drop(rx);
1519         assert!(tx.send(1).is_err())
1520     }
1521
1522     #[test]
1523     fn smoke_shared_port_gone2() {
1524         let (tx, rx) = channel::<i32>();
1525         drop(rx);
1526         let tx2 = tx.clone();
1527         drop(tx);
1528         assert!(tx2.send(1).is_err());
1529     }
1530
1531     #[test]
1532     fn port_gone_concurrent() {
1533         let (tx, rx) = channel::<i32>();
1534         let _t = thread::spawn(move|| {
1535             rx.recv().unwrap();
1536         });
1537         while tx.send(1).is_ok() {}
1538     }
1539
1540     #[test]
1541     fn port_gone_concurrent_shared() {
1542         let (tx, rx) = channel::<i32>();
1543         let tx2 = tx.clone();
1544         let _t = thread::spawn(move|| {
1545             rx.recv().unwrap();
1546         });
1547         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1548     }
1549
1550     #[test]
1551     fn smoke_chan_gone() {
1552         let (tx, rx) = channel::<i32>();
1553         drop(tx);
1554         assert!(rx.recv().is_err());
1555     }
1556
1557     #[test]
1558     fn smoke_chan_gone_shared() {
1559         let (tx, rx) = channel::<()>();
1560         let tx2 = tx.clone();
1561         drop(tx);
1562         drop(tx2);
1563         assert!(rx.recv().is_err());
1564     }
1565
1566     #[test]
1567     fn chan_gone_concurrent() {
1568         let (tx, rx) = channel::<i32>();
1569         let _t = thread::spawn(move|| {
1570             tx.send(1).unwrap();
1571             tx.send(1).unwrap();
1572         });
1573         while rx.recv().is_ok() {}
1574     }
1575
1576     #[test]
1577     fn stress() {
1578         let (tx, rx) = channel::<i32>();
1579         let t = thread::spawn(move|| {
1580             for _ in 0..10000 { tx.send(1).unwrap(); }
1581         });
1582         for _ in 0..10000 {
1583             assert_eq!(rx.recv().unwrap(), 1);
1584         }
1585         t.join().ok().unwrap();
1586     }
1587
1588     #[test]
1589     fn stress_shared() {
1590         const AMT: u32 = 10000;
1591         const NTHREADS: u32 = 8;
1592         let (tx, rx) = channel::<i32>();
1593
1594         let t = thread::spawn(move|| {
1595             for _ in 0..AMT * NTHREADS {
1596                 assert_eq!(rx.recv().unwrap(), 1);
1597             }
1598             match rx.try_recv() {
1599                 Ok(..) => panic!(),
1600                 _ => {}
1601             }
1602         });
1603
1604         for _ in 0..NTHREADS {
1605             let tx = tx.clone();
1606             thread::spawn(move|| {
1607                 for _ in 0..AMT { tx.send(1).unwrap(); }
1608             });
1609         }
1610         drop(tx);
1611         t.join().ok().unwrap();
1612     }
1613
1614     #[test]
1615     fn send_from_outside_runtime() {
1616         let (tx1, rx1) = channel::<()>();
1617         let (tx2, rx2) = channel::<i32>();
1618         let t1 = thread::spawn(move|| {
1619             tx1.send(()).unwrap();
1620             for _ in 0..40 {
1621                 assert_eq!(rx2.recv().unwrap(), 1);
1622             }
1623         });
1624         rx1.recv().unwrap();
1625         let t2 = thread::spawn(move|| {
1626             for _ in 0..40 {
1627                 tx2.send(1).unwrap();
1628             }
1629         });
1630         t1.join().ok().unwrap();
1631         t2.join().ok().unwrap();
1632     }
1633
1634     #[test]
1635     fn recv_from_outside_runtime() {
1636         let (tx, rx) = channel::<i32>();
1637         let t = thread::spawn(move|| {
1638             for _ in 0..40 {
1639                 assert_eq!(rx.recv().unwrap(), 1);
1640             }
1641         });
1642         for _ in 0..40 {
1643             tx.send(1).unwrap();
1644         }
1645         t.join().ok().unwrap();
1646     }
1647
1648     #[test]
1649     fn no_runtime() {
1650         let (tx1, rx1) = channel::<i32>();
1651         let (tx2, rx2) = channel::<i32>();
1652         let t1 = thread::spawn(move|| {
1653             assert_eq!(rx1.recv().unwrap(), 1);
1654             tx2.send(2).unwrap();
1655         });
1656         let t2 = thread::spawn(move|| {
1657             tx1.send(1).unwrap();
1658             assert_eq!(rx2.recv().unwrap(), 2);
1659         });
1660         t1.join().ok().unwrap();
1661         t2.join().ok().unwrap();
1662     }
1663
1664     #[test]
1665     fn oneshot_single_thread_close_port_first() {
1666         // Simple test of closing without sending
1667         let (_tx, rx) = channel::<i32>();
1668         drop(rx);
1669     }
1670
1671     #[test]
1672     fn oneshot_single_thread_close_chan_first() {
1673         // Simple test of closing without sending
1674         let (tx, _rx) = channel::<i32>();
1675         drop(tx);
1676     }
1677
1678     #[test]
1679     fn oneshot_single_thread_send_port_close() {
1680         // Testing that the sender cleans up the payload if receiver is closed
1681         let (tx, rx) = channel::<Box<i32>>();
1682         drop(rx);
1683         assert!(tx.send(box 0).is_err());
1684     }
1685
1686     #[test]
1687     fn oneshot_single_thread_recv_chan_close() {
1688         // Receiving on a closed chan will panic
1689         let res = thread::spawn(move|| {
1690             let (tx, rx) = channel::<i32>();
1691             drop(tx);
1692             rx.recv().unwrap();
1693         }).join();
1694         // What is our res?
1695         assert!(res.is_err());
1696     }
1697
1698     #[test]
1699     fn oneshot_single_thread_send_then_recv() {
1700         let (tx, rx) = channel::<Box<i32>>();
1701         tx.send(box 10).unwrap();
1702         assert!(rx.recv().unwrap() == box 10);
1703     }
1704
1705     #[test]
1706     fn oneshot_single_thread_try_send_open() {
1707         let (tx, rx) = channel::<i32>();
1708         assert!(tx.send(10).is_ok());
1709         assert!(rx.recv().unwrap() == 10);
1710     }
1711
1712     #[test]
1713     fn oneshot_single_thread_try_send_closed() {
1714         let (tx, rx) = channel::<i32>();
1715         drop(rx);
1716         assert!(tx.send(10).is_err());
1717     }
1718
1719     #[test]
1720     fn oneshot_single_thread_try_recv_open() {
1721         let (tx, rx) = channel::<i32>();
1722         tx.send(10).unwrap();
1723         assert!(rx.recv() == Ok(10));
1724     }
1725
1726     #[test]
1727     fn oneshot_single_thread_try_recv_closed() {
1728         let (tx, rx) = channel::<i32>();
1729         drop(tx);
1730         assert!(rx.recv().is_err());
1731     }
1732
1733     #[test]
1734     fn oneshot_single_thread_peek_data() {
1735         let (tx, rx) = channel::<i32>();
1736         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1737         tx.send(10).unwrap();
1738         assert_eq!(rx.try_recv(), Ok(10));
1739     }
1740
1741     #[test]
1742     fn oneshot_single_thread_peek_close() {
1743         let (tx, rx) = channel::<i32>();
1744         drop(tx);
1745         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1746         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1747     }
1748
1749     #[test]
1750     fn oneshot_single_thread_peek_open() {
1751         let (_tx, rx) = channel::<i32>();
1752         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1753     }
1754
1755     #[test]
1756     fn oneshot_multi_task_recv_then_send() {
1757         let (tx, rx) = channel::<Box<i32>>();
1758         let _t = thread::spawn(move|| {
1759             assert!(rx.recv().unwrap() == box 10);
1760         });
1761
1762         tx.send(box 10).unwrap();
1763     }
1764
1765     #[test]
1766     fn oneshot_multi_task_recv_then_close() {
1767         let (tx, rx) = channel::<Box<i32>>();
1768         let _t = thread::spawn(move|| {
1769             drop(tx);
1770         });
1771         let res = thread::spawn(move|| {
1772             assert!(rx.recv().unwrap() == box 10);
1773         }).join();
1774         assert!(res.is_err());
1775     }
1776
1777     #[test]
1778     fn oneshot_multi_thread_close_stress() {
1779         for _ in 0..stress_factor() {
1780             let (tx, rx) = channel::<i32>();
1781             let _t = thread::spawn(move|| {
1782                 drop(rx);
1783             });
1784             drop(tx);
1785         }
1786     }
1787
1788     #[test]
1789     fn oneshot_multi_thread_send_close_stress() {
1790         for _ in 0..stress_factor() {
1791             let (tx, rx) = channel::<i32>();
1792             let _t = thread::spawn(move|| {
1793                 drop(rx);
1794             });
1795             let _ = thread::spawn(move|| {
1796                 tx.send(1).unwrap();
1797             }).join();
1798         }
1799     }
1800
1801     #[test]
1802     fn oneshot_multi_thread_recv_close_stress() {
1803         for _ in 0..stress_factor() {
1804             let (tx, rx) = channel::<i32>();
1805             thread::spawn(move|| {
1806                 let res = thread::spawn(move|| {
1807                     rx.recv().unwrap();
1808                 }).join();
1809                 assert!(res.is_err());
1810             });
1811             let _t = thread::spawn(move|| {
1812                 thread::spawn(move|| {
1813                     drop(tx);
1814                 });
1815             });
1816         }
1817     }
1818
1819     #[test]
1820     fn oneshot_multi_thread_send_recv_stress() {
1821         for _ in 0..stress_factor() {
1822             let (tx, rx) = channel::<Box<isize>>();
1823             let _t = thread::spawn(move|| {
1824                 tx.send(box 10).unwrap();
1825             });
1826             assert!(rx.recv().unwrap() == box 10);
1827         }
1828     }
1829
1830     #[test]
1831     fn stream_send_recv_stress() {
1832         for _ in 0..stress_factor() {
1833             let (tx, rx) = channel();
1834
1835             send(tx, 0);
1836             recv(rx, 0);
1837
1838             fn send(tx: Sender<Box<i32>>, i: i32) {
1839                 if i == 10 { return }
1840
1841                 thread::spawn(move|| {
1842                     tx.send(box i).unwrap();
1843                     send(tx, i + 1);
1844                 });
1845             }
1846
1847             fn recv(rx: Receiver<Box<i32>>, i: i32) {
1848                 if i == 10 { return }
1849
1850                 thread::spawn(move|| {
1851                     assert!(rx.recv().unwrap() == box i);
1852                     recv(rx, i + 1);
1853                 });
1854             }
1855         }
1856     }
1857
1858     #[test]
1859     fn oneshot_single_thread_recv_timeout() {
1860         let (tx, rx) = channel();
1861         tx.send(()).unwrap();
1862         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
1863         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
1864         tx.send(()).unwrap();
1865         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
1866     }
1867
1868     #[test]
1869     fn stress_recv_timeout_two_threads() {
1870         let (tx, rx) = channel();
1871         let stress = stress_factor() + 100;
1872         let timeout = Duration::from_millis(100);
1873
1874         thread::spawn(move || {
1875             for i in 0..stress {
1876                 if i % 2 == 0 {
1877                     thread::sleep(timeout * 2);
1878                 }
1879                 tx.send(1usize).unwrap();
1880             }
1881         });
1882
1883         let mut recv_count = 0;
1884         loop {
1885             match rx.recv_timeout(timeout) {
1886                 Ok(n) => {
1887                     assert_eq!(n, 1usize);
1888                     recv_count += 1;
1889                 }
1890                 Err(RecvTimeoutError::Timeout) => continue,
1891                 Err(RecvTimeoutError::Disconnected) => break,
1892             }
1893         }
1894
1895         assert_eq!(recv_count, stress);
1896     }
1897
1898     #[test]
1899     fn recv_timeout_upgrade() {
1900         let (tx, rx) = channel::<()>();
1901         let timeout = Duration::from_millis(1);
1902         let _tx_clone = tx.clone();
1903
1904         let start = Instant::now();
1905         assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
1906         assert!(Instant::now() >= start + timeout);
1907     }
1908
1909     #[test]
1910     fn stress_recv_timeout_shared() {
1911         let (tx, rx) = channel();
1912         let stress = stress_factor() + 100;
1913
1914         for i in 0..stress {
1915             let tx = tx.clone();
1916             thread::spawn(move || {
1917                 thread::sleep(Duration::from_millis(i as u64 * 10));
1918                 tx.send(1usize).unwrap();
1919             });
1920         }
1921
1922         drop(tx);
1923
1924         let mut recv_count = 0;
1925         loop {
1926             match rx.recv_timeout(Duration::from_millis(10)) {
1927                 Ok(n) => {
1928                     assert_eq!(n, 1usize);
1929                     recv_count += 1;
1930                 }
1931                 Err(RecvTimeoutError::Timeout) => continue,
1932                 Err(RecvTimeoutError::Disconnected) => break,
1933             }
1934         }
1935
1936         assert_eq!(recv_count, stress);
1937     }
1938
1939     #[test]
1940     fn recv_a_lot() {
1941         // Regression test that we don't run out of stack in scheduler context
1942         let (tx, rx) = channel();
1943         for _ in 0..10000 { tx.send(()).unwrap(); }
1944         for _ in 0..10000 { rx.recv().unwrap(); }
1945     }
1946
1947     #[test]
1948     fn shared_recv_timeout() {
1949         let (tx, rx) = channel();
1950         let total = 5;
1951         for _ in 0..total {
1952             let tx = tx.clone();
1953             thread::spawn(move|| {
1954                 tx.send(()).unwrap();
1955             });
1956         }
1957
1958         for _ in 0..total { rx.recv().unwrap(); }
1959
1960         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
1961         tx.send(()).unwrap();
1962         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
1963     }
1964
1965     #[test]
1966     fn shared_chan_stress() {
1967         let (tx, rx) = channel();
1968         let total = stress_factor() + 100;
1969         for _ in 0..total {
1970             let tx = tx.clone();
1971             thread::spawn(move|| {
1972                 tx.send(()).unwrap();
1973             });
1974         }
1975
1976         for _ in 0..total {
1977             rx.recv().unwrap();
1978         }
1979     }
1980
1981     #[test]
1982     fn test_nested_recv_iter() {
1983         let (tx, rx) = channel::<i32>();
1984         let (total_tx, total_rx) = channel::<i32>();
1985
1986         let _t = thread::spawn(move|| {
1987             let mut acc = 0;
1988             for x in rx.iter() {
1989                 acc += x;
1990             }
1991             total_tx.send(acc).unwrap();
1992         });
1993
1994         tx.send(3).unwrap();
1995         tx.send(1).unwrap();
1996         tx.send(2).unwrap();
1997         drop(tx);
1998         assert_eq!(total_rx.recv().unwrap(), 6);
1999     }
2000
2001     #[test]
2002     fn test_recv_iter_break() {
2003         let (tx, rx) = channel::<i32>();
2004         let (count_tx, count_rx) = channel();
2005
2006         let _t = thread::spawn(move|| {
2007             let mut count = 0;
2008             for x in rx.iter() {
2009                 if count >= 3 {
2010                     break;
2011                 } else {
2012                     count += x;
2013                 }
2014             }
2015             count_tx.send(count).unwrap();
2016         });
2017
2018         tx.send(2).unwrap();
2019         tx.send(2).unwrap();
2020         tx.send(2).unwrap();
2021         let _ = tx.send(2);
2022         drop(tx);
2023         assert_eq!(count_rx.recv().unwrap(), 4);
2024     }
2025
2026     #[test]
2027     fn test_recv_try_iter() {
2028         let (request_tx, request_rx) = channel();
2029         let (response_tx, response_rx) = channel();
2030
2031         // Request `x`s until we have `6`.
2032         let t = thread::spawn(move|| {
2033             let mut count = 0;
2034             loop {
2035                 for x in response_rx.try_iter() {
2036                     count += x;
2037                     if count == 6 {
2038                         return count;
2039                     }
2040                 }
2041                 request_tx.send(()).unwrap();
2042             }
2043         });
2044
2045         for _ in request_rx.iter() {
2046             if response_tx.send(2).is_err() {
2047                 break;
2048             }
2049         }
2050
2051         assert_eq!(t.join().unwrap(), 6);
2052     }
2053
2054     #[test]
2055     fn test_recv_into_iter_owned() {
2056         let mut iter = {
2057           let (tx, rx) = channel::<i32>();
2058           tx.send(1).unwrap();
2059           tx.send(2).unwrap();
2060
2061           rx.into_iter()
2062         };
2063         assert_eq!(iter.next().unwrap(), 1);
2064         assert_eq!(iter.next().unwrap(), 2);
2065         assert_eq!(iter.next().is_none(), true);
2066     }
2067
2068     #[test]
2069     fn test_recv_into_iter_borrowed() {
2070         let (tx, rx) = channel::<i32>();
2071         tx.send(1).unwrap();
2072         tx.send(2).unwrap();
2073         drop(tx);
2074         let mut iter = (&rx).into_iter();
2075         assert_eq!(iter.next().unwrap(), 1);
2076         assert_eq!(iter.next().unwrap(), 2);
2077         assert_eq!(iter.next().is_none(), true);
2078     }
2079
2080     #[test]
2081     fn try_recv_states() {
2082         let (tx1, rx1) = channel::<i32>();
2083         let (tx2, rx2) = channel::<()>();
2084         let (tx3, rx3) = channel::<()>();
2085         let _t = thread::spawn(move|| {
2086             rx2.recv().unwrap();
2087             tx1.send(1).unwrap();
2088             tx3.send(()).unwrap();
2089             rx2.recv().unwrap();
2090             drop(tx1);
2091             tx3.send(()).unwrap();
2092         });
2093
2094         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2095         tx2.send(()).unwrap();
2096         rx3.recv().unwrap();
2097         assert_eq!(rx1.try_recv(), Ok(1));
2098         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2099         tx2.send(()).unwrap();
2100         rx3.recv().unwrap();
2101         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2102     }
2103
2104     // This bug used to end up in a livelock inside of the Receiver destructor
2105     // because the internal state of the Shared packet was corrupted
2106     #[test]
2107     fn destroy_upgraded_shared_port_when_sender_still_active() {
2108         let (tx, rx) = channel();
2109         let (tx2, rx2) = channel();
2110         let _t = thread::spawn(move|| {
2111             rx.recv().unwrap(); // wait on a oneshot
2112             drop(rx);  // destroy a shared
2113             tx2.send(()).unwrap();
2114         });
2115         // make sure the other thread has gone to sleep
2116         for _ in 0..5000 { thread::yield_now(); }
2117
2118         // upgrade to a shared chan and send a message
2119         let t = tx.clone();
2120         drop(tx);
2121         t.send(()).unwrap();
2122
2123         // wait for the child thread to exit before we exit
2124         rx2.recv().unwrap();
2125     }
2126
2127     #[test]
2128     fn issue_32114() {
2129         let (tx, _) = channel();
2130         let _ = tx.send(123);
2131         assert_eq!(tx.send(123), Err(SendError(123)));
2132     }
2133 }
2134
2135 #[cfg(all(test, not(target_os = "emscripten")))]
2136 mod sync_tests {
2137     use env;
2138     use thread;
2139     use super::*;
2140     use time::Duration;
2141
2142     pub fn stress_factor() -> usize {
2143         match env::var("RUST_TEST_STRESS") {
2144             Ok(val) => val.parse().unwrap(),
2145             Err(..) => 1,
2146         }
2147     }
2148
2149     #[test]
2150     fn smoke() {
2151         let (tx, rx) = sync_channel::<i32>(1);
2152         tx.send(1).unwrap();
2153         assert_eq!(rx.recv().unwrap(), 1);
2154     }
2155
2156     #[test]
2157     fn drop_full() {
2158         let (tx, _rx) = sync_channel::<Box<isize>>(1);
2159         tx.send(box 1).unwrap();
2160     }
2161
2162     #[test]
2163     fn smoke_shared() {
2164         let (tx, rx) = sync_channel::<i32>(1);
2165         tx.send(1).unwrap();
2166         assert_eq!(rx.recv().unwrap(), 1);
2167         let tx = tx.clone();
2168         tx.send(1).unwrap();
2169         assert_eq!(rx.recv().unwrap(), 1);
2170     }
2171
2172     #[test]
2173     fn recv_timeout() {
2174         let (tx, rx) = sync_channel::<i32>(1);
2175         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2176         tx.send(1).unwrap();
2177         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2178     }
2179
2180     #[test]
2181     fn smoke_threads() {
2182         let (tx, rx) = sync_channel::<i32>(0);
2183         let _t = thread::spawn(move|| {
2184             tx.send(1).unwrap();
2185         });
2186         assert_eq!(rx.recv().unwrap(), 1);
2187     }
2188
2189     #[test]
2190     fn smoke_port_gone() {
2191         let (tx, rx) = sync_channel::<i32>(0);
2192         drop(rx);
2193         assert!(tx.send(1).is_err());
2194     }
2195
2196     #[test]
2197     fn smoke_shared_port_gone2() {
2198         let (tx, rx) = sync_channel::<i32>(0);
2199         drop(rx);
2200         let tx2 = tx.clone();
2201         drop(tx);
2202         assert!(tx2.send(1).is_err());
2203     }
2204
2205     #[test]
2206     fn port_gone_concurrent() {
2207         let (tx, rx) = sync_channel::<i32>(0);
2208         let _t = thread::spawn(move|| {
2209             rx.recv().unwrap();
2210         });
2211         while tx.send(1).is_ok() {}
2212     }
2213
2214     #[test]
2215     fn port_gone_concurrent_shared() {
2216         let (tx, rx) = sync_channel::<i32>(0);
2217         let tx2 = tx.clone();
2218         let _t = thread::spawn(move|| {
2219             rx.recv().unwrap();
2220         });
2221         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2222     }
2223
2224     #[test]
2225     fn smoke_chan_gone() {
2226         let (tx, rx) = sync_channel::<i32>(0);
2227         drop(tx);
2228         assert!(rx.recv().is_err());
2229     }
2230
2231     #[test]
2232     fn smoke_chan_gone_shared() {
2233         let (tx, rx) = sync_channel::<()>(0);
2234         let tx2 = tx.clone();
2235         drop(tx);
2236         drop(tx2);
2237         assert!(rx.recv().is_err());
2238     }
2239
2240     #[test]
2241     fn chan_gone_concurrent() {
2242         let (tx, rx) = sync_channel::<i32>(0);
2243         thread::spawn(move|| {
2244             tx.send(1).unwrap();
2245             tx.send(1).unwrap();
2246         });
2247         while rx.recv().is_ok() {}
2248     }
2249
2250     #[test]
2251     fn stress() {
2252         let (tx, rx) = sync_channel::<i32>(0);
2253         thread::spawn(move|| {
2254             for _ in 0..10000 { tx.send(1).unwrap(); }
2255         });
2256         for _ in 0..10000 {
2257             assert_eq!(rx.recv().unwrap(), 1);
2258         }
2259     }
2260
2261     #[test]
2262     fn stress_recv_timeout_two_threads() {
2263         let (tx, rx) = sync_channel::<i32>(0);
2264
2265         thread::spawn(move|| {
2266             for _ in 0..10000 { tx.send(1).unwrap(); }
2267         });
2268
2269         let mut recv_count = 0;
2270         loop {
2271             match rx.recv_timeout(Duration::from_millis(1)) {
2272                 Ok(v) => {
2273                     assert_eq!(v, 1);
2274                     recv_count += 1;
2275                 },
2276                 Err(RecvTimeoutError::Timeout) => continue,
2277                 Err(RecvTimeoutError::Disconnected) => break,
2278             }
2279         }
2280
2281         assert_eq!(recv_count, 10000);
2282     }
2283
2284     #[test]
2285     fn stress_recv_timeout_shared() {
2286         const AMT: u32 = 1000;
2287         const NTHREADS: u32 = 8;
2288         let (tx, rx) = sync_channel::<i32>(0);
2289         let (dtx, drx) = sync_channel::<()>(0);
2290
2291         thread::spawn(move|| {
2292             let mut recv_count = 0;
2293             loop {
2294                 match rx.recv_timeout(Duration::from_millis(10)) {
2295                     Ok(v) => {
2296                         assert_eq!(v, 1);
2297                         recv_count += 1;
2298                     },
2299                     Err(RecvTimeoutError::Timeout) => continue,
2300                     Err(RecvTimeoutError::Disconnected) => break,
2301                 }
2302             }
2303
2304             assert_eq!(recv_count, AMT * NTHREADS);
2305             assert!(rx.try_recv().is_err());
2306
2307             dtx.send(()).unwrap();
2308         });
2309
2310         for _ in 0..NTHREADS {
2311             let tx = tx.clone();
2312             thread::spawn(move|| {
2313                 for _ in 0..AMT { tx.send(1).unwrap(); }
2314             });
2315         }
2316
2317         drop(tx);
2318
2319         drx.recv().unwrap();
2320     }
2321
2322     #[test]
2323     fn stress_shared() {
2324         const AMT: u32 = 1000;
2325         const NTHREADS: u32 = 8;
2326         let (tx, rx) = sync_channel::<i32>(0);
2327         let (dtx, drx) = sync_channel::<()>(0);
2328
2329         thread::spawn(move|| {
2330             for _ in 0..AMT * NTHREADS {
2331                 assert_eq!(rx.recv().unwrap(), 1);
2332             }
2333             match rx.try_recv() {
2334                 Ok(..) => panic!(),
2335                 _ => {}
2336             }
2337             dtx.send(()).unwrap();
2338         });
2339
2340         for _ in 0..NTHREADS {
2341             let tx = tx.clone();
2342             thread::spawn(move|| {
2343                 for _ in 0..AMT { tx.send(1).unwrap(); }
2344             });
2345         }
2346         drop(tx);
2347         drx.recv().unwrap();
2348     }
2349
2350     #[test]
2351     fn oneshot_single_thread_close_port_first() {
2352         // Simple test of closing without sending
2353         let (_tx, rx) = sync_channel::<i32>(0);
2354         drop(rx);
2355     }
2356
2357     #[test]
2358     fn oneshot_single_thread_close_chan_first() {
2359         // Simple test of closing without sending
2360         let (tx, _rx) = sync_channel::<i32>(0);
2361         drop(tx);
2362     }
2363
2364     #[test]
2365     fn oneshot_single_thread_send_port_close() {
2366         // Testing that the sender cleans up the payload if receiver is closed
2367         let (tx, rx) = sync_channel::<Box<i32>>(0);
2368         drop(rx);
2369         assert!(tx.send(box 0).is_err());
2370     }
2371
2372     #[test]
2373     fn oneshot_single_thread_recv_chan_close() {
2374         // Receiving on a closed chan will panic
2375         let res = thread::spawn(move|| {
2376             let (tx, rx) = sync_channel::<i32>(0);
2377             drop(tx);
2378             rx.recv().unwrap();
2379         }).join();
2380         // What is our res?
2381         assert!(res.is_err());
2382     }
2383
2384     #[test]
2385     fn oneshot_single_thread_send_then_recv() {
2386         let (tx, rx) = sync_channel::<Box<i32>>(1);
2387         tx.send(box 10).unwrap();
2388         assert!(rx.recv().unwrap() == box 10);
2389     }
2390
2391     #[test]
2392     fn oneshot_single_thread_try_send_open() {
2393         let (tx, rx) = sync_channel::<i32>(1);
2394         assert_eq!(tx.try_send(10), Ok(()));
2395         assert!(rx.recv().unwrap() == 10);
2396     }
2397
2398     #[test]
2399     fn oneshot_single_thread_try_send_closed() {
2400         let (tx, rx) = sync_channel::<i32>(0);
2401         drop(rx);
2402         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2403     }
2404
2405     #[test]
2406     fn oneshot_single_thread_try_send_closed2() {
2407         let (tx, _rx) = sync_channel::<i32>(0);
2408         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2409     }
2410
2411     #[test]
2412     fn oneshot_single_thread_try_recv_open() {
2413         let (tx, rx) = sync_channel::<i32>(1);
2414         tx.send(10).unwrap();
2415         assert!(rx.recv() == Ok(10));
2416     }
2417
2418     #[test]
2419     fn oneshot_single_thread_try_recv_closed() {
2420         let (tx, rx) = sync_channel::<i32>(0);
2421         drop(tx);
2422         assert!(rx.recv().is_err());
2423     }
2424
2425     #[test]
2426     fn oneshot_single_thread_try_recv_closed_with_data() {
2427         let (tx, rx) = sync_channel::<i32>(1);
2428         tx.send(10).unwrap();
2429         drop(tx);
2430         assert_eq!(rx.try_recv(), Ok(10));
2431         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2432     }
2433
2434     #[test]
2435     fn oneshot_single_thread_peek_data() {
2436         let (tx, rx) = sync_channel::<i32>(1);
2437         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2438         tx.send(10).unwrap();
2439         assert_eq!(rx.try_recv(), Ok(10));
2440     }
2441
2442     #[test]
2443     fn oneshot_single_thread_peek_close() {
2444         let (tx, rx) = sync_channel::<i32>(0);
2445         drop(tx);
2446         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2447         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2448     }
2449
2450     #[test]
2451     fn oneshot_single_thread_peek_open() {
2452         let (_tx, rx) = sync_channel::<i32>(0);
2453         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2454     }
2455
2456     #[test]
2457     fn oneshot_multi_task_recv_then_send() {
2458         let (tx, rx) = sync_channel::<Box<i32>>(0);
2459         let _t = thread::spawn(move|| {
2460             assert!(rx.recv().unwrap() == box 10);
2461         });
2462
2463         tx.send(box 10).unwrap();
2464     }
2465
2466     #[test]
2467     fn oneshot_multi_task_recv_then_close() {
2468         let (tx, rx) = sync_channel::<Box<i32>>(0);
2469         let _t = thread::spawn(move|| {
2470             drop(tx);
2471         });
2472         let res = thread::spawn(move|| {
2473             assert!(rx.recv().unwrap() == box 10);
2474         }).join();
2475         assert!(res.is_err());
2476     }
2477
2478     #[test]
2479     fn oneshot_multi_thread_close_stress() {
2480         for _ in 0..stress_factor() {
2481             let (tx, rx) = sync_channel::<i32>(0);
2482             let _t = thread::spawn(move|| {
2483                 drop(rx);
2484             });
2485             drop(tx);
2486         }
2487     }
2488
2489     #[test]
2490     fn oneshot_multi_thread_send_close_stress() {
2491         for _ in 0..stress_factor() {
2492             let (tx, rx) = sync_channel::<i32>(0);
2493             let _t = thread::spawn(move|| {
2494                 drop(rx);
2495             });
2496             let _ = thread::spawn(move || {
2497                 tx.send(1).unwrap();
2498             }).join();
2499         }
2500     }
2501
2502     #[test]
2503     fn oneshot_multi_thread_recv_close_stress() {
2504         for _ in 0..stress_factor() {
2505             let (tx, rx) = sync_channel::<i32>(0);
2506             let _t = thread::spawn(move|| {
2507                 let res = thread::spawn(move|| {
2508                     rx.recv().unwrap();
2509                 }).join();
2510                 assert!(res.is_err());
2511             });
2512             let _t = thread::spawn(move|| {
2513                 thread::spawn(move|| {
2514                     drop(tx);
2515                 });
2516             });
2517         }
2518     }
2519
2520     #[test]
2521     fn oneshot_multi_thread_send_recv_stress() {
2522         for _ in 0..stress_factor() {
2523             let (tx, rx) = sync_channel::<Box<i32>>(0);
2524             let _t = thread::spawn(move|| {
2525                 tx.send(box 10).unwrap();
2526             });
2527             assert!(rx.recv().unwrap() == box 10);
2528         }
2529     }
2530
2531     #[test]
2532     fn stream_send_recv_stress() {
2533         for _ in 0..stress_factor() {
2534             let (tx, rx) = sync_channel::<Box<i32>>(0);
2535
2536             send(tx, 0);
2537             recv(rx, 0);
2538
2539             fn send(tx: SyncSender<Box<i32>>, i: i32) {
2540                 if i == 10 { return }
2541
2542                 thread::spawn(move|| {
2543                     tx.send(box i).unwrap();
2544                     send(tx, i + 1);
2545                 });
2546             }
2547
2548             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2549                 if i == 10 { return }
2550
2551                 thread::spawn(move|| {
2552                     assert!(rx.recv().unwrap() == box i);
2553                     recv(rx, i + 1);
2554                 });
2555             }
2556         }
2557     }
2558
2559     #[test]
2560     fn recv_a_lot() {
2561         // Regression test that we don't run out of stack in scheduler context
2562         let (tx, rx) = sync_channel(10000);
2563         for _ in 0..10000 { tx.send(()).unwrap(); }
2564         for _ in 0..10000 { rx.recv().unwrap(); }
2565     }
2566
2567     #[test]
2568     fn shared_chan_stress() {
2569         let (tx, rx) = sync_channel(0);
2570         let total = stress_factor() + 100;
2571         for _ in 0..total {
2572             let tx = tx.clone();
2573             thread::spawn(move|| {
2574                 tx.send(()).unwrap();
2575             });
2576         }
2577
2578         for _ in 0..total {
2579             rx.recv().unwrap();
2580         }
2581     }
2582
2583     #[test]
2584     fn test_nested_recv_iter() {
2585         let (tx, rx) = sync_channel::<i32>(0);
2586         let (total_tx, total_rx) = sync_channel::<i32>(0);
2587
2588         let _t = thread::spawn(move|| {
2589             let mut acc = 0;
2590             for x in rx.iter() {
2591                 acc += x;
2592             }
2593             total_tx.send(acc).unwrap();
2594         });
2595
2596         tx.send(3).unwrap();
2597         tx.send(1).unwrap();
2598         tx.send(2).unwrap();
2599         drop(tx);
2600         assert_eq!(total_rx.recv().unwrap(), 6);
2601     }
2602
2603     #[test]
2604     fn test_recv_iter_break() {
2605         let (tx, rx) = sync_channel::<i32>(0);
2606         let (count_tx, count_rx) = sync_channel(0);
2607
2608         let _t = thread::spawn(move|| {
2609             let mut count = 0;
2610             for x in rx.iter() {
2611                 if count >= 3 {
2612                     break;
2613                 } else {
2614                     count += x;
2615                 }
2616             }
2617             count_tx.send(count).unwrap();
2618         });
2619
2620         tx.send(2).unwrap();
2621         tx.send(2).unwrap();
2622         tx.send(2).unwrap();
2623         let _ = tx.try_send(2);
2624         drop(tx);
2625         assert_eq!(count_rx.recv().unwrap(), 4);
2626     }
2627
2628     #[test]
2629     fn try_recv_states() {
2630         let (tx1, rx1) = sync_channel::<i32>(1);
2631         let (tx2, rx2) = sync_channel::<()>(1);
2632         let (tx3, rx3) = sync_channel::<()>(1);
2633         let _t = thread::spawn(move|| {
2634             rx2.recv().unwrap();
2635             tx1.send(1).unwrap();
2636             tx3.send(()).unwrap();
2637             rx2.recv().unwrap();
2638             drop(tx1);
2639             tx3.send(()).unwrap();
2640         });
2641
2642         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2643         tx2.send(()).unwrap();
2644         rx3.recv().unwrap();
2645         assert_eq!(rx1.try_recv(), Ok(1));
2646         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2647         tx2.send(()).unwrap();
2648         rx3.recv().unwrap();
2649         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2650     }
2651
2652     // This bug used to end up in a livelock inside of the Receiver destructor
2653     // because the internal state of the Shared packet was corrupted
2654     #[test]
2655     fn destroy_upgraded_shared_port_when_sender_still_active() {
2656         let (tx, rx) = sync_channel::<()>(0);
2657         let (tx2, rx2) = sync_channel::<()>(0);
2658         let _t = thread::spawn(move|| {
2659             rx.recv().unwrap(); // wait on a oneshot
2660             drop(rx);  // destroy a shared
2661             tx2.send(()).unwrap();
2662         });
2663         // make sure the other thread has gone to sleep
2664         for _ in 0..5000 { thread::yield_now(); }
2665
2666         // upgrade to a shared chan and send a message
2667         let t = tx.clone();
2668         drop(tx);
2669         t.send(()).unwrap();
2670
2671         // wait for the child thread to exit before we exit
2672         rx2.recv().unwrap();
2673     }
2674
2675     #[test]
2676     fn send1() {
2677         let (tx, rx) = sync_channel::<i32>(0);
2678         let _t = thread::spawn(move|| { rx.recv().unwrap(); });
2679         assert_eq!(tx.send(1), Ok(()));
2680     }
2681
2682     #[test]
2683     fn send2() {
2684         let (tx, rx) = sync_channel::<i32>(0);
2685         let _t = thread::spawn(move|| { drop(rx); });
2686         assert!(tx.send(1).is_err());
2687     }
2688
2689     #[test]
2690     fn send3() {
2691         let (tx, rx) = sync_channel::<i32>(1);
2692         assert_eq!(tx.send(1), Ok(()));
2693         let _t =thread::spawn(move|| { drop(rx); });
2694         assert!(tx.send(1).is_err());
2695     }
2696
2697     #[test]
2698     fn send4() {
2699         let (tx, rx) = sync_channel::<i32>(0);
2700         let tx2 = tx.clone();
2701         let (done, donerx) = channel();
2702         let done2 = done.clone();
2703         let _t = thread::spawn(move|| {
2704             assert!(tx.send(1).is_err());
2705             done.send(()).unwrap();
2706         });
2707         let _t = thread::spawn(move|| {
2708             assert!(tx2.send(2).is_err());
2709             done2.send(()).unwrap();
2710         });
2711         drop(rx);
2712         donerx.recv().unwrap();
2713         donerx.recv().unwrap();
2714     }
2715
2716     #[test]
2717     fn try_send1() {
2718         let (tx, _rx) = sync_channel::<i32>(0);
2719         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2720     }
2721
2722     #[test]
2723     fn try_send2() {
2724         let (tx, _rx) = sync_channel::<i32>(1);
2725         assert_eq!(tx.try_send(1), Ok(()));
2726         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2727     }
2728
2729     #[test]
2730     fn try_send3() {
2731         let (tx, rx) = sync_channel::<i32>(1);
2732         assert_eq!(tx.try_send(1), Ok(()));
2733         drop(rx);
2734         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2735     }
2736
2737     #[test]
2738     fn issue_15761() {
2739         fn repro() {
2740             let (tx1, rx1) = sync_channel::<()>(3);
2741             let (tx2, rx2) = sync_channel::<()>(3);
2742
2743             let _t = thread::spawn(move|| {
2744                 rx1.recv().unwrap();
2745                 tx2.try_send(()).unwrap();
2746             });
2747
2748             tx1.try_send(()).unwrap();
2749             rx2.recv().unwrap();
2750         }
2751
2752         for _ in 0..100 {
2753             repro()
2754         }
2755     }
2756
2757     #[test]
2758     fn fmt_debug_sender() {
2759         let (tx, _) = channel::<i32>();
2760         assert_eq!(format!("{:?}", tx), "Sender { .. }");
2761     }
2762
2763     #[test]
2764     fn fmt_debug_recv() {
2765         let (_, rx) = channel::<i32>();
2766         assert_eq!(format!("{:?}", rx), "Receiver { .. }");
2767     }
2768
2769     #[test]
2770     fn fmt_debug_sync_sender() {
2771         let (tx, _) = sync_channel::<i32>(1);
2772         assert_eq!(format!("{:?}", tx), "SyncSender { .. }");
2773     }
2774 }