]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/mod.rs
rollup merge of #20642: michaelwoerister/sane-source-locations-pt1
[rust.git] / src / libstd / sync / mpsc / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Multi-producer, single-consumer communication primitives threads
12 //!
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
15 //!
16 //! * `Sender`
17 //! * `SyncSender`
18 //! * `Receiver`
19 //!
20 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
23 //!
24 //! These channels come in two flavors:
25 //!
26 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27 //!    will return a `(Sender, Receiver)` tuple where all sends will be
28 //!    **asynchronous** (they never block). The channel conceptually has an
29 //!    infinite buffer.
30 //!
31 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32 //!    a `(SyncSender, Receiver)` tuple where the storage for pending messages
33 //!    is a pre-allocated buffer of a fixed size. All sends will be
34 //!    **synchronous** by blocking until there is buffer space available. Note
35 //!    that a bound of 0 is allowed, causing the channel to become a
36 //!    "rendezvous" channel where each sender atomically hands off a message to
37 //!    a receiver.
38 //!
39 //! ## Disconnection
40 //!
41 //! The send and receive operations on channels will all return a `Result`
42 //! indicating whether the operation succeeded or not. An unsuccessful operation
43 //! is normally indicative of the other half of a channel having "hung up" by
44 //! being dropped in its corresponding thread.
45 //!
46 //! Once half of a channel has been deallocated, most operations can no longer
47 //! continue to make progress, so `Err` will be returned. Many applications will
48 //! continue to `unwrap()` the results returned from this module, instigating a
49 //! propagation of failure among threads if one unexpectedly dies.
50 //!
51 //! # Examples
52 //!
53 //! Simple usage:
54 //!
55 //! ```
56 //! use std::thread::Thread;
57 //! use std::sync::mpsc::channel;
58 //!
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! Thread::spawn(move|| {
62 //!     tx.send(10i).unwrap();
63 //! });
64 //! assert_eq!(rx.recv().unwrap(), 10i);
65 //! ```
66 //!
67 //! Shared usage:
68 //!
69 //! ```
70 //! use std::thread::Thread;
71 //! use std::sync::mpsc::channel;
72 //!
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
77 //! for i in range(0i, 10i) {
78 //!     let tx = tx.clone();
79 //!     Thread::spawn(move|| {
80 //!         tx.send(i).unwrap();
81 //!     });
82 //! }
83 //!
84 //! for _ in range(0i, 10i) {
85 //!     let j = rx.recv().unwrap();
86 //!     assert!(0 <= j && j < 10);
87 //! }
88 //! ```
89 //!
90 //! Propagating panics:
91 //!
92 //! ```
93 //! use std::sync::mpsc::channel;
94 //!
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<int>();
98 //! drop(tx);
99 //! assert!(rx.recv().is_err());
100 //! ```
101 //!
102 //! Synchronous channels:
103 //!
104 //! ```
105 //! use std::thread::Thread;
106 //! use std::sync::mpsc::sync_channel;
107 //!
108 //! let (tx, rx) = sync_channel::<int>(0);
109 //! Thread::spawn(move|| {
110 //!     // This will wait for the parent task to start receiving
111 //!     tx.send(53).unwrap();
112 //! });
113 //! rx.recv().unwrap();
114 //! ```
115 //!
116 //! Reading from a channel with a timeout requires to use a Timer together
117 //! with the channel. You can use the select! macro to select either and
118 //! handle the timeout case. This first example will break out of the loop
119 //! after 10 seconds no matter what:
120 //!
121 //! ```no_run
122 //! use std::sync::mpsc::channel;
123 //! use std::io::timer::Timer;
124 //! use std::time::Duration;
125 //!
126 //! let (tx, rx) = channel::<int>();
127 //! let mut timer = Timer::new().unwrap();
128 //! let timeout = timer.oneshot(Duration::seconds(10));
129 //!
130 //! loop {
131 //!     select! {
132 //!         val = rx.recv() => println!("Received {}", val.unwrap()),
133 //!         _ = timeout.recv() => {
134 //!             println!("timed out, total time was more than 10 seconds");
135 //!             break;
136 //!         }
137 //!     }
138 //! }
139 //! ```
140 //!
141 //! This second example is more costly since it allocates a new timer every
142 //! time a message is received, but it allows you to timeout after the channel
143 //! has been inactive for 5 seconds:
144 //!
145 //! ```no_run
146 //! use std::sync::mpsc::channel;
147 //! use std::io::timer::Timer;
148 //! use std::time::Duration;
149 //!
150 //! let (tx, rx) = channel::<int>();
151 //! let mut timer = Timer::new().unwrap();
152 //!
153 //! loop {
154 //!     let timeout = timer.oneshot(Duration::seconds(5));
155 //!
156 //!     select! {
157 //!         val = rx.recv() => println!("Received {}", val.unwrap()),
158 //!         _ = timeout.recv() => {
159 //!             println!("timed out, no message received in 5 seconds");
160 //!             break;
161 //!         }
162 //!     }
163 //! }
164 //! ```
165
166 #![stable]
167
168 // A description of how Rust's channel implementation works
169 //
170 // Channels are supposed to be the basic building block for all other
171 // concurrent primitives that are used in Rust. As a result, the channel type
172 // needs to be highly optimized, flexible, and broad enough for use everywhere.
173 //
174 // The choice of implementation of all channels is to be built on lock-free data
175 // structures. The channels themselves are then consequently also lock-free data
176 // structures. As always with lock-free code, this is a very "here be dragons"
177 // territory, especially because I'm unaware of any academic papers that have
178 // gone into great length about channels of these flavors.
179 //
180 // ## Flavors of channels
181 //
182 // From the perspective of a consumer of this library, there is only one flavor
183 // of channel. This channel can be used as a stream and cloned to allow multiple
184 // senders. Under the hood, however, there are actually three flavors of
185 // channels in play.
186 //
187 // * Flavor::Oneshots - these channels are highly optimized for the one-send use case.
188 //              They contain as few atomics as possible and involve one and
189 //              exactly one allocation.
190 // * Streams - these channels are optimized for the non-shared use case. They
191 //             use a different concurrent queue that is more tailored for this
192 //             use case. The initial allocation of this flavor of channel is not
193 //             optimized.
194 // * Shared - this is the most general form of channel that this module offers,
195 //            a channel with multiple senders. This type is as optimized as it
196 //            can be, but the previous two types mentioned are much faster for
197 //            their use-cases.
198 //
199 // ## Concurrent queues
200 //
201 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
202 // recv() obviously blocks. This means that under the hood there must be some
203 // shared and concurrent queue holding all of the actual data.
204 //
205 // With two flavors of channels, two flavors of queues are also used. We have
206 // chosen to use queues from a well-known author that are abbreviated as SPSC
207 // and MPSC (single producer, single consumer and multiple producer, single
208 // consumer). SPSC queues are used for streams while MPSC queues are used for
209 // shared channels.
210 //
211 // ### SPSC optimizations
212 //
213 // The SPSC queue found online is essentially a linked list of nodes where one
214 // half of the nodes are the "queue of data" and the other half of nodes are a
215 // cache of unused nodes. The unused nodes are used such that an allocation is
216 // not required on every push() and a free doesn't need to happen on every
217 // pop().
218 //
219 // As found online, however, the cache of nodes is of an infinite size. This
220 // means that if a channel at one point in its life had 50k items in the queue,
221 // then the queue will always have the capacity for 50k items. I believed that
222 // this was an unnecessary limitation of the implementation, so I have altered
223 // the queue to optionally have a bound on the cache size.
224 //
225 // By default, streams will have an unbounded SPSC queue with a small-ish cache
226 // size. The hope is that the cache is still large enough to have very fast
227 // send() operations while not too large such that millions of channels can
228 // coexist at once.
229 //
230 // ### MPSC optimizations
231 //
232 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
233 // a linked list under the hood to earn its unboundedness, but I have not put
234 // forth much effort into having a cache of nodes similar to the SPSC queue.
235 //
236 // For now, I believe that this is "ok" because shared channels are not the most
237 // common type, but soon we may wish to revisit this queue choice and determine
238 // another candidate for backend storage of shared channels.
239 //
240 // ## Overview of the Implementation
241 //
242 // Now that there's a little background on the concurrent queues used, it's
243 // worth going into much more detail about the channels themselves. The basic
244 // pseudocode for a send/recv are:
245 //
246 //
247 //      send(t)                             recv()
248 //        queue.push(t)                       return if queue.pop()
249 //        if increment() == -1                deschedule {
250 //          wakeup()                            if decrement() > 0
251 //                                                cancel_deschedule()
252 //                                            }
253 //                                            queue.pop()
254 //
255 // As mentioned before, there are no locks in this implementation, only atomic
256 // instructions are used.
257 //
258 // ### The internal atomic counter
259 //
260 // Every channel has a shared counter with each half to keep track of the size
261 // of the queue. This counter is used to abort descheduling by the receiver and
262 // to know when to wake up on the sending side.
263 //
264 // As seen in the pseudocode, senders will increment this count and receivers
265 // will decrement the count. The theory behind this is that if a sender sees a
266 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
267 // then it doesn't need to block.
268 //
269 // The recv() method has a beginning call to pop(), and if successful, it needs
270 // to decrement the count. It is a crucial implementation detail that this
271 // decrement does *not* happen to the shared counter. If this were the case,
272 // then it would be possible for the counter to be very negative when there were
273 // no receivers waiting, in which case the senders would have to determine when
274 // it was actually appropriate to wake up a receiver.
275 //
276 // Instead, the "steal count" is kept track of separately (not atomically
277 // because it's only used by receivers), and then the decrement() call when
278 // descheduling will lump in all of the recent steals into one large decrement.
279 //
280 // The implication of this is that if a sender sees a -1 count, then there's
281 // guaranteed to be a waiter waiting!
282 //
283 // ## Native Implementation
284 //
285 // A major goal of these channels is to work seamlessly on and off the runtime.
286 // All of the previous race conditions have been worded in terms of
287 // scheduler-isms (which is obviously not available without the runtime).
288 //
289 // For now, native usage of channels (off the runtime) will fall back onto
290 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
291 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
292 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
293 // condition variable.
294 //
295 // ## Select
296 //
297 // Being able to support selection over channels has greatly influenced this
298 // design, and not only does selection need to work inside the runtime, but also
299 // outside the runtime.
300 //
301 // The implementation is fairly straightforward. The goal of select() is not to
302 // return some data, but only to return which channel can receive data without
303 // blocking. The implementation is essentially the entire blocking procedure
304 // followed by an increment as soon as its woken up. The cancellation procedure
305 // involves an increment and swapping out of to_wake to acquire ownership of the
306 // task to unblock.
307 //
308 // Sadly this current implementation requires multiple allocations, so I have
309 // seen the throughput of select() be much worse than it should be. I do not
310 // believe that there is anything fundamental that needs to change about these
311 // channels, however, in order to support a more efficient select().
312 //
313 // # Conclusion
314 //
315 // And now that you've seen all the races that I found and attempted to fix,
316 // here's the code for you to find some more!
317
318 use prelude::v1::*;
319
320 use sync::Arc;
321 use fmt;
322 use mem;
323 use cell::UnsafeCell;
324
325 pub use self::select::{Select, Handle};
326 use self::select::StartResult;
327 use self::select::StartResult::*;
328 use self::blocking::SignalToken;
329
330 mod blocking;
331 mod oneshot;
332 mod select;
333 mod shared;
334 mod stream;
335 mod sync;
336 mod mpsc_queue;
337 mod spsc_queue;
338
339 /// The receiving-half of Rust's channel type. This half can only be owned by
340 /// one task
341 #[stable]
342 pub struct Receiver<T> {
343     inner: UnsafeCell<Flavor<T>>,
344 }
345
346 // The receiver port can be sent from place to place, so long as it
347 // is not used to receive non-sendable things.
348 unsafe impl<T:Send> Send for Receiver<T> { }
349
350 /// An iterator over messages on a receiver, this iterator will block
351 /// whenever `next` is called, waiting for a new message, and `None` will be
352 /// returned when the corresponding channel has hung up.
353 #[stable]
354 pub struct Iter<'a, T:'a> {
355     rx: &'a Receiver<T>
356 }
357
358 /// The sending-half of Rust's asynchronous channel type. This half can only be
359 /// owned by one task, but it can be cloned to send to other tasks.
360 #[stable]
361 pub struct Sender<T> {
362     inner: UnsafeCell<Flavor<T>>,
363 }
364
365 // The send port can be sent from place to place, so long as it
366 // is not used to send non-sendable things.
367 unsafe impl<T:Send> Send for Sender<T> { }
368
369 /// The sending-half of Rust's synchronous channel type. This half can only be
370 /// owned by one task, but it can be cloned to send to other tasks.
371 #[stable]
372 pub struct SyncSender<T> {
373     inner: Arc<UnsafeCell<sync::Packet<T>>>,
374 }
375
376 unsafe impl<T:Send> Send for SyncSender<T> {}
377
378 impl<T> !Sync for SyncSender<T> {}
379
380 /// An error returned from the `send` function on channels.
381 ///
382 /// A `send` operation can only fail if the receiving end of a channel is
383 /// disconnected, implying that the data could never be received. The error
384 /// contains the data being sent as a payload so it can be recovered.
385 #[derive(PartialEq, Eq, Show)]
386 #[stable]
387 pub struct SendError<T>(pub T);
388
389 /// An error returned from the `recv` function on a `Receiver`.
390 ///
391 /// The `recv` operation can only fail if the sending half of a channel is
392 /// disconnected, implying that no further messages will ever be received.
393 #[derive(PartialEq, Eq, Clone, Copy, Show)]
394 #[stable]
395 pub struct RecvError;
396
397 /// This enumeration is the list of the possible reasons that try_recv could not
398 /// return data when called.
399 #[derive(PartialEq, Clone, Copy, Show)]
400 #[stable]
401 pub enum TryRecvError {
402     /// This channel is currently empty, but the sender(s) have not yet
403     /// disconnected, so data may yet become available.
404     #[stable]
405     Empty,
406
407     /// This channel's sending half has become disconnected, and there will
408     /// never be any more data received on this channel
409     #[stable]
410     Disconnected,
411 }
412
413 /// This enumeration is the list of the possible error outcomes for the
414 /// `SyncSender::try_send` method.
415 #[derive(PartialEq, Clone, Show)]
416 #[stable]
417 pub enum TrySendError<T> {
418     /// The data could not be sent on the channel because it would require that
419     /// the callee block to send the data.
420     ///
421     /// If this is a buffered channel, then the buffer is full at this time. If
422     /// this is not a buffered channel, then there is no receiver available to
423     /// acquire the data.
424     #[stable]
425     Full(T),
426
427     /// This channel's receiving half has disconnected, so the data could not be
428     /// sent. The data is returned back to the callee in this case.
429     #[stable]
430     Disconnected(T),
431 }
432
433 enum Flavor<T> {
434     Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
435     Stream(Arc<UnsafeCell<stream::Packet<T>>>),
436     Shared(Arc<UnsafeCell<shared::Packet<T>>>),
437     Sync(Arc<UnsafeCell<sync::Packet<T>>>),
438 }
439
440 #[doc(hidden)]
441 trait UnsafeFlavor<T> {
442     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>>;
443     unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> {
444         &mut *self.inner_unsafe().get()
445     }
446     unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
447         &*self.inner_unsafe().get()
448     }
449 }
450 impl<T> UnsafeFlavor<T> for Sender<T> {
451     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
452         &self.inner
453     }
454 }
455 impl<T> UnsafeFlavor<T> for Receiver<T> {
456     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
457         &self.inner
458     }
459 }
460
461 /// Creates a new asynchronous channel, returning the sender/receiver halves.
462 ///
463 /// All data sent on the sender will become available on the receiver, and no
464 /// send will block the calling task (this channel has an "infinite buffer").
465 ///
466 /// # Example
467 ///
468 /// ```
469 /// use std::sync::mpsc::channel;
470 /// use std::thread::Thread;
471 ///
472 /// // tx is is the sending half (tx for transmission), and rx is the receiving
473 /// // half (rx for receiving).
474 /// let (tx, rx) = channel();
475 ///
476 /// // Spawn off an expensive computation
477 /// Thread::spawn(move|| {
478 /// #   fn expensive_computation() {}
479 ///     tx.send(expensive_computation()).unwrap();
480 /// });
481 ///
482 /// // Do some useful work for awhile
483 ///
484 /// // Let's see what that answer was
485 /// println!("{:?}", rx.recv().unwrap());
486 /// ```
487 #[stable]
488 pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
489     let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
490     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
491 }
492
493 /// Creates a new synchronous, bounded channel.
494 ///
495 /// Like asynchronous channels, the `Receiver` will block until a message
496 /// becomes available. These channels differ greatly in the semantics of the
497 /// sender from asynchronous channels, however.
498 ///
499 /// This channel has an internal buffer on which messages will be queued. When
500 /// the internal buffer becomes full, future sends will *block* waiting for the
501 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
502 /// becomes  "rendezvous channel" where each send will not return until a recv
503 /// is paired with it.
504 ///
505 /// As with asynchronous channels, all senders will panic in `send` if the
506 /// `Receiver` has been destroyed.
507 ///
508 /// # Example
509 ///
510 /// ```
511 /// use std::sync::mpsc::sync_channel;
512 /// use std::thread::Thread;
513 ///
514 /// let (tx, rx) = sync_channel(1);
515 ///
516 /// // this returns immediately
517 /// tx.send(1i).unwrap();
518 ///
519 /// Thread::spawn(move|| {
520 ///     // this will block until the previous message has been received
521 ///     tx.send(2i).unwrap();
522 /// });
523 ///
524 /// assert_eq!(rx.recv().unwrap(), 1i);
525 /// assert_eq!(rx.recv().unwrap(), 2i);
526 /// ```
527 #[stable]
528 pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
529     let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
530     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
531 }
532
533 ////////////////////////////////////////////////////////////////////////////////
534 // Sender
535 ////////////////////////////////////////////////////////////////////////////////
536
537 impl<T: Send> Sender<T> {
538     fn new(inner: Flavor<T>) -> Sender<T> {
539         Sender {
540             inner: UnsafeCell::new(inner),
541         }
542     }
543
544     /// Attempts to send a value on this channel, returning it back if it could
545     /// not be sent.
546     ///
547     /// A successful send occurs when it is determined that the other end of
548     /// the channel has not hung up already. An unsuccessful send would be one
549     /// where the corresponding receiver has already been deallocated. Note
550     /// that a return value of `Err` means that the data will never be
551     /// received, but a return value of `Ok` does *not* mean that the data
552     /// will be received.  It is possible for the corresponding receiver to
553     /// hang up immediately after this function returns `Ok`.
554     ///
555     /// This method will never block the current thread.
556     ///
557     /// # Example
558     ///
559     /// ```
560     /// use std::sync::mpsc::channel;
561     ///
562     /// let (tx, rx) = channel();
563     ///
564     /// // This send is always successful
565     /// tx.send(1i).unwrap();
566     ///
567     /// // This send will fail because the receiver is gone
568     /// drop(rx);
569     /// assert_eq!(tx.send(1i).err().unwrap().0, 1);
570     /// ```
571     #[stable]
572     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
573         let (new_inner, ret) = match *unsafe { self.inner() } {
574             Flavor::Oneshot(ref p) => {
575                 unsafe {
576                     let p = p.get();
577                     if !(*p).sent() {
578                         return (*p).send(t).map_err(SendError);
579                     } else {
580                         let a =
581                             Arc::new(UnsafeCell::new(stream::Packet::new()));
582                         let rx = Receiver::new(Flavor::Stream(a.clone()));
583                         match (*p).upgrade(rx) {
584                             oneshot::UpSuccess => {
585                                 let ret = (*a.get()).send(t);
586                                 (a, ret)
587                             }
588                             oneshot::UpDisconnected => (a, Err(t)),
589                             oneshot::UpWoke(token) => {
590                                 // This send cannot panic because the thread is
591                                 // asleep (we're looking at it), so the receiver
592                                 // can't go away.
593                                 (*a.get()).send(t).ok().unwrap();
594                         token.signal();
595                                 (a, Ok(()))
596                             }
597                         }
598                     }
599                 }
600             }
601             Flavor::Stream(ref p) => return unsafe {
602                 (*p.get()).send(t).map_err(SendError)
603             },
604             Flavor::Shared(ref p) => return unsafe {
605                 (*p.get()).send(t).map_err(SendError)
606             },
607             Flavor::Sync(..) => unreachable!(),
608         };
609
610         unsafe {
611             let tmp = Sender::new(Flavor::Stream(new_inner));
612             mem::swap(self.inner_mut(), tmp.inner_mut());
613         }
614         ret.map_err(SendError)
615     }
616 }
617
618 #[stable]
619 impl<T: Send> Clone for Sender<T> {
620     fn clone(&self) -> Sender<T> {
621         let (packet, sleeper, guard) = match *unsafe { self.inner() } {
622             Flavor::Oneshot(ref p) => {
623                 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
624                 unsafe {
625                     let guard = (*a.get()).postinit_lock();
626                     let rx = Receiver::new(Flavor::Shared(a.clone()));
627                     match (*p.get()).upgrade(rx) {
628                         oneshot::UpSuccess |
629                         oneshot::UpDisconnected => (a, None, guard),
630                         oneshot::UpWoke(task) => (a, Some(task), guard)
631                     }
632                 }
633             }
634             Flavor::Stream(ref p) => {
635                 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
636                 unsafe {
637                     let guard = (*a.get()).postinit_lock();
638                     let rx = Receiver::new(Flavor::Shared(a.clone()));
639                     match (*p.get()).upgrade(rx) {
640                         stream::UpSuccess |
641                         stream::UpDisconnected => (a, None, guard),
642                         stream::UpWoke(task) => (a, Some(task), guard),
643                     }
644                 }
645             }
646             Flavor::Shared(ref p) => {
647                 unsafe { (*p.get()).clone_chan(); }
648                 return Sender::new(Flavor::Shared(p.clone()));
649             }
650             Flavor::Sync(..) => unreachable!(),
651         };
652
653         unsafe {
654             (*packet.get()).inherit_blocker(sleeper, guard);
655
656             let tmp = Sender::new(Flavor::Shared(packet.clone()));
657             mem::swap(self.inner_mut(), tmp.inner_mut());
658         }
659         Sender::new(Flavor::Shared(packet))
660     }
661 }
662
663 #[unsafe_destructor]
664 #[stable]
665 impl<T: Send> Drop for Sender<T> {
666     fn drop(&mut self) {
667         match *unsafe { self.inner_mut() } {
668             Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
669             Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
670             Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
671             Flavor::Sync(..) => unreachable!(),
672         }
673     }
674 }
675
676 ////////////////////////////////////////////////////////////////////////////////
677 // SyncSender
678 ////////////////////////////////////////////////////////////////////////////////
679
680 impl<T: Send> SyncSender<T> {
681     fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
682         SyncSender { inner: inner }
683     }
684
685     /// Sends a value on this synchronous channel.
686     ///
687     /// This function will *block* until space in the internal buffer becomes
688     /// available or a receiver is available to hand off the message to.
689     ///
690     /// Note that a successful send does *not* guarantee that the receiver will
691     /// ever see the data if there is a buffer on this channel. Items may be
692     /// enqueued in the internal buffer for the receiver to receive at a later
693     /// time. If the buffer size is 0, however, it can be guaranteed that the
694     /// receiver has indeed received the data if this function returns success.
695     ///
696     /// This function will never panic, but it may return `Err` if the
697     /// `Receiver` has disconnected and is no longer able to receive
698     /// information.
699     #[stable]
700     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
701         unsafe { (*self.inner.get()).send(t).map_err(SendError) }
702     }
703
704     /// Attempts to send a value on this channel without blocking.
705     ///
706     /// This method differs from `send` by returning immediately if the
707     /// channel's buffer is full or no receiver is waiting to acquire some
708     /// data. Compared with `send`, this function has two failure cases
709     /// instead of one (one for disconnection, one for a full buffer).
710     ///
711     /// See `SyncSender::send` for notes about guarantees of whether the
712     /// receiver has received the data or not if this function is successful.
713     #[stable]
714     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
715         unsafe { (*self.inner.get()).try_send(t) }
716     }
717 }
718
719 #[stable]
720 impl<T: Send> Clone for SyncSender<T> {
721     fn clone(&self) -> SyncSender<T> {
722         unsafe { (*self.inner.get()).clone_chan(); }
723         return SyncSender::new(self.inner.clone());
724     }
725 }
726
727 #[unsafe_destructor]
728 #[stable]
729 impl<T: Send> Drop for SyncSender<T> {
730     fn drop(&mut self) {
731         unsafe { (*self.inner.get()).drop_chan(); }
732     }
733 }
734
735 ////////////////////////////////////////////////////////////////////////////////
736 // Receiver
737 ////////////////////////////////////////////////////////////////////////////////
738
739 impl<T: Send> Receiver<T> {
740     fn new(inner: Flavor<T>) -> Receiver<T> {
741         Receiver { inner: UnsafeCell::new(inner) }
742     }
743
744     /// Attempts to return a pending value on this receiver without blocking
745     ///
746     /// This method will never block the caller in order to wait for data to
747     /// become available. Instead, this will always return immediately with a
748     /// possible option of pending data on the channel.
749     ///
750     /// This is useful for a flavor of "optimistic check" before deciding to
751     /// block on a receiver.
752     #[stable]
753     pub fn try_recv(&self) -> Result<T, TryRecvError> {
754         loop {
755             let new_port = match *unsafe { self.inner() } {
756                 Flavor::Oneshot(ref p) => {
757                     match unsafe { (*p.get()).try_recv() } {
758                         Ok(t) => return Ok(t),
759                         Err(oneshot::Empty) => return Err(TryRecvError::Empty),
760                         Err(oneshot::Disconnected) => {
761                             return Err(TryRecvError::Disconnected)
762                         }
763                         Err(oneshot::Upgraded(rx)) => rx,
764                     }
765                 }
766                 Flavor::Stream(ref p) => {
767                     match unsafe { (*p.get()).try_recv() } {
768                         Ok(t) => return Ok(t),
769                         Err(stream::Empty) => return Err(TryRecvError::Empty),
770                         Err(stream::Disconnected) => {
771                             return Err(TryRecvError::Disconnected)
772                         }
773                         Err(stream::Upgraded(rx)) => rx,
774                     }
775                 }
776                 Flavor::Shared(ref p) => {
777                     match unsafe { (*p.get()).try_recv() } {
778                         Ok(t) => return Ok(t),
779                         Err(shared::Empty) => return Err(TryRecvError::Empty),
780                         Err(shared::Disconnected) => {
781                             return Err(TryRecvError::Disconnected)
782                         }
783                     }
784                 }
785                 Flavor::Sync(ref p) => {
786                     match unsafe { (*p.get()).try_recv() } {
787                         Ok(t) => return Ok(t),
788                         Err(sync::Empty) => return Err(TryRecvError::Empty),
789                         Err(sync::Disconnected) => {
790                             return Err(TryRecvError::Disconnected)
791                         }
792                     }
793                 }
794             };
795             unsafe {
796                 mem::swap(self.inner_mut(),
797                           new_port.inner_mut());
798             }
799         }
800     }
801
802     /// Attempt to wait for a value on this receiver, returning an error if the
803     /// corresponding channel has hung up.
804     ///
805     /// This function will always block the current thread if there is no data
806     /// available and it's possible for more data to be sent. Once a message is
807     /// sent to the corresponding `Sender`, then this receiver will wake up and
808     /// return that message.
809     ///
810     /// If the corresponding `Sender` has disconnected, or it disconnects while
811     /// this call is blocking, this call will wake up and return `Err` to
812     /// indicate that no more messages can ever be received on this channel.
813     #[stable]
814     pub fn recv(&self) -> Result<T, RecvError> {
815         loop {
816             let new_port = match *unsafe { self.inner() } {
817                 Flavor::Oneshot(ref p) => {
818                     match unsafe { (*p.get()).recv() } {
819                         Ok(t) => return Ok(t),
820                         Err(oneshot::Empty) => return unreachable!(),
821                         Err(oneshot::Disconnected) => return Err(RecvError),
822                         Err(oneshot::Upgraded(rx)) => rx,
823                     }
824                 }
825                 Flavor::Stream(ref p) => {
826                     match unsafe { (*p.get()).recv() } {
827                         Ok(t) => return Ok(t),
828                         Err(stream::Empty) => return unreachable!(),
829                         Err(stream::Disconnected) => return Err(RecvError),
830                         Err(stream::Upgraded(rx)) => rx,
831                     }
832                 }
833                 Flavor::Shared(ref p) => {
834                     match unsafe { (*p.get()).recv() } {
835                         Ok(t) => return Ok(t),
836                         Err(shared::Empty) => return unreachable!(),
837                         Err(shared::Disconnected) => return Err(RecvError),
838                     }
839                 }
840                 Flavor::Sync(ref p) => return unsafe {
841                     (*p.get()).recv().map_err(|()| RecvError)
842                 }
843             };
844             unsafe {
845                 mem::swap(self.inner_mut(), new_port.inner_mut());
846             }
847         }
848     }
849
850     /// Returns an iterator that will block waiting for messages, but never
851     /// `panic!`. It will return `None` when the channel has hung up.
852     #[stable]
853     pub fn iter(&self) -> Iter<T> {
854         Iter { rx: self }
855     }
856 }
857
858 impl<T: Send> select::Packet for Receiver<T> {
859     fn can_recv(&self) -> bool {
860         loop {
861             let new_port = match *unsafe { self.inner() } {
862                 Flavor::Oneshot(ref p) => {
863                     match unsafe { (*p.get()).can_recv() } {
864                         Ok(ret) => return ret,
865                         Err(upgrade) => upgrade,
866                     }
867                 }
868                 Flavor::Stream(ref p) => {
869                     match unsafe { (*p.get()).can_recv() } {
870                         Ok(ret) => return ret,
871                         Err(upgrade) => upgrade,
872                     }
873                 }
874                 Flavor::Shared(ref p) => {
875                     return unsafe { (*p.get()).can_recv() };
876                 }
877                 Flavor::Sync(ref p) => {
878                     return unsafe { (*p.get()).can_recv() };
879                 }
880             };
881             unsafe {
882                 mem::swap(self.inner_mut(),
883                           new_port.inner_mut());
884             }
885         }
886     }
887
888     fn start_selection(&self, mut token: SignalToken) -> StartResult {
889         loop {
890             let (t, new_port) = match *unsafe { self.inner() } {
891                 Flavor::Oneshot(ref p) => {
892                     match unsafe { (*p.get()).start_selection(token) } {
893                         oneshot::SelSuccess => return Installed,
894                         oneshot::SelCanceled => return Abort,
895                         oneshot::SelUpgraded(t, rx) => (t, rx),
896                     }
897                 }
898                 Flavor::Stream(ref p) => {
899                     match unsafe { (*p.get()).start_selection(token) } {
900                         stream::SelSuccess => return Installed,
901                         stream::SelCanceled => return Abort,
902                         stream::SelUpgraded(t, rx) => (t, rx),
903                     }
904                 }
905                 Flavor::Shared(ref p) => {
906                     return unsafe { (*p.get()).start_selection(token) };
907                 }
908                 Flavor::Sync(ref p) => {
909                     return unsafe { (*p.get()).start_selection(token) };
910                 }
911             };
912             token = t;
913             unsafe {
914                 mem::swap(self.inner_mut(), new_port.inner_mut());
915             }
916         }
917     }
918
919     fn abort_selection(&self) -> bool {
920         let mut was_upgrade = false;
921         loop {
922             let result = match *unsafe { self.inner() } {
923                 Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
924                 Flavor::Stream(ref p) => unsafe {
925                     (*p.get()).abort_selection(was_upgrade)
926                 },
927                 Flavor::Shared(ref p) => return unsafe {
928                     (*p.get()).abort_selection(was_upgrade)
929                 },
930                 Flavor::Sync(ref p) => return unsafe {
931                     (*p.get()).abort_selection()
932                 },
933             };
934             let new_port = match result { Ok(b) => return b, Err(p) => p };
935             was_upgrade = true;
936             unsafe {
937                 mem::swap(self.inner_mut(),
938                           new_port.inner_mut());
939             }
940         }
941     }
942 }
943
944 #[stable]
945 impl<'a, T: Send> Iterator for Iter<'a, T> {
946     type Item = T;
947
948     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
949 }
950
951 #[unsafe_destructor]
952 #[stable]
953 impl<T: Send> Drop for Receiver<T> {
954     fn drop(&mut self) {
955         match *unsafe { self.inner_mut() } {
956             Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
957             Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
958             Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
959             Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
960         }
961     }
962 }
963
964 #[stable]
965 impl<T> fmt::Display for SendError<T> {
966     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
967         "sending on a closed channel".fmt(f)
968     }
969 }
970
971 #[stable]
972 impl<T> fmt::Display for TrySendError<T> {
973     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
974         match *self {
975             TrySendError::Full(..) => {
976                 "sending on a full channel".fmt(f)
977             }
978             TrySendError::Disconnected(..) => {
979                 "sending on a closed channel".fmt(f)
980             }
981         }
982     }
983 }
984
985 #[stable]
986 impl fmt::Display for RecvError {
987     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
988         "receiving on a closed channel".fmt(f)
989     }
990 }
991
992 #[stable]
993 impl fmt::Display for TryRecvError {
994     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
995         match *self {
996             TryRecvError::Empty => {
997                 "receiving on an empty channel".fmt(f)
998             }
999             TryRecvError::Disconnected => {
1000                 "receiving on a closed channel".fmt(f)
1001             }
1002         }
1003     }
1004 }
1005
1006 #[cfg(test)]
1007 mod test {
1008     use prelude::v1::*;
1009
1010     use os;
1011     use super::*;
1012     use thread::Thread;
1013
1014     pub fn stress_factor() -> uint {
1015         match os::getenv("RUST_TEST_STRESS") {
1016             Some(val) => val.parse().unwrap(),
1017             None => 1,
1018         }
1019     }
1020
1021     #[test]
1022     fn smoke() {
1023         let (tx, rx) = channel::<int>();
1024         tx.send(1).unwrap();
1025         assert_eq!(rx.recv().unwrap(), 1);
1026     }
1027
1028     #[test]
1029     fn drop_full() {
1030         let (tx, _rx) = channel();
1031         tx.send(box 1i).unwrap();
1032     }
1033
1034     #[test]
1035     fn drop_full_shared() {
1036         let (tx, _rx) = channel();
1037         drop(tx.clone());
1038         drop(tx.clone());
1039         tx.send(box 1i).unwrap();
1040     }
1041
1042     #[test]
1043     fn smoke_shared() {
1044         let (tx, rx) = channel::<int>();
1045         tx.send(1).unwrap();
1046         assert_eq!(rx.recv().unwrap(), 1);
1047         let tx = tx.clone();
1048         tx.send(1).unwrap();
1049         assert_eq!(rx.recv().unwrap(), 1);
1050     }
1051
1052     #[test]
1053     fn smoke_threads() {
1054         let (tx, rx) = channel::<int>();
1055         let _t = Thread::spawn(move|| {
1056             tx.send(1).unwrap();
1057         });
1058         assert_eq!(rx.recv().unwrap(), 1);
1059     }
1060
1061     #[test]
1062     fn smoke_port_gone() {
1063         let (tx, rx) = channel::<int>();
1064         drop(rx);
1065         assert!(tx.send(1).is_err());
1066     }
1067
1068     #[test]
1069     fn smoke_shared_port_gone() {
1070         let (tx, rx) = channel::<int>();
1071         drop(rx);
1072         assert!(tx.send(1).is_err())
1073     }
1074
1075     #[test]
1076     fn smoke_shared_port_gone2() {
1077         let (tx, rx) = channel::<int>();
1078         drop(rx);
1079         let tx2 = tx.clone();
1080         drop(tx);
1081         assert!(tx2.send(1).is_err());
1082     }
1083
1084     #[test]
1085     fn port_gone_concurrent() {
1086         let (tx, rx) = channel::<int>();
1087         let _t = Thread::spawn(move|| {
1088             rx.recv().unwrap();
1089         });
1090         while tx.send(1).is_ok() {}
1091     }
1092
1093     #[test]
1094     fn port_gone_concurrent_shared() {
1095         let (tx, rx) = channel::<int>();
1096         let tx2 = tx.clone();
1097         let _t = Thread::spawn(move|| {
1098             rx.recv().unwrap();
1099         });
1100         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1101     }
1102
1103     #[test]
1104     fn smoke_chan_gone() {
1105         let (tx, rx) = channel::<int>();
1106         drop(tx);
1107         assert!(rx.recv().is_err());
1108     }
1109
1110     #[test]
1111     fn smoke_chan_gone_shared() {
1112         let (tx, rx) = channel::<()>();
1113         let tx2 = tx.clone();
1114         drop(tx);
1115         drop(tx2);
1116         assert!(rx.recv().is_err());
1117     }
1118
1119     #[test]
1120     fn chan_gone_concurrent() {
1121         let (tx, rx) = channel::<int>();
1122         let _t = Thread::spawn(move|| {
1123             tx.send(1).unwrap();
1124             tx.send(1).unwrap();
1125         });
1126         while rx.recv().is_ok() {}
1127     }
1128
1129     #[test]
1130     fn stress() {
1131         let (tx, rx) = channel::<int>();
1132         let t = Thread::scoped(move|| {
1133             for _ in range(0u, 10000) { tx.send(1i).unwrap(); }
1134         });
1135         for _ in range(0u, 10000) {
1136             assert_eq!(rx.recv().unwrap(), 1);
1137         }
1138         t.join().ok().unwrap();
1139     }
1140
1141     #[test]
1142     fn stress_shared() {
1143         static AMT: uint = 10000;
1144         static NTHREADS: uint = 8;
1145         let (tx, rx) = channel::<int>();
1146
1147         let t = Thread::scoped(move|| {
1148             for _ in range(0, AMT * NTHREADS) {
1149                 assert_eq!(rx.recv().unwrap(), 1);
1150             }
1151             match rx.try_recv() {
1152                 Ok(..) => panic!(),
1153                 _ => {}
1154             }
1155         });
1156
1157         for _ in range(0, NTHREADS) {
1158             let tx = tx.clone();
1159             Thread::spawn(move|| {
1160                 for _ in range(0, AMT) { tx.send(1).unwrap(); }
1161             });
1162         }
1163         drop(tx);
1164         t.join().ok().unwrap();
1165     }
1166
1167     #[test]
1168     fn send_from_outside_runtime() {
1169         let (tx1, rx1) = channel::<()>();
1170         let (tx2, rx2) = channel::<int>();
1171         let t1 = Thread::scoped(move|| {
1172             tx1.send(()).unwrap();
1173             for _ in range(0i, 40) {
1174                 assert_eq!(rx2.recv().unwrap(), 1);
1175             }
1176         });
1177         rx1.recv().unwrap();
1178         let t2 = Thread::scoped(move|| {
1179             for _ in range(0i, 40) {
1180                 tx2.send(1).unwrap();
1181             }
1182         });
1183         t1.join().ok().unwrap();
1184         t2.join().ok().unwrap();
1185     }
1186
1187     #[test]
1188     fn recv_from_outside_runtime() {
1189         let (tx, rx) = channel::<int>();
1190         let t = Thread::scoped(move|| {
1191             for _ in range(0i, 40) {
1192                 assert_eq!(rx.recv().unwrap(), 1);
1193             }
1194         });
1195         for _ in range(0u, 40) {
1196             tx.send(1).unwrap();
1197         }
1198         t.join().ok().unwrap();
1199     }
1200
1201     #[test]
1202     fn no_runtime() {
1203         let (tx1, rx1) = channel::<int>();
1204         let (tx2, rx2) = channel::<int>();
1205         let t1 = Thread::scoped(move|| {
1206             assert_eq!(rx1.recv().unwrap(), 1);
1207             tx2.send(2).unwrap();
1208         });
1209         let t2 = Thread::scoped(move|| {
1210             tx1.send(1).unwrap();
1211             assert_eq!(rx2.recv().unwrap(), 2);
1212         });
1213         t1.join().ok().unwrap();
1214         t2.join().ok().unwrap();
1215     }
1216
1217     #[test]
1218     fn oneshot_single_thread_close_port_first() {
1219         // Simple test of closing without sending
1220         let (_tx, rx) = channel::<int>();
1221         drop(rx);
1222     }
1223
1224     #[test]
1225     fn oneshot_single_thread_close_chan_first() {
1226         // Simple test of closing without sending
1227         let (tx, _rx) = channel::<int>();
1228         drop(tx);
1229     }
1230
1231     #[test]
1232     fn oneshot_single_thread_send_port_close() {
1233         // Testing that the sender cleans up the payload if receiver is closed
1234         let (tx, rx) = channel::<Box<int>>();
1235         drop(rx);
1236         assert!(tx.send(box 0).is_err());
1237     }
1238
1239     #[test]
1240     fn oneshot_single_thread_recv_chan_close() {
1241         // Receiving on a closed chan will panic
1242         let res = Thread::scoped(move|| {
1243             let (tx, rx) = channel::<int>();
1244             drop(tx);
1245             rx.recv().unwrap();
1246         }).join();
1247         // What is our res?
1248         assert!(res.is_err());
1249     }
1250
1251     #[test]
1252     fn oneshot_single_thread_send_then_recv() {
1253         let (tx, rx) = channel::<Box<int>>();
1254         tx.send(box 10).unwrap();
1255         assert!(rx.recv().unwrap() == box 10);
1256     }
1257
1258     #[test]
1259     fn oneshot_single_thread_try_send_open() {
1260         let (tx, rx) = channel::<int>();
1261         assert!(tx.send(10).is_ok());
1262         assert!(rx.recv().unwrap() == 10);
1263     }
1264
1265     #[test]
1266     fn oneshot_single_thread_try_send_closed() {
1267         let (tx, rx) = channel::<int>();
1268         drop(rx);
1269         assert!(tx.send(10).is_err());
1270     }
1271
1272     #[test]
1273     fn oneshot_single_thread_try_recv_open() {
1274         let (tx, rx) = channel::<int>();
1275         tx.send(10).unwrap();
1276         assert!(rx.recv() == Ok(10));
1277     }
1278
1279     #[test]
1280     fn oneshot_single_thread_try_recv_closed() {
1281         let (tx, rx) = channel::<int>();
1282         drop(tx);
1283         assert!(rx.recv().is_err());
1284     }
1285
1286     #[test]
1287     fn oneshot_single_thread_peek_data() {
1288         let (tx, rx) = channel::<int>();
1289         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1290         tx.send(10).unwrap();
1291         assert_eq!(rx.try_recv(), Ok(10));
1292     }
1293
1294     #[test]
1295     fn oneshot_single_thread_peek_close() {
1296         let (tx, rx) = channel::<int>();
1297         drop(tx);
1298         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1299         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1300     }
1301
1302     #[test]
1303     fn oneshot_single_thread_peek_open() {
1304         let (_tx, rx) = channel::<int>();
1305         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1306     }
1307
1308     #[test]
1309     fn oneshot_multi_task_recv_then_send() {
1310         let (tx, rx) = channel::<Box<int>>();
1311         let _t = Thread::spawn(move|| {
1312             assert!(rx.recv().unwrap() == box 10);
1313         });
1314
1315         tx.send(box 10).unwrap();
1316     }
1317
1318     #[test]
1319     fn oneshot_multi_task_recv_then_close() {
1320         let (tx, rx) = channel::<Box<int>>();
1321         let _t = Thread::spawn(move|| {
1322             drop(tx);
1323         });
1324         let res = Thread::scoped(move|| {
1325             assert!(rx.recv().unwrap() == box 10);
1326         }).join();
1327         assert!(res.is_err());
1328     }
1329
1330     #[test]
1331     fn oneshot_multi_thread_close_stress() {
1332         for _ in range(0, stress_factor()) {
1333             let (tx, rx) = channel::<int>();
1334             let _t = Thread::spawn(move|| {
1335                 drop(rx);
1336             });
1337             drop(tx);
1338         }
1339     }
1340
1341     #[test]
1342     fn oneshot_multi_thread_send_close_stress() {
1343         for _ in range(0, stress_factor()) {
1344             let (tx, rx) = channel::<int>();
1345             let _t = Thread::spawn(move|| {
1346                 drop(rx);
1347             });
1348             let _ = Thread::scoped(move|| {
1349                 tx.send(1).unwrap();
1350             }).join();
1351         }
1352     }
1353
1354     #[test]
1355     fn oneshot_multi_thread_recv_close_stress() {
1356         for _ in range(0, stress_factor()) {
1357             let (tx, rx) = channel::<int>();
1358             Thread::spawn(move|| {
1359                 let res = Thread::scoped(move|| {
1360                     rx.recv().unwrap();
1361                 }).join();
1362                 assert!(res.is_err());
1363             });
1364             let _t = Thread::spawn(move|| {
1365                 Thread::spawn(move|| {
1366                     drop(tx);
1367                 });
1368             });
1369         }
1370     }
1371
1372     #[test]
1373     fn oneshot_multi_thread_send_recv_stress() {
1374         for _ in range(0, stress_factor()) {
1375             let (tx, rx) = channel();
1376             let _t = Thread::spawn(move|| {
1377                 tx.send(box 10i).unwrap();
1378             });
1379             assert!(rx.recv().unwrap() == box 10i);
1380         }
1381     }
1382
1383     #[test]
1384     fn stream_send_recv_stress() {
1385         for _ in range(0, stress_factor()) {
1386             let (tx, rx) = channel();
1387
1388             send(tx, 0);
1389             recv(rx, 0);
1390
1391             fn send(tx: Sender<Box<int>>, i: int) {
1392                 if i == 10 { return }
1393
1394                 Thread::spawn(move|| {
1395                     tx.send(box i).unwrap();
1396                     send(tx, i + 1);
1397                 });
1398             }
1399
1400             fn recv(rx: Receiver<Box<int>>, i: int) {
1401                 if i == 10 { return }
1402
1403                 Thread::spawn(move|| {
1404                     assert!(rx.recv().unwrap() == box i);
1405                     recv(rx, i + 1);
1406                 });
1407             }
1408         }
1409     }
1410
1411     #[test]
1412     fn recv_a_lot() {
1413         // Regression test that we don't run out of stack in scheduler context
1414         let (tx, rx) = channel();
1415         for _ in range(0i, 10000) { tx.send(()).unwrap(); }
1416         for _ in range(0i, 10000) { rx.recv().unwrap(); }
1417     }
1418
1419     #[test]
1420     fn shared_chan_stress() {
1421         let (tx, rx) = channel();
1422         let total = stress_factor() + 100;
1423         for _ in range(0, total) {
1424             let tx = tx.clone();
1425             Thread::spawn(move|| {
1426                 tx.send(()).unwrap();
1427             });
1428         }
1429
1430         for _ in range(0, total) {
1431             rx.recv().unwrap();
1432         }
1433     }
1434
1435     #[test]
1436     fn test_nested_recv_iter() {
1437         let (tx, rx) = channel::<int>();
1438         let (total_tx, total_rx) = channel::<int>();
1439
1440         let _t = Thread::spawn(move|| {
1441             let mut acc = 0;
1442             for x in rx.iter() {
1443                 acc += x;
1444             }
1445             total_tx.send(acc).unwrap();
1446         });
1447
1448         tx.send(3).unwrap();
1449         tx.send(1).unwrap();
1450         tx.send(2).unwrap();
1451         drop(tx);
1452         assert_eq!(total_rx.recv().unwrap(), 6);
1453     }
1454
1455     #[test]
1456     fn test_recv_iter_break() {
1457         let (tx, rx) = channel::<int>();
1458         let (count_tx, count_rx) = channel();
1459
1460         let _t = Thread::spawn(move|| {
1461             let mut count = 0;
1462             for x in rx.iter() {
1463                 if count >= 3 {
1464                     break;
1465                 } else {
1466                     count += x;
1467                 }
1468             }
1469             count_tx.send(count).unwrap();
1470         });
1471
1472         tx.send(2).unwrap();
1473         tx.send(2).unwrap();
1474         tx.send(2).unwrap();
1475         let _ = tx.send(2);
1476         drop(tx);
1477         assert_eq!(count_rx.recv().unwrap(), 4);
1478     }
1479
1480     #[test]
1481     fn try_recv_states() {
1482         let (tx1, rx1) = channel::<int>();
1483         let (tx2, rx2) = channel::<()>();
1484         let (tx3, rx3) = channel::<()>();
1485         let _t = Thread::spawn(move|| {
1486             rx2.recv().unwrap();
1487             tx1.send(1).unwrap();
1488             tx3.send(()).unwrap();
1489             rx2.recv().unwrap();
1490             drop(tx1);
1491             tx3.send(()).unwrap();
1492         });
1493
1494         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1495         tx2.send(()).unwrap();
1496         rx3.recv().unwrap();
1497         assert_eq!(rx1.try_recv(), Ok(1));
1498         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1499         tx2.send(()).unwrap();
1500         rx3.recv().unwrap();
1501         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1502     }
1503
1504     // This bug used to end up in a livelock inside of the Receiver destructor
1505     // because the internal state of the Shared packet was corrupted
1506     #[test]
1507     fn destroy_upgraded_shared_port_when_sender_still_active() {
1508         let (tx, rx) = channel();
1509         let (tx2, rx2) = channel();
1510         let _t = Thread::spawn(move|| {
1511             rx.recv().unwrap(); // wait on a oneshot
1512             drop(rx);  // destroy a shared
1513             tx2.send(()).unwrap();
1514         });
1515         // make sure the other task has gone to sleep
1516         for _ in range(0u, 5000) { Thread::yield_now(); }
1517
1518         // upgrade to a shared chan and send a message
1519         let t = tx.clone();
1520         drop(tx);
1521         t.send(()).unwrap();
1522
1523         // wait for the child task to exit before we exit
1524         rx2.recv().unwrap();
1525     }
1526 }
1527
1528 #[cfg(test)]
1529 mod sync_tests {
1530     use prelude::v1::*;
1531
1532     use os;
1533     use thread::Thread;
1534     use super::*;
1535
1536     pub fn stress_factor() -> uint {
1537         match os::getenv("RUST_TEST_STRESS") {
1538             Some(val) => val.parse().unwrap(),
1539             None => 1,
1540         }
1541     }
1542
1543     #[test]
1544     fn smoke() {
1545         let (tx, rx) = sync_channel::<int>(1);
1546         tx.send(1).unwrap();
1547         assert_eq!(rx.recv().unwrap(), 1);
1548     }
1549
1550     #[test]
1551     fn drop_full() {
1552         let (tx, _rx) = sync_channel(1);
1553         tx.send(box 1i).unwrap();
1554     }
1555
1556     #[test]
1557     fn smoke_shared() {
1558         let (tx, rx) = sync_channel::<int>(1);
1559         tx.send(1).unwrap();
1560         assert_eq!(rx.recv().unwrap(), 1);
1561         let tx = tx.clone();
1562         tx.send(1).unwrap();
1563         assert_eq!(rx.recv().unwrap(), 1);
1564     }
1565
1566     #[test]
1567     fn smoke_threads() {
1568         let (tx, rx) = sync_channel::<int>(0);
1569         let _t = Thread::spawn(move|| {
1570             tx.send(1).unwrap();
1571         });
1572         assert_eq!(rx.recv().unwrap(), 1);
1573     }
1574
1575     #[test]
1576     fn smoke_port_gone() {
1577         let (tx, rx) = sync_channel::<int>(0);
1578         drop(rx);
1579         assert!(tx.send(1).is_err());
1580     }
1581
1582     #[test]
1583     fn smoke_shared_port_gone2() {
1584         let (tx, rx) = sync_channel::<int>(0);
1585         drop(rx);
1586         let tx2 = tx.clone();
1587         drop(tx);
1588         assert!(tx2.send(1).is_err());
1589     }
1590
1591     #[test]
1592     fn port_gone_concurrent() {
1593         let (tx, rx) = sync_channel::<int>(0);
1594         let _t = Thread::spawn(move|| {
1595             rx.recv().unwrap();
1596         });
1597         while tx.send(1).is_ok() {}
1598     }
1599
1600     #[test]
1601     fn port_gone_concurrent_shared() {
1602         let (tx, rx) = sync_channel::<int>(0);
1603         let tx2 = tx.clone();
1604         let _t = Thread::spawn(move|| {
1605             rx.recv().unwrap();
1606         });
1607         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1608     }
1609
1610     #[test]
1611     fn smoke_chan_gone() {
1612         let (tx, rx) = sync_channel::<int>(0);
1613         drop(tx);
1614         assert!(rx.recv().is_err());
1615     }
1616
1617     #[test]
1618     fn smoke_chan_gone_shared() {
1619         let (tx, rx) = sync_channel::<()>(0);
1620         let tx2 = tx.clone();
1621         drop(tx);
1622         drop(tx2);
1623         assert!(rx.recv().is_err());
1624     }
1625
1626     #[test]
1627     fn chan_gone_concurrent() {
1628         let (tx, rx) = sync_channel::<int>(0);
1629         Thread::spawn(move|| {
1630             tx.send(1).unwrap();
1631             tx.send(1).unwrap();
1632         });
1633         while rx.recv().is_ok() {}
1634     }
1635
1636     #[test]
1637     fn stress() {
1638         let (tx, rx) = sync_channel::<int>(0);
1639         Thread::spawn(move|| {
1640             for _ in range(0u, 10000) { tx.send(1).unwrap(); }
1641         });
1642         for _ in range(0u, 10000) {
1643             assert_eq!(rx.recv().unwrap(), 1);
1644         }
1645     }
1646
1647     #[test]
1648     fn stress_shared() {
1649         static AMT: uint = 1000;
1650         static NTHREADS: uint = 8;
1651         let (tx, rx) = sync_channel::<int>(0);
1652         let (dtx, drx) = sync_channel::<()>(0);
1653
1654         Thread::spawn(move|| {
1655             for _ in range(0, AMT * NTHREADS) {
1656                 assert_eq!(rx.recv().unwrap(), 1);
1657             }
1658             match rx.try_recv() {
1659                 Ok(..) => panic!(),
1660                 _ => {}
1661             }
1662             dtx.send(()).unwrap();
1663         });
1664
1665         for _ in range(0, NTHREADS) {
1666             let tx = tx.clone();
1667             Thread::spawn(move|| {
1668                 for _ in range(0, AMT) { tx.send(1).unwrap(); }
1669             });
1670         }
1671         drop(tx);
1672         drx.recv().unwrap();
1673     }
1674
1675     #[test]
1676     fn oneshot_single_thread_close_port_first() {
1677         // Simple test of closing without sending
1678         let (_tx, rx) = sync_channel::<int>(0);
1679         drop(rx);
1680     }
1681
1682     #[test]
1683     fn oneshot_single_thread_close_chan_first() {
1684         // Simple test of closing without sending
1685         let (tx, _rx) = sync_channel::<int>(0);
1686         drop(tx);
1687     }
1688
1689     #[test]
1690     fn oneshot_single_thread_send_port_close() {
1691         // Testing that the sender cleans up the payload if receiver is closed
1692         let (tx, rx) = sync_channel::<Box<int>>(0);
1693         drop(rx);
1694         assert!(tx.send(box 0).is_err());
1695     }
1696
1697     #[test]
1698     fn oneshot_single_thread_recv_chan_close() {
1699         // Receiving on a closed chan will panic
1700         let res = Thread::scoped(move|| {
1701             let (tx, rx) = sync_channel::<int>(0);
1702             drop(tx);
1703             rx.recv().unwrap();
1704         }).join();
1705         // What is our res?
1706         assert!(res.is_err());
1707     }
1708
1709     #[test]
1710     fn oneshot_single_thread_send_then_recv() {
1711         let (tx, rx) = sync_channel::<Box<int>>(1);
1712         tx.send(box 10).unwrap();
1713         assert!(rx.recv().unwrap() == box 10);
1714     }
1715
1716     #[test]
1717     fn oneshot_single_thread_try_send_open() {
1718         let (tx, rx) = sync_channel::<int>(1);
1719         assert_eq!(tx.try_send(10), Ok(()));
1720         assert!(rx.recv().unwrap() == 10);
1721     }
1722
1723     #[test]
1724     fn oneshot_single_thread_try_send_closed() {
1725         let (tx, rx) = sync_channel::<int>(0);
1726         drop(rx);
1727         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1728     }
1729
1730     #[test]
1731     fn oneshot_single_thread_try_send_closed2() {
1732         let (tx, _rx) = sync_channel::<int>(0);
1733         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1734     }
1735
1736     #[test]
1737     fn oneshot_single_thread_try_recv_open() {
1738         let (tx, rx) = sync_channel::<int>(1);
1739         tx.send(10).unwrap();
1740         assert!(rx.recv() == Ok(10));
1741     }
1742
1743     #[test]
1744     fn oneshot_single_thread_try_recv_closed() {
1745         let (tx, rx) = sync_channel::<int>(0);
1746         drop(tx);
1747         assert!(rx.recv().is_err());
1748     }
1749
1750     #[test]
1751     fn oneshot_single_thread_peek_data() {
1752         let (tx, rx) = sync_channel::<int>(1);
1753         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1754         tx.send(10).unwrap();
1755         assert_eq!(rx.try_recv(), Ok(10));
1756     }
1757
1758     #[test]
1759     fn oneshot_single_thread_peek_close() {
1760         let (tx, rx) = sync_channel::<int>(0);
1761         drop(tx);
1762         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1763         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1764     }
1765
1766     #[test]
1767     fn oneshot_single_thread_peek_open() {
1768         let (_tx, rx) = sync_channel::<int>(0);
1769         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1770     }
1771
1772     #[test]
1773     fn oneshot_multi_task_recv_then_send() {
1774         let (tx, rx) = sync_channel::<Box<int>>(0);
1775         let _t = Thread::spawn(move|| {
1776             assert!(rx.recv().unwrap() == box 10);
1777         });
1778
1779         tx.send(box 10).unwrap();
1780     }
1781
1782     #[test]
1783     fn oneshot_multi_task_recv_then_close() {
1784         let (tx, rx) = sync_channel::<Box<int>>(0);
1785         let _t = Thread::spawn(move|| {
1786             drop(tx);
1787         });
1788         let res = Thread::scoped(move|| {
1789             assert!(rx.recv().unwrap() == box 10);
1790         }).join();
1791         assert!(res.is_err());
1792     }
1793
1794     #[test]
1795     fn oneshot_multi_thread_close_stress() {
1796         for _ in range(0, stress_factor()) {
1797             let (tx, rx) = sync_channel::<int>(0);
1798             let _t = Thread::spawn(move|| {
1799                 drop(rx);
1800             });
1801             drop(tx);
1802         }
1803     }
1804
1805     #[test]
1806     fn oneshot_multi_thread_send_close_stress() {
1807         for _ in range(0, stress_factor()) {
1808             let (tx, rx) = sync_channel::<int>(0);
1809             let _t = Thread::spawn(move|| {
1810                 drop(rx);
1811             });
1812             let _ = Thread::scoped(move || {
1813                 tx.send(1).unwrap();
1814             }).join();
1815         }
1816     }
1817
1818     #[test]
1819     fn oneshot_multi_thread_recv_close_stress() {
1820         for _ in range(0, stress_factor()) {
1821             let (tx, rx) = sync_channel::<int>(0);
1822             let _t = Thread::spawn(move|| {
1823                 let res = Thread::scoped(move|| {
1824                     rx.recv().unwrap();
1825                 }).join();
1826                 assert!(res.is_err());
1827             });
1828             let _t = Thread::spawn(move|| {
1829                 Thread::spawn(move|| {
1830                     drop(tx);
1831                 });
1832             });
1833         }
1834     }
1835
1836     #[test]
1837     fn oneshot_multi_thread_send_recv_stress() {
1838         for _ in range(0, stress_factor()) {
1839             let (tx, rx) = sync_channel::<Box<int>>(0);
1840             let _t = Thread::spawn(move|| {
1841                 tx.send(box 10i).unwrap();
1842             });
1843             assert!(rx.recv().unwrap() == box 10i);
1844         }
1845     }
1846
1847     #[test]
1848     fn stream_send_recv_stress() {
1849         for _ in range(0, stress_factor()) {
1850             let (tx, rx) = sync_channel::<Box<int>>(0);
1851
1852             send(tx, 0);
1853             recv(rx, 0);
1854
1855             fn send(tx: SyncSender<Box<int>>, i: int) {
1856                 if i == 10 { return }
1857
1858                 Thread::spawn(move|| {
1859                     tx.send(box i).unwrap();
1860                     send(tx, i + 1);
1861                 });
1862             }
1863
1864             fn recv(rx: Receiver<Box<int>>, i: int) {
1865                 if i == 10 { return }
1866
1867                 Thread::spawn(move|| {
1868                     assert!(rx.recv().unwrap() == box i);
1869                     recv(rx, i + 1);
1870                 });
1871             }
1872         }
1873     }
1874
1875     #[test]
1876     fn recv_a_lot() {
1877         // Regression test that we don't run out of stack in scheduler context
1878         let (tx, rx) = sync_channel(10000);
1879         for _ in range(0u, 10000) { tx.send(()).unwrap(); }
1880         for _ in range(0u, 10000) { rx.recv().unwrap(); }
1881     }
1882
1883     #[test]
1884     fn shared_chan_stress() {
1885         let (tx, rx) = sync_channel(0);
1886         let total = stress_factor() + 100;
1887         for _ in range(0, total) {
1888             let tx = tx.clone();
1889             Thread::spawn(move|| {
1890                 tx.send(()).unwrap();
1891             });
1892         }
1893
1894         for _ in range(0, total) {
1895             rx.recv().unwrap();
1896         }
1897     }
1898
1899     #[test]
1900     fn test_nested_recv_iter() {
1901         let (tx, rx) = sync_channel::<int>(0);
1902         let (total_tx, total_rx) = sync_channel::<int>(0);
1903
1904         let _t = Thread::spawn(move|| {
1905             let mut acc = 0;
1906             for x in rx.iter() {
1907                 acc += x;
1908             }
1909             total_tx.send(acc).unwrap();
1910         });
1911
1912         tx.send(3).unwrap();
1913         tx.send(1).unwrap();
1914         tx.send(2).unwrap();
1915         drop(tx);
1916         assert_eq!(total_rx.recv().unwrap(), 6);
1917     }
1918
1919     #[test]
1920     fn test_recv_iter_break() {
1921         let (tx, rx) = sync_channel::<int>(0);
1922         let (count_tx, count_rx) = sync_channel(0);
1923
1924         let _t = Thread::spawn(move|| {
1925             let mut count = 0;
1926             for x in rx.iter() {
1927                 if count >= 3 {
1928                     break;
1929                 } else {
1930                     count += x;
1931                 }
1932             }
1933             count_tx.send(count).unwrap();
1934         });
1935
1936         tx.send(2).unwrap();
1937         tx.send(2).unwrap();
1938         tx.send(2).unwrap();
1939         let _ = tx.try_send(2);
1940         drop(tx);
1941         assert_eq!(count_rx.recv().unwrap(), 4);
1942     }
1943
1944     #[test]
1945     fn try_recv_states() {
1946         let (tx1, rx1) = sync_channel::<int>(1);
1947         let (tx2, rx2) = sync_channel::<()>(1);
1948         let (tx3, rx3) = sync_channel::<()>(1);
1949         let _t = Thread::spawn(move|| {
1950             rx2.recv().unwrap();
1951             tx1.send(1).unwrap();
1952             tx3.send(()).unwrap();
1953             rx2.recv().unwrap();
1954             drop(tx1);
1955             tx3.send(()).unwrap();
1956         });
1957
1958         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1959         tx2.send(()).unwrap();
1960         rx3.recv().unwrap();
1961         assert_eq!(rx1.try_recv(), Ok(1));
1962         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1963         tx2.send(()).unwrap();
1964         rx3.recv().unwrap();
1965         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1966     }
1967
1968     // This bug used to end up in a livelock inside of the Receiver destructor
1969     // because the internal state of the Shared packet was corrupted
1970     #[test]
1971     fn destroy_upgraded_shared_port_when_sender_still_active() {
1972         let (tx, rx) = sync_channel::<()>(0);
1973         let (tx2, rx2) = sync_channel::<()>(0);
1974         let _t = Thread::spawn(move|| {
1975             rx.recv().unwrap(); // wait on a oneshot
1976             drop(rx);  // destroy a shared
1977             tx2.send(()).unwrap();
1978         });
1979         // make sure the other task has gone to sleep
1980         for _ in range(0u, 5000) { Thread::yield_now(); }
1981
1982         // upgrade to a shared chan and send a message
1983         let t = tx.clone();
1984         drop(tx);
1985         t.send(()).unwrap();
1986
1987         // wait for the child task to exit before we exit
1988         rx2.recv().unwrap();
1989     }
1990
1991     #[test]
1992     fn send1() {
1993         let (tx, rx) = sync_channel::<int>(0);
1994         let _t = Thread::spawn(move|| { rx.recv().unwrap(); });
1995         assert_eq!(tx.send(1), Ok(()));
1996     }
1997
1998     #[test]
1999     fn send2() {
2000         let (tx, rx) = sync_channel::<int>(0);
2001         let _t = Thread::spawn(move|| { drop(rx); });
2002         assert!(tx.send(1).is_err());
2003     }
2004
2005     #[test]
2006     fn send3() {
2007         let (tx, rx) = sync_channel::<int>(1);
2008         assert_eq!(tx.send(1), Ok(()));
2009         let _t =Thread::spawn(move|| { drop(rx); });
2010         assert!(tx.send(1).is_err());
2011     }
2012
2013     #[test]
2014     fn send4() {
2015         let (tx, rx) = sync_channel::<int>(0);
2016         let tx2 = tx.clone();
2017         let (done, donerx) = channel();
2018         let done2 = done.clone();
2019         let _t = Thread::spawn(move|| {
2020             assert!(tx.send(1).is_err());
2021             done.send(()).unwrap();
2022         });
2023         let _t = Thread::spawn(move|| {
2024             assert!(tx2.send(2).is_err());
2025             done2.send(()).unwrap();
2026         });
2027         drop(rx);
2028         donerx.recv().unwrap();
2029         donerx.recv().unwrap();
2030     }
2031
2032     #[test]
2033     fn try_send1() {
2034         let (tx, _rx) = sync_channel::<int>(0);
2035         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2036     }
2037
2038     #[test]
2039     fn try_send2() {
2040         let (tx, _rx) = sync_channel::<int>(1);
2041         assert_eq!(tx.try_send(1), Ok(()));
2042         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2043     }
2044
2045     #[test]
2046     fn try_send3() {
2047         let (tx, rx) = sync_channel::<int>(1);
2048         assert_eq!(tx.try_send(1), Ok(()));
2049         drop(rx);
2050         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2051     }
2052
2053     #[test]
2054     fn issue_15761() {
2055         fn repro() {
2056             let (tx1, rx1) = sync_channel::<()>(3);
2057             let (tx2, rx2) = sync_channel::<()>(3);
2058
2059             let _t = Thread::spawn(move|| {
2060                 rx1.recv().unwrap();
2061                 tx2.try_send(()).unwrap();
2062             });
2063
2064             tx1.try_send(()).unwrap();
2065             rx2.recv().unwrap();
2066         }
2067
2068         for _ in range(0u, 100) {
2069             repro()
2070         }
2071     }
2072 }