]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/mod.rs
Uncapitalize "If"
[rust.git] / src / libstd / sync / mpsc / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Multi-producer, single-consumer FIFO queue communication primitives.
12 //!
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
15 //!
16 //! * [`Sender`]
17 //! * [`SyncSender`]
18 //! * [`Receiver`]
19 //!
20 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
23 //!
24 //! These channels come in two flavors:
25 //!
26 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
27 //!    will return a `(Sender, Receiver)` tuple where all sends will be
28 //!    **asynchronous** (they never block). The channel conceptually has an
29 //!    infinite buffer.
30 //!
31 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
32 //!    return a `(SyncSender, Receiver)` tuple where the storage for pending
33 //!    messages is a pre-allocated buffer of a fixed size. All sends will be
34 //!    **synchronous** by blocking until there is buffer space available. Note
35 //!    that a bound of 0 is allowed, causing the channel to become a "rendezvous"
36 //!    channel where each sender atomically hands off a message to a receiver.
37 //!
38 //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
39 //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
40 //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
41 //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
42 //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
43 //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
44 //!
45 //! ## Disconnection
46 //!
47 //! The send and receive operations on channels will all return a [`Result`]
48 //! indicating whether the operation succeeded or not. An unsuccessful operation
49 //! is normally indicative of the other half of a channel having "hung up" by
50 //! being dropped in its corresponding thread.
51 //!
52 //! Once half of a channel has been deallocated, most operations can no longer
53 //! continue to make progress, so [`Err`] will be returned. Many applications
54 //! will continue to [`unwrap`] the results returned from this module,
55 //! instigating a propagation of failure among threads if one unexpectedly dies.
56 //!
57 //! [`Result`]: ../../../std/result/enum.Result.html
58 //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
59 //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
60 //!
61 //! # Examples
62 //!
63 //! Simple usage:
64 //!
65 //! ```
66 //! use std::thread;
67 //! use std::sync::mpsc::channel;
68 //!
69 //! // Create a simple streaming channel
70 //! let (tx, rx) = channel();
71 //! thread::spawn(move|| {
72 //!     tx.send(10).unwrap();
73 //! });
74 //! assert_eq!(rx.recv().unwrap(), 10);
75 //! ```
76 //!
77 //! Shared usage:
78 //!
79 //! ```
80 //! use std::thread;
81 //! use std::sync::mpsc::channel;
82 //!
83 //! // Create a shared channel that can be sent along from many threads
84 //! // where tx is the sending half (tx for transmission), and rx is the receiving
85 //! // half (rx for receiving).
86 //! let (tx, rx) = channel();
87 //! for i in 0..10 {
88 //!     let tx = tx.clone();
89 //!     thread::spawn(move|| {
90 //!         tx.send(i).unwrap();
91 //!     });
92 //! }
93 //!
94 //! for _ in 0..10 {
95 //!     let j = rx.recv().unwrap();
96 //!     assert!(0 <= j && j < 10);
97 //! }
98 //! ```
99 //!
100 //! Propagating panics:
101 //!
102 //! ```
103 //! use std::sync::mpsc::channel;
104 //!
105 //! // The call to recv() will return an error because the channel has already
106 //! // hung up (or been deallocated)
107 //! let (tx, rx) = channel::<i32>();
108 //! drop(tx);
109 //! assert!(rx.recv().is_err());
110 //! ```
111 //!
112 //! Synchronous channels:
113 //!
114 //! ```
115 //! use std::thread;
116 //! use std::sync::mpsc::sync_channel;
117 //!
118 //! let (tx, rx) = sync_channel::<i32>(0);
119 //! thread::spawn(move|| {
120 //!     // This will wait for the parent thread to start receiving
121 //!     tx.send(53).unwrap();
122 //! });
123 //! rx.recv().unwrap();
124 //! ```
125
126 #![stable(feature = "rust1", since = "1.0.0")]
127
128 // A description of how Rust's channel implementation works
129 //
130 // Channels are supposed to be the basic building block for all other
131 // concurrent primitives that are used in Rust. As a result, the channel type
132 // needs to be highly optimized, flexible, and broad enough for use everywhere.
133 //
134 // The choice of implementation of all channels is to be built on lock-free data
135 // structures. The channels themselves are then consequently also lock-free data
136 // structures. As always with lock-free code, this is a very "here be dragons"
137 // territory, especially because I'm unaware of any academic papers that have
138 // gone into great length about channels of these flavors.
139 //
140 // ## Flavors of channels
141 //
142 // From the perspective of a consumer of this library, there is only one flavor
143 // of channel. This channel can be used as a stream and cloned to allow multiple
144 // senders. Under the hood, however, there are actually three flavors of
145 // channels in play.
146 //
147 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
148 //                      case. They contain as few atomics as possible and
149 //                      involve one and exactly one allocation.
150 // * Streams - these channels are optimized for the non-shared use case. They
151 //             use a different concurrent queue that is more tailored for this
152 //             use case. The initial allocation of this flavor of channel is not
153 //             optimized.
154 // * Shared - this is the most general form of channel that this module offers,
155 //            a channel with multiple senders. This type is as optimized as it
156 //            can be, but the previous two types mentioned are much faster for
157 //            their use-cases.
158 //
159 // ## Concurrent queues
160 //
161 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
162 // but recv() obviously blocks. This means that under the hood there must be
163 // some shared and concurrent queue holding all of the actual data.
164 //
165 // With two flavors of channels, two flavors of queues are also used. We have
166 // chosen to use queues from a well-known author that are abbreviated as SPSC
167 // and MPSC (single producer, single consumer and multiple producer, single
168 // consumer). SPSC queues are used for streams while MPSC queues are used for
169 // shared channels.
170 //
171 // ### SPSC optimizations
172 //
173 // The SPSC queue found online is essentially a linked list of nodes where one
174 // half of the nodes are the "queue of data" and the other half of nodes are a
175 // cache of unused nodes. The unused nodes are used such that an allocation is
176 // not required on every push() and a free doesn't need to happen on every
177 // pop().
178 //
179 // As found online, however, the cache of nodes is of an infinite size. This
180 // means that if a channel at one point in its life had 50k items in the queue,
181 // then the queue will always have the capacity for 50k items. I believed that
182 // this was an unnecessary limitation of the implementation, so I have altered
183 // the queue to optionally have a bound on the cache size.
184 //
185 // By default, streams will have an unbounded SPSC queue with a small-ish cache
186 // size. The hope is that the cache is still large enough to have very fast
187 // send() operations while not too large such that millions of channels can
188 // coexist at once.
189 //
190 // ### MPSC optimizations
191 //
192 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
193 // a linked list under the hood to earn its unboundedness, but I have not put
194 // forth much effort into having a cache of nodes similar to the SPSC queue.
195 //
196 // For now, I believe that this is "ok" because shared channels are not the most
197 // common type, but soon we may wish to revisit this queue choice and determine
198 // another candidate for backend storage of shared channels.
199 //
200 // ## Overview of the Implementation
201 //
202 // Now that there's a little background on the concurrent queues used, it's
203 // worth going into much more detail about the channels themselves. The basic
204 // pseudocode for a send/recv are:
205 //
206 //
207 //      send(t)                             recv()
208 //        queue.push(t)                       return if queue.pop()
209 //        if increment() == -1                deschedule {
210 //          wakeup()                            if decrement() > 0
211 //                                                cancel_deschedule()
212 //                                            }
213 //                                            queue.pop()
214 //
215 // As mentioned before, there are no locks in this implementation, only atomic
216 // instructions are used.
217 //
218 // ### The internal atomic counter
219 //
220 // Every channel has a shared counter with each half to keep track of the size
221 // of the queue. This counter is used to abort descheduling by the receiver and
222 // to know when to wake up on the sending side.
223 //
224 // As seen in the pseudocode, senders will increment this count and receivers
225 // will decrement the count. The theory behind this is that if a sender sees a
226 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
227 // then it doesn't need to block.
228 //
229 // The recv() method has a beginning call to pop(), and if successful, it needs
230 // to decrement the count. It is a crucial implementation detail that this
231 // decrement does *not* happen to the shared counter. If this were the case,
232 // then it would be possible for the counter to be very negative when there were
233 // no receivers waiting, in which case the senders would have to determine when
234 // it was actually appropriate to wake up a receiver.
235 //
236 // Instead, the "steal count" is kept track of separately (not atomically
237 // because it's only used by receivers), and then the decrement() call when
238 // descheduling will lump in all of the recent steals into one large decrement.
239 //
240 // The implication of this is that if a sender sees a -1 count, then there's
241 // guaranteed to be a waiter waiting!
242 //
243 // ## Native Implementation
244 //
245 // A major goal of these channels is to work seamlessly on and off the runtime.
246 // All of the previous race conditions have been worded in terms of
247 // scheduler-isms (which is obviously not available without the runtime).
248 //
249 // For now, native usage of channels (off the runtime) will fall back onto
250 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
251 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
252 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
253 // condition variable.
254 //
255 // ## Select
256 //
257 // Being able to support selection over channels has greatly influenced this
258 // design, and not only does selection need to work inside the runtime, but also
259 // outside the runtime.
260 //
261 // The implementation is fairly straightforward. The goal of select() is not to
262 // return some data, but only to return which channel can receive data without
263 // blocking. The implementation is essentially the entire blocking procedure
264 // followed by an increment as soon as its woken up. The cancellation procedure
265 // involves an increment and swapping out of to_wake to acquire ownership of the
266 // thread to unblock.
267 //
268 // Sadly this current implementation requires multiple allocations, so I have
269 // seen the throughput of select() be much worse than it should be. I do not
270 // believe that there is anything fundamental that needs to change about these
271 // channels, however, in order to support a more efficient select().
272 //
273 // # Conclusion
274 //
275 // And now that you've seen all the races that I found and attempted to fix,
276 // here's the code for you to find some more!
277
278 use sync::Arc;
279 use error;
280 use fmt;
281 use mem;
282 use cell::UnsafeCell;
283 use time::{Duration, Instant};
284
285 #[unstable(feature = "mpsc_select", issue = "27800")]
286 pub use self::select::{Select, Handle};
287 use self::select::StartResult;
288 use self::select::StartResult::*;
289 use self::blocking::SignalToken;
290
291 mod blocking;
292 mod oneshot;
293 mod select;
294 mod shared;
295 mod stream;
296 mod sync;
297 mod mpsc_queue;
298 mod spsc_queue;
299
300 mod cache_aligned;
301
302 /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
303 /// This half can only be owned by one thread.
304 ///
305 /// Messages sent to the channel can be retrieved using [`recv`].
306 ///
307 /// [`channel`]: fn.channel.html
308 /// [`sync_channel`]: fn.sync_channel.html
309 /// [`recv`]: struct.Receiver.html#method.recv
310 ///
311 /// # Examples
312 ///
313 /// ```rust
314 /// use std::sync::mpsc::channel;
315 /// use std::thread;
316 /// use std::time::Duration;
317 ///
318 /// let (send, recv) = channel();
319 ///
320 /// thread::spawn(move || {
321 ///     send.send("Hello world!").unwrap();
322 ///     thread::sleep(Duration::from_secs(2)); // block for two seconds
323 ///     send.send("Delayed for 2 seconds").unwrap();
324 /// });
325 ///
326 /// println!("{}", recv.recv().unwrap()); // Received immediately
327 /// println!("Waiting...");
328 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
329 /// ```
330 #[stable(feature = "rust1", since = "1.0.0")]
331 pub struct Receiver<T> {
332     inner: UnsafeCell<Flavor<T>>,
333 }
334
335 // The receiver port can be sent from place to place, so long as it
336 // is not used to receive non-sendable things.
337 #[stable(feature = "rust1", since = "1.0.0")]
338 unsafe impl<T: Send> Send for Receiver<T> { }
339
340 #[stable(feature = "rust1", since = "1.0.0")]
341 impl<T> !Sync for Receiver<T> { }
342
343 /// An iterator over messages on a [`Receiver`], created by [`iter`].
344 ///
345 /// This iterator will block whenever [`next`] is called,
346 /// waiting for a new message, and [`None`] will be returned
347 /// when the corresponding channel has hung up.
348 ///
349 /// [`iter`]: struct.Receiver.html#method.iter
350 /// [`Receiver`]: struct.Receiver.html
351 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
352 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
353 ///
354 /// # Examples
355 ///
356 /// ```rust
357 /// use std::sync::mpsc::channel;
358 /// use std::thread;
359 ///
360 /// let (send, recv) = channel();
361 ///
362 /// thread::spawn(move || {
363 ///     send.send(1u8).unwrap();
364 ///     send.send(2u8).unwrap();
365 ///     send.send(3u8).unwrap();
366 /// });
367 ///
368 /// for x in recv.iter() {
369 ///     println!("Got: {}", x);
370 /// }
371 /// ```
372 #[stable(feature = "rust1", since = "1.0.0")]
373 #[derive(Debug)]
374 pub struct Iter<'a, T: 'a> {
375     rx: &'a Receiver<T>
376 }
377
378 /// An iterator that attempts to yield all pending values for a [`Receiver`],
379 /// created by [`try_iter`].
380 ///
381 /// [`None`] will be returned when there are no pending values remaining or
382 /// if the corresponding channel has hung up.
383 ///
384 /// This iterator will never block the caller in order to wait for data to
385 /// become available. Instead, it will return [`None`].
386 ///
387 /// [`Receiver`]: struct.Receiver.html
388 /// [`try_iter`]: struct.Receiver.html#method.try_iter
389 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
390 ///
391 /// # Examples
392 ///
393 /// ```rust
394 /// use std::sync::mpsc::channel;
395 /// use std::thread;
396 /// use std::time::Duration;
397 ///
398 /// let (sender, receiver) = channel();
399 ///
400 /// // Nothing is in the buffer yet
401 /// assert!(receiver.try_iter().next().is_none());
402 /// println!("Nothing in the buffer...");
403 ///
404 /// thread::spawn(move || {
405 ///     sender.send(1).unwrap();
406 ///     sender.send(2).unwrap();
407 ///     sender.send(3).unwrap();
408 /// });
409 ///
410 /// println!("Going to sleep...");
411 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
412 ///
413 /// for x in receiver.try_iter() {
414 ///     println!("Got: {}", x);
415 /// }
416 /// ```
417 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
418 #[derive(Debug)]
419 pub struct TryIter<'a, T: 'a> {
420     rx: &'a Receiver<T>
421 }
422
423 /// An owning iterator over messages on a [`Receiver`],
424 /// created by **Receiver::into_iter**.
425 ///
426 /// This iterator will block whenever [`next`]
427 /// is called, waiting for a new message, and [`None`] will be
428 /// returned if the corresponding channel has hung up.
429 ///
430 /// [`Receiver`]: struct.Receiver.html
431 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
432 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
433 ///
434 /// # Examples
435 ///
436 /// ```rust
437 /// use std::sync::mpsc::channel;
438 /// use std::thread;
439 ///
440 /// let (send, recv) = channel();
441 ///
442 /// thread::spawn(move || {
443 ///     send.send(1u8).unwrap();
444 ///     send.send(2u8).unwrap();
445 ///     send.send(3u8).unwrap();
446 /// });
447 ///
448 /// for x in recv.into_iter() {
449 ///     println!("Got: {}", x);
450 /// }
451 /// ```
452 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
453 #[derive(Debug)]
454 pub struct IntoIter<T> {
455     rx: Receiver<T>
456 }
457
458 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
459 /// owned by one thread, but it can be cloned to send to other threads.
460 ///
461 /// Messages can be sent through this channel with [`send`].
462 ///
463 /// [`channel`]: fn.channel.html
464 /// [`send`]: struct.Sender.html#method.send
465 ///
466 /// # Examples
467 ///
468 /// ```rust
469 /// use std::sync::mpsc::channel;
470 /// use std::thread;
471 ///
472 /// let (sender, receiver) = channel();
473 /// let sender2 = sender.clone();
474 ///
475 /// // First thread owns sender
476 /// thread::spawn(move || {
477 ///     sender.send(1).unwrap();
478 /// });
479 ///
480 /// // Second thread owns sender2
481 /// thread::spawn(move || {
482 ///     sender2.send(2).unwrap();
483 /// });
484 ///
485 /// let msg = receiver.recv().unwrap();
486 /// let msg2 = receiver.recv().unwrap();
487 ///
488 /// assert_eq!(3, msg + msg2);
489 /// ```
490 #[stable(feature = "rust1", since = "1.0.0")]
491 pub struct Sender<T> {
492     inner: UnsafeCell<Flavor<T>>,
493 }
494
495 // The send port can be sent from place to place, so long as it
496 // is not used to send non-sendable things.
497 #[stable(feature = "rust1", since = "1.0.0")]
498 unsafe impl<T: Send> Send for Sender<T> { }
499
500 #[stable(feature = "rust1", since = "1.0.0")]
501 impl<T> !Sync for Sender<T> { }
502
503 /// The sending-half of Rust's synchronous [`sync_channel`] type.
504 ///
505 /// Messages can be sent through this channel with [`send`] or [`try_send`].
506 ///
507 /// [`send`] will block if there is no space in the internal buffer.
508 ///
509 /// [`sync_channel`]: fn.sync_channel.html
510 /// [`send`]: struct.SyncSender.html#method.send
511 /// [`try_send`]: struct.SyncSender.html#method.try_send
512 ///
513 /// # Examples
514 ///
515 /// ```rust
516 /// use std::sync::mpsc::sync_channel;
517 /// use std::thread;
518 ///
519 /// // Create a sync_channel with buffer size 2
520 /// let (sync_sender, receiver) = sync_channel(2);
521 /// let sync_sender2 = sync_sender.clone();
522 ///
523 /// // First thread owns sync_sender
524 /// thread::spawn(move || {
525 ///     sync_sender.send(1).unwrap();
526 ///     sync_sender.send(2).unwrap();
527 /// });
528 ///
529 /// // Second thread owns sync_sender2
530 /// thread::spawn(move || {
531 ///     sync_sender2.send(3).unwrap();
532 ///     // thread will now block since the buffer is full
533 ///     println!("Thread unblocked!");
534 /// });
535 ///
536 /// let mut msg;
537 ///
538 /// msg = receiver.recv().unwrap();
539 /// println!("message {} received", msg);
540 ///
541 /// // "Thread unblocked!" will be printed now
542 ///
543 /// msg = receiver.recv().unwrap();
544 /// println!("message {} received", msg);
545 ///
546 /// msg = receiver.recv().unwrap();
547 ///
548 /// println!("message {} received", msg);
549 /// ```
550 #[stable(feature = "rust1", since = "1.0.0")]
551 pub struct SyncSender<T> {
552     inner: Arc<sync::Packet<T>>,
553 }
554
555 #[stable(feature = "rust1", since = "1.0.0")]
556 unsafe impl<T: Send> Send for SyncSender<T> {}
557
558 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
559 /// function on **channel**s.
560 ///
561 /// A **send** operation can only fail if the receiving end of a channel is
562 /// disconnected, implying that the data could never be received. The error
563 /// contains the data being sent as a payload so it can be recovered.
564 ///
565 /// [`Sender::send`]: struct.Sender.html#method.send
566 /// [`SyncSender::send`]: struct.SyncSender.html#method.send
567 #[stable(feature = "rust1", since = "1.0.0")]
568 #[derive(PartialEq, Eq, Clone, Copy)]
569 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
570
571 /// An error returned from the [`recv`] function on a [`Receiver`].
572 ///
573 /// The [`recv`] operation can only fail if the sending half of a
574 /// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
575 /// messages will ever be received.
576 ///
577 /// [`recv`]: struct.Receiver.html#method.recv
578 /// [`Receiver`]: struct.Receiver.html
579 /// [`channel`]: fn.channel.html
580 /// [`sync_channel`]: fn.sync_channel.html
581 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
582 #[stable(feature = "rust1", since = "1.0.0")]
583 pub struct RecvError;
584
585 /// This enumeration is the list of the possible reasons that [`try_recv`] could
586 /// not return data when called. This can occur with both a [`channel`] and
587 /// a [`sync_channel`].
588 ///
589 /// [`try_recv`]: struct.Receiver.html#method.try_recv
590 /// [`channel`]: fn.channel.html
591 /// [`sync_channel`]: fn.sync_channel.html
592 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
593 #[stable(feature = "rust1", since = "1.0.0")]
594 pub enum TryRecvError {
595     /// This **channel** is currently empty, but the **Sender**(s) have not yet
596     /// disconnected, so data may yet become available.
597     #[stable(feature = "rust1", since = "1.0.0")]
598     Empty,
599
600     /// The **channel**'s sending half has become disconnected, and there will
601     /// never be any more data received on it.
602     #[stable(feature = "rust1", since = "1.0.0")]
603     Disconnected,
604 }
605
606 /// This enumeration is the list of possible errors that made [`recv_timeout`]
607 /// unable to return data when called. This can occur with both a [`channel`] and
608 /// a [`sync_channel`].
609 ///
610 /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
611 /// [`channel`]: fn.channel.html
612 /// [`sync_channel`]: fn.sync_channel.html
613 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
614 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
615 pub enum RecvTimeoutError {
616     /// This **channel** is currently empty, but the **Sender**(s) have not yet
617     /// disconnected, so data may yet become available.
618     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
619     Timeout,
620     /// The **channel**'s sending half has become disconnected, and there will
621     /// never be any more data received on it.
622     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
623     Disconnected,
624 }
625
626 /// This enumeration is the list of the possible error outcomes for the
627 /// [`try_send`] method.
628 ///
629 /// [`try_send`]: struct.SyncSender.html#method.try_send
630 #[stable(feature = "rust1", since = "1.0.0")]
631 #[derive(PartialEq, Eq, Clone, Copy)]
632 pub enum TrySendError<T> {
633     /// The data could not be sent on the [`sync_channel`] because it would require that
634     /// the callee block to send the data.
635     ///
636     /// If this is a buffered channel, then the buffer is full at this time. If
637     /// this is not a buffered channel, then there is no [`Receiver`] available to
638     /// acquire the data.
639     ///
640     /// [`sync_channel`]: fn.sync_channel.html
641     /// [`Receiver`]: struct.Receiver.html
642     #[stable(feature = "rust1", since = "1.0.0")]
643     Full(#[stable(feature = "rust1", since = "1.0.0")] T),
644
645     /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
646     /// sent. The data is returned back to the callee in this case.
647     ///
648     /// [`sync_channel`]: fn.sync_channel.html
649     #[stable(feature = "rust1", since = "1.0.0")]
650     Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
651 }
652
653 enum Flavor<T> {
654     Oneshot(Arc<oneshot::Packet<T>>),
655     Stream(Arc<stream::Packet<T>>),
656     Shared(Arc<shared::Packet<T>>),
657     Sync(Arc<sync::Packet<T>>),
658 }
659
660 #[doc(hidden)]
661 trait UnsafeFlavor<T> {
662     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
663     unsafe fn inner_mut(&self) -> &mut Flavor<T> {
664         &mut *self.inner_unsafe().get()
665     }
666     unsafe fn inner(&self) -> &Flavor<T> {
667         &*self.inner_unsafe().get()
668     }
669 }
670 impl<T> UnsafeFlavor<T> for Sender<T> {
671     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
672         &self.inner
673     }
674 }
675 impl<T> UnsafeFlavor<T> for Receiver<T> {
676     fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
677         &self.inner
678     }
679 }
680
681 /// Creates a new asynchronous channel, returning the sender/receiver halves.
682 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
683 /// the same order as it was sent, and no [`send`] will block the calling thread
684 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
685 /// block after its buffer limit is reached). [`recv`] will block until a message
686 /// is available.
687 ///
688 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
689 /// only one [`Receiver`] is supported.
690 ///
691 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
692 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
693 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
694 /// return a [`RecvError`].
695 ///
696 /// [`send`]: struct.Sender.html#method.send
697 /// [`recv`]: struct.Receiver.html#method.recv
698 /// [`Sender`]: struct.Sender.html
699 /// [`Receiver`]: struct.Receiver.html
700 /// [`sync_channel`]: fn.sync_channel.html
701 /// [`SendError`]: struct.SendError.html
702 /// [`RecvError`]: struct.RecvError.html
703 ///
704 /// # Examples
705 ///
706 /// ```
707 /// use std::sync::mpsc::channel;
708 /// use std::thread;
709 ///
710 /// let (sender, receiver) = channel();
711 ///
712 /// // Spawn off an expensive computation
713 /// thread::spawn(move|| {
714 /// #   fn expensive_computation() {}
715 ///     sender.send(expensive_computation()).unwrap();
716 /// });
717 ///
718 /// // Do some useful work for awhile
719 ///
720 /// // Let's see what that answer was
721 /// println!("{:?}", receiver.recv().unwrap());
722 /// ```
723 #[stable(feature = "rust1", since = "1.0.0")]
724 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
725     let a = Arc::new(oneshot::Packet::new());
726     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
727 }
728
729 /// Creates a new synchronous, bounded channel.
730 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
731 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
732 /// [`Receiver`] will block until a message becomes available. `sync_channel`
733 /// differs greatly in the semantics of the sender, however.
734 ///
735 /// This channel has an internal buffer on which messages will be queued.
736 /// `bound` specifies the buffer size. When the internal buffer becomes full,
737 /// future sends will *block* waiting for the buffer to open up. Note that a
738 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
739 /// where each [`send`] will not return until a [`recv`] is paired with it.
740 ///
741 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
742 /// times, but only one [`Receiver`] is supported.
743 ///
744 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
745 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
746 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
747 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
748 ///
749 /// [`channel`]: fn.channel.html
750 /// [`send`]: struct.SyncSender.html#method.send
751 /// [`recv`]: struct.Receiver.html#method.recv
752 /// [`SyncSender`]: struct.SyncSender.html
753 /// [`Receiver`]: struct.Receiver.html
754 /// [`SendError`]: struct.SendError.html
755 /// [`RecvError`]: struct.RecvError.html
756 ///
757 /// # Examples
758 ///
759 /// ```
760 /// use std::sync::mpsc::sync_channel;
761 /// use std::thread;
762 ///
763 /// let (sender, receiver) = sync_channel(1);
764 ///
765 /// // this returns immediately
766 /// sender.send(1).unwrap();
767 ///
768 /// thread::spawn(move|| {
769 ///     // this will block until the previous message has been received
770 ///     sender.send(2).unwrap();
771 /// });
772 ///
773 /// assert_eq!(receiver.recv().unwrap(), 1);
774 /// assert_eq!(receiver.recv().unwrap(), 2);
775 /// ```
776 #[stable(feature = "rust1", since = "1.0.0")]
777 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
778     let a = Arc::new(sync::Packet::new(bound));
779     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
780 }
781
782 ////////////////////////////////////////////////////////////////////////////////
783 // Sender
784 ////////////////////////////////////////////////////////////////////////////////
785
786 impl<T> Sender<T> {
787     fn new(inner: Flavor<T>) -> Sender<T> {
788         Sender {
789             inner: UnsafeCell::new(inner),
790         }
791     }
792
793     /// Attempts to send a value on this channel, returning it back if it could
794     /// not be sent.
795     ///
796     /// A successful send occurs when it is determined that the other end of
797     /// the channel has not hung up already. An unsuccessful send would be one
798     /// where the corresponding receiver has already been deallocated. Note
799     /// that a return value of [`Err`] means that the data will never be
800     /// received, but a return value of [`Ok`] does *not* mean that the data
801     /// will be received.  It is possible for the corresponding receiver to
802     /// hang up immediately after this function returns [`Ok`].
803     ///
804     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
805     /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
806     ///
807     /// This method will never block the current thread.
808     ///
809     /// # Examples
810     ///
811     /// ```
812     /// use std::sync::mpsc::channel;
813     ///
814     /// let (tx, rx) = channel();
815     ///
816     /// // This send is always successful
817     /// tx.send(1).unwrap();
818     ///
819     /// // This send will fail because the receiver is gone
820     /// drop(rx);
821     /// assert_eq!(tx.send(1).unwrap_err().0, 1);
822     /// ```
823     #[stable(feature = "rust1", since = "1.0.0")]
824     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
825         let (new_inner, ret) = match *unsafe { self.inner() } {
826             Flavor::Oneshot(ref p) => {
827                 if !p.sent() {
828                     return p.send(t).map_err(SendError);
829                 } else {
830                     let a = Arc::new(stream::Packet::new());
831                     let rx = Receiver::new(Flavor::Stream(a.clone()));
832                     match p.upgrade(rx) {
833                         oneshot::UpSuccess => {
834                             let ret = a.send(t);
835                             (a, ret)
836                         }
837                         oneshot::UpDisconnected => (a, Err(t)),
838                         oneshot::UpWoke(token) => {
839                             // This send cannot panic because the thread is
840                             // asleep (we're looking at it), so the receiver
841                             // can't go away.
842                             a.send(t).ok().unwrap();
843                             token.signal();
844                             (a, Ok(()))
845                         }
846                     }
847                 }
848             }
849             Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
850             Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
851             Flavor::Sync(..) => unreachable!(),
852         };
853
854         unsafe {
855             let tmp = Sender::new(Flavor::Stream(new_inner));
856             mem::swap(self.inner_mut(), tmp.inner_mut());
857         }
858         ret.map_err(SendError)
859     }
860 }
861
862 #[stable(feature = "rust1", since = "1.0.0")]
863 impl<T> Clone for Sender<T> {
864     fn clone(&self) -> Sender<T> {
865         let packet = match *unsafe { self.inner() } {
866             Flavor::Oneshot(ref p) => {
867                 let a = Arc::new(shared::Packet::new());
868                 {
869                     let guard = a.postinit_lock();
870                     let rx = Receiver::new(Flavor::Shared(a.clone()));
871                     let sleeper = match p.upgrade(rx) {
872                         oneshot::UpSuccess |
873                         oneshot::UpDisconnected => None,
874                         oneshot::UpWoke(task) => Some(task),
875                     };
876                     a.inherit_blocker(sleeper, guard);
877                 }
878                 a
879             }
880             Flavor::Stream(ref p) => {
881                 let a = Arc::new(shared::Packet::new());
882                 {
883                     let guard = a.postinit_lock();
884                     let rx = Receiver::new(Flavor::Shared(a.clone()));
885                     let sleeper = match p.upgrade(rx) {
886                         stream::UpSuccess |
887                         stream::UpDisconnected => None,
888                         stream::UpWoke(task) => Some(task),
889                     };
890                     a.inherit_blocker(sleeper, guard);
891                 }
892                 a
893             }
894             Flavor::Shared(ref p) => {
895                 p.clone_chan();
896                 return Sender::new(Flavor::Shared(p.clone()));
897             }
898             Flavor::Sync(..) => unreachable!(),
899         };
900
901         unsafe {
902             let tmp = Sender::new(Flavor::Shared(packet.clone()));
903             mem::swap(self.inner_mut(), tmp.inner_mut());
904         }
905         Sender::new(Flavor::Shared(packet))
906     }
907 }
908
909 #[stable(feature = "rust1", since = "1.0.0")]
910 impl<T> Drop for Sender<T> {
911     fn drop(&mut self) {
912         match *unsafe { self.inner() } {
913             Flavor::Oneshot(ref p) => p.drop_chan(),
914             Flavor::Stream(ref p) => p.drop_chan(),
915             Flavor::Shared(ref p) => p.drop_chan(),
916             Flavor::Sync(..) => unreachable!(),
917         }
918     }
919 }
920
921 #[stable(feature = "mpsc_debug", since = "1.8.0")]
922 impl<T> fmt::Debug for Sender<T> {
923     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
924         f.debug_struct("Sender").finish()
925     }
926 }
927
928 ////////////////////////////////////////////////////////////////////////////////
929 // SyncSender
930 ////////////////////////////////////////////////////////////////////////////////
931
932 impl<T> SyncSender<T> {
933     fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
934         SyncSender { inner: inner }
935     }
936
937     /// Sends a value on this synchronous channel.
938     ///
939     /// This function will *block* until space in the internal buffer becomes
940     /// available or a receiver is available to hand off the message to.
941     ///
942     /// Note that a successful send does *not* guarantee that the receiver will
943     /// ever see the data if there is a buffer on this channel. Items may be
944     /// enqueued in the internal buffer for the receiver to receive at a later
945     /// time. If the buffer size is 0, however, the channel becomes a rendezvous
946     /// channel and it guarantees that the receiver has indeed received
947     /// the data if this function returns success.
948     ///
949     /// This function will never panic, but it may return [`Err`] if the
950     /// [`Receiver`] has disconnected and is no longer able to receive
951     /// information.
952     ///
953     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
954     /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
955     ///
956     /// # Examples
957     ///
958     /// ```rust
959     /// use std::sync::mpsc::sync_channel;
960     /// use std::thread;
961     ///
962     /// // Create a rendezvous sync_channel with buffer size 0
963     /// let (sync_sender, receiver) = sync_channel(0);
964     ///
965     /// thread::spawn(move || {
966     ///    println!("sending message...");
967     ///    sync_sender.send(1).unwrap();
968     ///    // Thread is now blocked until the message is received
969     ///
970     ///    println!("...message received!");
971     /// });
972     ///
973     /// let msg = receiver.recv().unwrap();
974     /// assert_eq!(1, msg);
975     /// ```
976     #[stable(feature = "rust1", since = "1.0.0")]
977     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
978         self.inner.send(t).map_err(SendError)
979     }
980
981     /// Attempts to send a value on this channel without blocking.
982     ///
983     /// This method differs from [`send`] by returning immediately if the
984     /// channel's buffer is full or no receiver is waiting to acquire some
985     /// data. Compared with [`send`], this function has two failure cases
986     /// instead of one (one for disconnection, one for a full buffer).
987     ///
988     /// See [`send`] for notes about guarantees of whether the
989     /// receiver has received the data or not if this function is successful.
990     ///
991     /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
992     ///
993     /// # Examples
994     ///
995     /// ```rust
996     /// use std::sync::mpsc::sync_channel;
997     /// use std::thread;
998     ///
999     /// // Create a sync_channel with buffer size 1
1000     /// let (sync_sender, receiver) = sync_channel(1);
1001     /// let sync_sender2 = sync_sender.clone();
1002     ///
1003     /// // First thread owns sync_sender
1004     /// thread::spawn(move || {
1005     ///     sync_sender.send(1).unwrap();
1006     ///     sync_sender.send(2).unwrap();
1007     ///     // Thread blocked
1008     /// });
1009     ///
1010     /// // Second thread owns sync_sender2
1011     /// thread::spawn(move || {
1012     ///     // This will return an error and send
1013     ///     // no message if the buffer is full
1014     ///     sync_sender2.try_send(3).is_err();
1015     /// });
1016     ///
1017     /// let mut msg;
1018     /// msg = receiver.recv().unwrap();
1019     /// println!("message {} received", msg);
1020     ///
1021     /// msg = receiver.recv().unwrap();
1022     /// println!("message {} received", msg);
1023     ///
1024     /// // Third message may have never been sent
1025     /// match receiver.try_recv() {
1026     ///     Ok(msg) => println!("message {} received", msg),
1027     ///     Err(_) => println!("the third message was never sent"),
1028     /// }
1029     /// ```
1030     #[stable(feature = "rust1", since = "1.0.0")]
1031     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
1032         self.inner.try_send(t)
1033     }
1034 }
1035
1036 #[stable(feature = "rust1", since = "1.0.0")]
1037 impl<T> Clone for SyncSender<T> {
1038     fn clone(&self) -> SyncSender<T> {
1039         self.inner.clone_chan();
1040         SyncSender::new(self.inner.clone())
1041     }
1042 }
1043
1044 #[stable(feature = "rust1", since = "1.0.0")]
1045 impl<T> Drop for SyncSender<T> {
1046     fn drop(&mut self) {
1047         self.inner.drop_chan();
1048     }
1049 }
1050
1051 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1052 impl<T> fmt::Debug for SyncSender<T> {
1053     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1054         f.debug_struct("SyncSender").finish()
1055     }
1056 }
1057
1058 ////////////////////////////////////////////////////////////////////////////////
1059 // Receiver
1060 ////////////////////////////////////////////////////////////////////////////////
1061
1062 impl<T> Receiver<T> {
1063     fn new(inner: Flavor<T>) -> Receiver<T> {
1064         Receiver { inner: UnsafeCell::new(inner) }
1065     }
1066
1067     /// Attempts to return a pending value on this receiver without blocking.
1068     ///
1069     /// This method will never block the caller in order to wait for data to
1070     /// become available. Instead, this will always return immediately with a
1071     /// possible option of pending data on the channel.
1072     ///
1073     /// This is useful for a flavor of "optimistic check" before deciding to
1074     /// block on a receiver.
1075     ///
1076     /// Compared with [`recv`], this function has two failure cases instead of one
1077     /// (one for disconnection, one for an empty buffer).
1078     ///
1079     /// [`recv`]: struct.Receiver.html#method.recv
1080     ///
1081     /// # Examples
1082     ///
1083     /// ```rust
1084     /// use std::sync::mpsc::{Receiver, channel};
1085     ///
1086     /// let (_, receiver): (_, Receiver<i32>) = channel();
1087     ///
1088     /// assert!(receiver.try_recv().is_err());
1089     /// ```
1090     #[stable(feature = "rust1", since = "1.0.0")]
1091     pub fn try_recv(&self) -> Result<T, TryRecvError> {
1092         loop {
1093             let new_port = match *unsafe { self.inner() } {
1094                 Flavor::Oneshot(ref p) => {
1095                     match p.try_recv() {
1096                         Ok(t) => return Ok(t),
1097                         Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1098                         Err(oneshot::Disconnected) => {
1099                             return Err(TryRecvError::Disconnected)
1100                         }
1101                         Err(oneshot::Upgraded(rx)) => rx,
1102                     }
1103                 }
1104                 Flavor::Stream(ref p) => {
1105                     match p.try_recv() {
1106                         Ok(t) => return Ok(t),
1107                         Err(stream::Empty) => return Err(TryRecvError::Empty),
1108                         Err(stream::Disconnected) => {
1109                             return Err(TryRecvError::Disconnected)
1110                         }
1111                         Err(stream::Upgraded(rx)) => rx,
1112                     }
1113                 }
1114                 Flavor::Shared(ref p) => {
1115                     match p.try_recv() {
1116                         Ok(t) => return Ok(t),
1117                         Err(shared::Empty) => return Err(TryRecvError::Empty),
1118                         Err(shared::Disconnected) => {
1119                             return Err(TryRecvError::Disconnected)
1120                         }
1121                     }
1122                 }
1123                 Flavor::Sync(ref p) => {
1124                     match p.try_recv() {
1125                         Ok(t) => return Ok(t),
1126                         Err(sync::Empty) => return Err(TryRecvError::Empty),
1127                         Err(sync::Disconnected) => {
1128                             return Err(TryRecvError::Disconnected)
1129                         }
1130                     }
1131                 }
1132             };
1133             unsafe {
1134                 mem::swap(self.inner_mut(),
1135                           new_port.inner_mut());
1136             }
1137         }
1138     }
1139
1140     /// Attempts to wait for a value on this receiver, returning an error if the
1141     /// corresponding channel has hung up.
1142     ///
1143     /// This function will always block the current thread if there is no data
1144     /// available and it's possible for more data to be sent. Once a message is
1145     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1146     /// receiver will wake up and return that message.
1147     ///
1148     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1149     /// this call is blocking, this call will wake up and return [`Err`] to
1150     /// indicate that no more messages can ever be received on this channel.
1151     /// However, since channels are buffered, messages sent before the disconnect
1152     /// will still be properly received.
1153     ///
1154     /// [`Sender`]: struct.Sender.html
1155     /// [`SyncSender`]: struct.SyncSender.html
1156     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1157     ///
1158     /// # Examples
1159     ///
1160     /// ```
1161     /// use std::sync::mpsc;
1162     /// use std::thread;
1163     ///
1164     /// let (send, recv) = mpsc::channel();
1165     /// let handle = thread::spawn(move || {
1166     ///     send.send(1u8).unwrap();
1167     /// });
1168     ///
1169     /// handle.join().unwrap();
1170     ///
1171     /// assert_eq!(Ok(1), recv.recv());
1172     /// ```
1173     ///
1174     /// Buffering behavior:
1175     ///
1176     /// ```
1177     /// use std::sync::mpsc;
1178     /// use std::thread;
1179     /// use std::sync::mpsc::RecvError;
1180     ///
1181     /// let (send, recv) = mpsc::channel();
1182     /// let handle = thread::spawn(move || {
1183     ///     send.send(1u8).unwrap();
1184     ///     send.send(2).unwrap();
1185     ///     send.send(3).unwrap();
1186     ///     drop(send);
1187     /// });
1188     ///
1189     /// // wait for the thread to join so we ensure the sender is dropped
1190     /// handle.join().unwrap();
1191     ///
1192     /// assert_eq!(Ok(1), recv.recv());
1193     /// assert_eq!(Ok(2), recv.recv());
1194     /// assert_eq!(Ok(3), recv.recv());
1195     /// assert_eq!(Err(RecvError), recv.recv());
1196     /// ```
1197     #[stable(feature = "rust1", since = "1.0.0")]
1198     pub fn recv(&self) -> Result<T, RecvError> {
1199         loop {
1200             let new_port = match *unsafe { self.inner() } {
1201                 Flavor::Oneshot(ref p) => {
1202                     match p.recv(None) {
1203                         Ok(t) => return Ok(t),
1204                         Err(oneshot::Disconnected) => return Err(RecvError),
1205                         Err(oneshot::Upgraded(rx)) => rx,
1206                         Err(oneshot::Empty) => unreachable!(),
1207                     }
1208                 }
1209                 Flavor::Stream(ref p) => {
1210                     match p.recv(None) {
1211                         Ok(t) => return Ok(t),
1212                         Err(stream::Disconnected) => return Err(RecvError),
1213                         Err(stream::Upgraded(rx)) => rx,
1214                         Err(stream::Empty) => unreachable!(),
1215                     }
1216                 }
1217                 Flavor::Shared(ref p) => {
1218                     match p.recv(None) {
1219                         Ok(t) => return Ok(t),
1220                         Err(shared::Disconnected) => return Err(RecvError),
1221                         Err(shared::Empty) => unreachable!(),
1222                     }
1223                 }
1224                 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1225             };
1226             unsafe {
1227                 mem::swap(self.inner_mut(), new_port.inner_mut());
1228             }
1229         }
1230     }
1231
1232     /// Attempts to wait for a value on this receiver, returning an error if the
1233     /// corresponding channel has hung up, or if it waits more than `timeout`.
1234     ///
1235     /// This function will always block the current thread if there is no data
1236     /// available and it's possible for more data to be sent. Once a message is
1237     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1238     /// receiver will wake up and return that message.
1239     ///
1240     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1241     /// this call is blocking, this call will wake up and return [`Err`] to
1242     /// indicate that no more messages can ever be received on this channel.
1243     /// However, since channels are buffered, messages sent before the disconnect
1244     /// will still be properly received.
1245     ///
1246     /// [`Sender`]: struct.Sender.html
1247     /// [`SyncSender`]: struct.SyncSender.html
1248     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1249     ///
1250     /// # Examples
1251     ///
1252     /// Successfully receiving value before encountering timeout:
1253     ///
1254     /// ```no_run
1255     /// use std::thread;
1256     /// use std::time::Duration;
1257     /// use std::sync::mpsc;
1258     ///
1259     /// let (send, recv) = mpsc::channel();
1260     ///
1261     /// thread::spawn(move || {
1262     ///     send.send('a').unwrap();
1263     /// });
1264     ///
1265     /// assert_eq!(
1266     ///     recv.recv_timeout(Duration::from_millis(400)),
1267     ///     Ok('a')
1268     /// );
1269     /// ```
1270     ///
1271     /// Receiving an error upon reaching timeout:
1272     ///
1273     /// ```no_run
1274     /// use std::thread;
1275     /// use std::time::Duration;
1276     /// use std::sync::mpsc;
1277     ///
1278     /// let (send, recv) = mpsc::channel();
1279     ///
1280     /// thread::spawn(move || {
1281     ///     thread::sleep(Duration::from_millis(800));
1282     ///     send.send('a').unwrap();
1283     /// });
1284     ///
1285     /// assert_eq!(
1286     ///     recv.recv_timeout(Duration::from_millis(400)),
1287     ///     Err(mpsc::RecvTimeoutError::Timeout)
1288     /// );
1289     /// ```
1290     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1291     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1292         // Do an optimistic try_recv to avoid the performance impact of
1293         // Instant::now() in the full-channel case.
1294         match self.try_recv() {
1295             Ok(result)
1296                 => Ok(result),
1297             Err(TryRecvError::Disconnected)
1298                 => Err(RecvTimeoutError::Disconnected),
1299             Err(TryRecvError::Empty)
1300                 => self.recv_deadline(Instant::now() + timeout)
1301         }
1302     }
1303
1304     /// Attempts to wait for a value on this receiver, returning an error if the
1305     /// corresponding channel has hung up, or if `deadline` is reached.
1306     ///
1307     /// This function will always block the current thread if there is no data
1308     /// available and it's possible for more data to be sent. Once a message is
1309     /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1310     /// receiver will wake up and return that message.
1311     ///
1312     /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1313     /// this call is blocking, this call will wake up and return [`Err`] to
1314     /// indicate that no more messages can ever be received on this channel.
1315     /// However, since channels are buffered, messages sent before the disconnect
1316     /// will still be properly received.
1317     ///
1318     /// [`Sender`]: struct.Sender.html
1319     /// [`SyncSender`]: struct.SyncSender.html
1320     /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1321     ///
1322     /// # Examples
1323     ///
1324     /// Successfully receiving value before reaching deadline:
1325     ///
1326     /// ```no_run
1327     /// #![feature(deadline_api)]
1328     /// use std::thread;
1329     /// use std::time::{Duration, Instant};
1330     /// use std::sync::mpsc;
1331     ///
1332     /// let (send, recv) = mpsc::channel();
1333     ///
1334     /// thread::spawn(move || {
1335     ///     send.send('a').unwrap();
1336     /// });
1337     ///
1338     /// assert_eq!(
1339     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1340     ///     Ok('a')
1341     /// );
1342     /// ```
1343     ///
1344     /// Receiving an error upon reaching deadline:
1345     ///
1346     /// ```no_run
1347     /// #![feature(deadline_api)]
1348     /// use std::thread;
1349     /// use std::time::{Duration, Instant};
1350     /// use std::sync::mpsc;
1351     ///
1352     /// let (send, recv) = mpsc::channel();
1353     ///
1354     /// thread::spawn(move || {
1355     ///     thread::sleep(Duration::from_millis(800));
1356     ///     send.send('a').unwrap();
1357     /// });
1358     ///
1359     /// assert_eq!(
1360     ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1361     ///     Err(mpsc::RecvTimeoutError::Timeout)
1362     /// );
1363     /// ```
1364     #[unstable(feature = "deadline_api", issue = "46316")]
1365     pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1366         use self::RecvTimeoutError::*;
1367
1368         loop {
1369             let port_or_empty = match *unsafe { self.inner() } {
1370                 Flavor::Oneshot(ref p) => {
1371                     match p.recv(Some(deadline)) {
1372                         Ok(t) => return Ok(t),
1373                         Err(oneshot::Disconnected) => return Err(Disconnected),
1374                         Err(oneshot::Upgraded(rx)) => Some(rx),
1375                         Err(oneshot::Empty) => None,
1376                     }
1377                 }
1378                 Flavor::Stream(ref p) => {
1379                     match p.recv(Some(deadline)) {
1380                         Ok(t) => return Ok(t),
1381                         Err(stream::Disconnected) => return Err(Disconnected),
1382                         Err(stream::Upgraded(rx)) => Some(rx),
1383                         Err(stream::Empty) => None,
1384                     }
1385                 }
1386                 Flavor::Shared(ref p) => {
1387                     match p.recv(Some(deadline)) {
1388                         Ok(t) => return Ok(t),
1389                         Err(shared::Disconnected) => return Err(Disconnected),
1390                         Err(shared::Empty) => None,
1391                     }
1392                 }
1393                 Flavor::Sync(ref p) => {
1394                     match p.recv(Some(deadline)) {
1395                         Ok(t) => return Ok(t),
1396                         Err(sync::Disconnected) => return Err(Disconnected),
1397                         Err(sync::Empty) => None,
1398                     }
1399                 }
1400             };
1401
1402             if let Some(new_port) = port_or_empty {
1403                 unsafe {
1404                     mem::swap(self.inner_mut(), new_port.inner_mut());
1405                 }
1406             }
1407
1408             // If we're already passed the deadline, and we're here without
1409             // data, return a timeout, else try again.
1410             if Instant::now() >= deadline {
1411                 return Err(Timeout);
1412             }
1413         }
1414     }
1415
1416     /// Returns an iterator that will block waiting for messages, but never
1417     /// [`panic!`]. It will return [`None`] when the channel has hung up.
1418     ///
1419     /// [`panic!`]: ../../../std/macro.panic.html
1420     /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1421     ///
1422     /// # Examples
1423     ///
1424     /// ```rust
1425     /// use std::sync::mpsc::channel;
1426     /// use std::thread;
1427     ///
1428     /// let (send, recv) = channel();
1429     ///
1430     /// thread::spawn(move || {
1431     ///     send.send(1).unwrap();
1432     ///     send.send(2).unwrap();
1433     ///     send.send(3).unwrap();
1434     /// });
1435     ///
1436     /// let mut iter = recv.iter();
1437     /// assert_eq!(iter.next(), Some(1));
1438     /// assert_eq!(iter.next(), Some(2));
1439     /// assert_eq!(iter.next(), Some(3));
1440     /// assert_eq!(iter.next(), None);
1441     /// ```
1442     #[stable(feature = "rust1", since = "1.0.0")]
1443     pub fn iter(&self) -> Iter<T> {
1444         Iter { rx: self }
1445     }
1446
1447     /// Returns an iterator that will attempt to yield all pending values.
1448     /// It will return `None` if there are no more pending values or if the
1449     /// channel has hung up. The iterator will never [`panic!`] or block the
1450     /// user by waiting for values.
1451     ///
1452     /// [`panic!`]: ../../../std/macro.panic.html
1453     ///
1454     /// # Examples
1455     ///
1456     /// ```no_run
1457     /// use std::sync::mpsc::channel;
1458     /// use std::thread;
1459     /// use std::time::Duration;
1460     ///
1461     /// let (sender, receiver) = channel();
1462     ///
1463     /// // nothing is in the buffer yet
1464     /// assert!(receiver.try_iter().next().is_none());
1465     ///
1466     /// thread::spawn(move || {
1467     ///     thread::sleep(Duration::from_secs(1));
1468     ///     sender.send(1).unwrap();
1469     ///     sender.send(2).unwrap();
1470     ///     sender.send(3).unwrap();
1471     /// });
1472     ///
1473     /// // nothing is in the buffer yet
1474     /// assert!(receiver.try_iter().next().is_none());
1475     ///
1476     /// // block for two seconds
1477     /// thread::sleep(Duration::from_secs(2));
1478     ///
1479     /// let mut iter = receiver.try_iter();
1480     /// assert_eq!(iter.next(), Some(1));
1481     /// assert_eq!(iter.next(), Some(2));
1482     /// assert_eq!(iter.next(), Some(3));
1483     /// assert_eq!(iter.next(), None);
1484     /// ```
1485     #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1486     pub fn try_iter(&self) -> TryIter<T> {
1487         TryIter { rx: self }
1488     }
1489
1490 }
1491
1492 impl<T> select::Packet for Receiver<T> {
1493     fn can_recv(&self) -> bool {
1494         loop {
1495             let new_port = match *unsafe { self.inner() } {
1496                 Flavor::Oneshot(ref p) => {
1497                     match p.can_recv() {
1498                         Ok(ret) => return ret,
1499                         Err(upgrade) => upgrade,
1500                     }
1501                 }
1502                 Flavor::Stream(ref p) => {
1503                     match p.can_recv() {
1504                         Ok(ret) => return ret,
1505                         Err(upgrade) => upgrade,
1506                     }
1507                 }
1508                 Flavor::Shared(ref p) => return p.can_recv(),
1509                 Flavor::Sync(ref p) => return p.can_recv(),
1510             };
1511             unsafe {
1512                 mem::swap(self.inner_mut(),
1513                           new_port.inner_mut());
1514             }
1515         }
1516     }
1517
1518     fn start_selection(&self, mut token: SignalToken) -> StartResult {
1519         loop {
1520             let (t, new_port) = match *unsafe { self.inner() } {
1521                 Flavor::Oneshot(ref p) => {
1522                     match p.start_selection(token) {
1523                         oneshot::SelSuccess => return Installed,
1524                         oneshot::SelCanceled => return Abort,
1525                         oneshot::SelUpgraded(t, rx) => (t, rx),
1526                     }
1527                 }
1528                 Flavor::Stream(ref p) => {
1529                     match p.start_selection(token) {
1530                         stream::SelSuccess => return Installed,
1531                         stream::SelCanceled => return Abort,
1532                         stream::SelUpgraded(t, rx) => (t, rx),
1533                     }
1534                 }
1535                 Flavor::Shared(ref p) => return p.start_selection(token),
1536                 Flavor::Sync(ref p) => return p.start_selection(token),
1537             };
1538             token = t;
1539             unsafe {
1540                 mem::swap(self.inner_mut(), new_port.inner_mut());
1541             }
1542         }
1543     }
1544
1545     fn abort_selection(&self) -> bool {
1546         let mut was_upgrade = false;
1547         loop {
1548             let result = match *unsafe { self.inner() } {
1549                 Flavor::Oneshot(ref p) => p.abort_selection(),
1550                 Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
1551                 Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
1552                 Flavor::Sync(ref p) => return p.abort_selection(),
1553             };
1554             let new_port = match result { Ok(b) => return b, Err(p) => p };
1555             was_upgrade = true;
1556             unsafe {
1557                 mem::swap(self.inner_mut(),
1558                           new_port.inner_mut());
1559             }
1560         }
1561     }
1562 }
1563
1564 #[stable(feature = "rust1", since = "1.0.0")]
1565 impl<'a, T> Iterator for Iter<'a, T> {
1566     type Item = T;
1567
1568     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1569 }
1570
1571 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1572 impl<'a, T> Iterator for TryIter<'a, T> {
1573     type Item = T;
1574
1575     fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
1576 }
1577
1578 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1579 impl<'a, T> IntoIterator for &'a Receiver<T> {
1580     type Item = T;
1581     type IntoIter = Iter<'a, T>;
1582
1583     fn into_iter(self) -> Iter<'a, T> { self.iter() }
1584 }
1585
1586 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1587 impl<T> Iterator for IntoIter<T> {
1588     type Item = T;
1589     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1590 }
1591
1592 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1593 impl <T> IntoIterator for Receiver<T> {
1594     type Item = T;
1595     type IntoIter = IntoIter<T>;
1596
1597     fn into_iter(self) -> IntoIter<T> {
1598         IntoIter { rx: self }
1599     }
1600 }
1601
1602 #[stable(feature = "rust1", since = "1.0.0")]
1603 impl<T> Drop for Receiver<T> {
1604     fn drop(&mut self) {
1605         match *unsafe { self.inner() } {
1606             Flavor::Oneshot(ref p) => p.drop_port(),
1607             Flavor::Stream(ref p) => p.drop_port(),
1608             Flavor::Shared(ref p) => p.drop_port(),
1609             Flavor::Sync(ref p) => p.drop_port(),
1610         }
1611     }
1612 }
1613
1614 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1615 impl<T> fmt::Debug for Receiver<T> {
1616     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1617         f.debug_struct("Receiver").finish()
1618     }
1619 }
1620
1621 #[stable(feature = "rust1", since = "1.0.0")]
1622 impl<T> fmt::Debug for SendError<T> {
1623     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1624         "SendError(..)".fmt(f)
1625     }
1626 }
1627
1628 #[stable(feature = "rust1", since = "1.0.0")]
1629 impl<T> fmt::Display for SendError<T> {
1630     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1631         "sending on a closed channel".fmt(f)
1632     }
1633 }
1634
1635 #[stable(feature = "rust1", since = "1.0.0")]
1636 impl<T: Send> error::Error for SendError<T> {
1637     fn description(&self) -> &str {
1638         "sending on a closed channel"
1639     }
1640
1641     fn cause(&self) -> Option<&error::Error> {
1642         None
1643     }
1644 }
1645
1646 #[stable(feature = "rust1", since = "1.0.0")]
1647 impl<T> fmt::Debug for TrySendError<T> {
1648     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1649         match *self {
1650             TrySendError::Full(..) => "Full(..)".fmt(f),
1651             TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1652         }
1653     }
1654 }
1655
1656 #[stable(feature = "rust1", since = "1.0.0")]
1657 impl<T> fmt::Display for TrySendError<T> {
1658     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1659         match *self {
1660             TrySendError::Full(..) => {
1661                 "sending on a full channel".fmt(f)
1662             }
1663             TrySendError::Disconnected(..) => {
1664                 "sending on a closed channel".fmt(f)
1665             }
1666         }
1667     }
1668 }
1669
1670 #[stable(feature = "rust1", since = "1.0.0")]
1671 impl<T: Send> error::Error for TrySendError<T> {
1672
1673     fn description(&self) -> &str {
1674         match *self {
1675             TrySendError::Full(..) => {
1676                 "sending on a full channel"
1677             }
1678             TrySendError::Disconnected(..) => {
1679                 "sending on a closed channel"
1680             }
1681         }
1682     }
1683
1684     fn cause(&self) -> Option<&error::Error> {
1685         None
1686     }
1687 }
1688
1689 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1690 impl<T> From<SendError<T>> for TrySendError<T> {
1691     fn from(err: SendError<T>) -> TrySendError<T> {
1692         match err {
1693             SendError(t) => TrySendError::Disconnected(t),
1694         }
1695     }
1696 }
1697
1698 #[stable(feature = "rust1", since = "1.0.0")]
1699 impl fmt::Display for RecvError {
1700     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1701         "receiving on a closed channel".fmt(f)
1702     }
1703 }
1704
1705 #[stable(feature = "rust1", since = "1.0.0")]
1706 impl error::Error for RecvError {
1707
1708     fn description(&self) -> &str {
1709         "receiving on a closed channel"
1710     }
1711
1712     fn cause(&self) -> Option<&error::Error> {
1713         None
1714     }
1715 }
1716
1717 #[stable(feature = "rust1", since = "1.0.0")]
1718 impl fmt::Display for TryRecvError {
1719     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1720         match *self {
1721             TryRecvError::Empty => {
1722                 "receiving on an empty channel".fmt(f)
1723             }
1724             TryRecvError::Disconnected => {
1725                 "receiving on a closed channel".fmt(f)
1726             }
1727         }
1728     }
1729 }
1730
1731 #[stable(feature = "rust1", since = "1.0.0")]
1732 impl error::Error for TryRecvError {
1733
1734     fn description(&self) -> &str {
1735         match *self {
1736             TryRecvError::Empty => {
1737                 "receiving on an empty channel"
1738             }
1739             TryRecvError::Disconnected => {
1740                 "receiving on a closed channel"
1741             }
1742         }
1743     }
1744
1745     fn cause(&self) -> Option<&error::Error> {
1746         None
1747     }
1748 }
1749
1750 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1751 impl From<RecvError> for TryRecvError {
1752     fn from(err: RecvError) -> TryRecvError {
1753         match err {
1754             RecvError => TryRecvError::Disconnected,
1755         }
1756     }
1757 }
1758
1759 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1760 impl fmt::Display for RecvTimeoutError {
1761     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1762         match *self {
1763             RecvTimeoutError::Timeout => {
1764                 "timed out waiting on channel".fmt(f)
1765             }
1766             RecvTimeoutError::Disconnected => {
1767                 "channel is empty and sending half is closed".fmt(f)
1768             }
1769         }
1770     }
1771 }
1772
1773 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1774 impl error::Error for RecvTimeoutError {
1775     fn description(&self) -> &str {
1776         match *self {
1777             RecvTimeoutError::Timeout => {
1778                 "timed out waiting on channel"
1779             }
1780             RecvTimeoutError::Disconnected => {
1781                 "channel is empty and sending half is closed"
1782             }
1783         }
1784     }
1785
1786     fn cause(&self) -> Option<&error::Error> {
1787         None
1788     }
1789 }
1790
1791 #[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1792 impl From<RecvError> for RecvTimeoutError {
1793     fn from(err: RecvError) -> RecvTimeoutError {
1794         match err {
1795             RecvError => RecvTimeoutError::Disconnected,
1796         }
1797     }
1798 }
1799
1800 #[cfg(all(test, not(target_os = "emscripten")))]
1801 mod tests {
1802     use env;
1803     use super::*;
1804     use thread;
1805     use time::{Duration, Instant};
1806
1807     pub fn stress_factor() -> usize {
1808         match env::var("RUST_TEST_STRESS") {
1809             Ok(val) => val.parse().unwrap(),
1810             Err(..) => 1,
1811         }
1812     }
1813
1814     #[test]
1815     fn smoke() {
1816         let (tx, rx) = channel::<i32>();
1817         tx.send(1).unwrap();
1818         assert_eq!(rx.recv().unwrap(), 1);
1819     }
1820
1821     #[test]
1822     fn drop_full() {
1823         let (tx, _rx) = channel::<Box<isize>>();
1824         tx.send(box 1).unwrap();
1825     }
1826
1827     #[test]
1828     fn drop_full_shared() {
1829         let (tx, _rx) = channel::<Box<isize>>();
1830         drop(tx.clone());
1831         drop(tx.clone());
1832         tx.send(box 1).unwrap();
1833     }
1834
1835     #[test]
1836     fn smoke_shared() {
1837         let (tx, rx) = channel::<i32>();
1838         tx.send(1).unwrap();
1839         assert_eq!(rx.recv().unwrap(), 1);
1840         let tx = tx.clone();
1841         tx.send(1).unwrap();
1842         assert_eq!(rx.recv().unwrap(), 1);
1843     }
1844
1845     #[test]
1846     fn smoke_threads() {
1847         let (tx, rx) = channel::<i32>();
1848         let _t = thread::spawn(move|| {
1849             tx.send(1).unwrap();
1850         });
1851         assert_eq!(rx.recv().unwrap(), 1);
1852     }
1853
1854     #[test]
1855     fn smoke_port_gone() {
1856         let (tx, rx) = channel::<i32>();
1857         drop(rx);
1858         assert!(tx.send(1).is_err());
1859     }
1860
1861     #[test]
1862     fn smoke_shared_port_gone() {
1863         let (tx, rx) = channel::<i32>();
1864         drop(rx);
1865         assert!(tx.send(1).is_err())
1866     }
1867
1868     #[test]
1869     fn smoke_shared_port_gone2() {
1870         let (tx, rx) = channel::<i32>();
1871         drop(rx);
1872         let tx2 = tx.clone();
1873         drop(tx);
1874         assert!(tx2.send(1).is_err());
1875     }
1876
1877     #[test]
1878     fn port_gone_concurrent() {
1879         let (tx, rx) = channel::<i32>();
1880         let _t = thread::spawn(move|| {
1881             rx.recv().unwrap();
1882         });
1883         while tx.send(1).is_ok() {}
1884     }
1885
1886     #[test]
1887     fn port_gone_concurrent_shared() {
1888         let (tx, rx) = channel::<i32>();
1889         let tx2 = tx.clone();
1890         let _t = thread::spawn(move|| {
1891             rx.recv().unwrap();
1892         });
1893         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1894     }
1895
1896     #[test]
1897     fn smoke_chan_gone() {
1898         let (tx, rx) = channel::<i32>();
1899         drop(tx);
1900         assert!(rx.recv().is_err());
1901     }
1902
1903     #[test]
1904     fn smoke_chan_gone_shared() {
1905         let (tx, rx) = channel::<()>();
1906         let tx2 = tx.clone();
1907         drop(tx);
1908         drop(tx2);
1909         assert!(rx.recv().is_err());
1910     }
1911
1912     #[test]
1913     fn chan_gone_concurrent() {
1914         let (tx, rx) = channel::<i32>();
1915         let _t = thread::spawn(move|| {
1916             tx.send(1).unwrap();
1917             tx.send(1).unwrap();
1918         });
1919         while rx.recv().is_ok() {}
1920     }
1921
1922     #[test]
1923     fn stress() {
1924         let (tx, rx) = channel::<i32>();
1925         let t = thread::spawn(move|| {
1926             for _ in 0..10000 { tx.send(1).unwrap(); }
1927         });
1928         for _ in 0..10000 {
1929             assert_eq!(rx.recv().unwrap(), 1);
1930         }
1931         t.join().ok().unwrap();
1932     }
1933
1934     #[test]
1935     fn stress_shared() {
1936         const AMT: u32 = 10000;
1937         const NTHREADS: u32 = 8;
1938         let (tx, rx) = channel::<i32>();
1939
1940         let t = thread::spawn(move|| {
1941             for _ in 0..AMT * NTHREADS {
1942                 assert_eq!(rx.recv().unwrap(), 1);
1943             }
1944             match rx.try_recv() {
1945                 Ok(..) => panic!(),
1946                 _ => {}
1947             }
1948         });
1949
1950         for _ in 0..NTHREADS {
1951             let tx = tx.clone();
1952             thread::spawn(move|| {
1953                 for _ in 0..AMT { tx.send(1).unwrap(); }
1954             });
1955         }
1956         drop(tx);
1957         t.join().ok().unwrap();
1958     }
1959
1960     #[test]
1961     fn send_from_outside_runtime() {
1962         let (tx1, rx1) = channel::<()>();
1963         let (tx2, rx2) = channel::<i32>();
1964         let t1 = thread::spawn(move|| {
1965             tx1.send(()).unwrap();
1966             for _ in 0..40 {
1967                 assert_eq!(rx2.recv().unwrap(), 1);
1968             }
1969         });
1970         rx1.recv().unwrap();
1971         let t2 = thread::spawn(move|| {
1972             for _ in 0..40 {
1973                 tx2.send(1).unwrap();
1974             }
1975         });
1976         t1.join().ok().unwrap();
1977         t2.join().ok().unwrap();
1978     }
1979
1980     #[test]
1981     fn recv_from_outside_runtime() {
1982         let (tx, rx) = channel::<i32>();
1983         let t = thread::spawn(move|| {
1984             for _ in 0..40 {
1985                 assert_eq!(rx.recv().unwrap(), 1);
1986             }
1987         });
1988         for _ in 0..40 {
1989             tx.send(1).unwrap();
1990         }
1991         t.join().ok().unwrap();
1992     }
1993
1994     #[test]
1995     fn no_runtime() {
1996         let (tx1, rx1) = channel::<i32>();
1997         let (tx2, rx2) = channel::<i32>();
1998         let t1 = thread::spawn(move|| {
1999             assert_eq!(rx1.recv().unwrap(), 1);
2000             tx2.send(2).unwrap();
2001         });
2002         let t2 = thread::spawn(move|| {
2003             tx1.send(1).unwrap();
2004             assert_eq!(rx2.recv().unwrap(), 2);
2005         });
2006         t1.join().ok().unwrap();
2007         t2.join().ok().unwrap();
2008     }
2009
2010     #[test]
2011     fn oneshot_single_thread_close_port_first() {
2012         // Simple test of closing without sending
2013         let (_tx, rx) = channel::<i32>();
2014         drop(rx);
2015     }
2016
2017     #[test]
2018     fn oneshot_single_thread_close_chan_first() {
2019         // Simple test of closing without sending
2020         let (tx, _rx) = channel::<i32>();
2021         drop(tx);
2022     }
2023
2024     #[test]
2025     fn oneshot_single_thread_send_port_close() {
2026         // Testing that the sender cleans up the payload if receiver is closed
2027         let (tx, rx) = channel::<Box<i32>>();
2028         drop(rx);
2029         assert!(tx.send(box 0).is_err());
2030     }
2031
2032     #[test]
2033     fn oneshot_single_thread_recv_chan_close() {
2034         // Receiving on a closed chan will panic
2035         let res = thread::spawn(move|| {
2036             let (tx, rx) = channel::<i32>();
2037             drop(tx);
2038             rx.recv().unwrap();
2039         }).join();
2040         // What is our res?
2041         assert!(res.is_err());
2042     }
2043
2044     #[test]
2045     fn oneshot_single_thread_send_then_recv() {
2046         let (tx, rx) = channel::<Box<i32>>();
2047         tx.send(box 10).unwrap();
2048         assert!(*rx.recv().unwrap() == 10);
2049     }
2050
2051     #[test]
2052     fn oneshot_single_thread_try_send_open() {
2053         let (tx, rx) = channel::<i32>();
2054         assert!(tx.send(10).is_ok());
2055         assert!(rx.recv().unwrap() == 10);
2056     }
2057
2058     #[test]
2059     fn oneshot_single_thread_try_send_closed() {
2060         let (tx, rx) = channel::<i32>();
2061         drop(rx);
2062         assert!(tx.send(10).is_err());
2063     }
2064
2065     #[test]
2066     fn oneshot_single_thread_try_recv_open() {
2067         let (tx, rx) = channel::<i32>();
2068         tx.send(10).unwrap();
2069         assert!(rx.recv() == Ok(10));
2070     }
2071
2072     #[test]
2073     fn oneshot_single_thread_try_recv_closed() {
2074         let (tx, rx) = channel::<i32>();
2075         drop(tx);
2076         assert!(rx.recv().is_err());
2077     }
2078
2079     #[test]
2080     fn oneshot_single_thread_peek_data() {
2081         let (tx, rx) = channel::<i32>();
2082         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2083         tx.send(10).unwrap();
2084         assert_eq!(rx.try_recv(), Ok(10));
2085     }
2086
2087     #[test]
2088     fn oneshot_single_thread_peek_close() {
2089         let (tx, rx) = channel::<i32>();
2090         drop(tx);
2091         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2092         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2093     }
2094
2095     #[test]
2096     fn oneshot_single_thread_peek_open() {
2097         let (_tx, rx) = channel::<i32>();
2098         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2099     }
2100
2101     #[test]
2102     fn oneshot_multi_task_recv_then_send() {
2103         let (tx, rx) = channel::<Box<i32>>();
2104         let _t = thread::spawn(move|| {
2105             assert!(*rx.recv().unwrap() == 10);
2106         });
2107
2108         tx.send(box 10).unwrap();
2109     }
2110
2111     #[test]
2112     fn oneshot_multi_task_recv_then_close() {
2113         let (tx, rx) = channel::<Box<i32>>();
2114         let _t = thread::spawn(move|| {
2115             drop(tx);
2116         });
2117         let res = thread::spawn(move|| {
2118             assert!(*rx.recv().unwrap() == 10);
2119         }).join();
2120         assert!(res.is_err());
2121     }
2122
2123     #[test]
2124     fn oneshot_multi_thread_close_stress() {
2125         for _ in 0..stress_factor() {
2126             let (tx, rx) = channel::<i32>();
2127             let _t = thread::spawn(move|| {
2128                 drop(rx);
2129             });
2130             drop(tx);
2131         }
2132     }
2133
2134     #[test]
2135     fn oneshot_multi_thread_send_close_stress() {
2136         for _ in 0..stress_factor() {
2137             let (tx, rx) = channel::<i32>();
2138             let _t = thread::spawn(move|| {
2139                 drop(rx);
2140             });
2141             let _ = thread::spawn(move|| {
2142                 tx.send(1).unwrap();
2143             }).join();
2144         }
2145     }
2146
2147     #[test]
2148     fn oneshot_multi_thread_recv_close_stress() {
2149         for _ in 0..stress_factor() {
2150             let (tx, rx) = channel::<i32>();
2151             thread::spawn(move|| {
2152                 let res = thread::spawn(move|| {
2153                     rx.recv().unwrap();
2154                 }).join();
2155                 assert!(res.is_err());
2156             });
2157             let _t = thread::spawn(move|| {
2158                 thread::spawn(move|| {
2159                     drop(tx);
2160                 });
2161             });
2162         }
2163     }
2164
2165     #[test]
2166     fn oneshot_multi_thread_send_recv_stress() {
2167         for _ in 0..stress_factor() {
2168             let (tx, rx) = channel::<Box<isize>>();
2169             let _t = thread::spawn(move|| {
2170                 tx.send(box 10).unwrap();
2171             });
2172             assert!(*rx.recv().unwrap() == 10);
2173         }
2174     }
2175
2176     #[test]
2177     fn stream_send_recv_stress() {
2178         for _ in 0..stress_factor() {
2179             let (tx, rx) = channel();
2180
2181             send(tx, 0);
2182             recv(rx, 0);
2183
2184             fn send(tx: Sender<Box<i32>>, i: i32) {
2185                 if i == 10 { return }
2186
2187                 thread::spawn(move|| {
2188                     tx.send(box i).unwrap();
2189                     send(tx, i + 1);
2190                 });
2191             }
2192
2193             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2194                 if i == 10 { return }
2195
2196                 thread::spawn(move|| {
2197                     assert!(*rx.recv().unwrap() == i);
2198                     recv(rx, i + 1);
2199                 });
2200             }
2201         }
2202     }
2203
2204     #[test]
2205     fn oneshot_single_thread_recv_timeout() {
2206         let (tx, rx) = channel();
2207         tx.send(()).unwrap();
2208         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2209         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2210         tx.send(()).unwrap();
2211         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2212     }
2213
2214     #[test]
2215     fn stress_recv_timeout_two_threads() {
2216         let (tx, rx) = channel();
2217         let stress = stress_factor() + 100;
2218         let timeout = Duration::from_millis(100);
2219
2220         thread::spawn(move || {
2221             for i in 0..stress {
2222                 if i % 2 == 0 {
2223                     thread::sleep(timeout * 2);
2224                 }
2225                 tx.send(1usize).unwrap();
2226             }
2227         });
2228
2229         let mut recv_count = 0;
2230         loop {
2231             match rx.recv_timeout(timeout) {
2232                 Ok(n) => {
2233                     assert_eq!(n, 1usize);
2234                     recv_count += 1;
2235                 }
2236                 Err(RecvTimeoutError::Timeout) => continue,
2237                 Err(RecvTimeoutError::Disconnected) => break,
2238             }
2239         }
2240
2241         assert_eq!(recv_count, stress);
2242     }
2243
2244     #[test]
2245     fn recv_timeout_upgrade() {
2246         let (tx, rx) = channel::<()>();
2247         let timeout = Duration::from_millis(1);
2248         let _tx_clone = tx.clone();
2249
2250         let start = Instant::now();
2251         assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
2252         assert!(Instant::now() >= start + timeout);
2253     }
2254
2255     #[test]
2256     fn stress_recv_timeout_shared() {
2257         let (tx, rx) = channel();
2258         let stress = stress_factor() + 100;
2259
2260         for i in 0..stress {
2261             let tx = tx.clone();
2262             thread::spawn(move || {
2263                 thread::sleep(Duration::from_millis(i as u64 * 10));
2264                 tx.send(1usize).unwrap();
2265             });
2266         }
2267
2268         drop(tx);
2269
2270         let mut recv_count = 0;
2271         loop {
2272             match rx.recv_timeout(Duration::from_millis(10)) {
2273                 Ok(n) => {
2274                     assert_eq!(n, 1usize);
2275                     recv_count += 1;
2276                 }
2277                 Err(RecvTimeoutError::Timeout) => continue,
2278                 Err(RecvTimeoutError::Disconnected) => break,
2279             }
2280         }
2281
2282         assert_eq!(recv_count, stress);
2283     }
2284
2285     #[test]
2286     fn recv_a_lot() {
2287         // Regression test that we don't run out of stack in scheduler context
2288         let (tx, rx) = channel();
2289         for _ in 0..10000 { tx.send(()).unwrap(); }
2290         for _ in 0..10000 { rx.recv().unwrap(); }
2291     }
2292
2293     #[test]
2294     fn shared_recv_timeout() {
2295         let (tx, rx) = channel();
2296         let total = 5;
2297         for _ in 0..total {
2298             let tx = tx.clone();
2299             thread::spawn(move|| {
2300                 tx.send(()).unwrap();
2301             });
2302         }
2303
2304         for _ in 0..total { rx.recv().unwrap(); }
2305
2306         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2307         tx.send(()).unwrap();
2308         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2309     }
2310
2311     #[test]
2312     fn shared_chan_stress() {
2313         let (tx, rx) = channel();
2314         let total = stress_factor() + 100;
2315         for _ in 0..total {
2316             let tx = tx.clone();
2317             thread::spawn(move|| {
2318                 tx.send(()).unwrap();
2319             });
2320         }
2321
2322         for _ in 0..total {
2323             rx.recv().unwrap();
2324         }
2325     }
2326
2327     #[test]
2328     fn test_nested_recv_iter() {
2329         let (tx, rx) = channel::<i32>();
2330         let (total_tx, total_rx) = channel::<i32>();
2331
2332         let _t = thread::spawn(move|| {
2333             let mut acc = 0;
2334             for x in rx.iter() {
2335                 acc += x;
2336             }
2337             total_tx.send(acc).unwrap();
2338         });
2339
2340         tx.send(3).unwrap();
2341         tx.send(1).unwrap();
2342         tx.send(2).unwrap();
2343         drop(tx);
2344         assert_eq!(total_rx.recv().unwrap(), 6);
2345     }
2346
2347     #[test]
2348     fn test_recv_iter_break() {
2349         let (tx, rx) = channel::<i32>();
2350         let (count_tx, count_rx) = channel();
2351
2352         let _t = thread::spawn(move|| {
2353             let mut count = 0;
2354             for x in rx.iter() {
2355                 if count >= 3 {
2356                     break;
2357                 } else {
2358                     count += x;
2359                 }
2360             }
2361             count_tx.send(count).unwrap();
2362         });
2363
2364         tx.send(2).unwrap();
2365         tx.send(2).unwrap();
2366         tx.send(2).unwrap();
2367         let _ = tx.send(2);
2368         drop(tx);
2369         assert_eq!(count_rx.recv().unwrap(), 4);
2370     }
2371
2372     #[test]
2373     fn test_recv_try_iter() {
2374         let (request_tx, request_rx) = channel();
2375         let (response_tx, response_rx) = channel();
2376
2377         // Request `x`s until we have `6`.
2378         let t = thread::spawn(move|| {
2379             let mut count = 0;
2380             loop {
2381                 for x in response_rx.try_iter() {
2382                     count += x;
2383                     if count == 6 {
2384                         return count;
2385                     }
2386                 }
2387                 request_tx.send(()).unwrap();
2388             }
2389         });
2390
2391         for _ in request_rx.iter() {
2392             if response_tx.send(2).is_err() {
2393                 break;
2394             }
2395         }
2396
2397         assert_eq!(t.join().unwrap(), 6);
2398     }
2399
2400     #[test]
2401     fn test_recv_into_iter_owned() {
2402         let mut iter = {
2403           let (tx, rx) = channel::<i32>();
2404           tx.send(1).unwrap();
2405           tx.send(2).unwrap();
2406
2407           rx.into_iter()
2408         };
2409         assert_eq!(iter.next().unwrap(), 1);
2410         assert_eq!(iter.next().unwrap(), 2);
2411         assert_eq!(iter.next().is_none(), true);
2412     }
2413
2414     #[test]
2415     fn test_recv_into_iter_borrowed() {
2416         let (tx, rx) = channel::<i32>();
2417         tx.send(1).unwrap();
2418         tx.send(2).unwrap();
2419         drop(tx);
2420         let mut iter = (&rx).into_iter();
2421         assert_eq!(iter.next().unwrap(), 1);
2422         assert_eq!(iter.next().unwrap(), 2);
2423         assert_eq!(iter.next().is_none(), true);
2424     }
2425
2426     #[test]
2427     fn try_recv_states() {
2428         let (tx1, rx1) = channel::<i32>();
2429         let (tx2, rx2) = channel::<()>();
2430         let (tx3, rx3) = channel::<()>();
2431         let _t = thread::spawn(move|| {
2432             rx2.recv().unwrap();
2433             tx1.send(1).unwrap();
2434             tx3.send(()).unwrap();
2435             rx2.recv().unwrap();
2436             drop(tx1);
2437             tx3.send(()).unwrap();
2438         });
2439
2440         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2441         tx2.send(()).unwrap();
2442         rx3.recv().unwrap();
2443         assert_eq!(rx1.try_recv(), Ok(1));
2444         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2445         tx2.send(()).unwrap();
2446         rx3.recv().unwrap();
2447         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2448     }
2449
2450     // This bug used to end up in a livelock inside of the Receiver destructor
2451     // because the internal state of the Shared packet was corrupted
2452     #[test]
2453     fn destroy_upgraded_shared_port_when_sender_still_active() {
2454         let (tx, rx) = channel();
2455         let (tx2, rx2) = channel();
2456         let _t = thread::spawn(move|| {
2457             rx.recv().unwrap(); // wait on a oneshot
2458             drop(rx);  // destroy a shared
2459             tx2.send(()).unwrap();
2460         });
2461         // make sure the other thread has gone to sleep
2462         for _ in 0..5000 { thread::yield_now(); }
2463
2464         // upgrade to a shared chan and send a message
2465         let t = tx.clone();
2466         drop(tx);
2467         t.send(()).unwrap();
2468
2469         // wait for the child thread to exit before we exit
2470         rx2.recv().unwrap();
2471     }
2472
2473     #[test]
2474     fn issue_32114() {
2475         let (tx, _) = channel();
2476         let _ = tx.send(123);
2477         assert_eq!(tx.send(123), Err(SendError(123)));
2478     }
2479 }
2480
2481 #[cfg(all(test, not(target_os = "emscripten")))]
2482 mod sync_tests {
2483     use env;
2484     use thread;
2485     use super::*;
2486     use time::Duration;
2487
2488     pub fn stress_factor() -> usize {
2489         match env::var("RUST_TEST_STRESS") {
2490             Ok(val) => val.parse().unwrap(),
2491             Err(..) => 1,
2492         }
2493     }
2494
2495     #[test]
2496     fn smoke() {
2497         let (tx, rx) = sync_channel::<i32>(1);
2498         tx.send(1).unwrap();
2499         assert_eq!(rx.recv().unwrap(), 1);
2500     }
2501
2502     #[test]
2503     fn drop_full() {
2504         let (tx, _rx) = sync_channel::<Box<isize>>(1);
2505         tx.send(box 1).unwrap();
2506     }
2507
2508     #[test]
2509     fn smoke_shared() {
2510         let (tx, rx) = sync_channel::<i32>(1);
2511         tx.send(1).unwrap();
2512         assert_eq!(rx.recv().unwrap(), 1);
2513         let tx = tx.clone();
2514         tx.send(1).unwrap();
2515         assert_eq!(rx.recv().unwrap(), 1);
2516     }
2517
2518     #[test]
2519     fn recv_timeout() {
2520         let (tx, rx) = sync_channel::<i32>(1);
2521         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2522         tx.send(1).unwrap();
2523         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2524     }
2525
2526     #[test]
2527     fn smoke_threads() {
2528         let (tx, rx) = sync_channel::<i32>(0);
2529         let _t = thread::spawn(move|| {
2530             tx.send(1).unwrap();
2531         });
2532         assert_eq!(rx.recv().unwrap(), 1);
2533     }
2534
2535     #[test]
2536     fn smoke_port_gone() {
2537         let (tx, rx) = sync_channel::<i32>(0);
2538         drop(rx);
2539         assert!(tx.send(1).is_err());
2540     }
2541
2542     #[test]
2543     fn smoke_shared_port_gone2() {
2544         let (tx, rx) = sync_channel::<i32>(0);
2545         drop(rx);
2546         let tx2 = tx.clone();
2547         drop(tx);
2548         assert!(tx2.send(1).is_err());
2549     }
2550
2551     #[test]
2552     fn port_gone_concurrent() {
2553         let (tx, rx) = sync_channel::<i32>(0);
2554         let _t = thread::spawn(move|| {
2555             rx.recv().unwrap();
2556         });
2557         while tx.send(1).is_ok() {}
2558     }
2559
2560     #[test]
2561     fn port_gone_concurrent_shared() {
2562         let (tx, rx) = sync_channel::<i32>(0);
2563         let tx2 = tx.clone();
2564         let _t = thread::spawn(move|| {
2565             rx.recv().unwrap();
2566         });
2567         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2568     }
2569
2570     #[test]
2571     fn smoke_chan_gone() {
2572         let (tx, rx) = sync_channel::<i32>(0);
2573         drop(tx);
2574         assert!(rx.recv().is_err());
2575     }
2576
2577     #[test]
2578     fn smoke_chan_gone_shared() {
2579         let (tx, rx) = sync_channel::<()>(0);
2580         let tx2 = tx.clone();
2581         drop(tx);
2582         drop(tx2);
2583         assert!(rx.recv().is_err());
2584     }
2585
2586     #[test]
2587     fn chan_gone_concurrent() {
2588         let (tx, rx) = sync_channel::<i32>(0);
2589         thread::spawn(move|| {
2590             tx.send(1).unwrap();
2591             tx.send(1).unwrap();
2592         });
2593         while rx.recv().is_ok() {}
2594     }
2595
2596     #[test]
2597     fn stress() {
2598         let (tx, rx) = sync_channel::<i32>(0);
2599         thread::spawn(move|| {
2600             for _ in 0..10000 { tx.send(1).unwrap(); }
2601         });
2602         for _ in 0..10000 {
2603             assert_eq!(rx.recv().unwrap(), 1);
2604         }
2605     }
2606
2607     #[test]
2608     fn stress_recv_timeout_two_threads() {
2609         let (tx, rx) = sync_channel::<i32>(0);
2610
2611         thread::spawn(move|| {
2612             for _ in 0..10000 { tx.send(1).unwrap(); }
2613         });
2614
2615         let mut recv_count = 0;
2616         loop {
2617             match rx.recv_timeout(Duration::from_millis(1)) {
2618                 Ok(v) => {
2619                     assert_eq!(v, 1);
2620                     recv_count += 1;
2621                 },
2622                 Err(RecvTimeoutError::Timeout) => continue,
2623                 Err(RecvTimeoutError::Disconnected) => break,
2624             }
2625         }
2626
2627         assert_eq!(recv_count, 10000);
2628     }
2629
2630     #[test]
2631     fn stress_recv_timeout_shared() {
2632         const AMT: u32 = 1000;
2633         const NTHREADS: u32 = 8;
2634         let (tx, rx) = sync_channel::<i32>(0);
2635         let (dtx, drx) = sync_channel::<()>(0);
2636
2637         thread::spawn(move|| {
2638             let mut recv_count = 0;
2639             loop {
2640                 match rx.recv_timeout(Duration::from_millis(10)) {
2641                     Ok(v) => {
2642                         assert_eq!(v, 1);
2643                         recv_count += 1;
2644                     },
2645                     Err(RecvTimeoutError::Timeout) => continue,
2646                     Err(RecvTimeoutError::Disconnected) => break,
2647                 }
2648             }
2649
2650             assert_eq!(recv_count, AMT * NTHREADS);
2651             assert!(rx.try_recv().is_err());
2652
2653             dtx.send(()).unwrap();
2654         });
2655
2656         for _ in 0..NTHREADS {
2657             let tx = tx.clone();
2658             thread::spawn(move|| {
2659                 for _ in 0..AMT { tx.send(1).unwrap(); }
2660             });
2661         }
2662
2663         drop(tx);
2664
2665         drx.recv().unwrap();
2666     }
2667
2668     #[test]
2669     fn stress_shared() {
2670         const AMT: u32 = 1000;
2671         const NTHREADS: u32 = 8;
2672         let (tx, rx) = sync_channel::<i32>(0);
2673         let (dtx, drx) = sync_channel::<()>(0);
2674
2675         thread::spawn(move|| {
2676             for _ in 0..AMT * NTHREADS {
2677                 assert_eq!(rx.recv().unwrap(), 1);
2678             }
2679             match rx.try_recv() {
2680                 Ok(..) => panic!(),
2681                 _ => {}
2682             }
2683             dtx.send(()).unwrap();
2684         });
2685
2686         for _ in 0..NTHREADS {
2687             let tx = tx.clone();
2688             thread::spawn(move|| {
2689                 for _ in 0..AMT { tx.send(1).unwrap(); }
2690             });
2691         }
2692         drop(tx);
2693         drx.recv().unwrap();
2694     }
2695
2696     #[test]
2697     fn oneshot_single_thread_close_port_first() {
2698         // Simple test of closing without sending
2699         let (_tx, rx) = sync_channel::<i32>(0);
2700         drop(rx);
2701     }
2702
2703     #[test]
2704     fn oneshot_single_thread_close_chan_first() {
2705         // Simple test of closing without sending
2706         let (tx, _rx) = sync_channel::<i32>(0);
2707         drop(tx);
2708     }
2709
2710     #[test]
2711     fn oneshot_single_thread_send_port_close() {
2712         // Testing that the sender cleans up the payload if receiver is closed
2713         let (tx, rx) = sync_channel::<Box<i32>>(0);
2714         drop(rx);
2715         assert!(tx.send(box 0).is_err());
2716     }
2717
2718     #[test]
2719     fn oneshot_single_thread_recv_chan_close() {
2720         // Receiving on a closed chan will panic
2721         let res = thread::spawn(move|| {
2722             let (tx, rx) = sync_channel::<i32>(0);
2723             drop(tx);
2724             rx.recv().unwrap();
2725         }).join();
2726         // What is our res?
2727         assert!(res.is_err());
2728     }
2729
2730     #[test]
2731     fn oneshot_single_thread_send_then_recv() {
2732         let (tx, rx) = sync_channel::<Box<i32>>(1);
2733         tx.send(box 10).unwrap();
2734         assert!(*rx.recv().unwrap() == 10);
2735     }
2736
2737     #[test]
2738     fn oneshot_single_thread_try_send_open() {
2739         let (tx, rx) = sync_channel::<i32>(1);
2740         assert_eq!(tx.try_send(10), Ok(()));
2741         assert!(rx.recv().unwrap() == 10);
2742     }
2743
2744     #[test]
2745     fn oneshot_single_thread_try_send_closed() {
2746         let (tx, rx) = sync_channel::<i32>(0);
2747         drop(rx);
2748         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2749     }
2750
2751     #[test]
2752     fn oneshot_single_thread_try_send_closed2() {
2753         let (tx, _rx) = sync_channel::<i32>(0);
2754         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2755     }
2756
2757     #[test]
2758     fn oneshot_single_thread_try_recv_open() {
2759         let (tx, rx) = sync_channel::<i32>(1);
2760         tx.send(10).unwrap();
2761         assert!(rx.recv() == Ok(10));
2762     }
2763
2764     #[test]
2765     fn oneshot_single_thread_try_recv_closed() {
2766         let (tx, rx) = sync_channel::<i32>(0);
2767         drop(tx);
2768         assert!(rx.recv().is_err());
2769     }
2770
2771     #[test]
2772     fn oneshot_single_thread_try_recv_closed_with_data() {
2773         let (tx, rx) = sync_channel::<i32>(1);
2774         tx.send(10).unwrap();
2775         drop(tx);
2776         assert_eq!(rx.try_recv(), Ok(10));
2777         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2778     }
2779
2780     #[test]
2781     fn oneshot_single_thread_peek_data() {
2782         let (tx, rx) = sync_channel::<i32>(1);
2783         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2784         tx.send(10).unwrap();
2785         assert_eq!(rx.try_recv(), Ok(10));
2786     }
2787
2788     #[test]
2789     fn oneshot_single_thread_peek_close() {
2790         let (tx, rx) = sync_channel::<i32>(0);
2791         drop(tx);
2792         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2793         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2794     }
2795
2796     #[test]
2797     fn oneshot_single_thread_peek_open() {
2798         let (_tx, rx) = sync_channel::<i32>(0);
2799         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2800     }
2801
2802     #[test]
2803     fn oneshot_multi_task_recv_then_send() {
2804         let (tx, rx) = sync_channel::<Box<i32>>(0);
2805         let _t = thread::spawn(move|| {
2806             assert!(*rx.recv().unwrap() == 10);
2807         });
2808
2809         tx.send(box 10).unwrap();
2810     }
2811
2812     #[test]
2813     fn oneshot_multi_task_recv_then_close() {
2814         let (tx, rx) = sync_channel::<Box<i32>>(0);
2815         let _t = thread::spawn(move|| {
2816             drop(tx);
2817         });
2818         let res = thread::spawn(move|| {
2819             assert!(*rx.recv().unwrap() == 10);
2820         }).join();
2821         assert!(res.is_err());
2822     }
2823
2824     #[test]
2825     fn oneshot_multi_thread_close_stress() {
2826         for _ in 0..stress_factor() {
2827             let (tx, rx) = sync_channel::<i32>(0);
2828             let _t = thread::spawn(move|| {
2829                 drop(rx);
2830             });
2831             drop(tx);
2832         }
2833     }
2834
2835     #[test]
2836     fn oneshot_multi_thread_send_close_stress() {
2837         for _ in 0..stress_factor() {
2838             let (tx, rx) = sync_channel::<i32>(0);
2839             let _t = thread::spawn(move|| {
2840                 drop(rx);
2841             });
2842             let _ = thread::spawn(move || {
2843                 tx.send(1).unwrap();
2844             }).join();
2845         }
2846     }
2847
2848     #[test]
2849     fn oneshot_multi_thread_recv_close_stress() {
2850         for _ in 0..stress_factor() {
2851             let (tx, rx) = sync_channel::<i32>(0);
2852             let _t = thread::spawn(move|| {
2853                 let res = thread::spawn(move|| {
2854                     rx.recv().unwrap();
2855                 }).join();
2856                 assert!(res.is_err());
2857             });
2858             let _t = thread::spawn(move|| {
2859                 thread::spawn(move|| {
2860                     drop(tx);
2861                 });
2862             });
2863         }
2864     }
2865
2866     #[test]
2867     fn oneshot_multi_thread_send_recv_stress() {
2868         for _ in 0..stress_factor() {
2869             let (tx, rx) = sync_channel::<Box<i32>>(0);
2870             let _t = thread::spawn(move|| {
2871                 tx.send(box 10).unwrap();
2872             });
2873             assert!(*rx.recv().unwrap() == 10);
2874         }
2875     }
2876
2877     #[test]
2878     fn stream_send_recv_stress() {
2879         for _ in 0..stress_factor() {
2880             let (tx, rx) = sync_channel::<Box<i32>>(0);
2881
2882             send(tx, 0);
2883             recv(rx, 0);
2884
2885             fn send(tx: SyncSender<Box<i32>>, i: i32) {
2886                 if i == 10 { return }
2887
2888                 thread::spawn(move|| {
2889                     tx.send(box i).unwrap();
2890                     send(tx, i + 1);
2891                 });
2892             }
2893
2894             fn recv(rx: Receiver<Box<i32>>, i: i32) {
2895                 if i == 10 { return }
2896
2897                 thread::spawn(move|| {
2898                     assert!(*rx.recv().unwrap() == i);
2899                     recv(rx, i + 1);
2900                 });
2901             }
2902         }
2903     }
2904
2905     #[test]
2906     fn recv_a_lot() {
2907         // Regression test that we don't run out of stack in scheduler context
2908         let (tx, rx) = sync_channel(10000);
2909         for _ in 0..10000 { tx.send(()).unwrap(); }
2910         for _ in 0..10000 { rx.recv().unwrap(); }
2911     }
2912
2913     #[test]
2914     fn shared_chan_stress() {
2915         let (tx, rx) = sync_channel(0);
2916         let total = stress_factor() + 100;
2917         for _ in 0..total {
2918             let tx = tx.clone();
2919             thread::spawn(move|| {
2920                 tx.send(()).unwrap();
2921             });
2922         }
2923
2924         for _ in 0..total {
2925             rx.recv().unwrap();
2926         }
2927     }
2928
2929     #[test]
2930     fn test_nested_recv_iter() {
2931         let (tx, rx) = sync_channel::<i32>(0);
2932         let (total_tx, total_rx) = sync_channel::<i32>(0);
2933
2934         let _t = thread::spawn(move|| {
2935             let mut acc = 0;
2936             for x in rx.iter() {
2937                 acc += x;
2938             }
2939             total_tx.send(acc).unwrap();
2940         });
2941
2942         tx.send(3).unwrap();
2943         tx.send(1).unwrap();
2944         tx.send(2).unwrap();
2945         drop(tx);
2946         assert_eq!(total_rx.recv().unwrap(), 6);
2947     }
2948
2949     #[test]
2950     fn test_recv_iter_break() {
2951         let (tx, rx) = sync_channel::<i32>(0);
2952         let (count_tx, count_rx) = sync_channel(0);
2953
2954         let _t = thread::spawn(move|| {
2955             let mut count = 0;
2956             for x in rx.iter() {
2957                 if count >= 3 {
2958                     break;
2959                 } else {
2960                     count += x;
2961                 }
2962             }
2963             count_tx.send(count).unwrap();
2964         });
2965
2966         tx.send(2).unwrap();
2967         tx.send(2).unwrap();
2968         tx.send(2).unwrap();
2969         let _ = tx.try_send(2);
2970         drop(tx);
2971         assert_eq!(count_rx.recv().unwrap(), 4);
2972     }
2973
2974     #[test]
2975     fn try_recv_states() {
2976         let (tx1, rx1) = sync_channel::<i32>(1);
2977         let (tx2, rx2) = sync_channel::<()>(1);
2978         let (tx3, rx3) = sync_channel::<()>(1);
2979         let _t = thread::spawn(move|| {
2980             rx2.recv().unwrap();
2981             tx1.send(1).unwrap();
2982             tx3.send(()).unwrap();
2983             rx2.recv().unwrap();
2984             drop(tx1);
2985             tx3.send(()).unwrap();
2986         });
2987
2988         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2989         tx2.send(()).unwrap();
2990         rx3.recv().unwrap();
2991         assert_eq!(rx1.try_recv(), Ok(1));
2992         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2993         tx2.send(()).unwrap();
2994         rx3.recv().unwrap();
2995         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2996     }
2997
2998     // This bug used to end up in a livelock inside of the Receiver destructor
2999     // because the internal state of the Shared packet was corrupted
3000     #[test]
3001     fn destroy_upgraded_shared_port_when_sender_still_active() {
3002         let (tx, rx) = sync_channel::<()>(0);
3003         let (tx2, rx2) = sync_channel::<()>(0);
3004         let _t = thread::spawn(move|| {
3005             rx.recv().unwrap(); // wait on a oneshot
3006             drop(rx);  // destroy a shared
3007             tx2.send(()).unwrap();
3008         });
3009         // make sure the other thread has gone to sleep
3010         for _ in 0..5000 { thread::yield_now(); }
3011
3012         // upgrade to a shared chan and send a message
3013         let t = tx.clone();
3014         drop(tx);
3015         t.send(()).unwrap();
3016
3017         // wait for the child thread to exit before we exit
3018         rx2.recv().unwrap();
3019     }
3020
3021     #[test]
3022     fn send1() {
3023         let (tx, rx) = sync_channel::<i32>(0);
3024         let _t = thread::spawn(move|| { rx.recv().unwrap(); });
3025         assert_eq!(tx.send(1), Ok(()));
3026     }
3027
3028     #[test]
3029     fn send2() {
3030         let (tx, rx) = sync_channel::<i32>(0);
3031         let _t = thread::spawn(move|| { drop(rx); });
3032         assert!(tx.send(1).is_err());
3033     }
3034
3035     #[test]
3036     fn send3() {
3037         let (tx, rx) = sync_channel::<i32>(1);
3038         assert_eq!(tx.send(1), Ok(()));
3039         let _t =thread::spawn(move|| { drop(rx); });
3040         assert!(tx.send(1).is_err());
3041     }
3042
3043     #[test]
3044     fn send4() {
3045         let (tx, rx) = sync_channel::<i32>(0);
3046         let tx2 = tx.clone();
3047         let (done, donerx) = channel();
3048         let done2 = done.clone();
3049         let _t = thread::spawn(move|| {
3050             assert!(tx.send(1).is_err());
3051             done.send(()).unwrap();
3052         });
3053         let _t = thread::spawn(move|| {
3054             assert!(tx2.send(2).is_err());
3055             done2.send(()).unwrap();
3056         });
3057         drop(rx);
3058         donerx.recv().unwrap();
3059         donerx.recv().unwrap();
3060     }
3061
3062     #[test]
3063     fn try_send1() {
3064         let (tx, _rx) = sync_channel::<i32>(0);
3065         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3066     }
3067
3068     #[test]
3069     fn try_send2() {
3070         let (tx, _rx) = sync_channel::<i32>(1);
3071         assert_eq!(tx.try_send(1), Ok(()));
3072         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3073     }
3074
3075     #[test]
3076     fn try_send3() {
3077         let (tx, rx) = sync_channel::<i32>(1);
3078         assert_eq!(tx.try_send(1), Ok(()));
3079         drop(rx);
3080         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
3081     }
3082
3083     #[test]
3084     fn issue_15761() {
3085         fn repro() {
3086             let (tx1, rx1) = sync_channel::<()>(3);
3087             let (tx2, rx2) = sync_channel::<()>(3);
3088
3089             let _t = thread::spawn(move|| {
3090                 rx1.recv().unwrap();
3091                 tx2.try_send(()).unwrap();
3092             });
3093
3094             tx1.try_send(()).unwrap();
3095             rx2.recv().unwrap();
3096         }
3097
3098         for _ in 0..100 {
3099             repro()
3100         }
3101     }
3102 }