]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/mod.rs
Rewrite Condvar::wait_timeout and make it public
[rust.git] / src / libstd / sync / mpsc / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Multi-producer, single-consumer communication primitives threads
12 //!
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
15 //!
16 //! * `Sender`
17 //! * `SyncSender`
18 //! * `Receiver`
19 //!
20 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
23 //!
24 //! These channels come in two flavors:
25 //!
26 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27 //!    will return a `(Sender, Receiver)` tuple where all sends will be
28 //!    **asynchronous** (they never block). The channel conceptually has an
29 //!    infinite buffer.
30 //!
31 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32 //!    a `(SyncSender, Receiver)` tuple where the storage for pending messages
33 //!    is a pre-allocated buffer of a fixed size. All sends will be
34 //!    **synchronous** by blocking until there is buffer space available. Note
35 //!    that a bound of 0 is allowed, causing the channel to become a
36 //!    "rendezvous" channel where each sender atomically hands off a message to
37 //!    a receiver.
38 //!
39 //! ## Disconnection
40 //!
41 //! The send and receive operations on channels will all return a `Result`
42 //! indicating whether the operation succeeded or not. An unsuccessful operation
43 //! is normally indicative of the other half of a channel having "hung up" by
44 //! being dropped in its corresponding thread.
45 //!
46 //! Once half of a channel has been deallocated, most operations can no longer
47 //! continue to make progress, so `Err` will be returned. Many applications will
48 //! continue to `unwrap()` the results returned from this module, instigating a
49 //! propagation of failure among threads if one unexpectedly dies.
50 //!
51 //! # Examples
52 //!
53 //! Simple usage:
54 //!
55 //! ```
56 //! use std::thread::Thread;
57 //! use std::sync::mpsc::channel;
58 //!
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! Thread::spawn(move|| {
62 //!     tx.send(10i).unwrap();
63 //! });
64 //! assert_eq!(rx.recv().unwrap(), 10i);
65 //! ```
66 //!
67 //! Shared usage:
68 //!
69 //! ```
70 //! use std::thread::Thread;
71 //! use std::sync::mpsc::channel;
72 //!
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
77 //! for i in range(0i, 10i) {
78 //!     let tx = tx.clone();
79 //!     Thread::spawn(move|| {
80 //!         tx.send(i).unwrap();
81 //!     });
82 //! }
83 //!
84 //! for _ in range(0i, 10i) {
85 //!     let j = rx.recv().unwrap();
86 //!     assert!(0 <= j && j < 10);
87 //! }
88 //! ```
89 //!
90 //! Propagating panics:
91 //!
92 //! ```
93 //! use std::sync::mpsc::channel;
94 //!
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<int>();
98 //! drop(tx);
99 //! assert!(rx.recv().is_err());
100 //! ```
101 //!
102 //! Synchronous channels:
103 //!
104 //! ```
105 //! use std::thread::Thread;
106 //! use std::sync::mpsc::sync_channel;
107 //!
108 //! let (tx, rx) = sync_channel::<int>(0);
109 //! Thread::spawn(move|| {
110 //!     // This will wait for the parent task to start receiving
111 //!     tx.send(53).unwrap();
112 //! });
113 //! rx.recv().unwrap();
114 //! ```
115 //!
116 //! Reading from a channel with a timeout requires to use a Timer together
117 //! with the channel. You can use the select! macro to select either and
118 //! handle the timeout case. This first example will break out of the loop
119 //! after 10 seconds no matter what:
120 //!
121 //! ```no_run
122 //! use std::sync::mpsc::channel;
123 //! use std::io::timer::Timer;
124 //! use std::time::Duration;
125 //!
126 //! let (tx, rx) = channel::<int>();
127 //! let mut timer = Timer::new().unwrap();
128 //! let timeout = timer.oneshot(Duration::seconds(10));
129 //!
130 //! loop {
131 //!     select! {
132 //!         val = rx.recv() => println!("Received {}", val.unwrap()),
133 //!         _ = timeout.recv() => {
134 //!             println!("timed out, total time was more than 10 seconds");
135 //!             break;
136 //!         }
137 //!     }
138 //! }
139 //! ```
140 //!
141 //! This second example is more costly since it allocates a new timer every
142 //! time a message is received, but it allows you to timeout after the channel
143 //! has been inactive for 5 seconds:
144 //!
145 //! ```no_run
146 //! use std::sync::mpsc::channel;
147 //! use std::io::timer::Timer;
148 //! use std::time::Duration;
149 //!
150 //! let (tx, rx) = channel::<int>();
151 //! let mut timer = Timer::new().unwrap();
152 //!
153 //! loop {
154 //!     let timeout = timer.oneshot(Duration::seconds(5));
155 //!
156 //!     select! {
157 //!         val = rx.recv() => println!("Received {}", val.unwrap()),
158 //!         _ = timeout.recv() => {
159 //!             println!("timed out, no message received in 5 seconds");
160 //!             break;
161 //!         }
162 //!     }
163 //! }
164 //! ```
165
166 #![stable]
167
168 // A description of how Rust's channel implementation works
169 //
170 // Channels are supposed to be the basic building block for all other
171 // concurrent primitives that are used in Rust. As a result, the channel type
172 // needs to be highly optimized, flexible, and broad enough for use everywhere.
173 //
174 // The choice of implementation of all channels is to be built on lock-free data
175 // structures. The channels themselves are then consequently also lock-free data
176 // structures. As always with lock-free code, this is a very "here be dragons"
177 // territory, especially because I'm unaware of any academic papers that have
178 // gone into great length about channels of these flavors.
179 //
180 // ## Flavors of channels
181 //
182 // From the perspective of a consumer of this library, there is only one flavor
183 // of channel. This channel can be used as a stream and cloned to allow multiple
184 // senders. Under the hood, however, there are actually three flavors of
185 // channels in play.
186 //
187 // * Flavor::Oneshots - these channels are highly optimized for the one-send use case.
188 //              They contain as few atomics as possible and involve one and
189 //              exactly one allocation.
190 // * Streams - these channels are optimized for the non-shared use case. They
191 //             use a different concurrent queue that is more tailored for this
192 //             use case. The initial allocation of this flavor of channel is not
193 //             optimized.
194 // * Shared - this is the most general form of channel that this module offers,
195 //            a channel with multiple senders. This type is as optimized as it
196 //            can be, but the previous two types mentioned are much faster for
197 //            their use-cases.
198 //
199 // ## Concurrent queues
200 //
201 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
202 // recv() obviously blocks. This means that under the hood there must be some
203 // shared and concurrent queue holding all of the actual data.
204 //
205 // With two flavors of channels, two flavors of queues are also used. We have
206 // chosen to use queues from a well-known author that are abbreviated as SPSC
207 // and MPSC (single producer, single consumer and multiple producer, single
208 // consumer). SPSC queues are used for streams while MPSC queues are used for
209 // shared channels.
210 //
211 // ### SPSC optimizations
212 //
213 // The SPSC queue found online is essentially a linked list of nodes where one
214 // half of the nodes are the "queue of data" and the other half of nodes are a
215 // cache of unused nodes. The unused nodes are used such that an allocation is
216 // not required on every push() and a free doesn't need to happen on every
217 // pop().
218 //
219 // As found online, however, the cache of nodes is of an infinite size. This
220 // means that if a channel at one point in its life had 50k items in the queue,
221 // then the queue will always have the capacity for 50k items. I believed that
222 // this was an unnecessary limitation of the implementation, so I have altered
223 // the queue to optionally have a bound on the cache size.
224 //
225 // By default, streams will have an unbounded SPSC queue with a small-ish cache
226 // size. The hope is that the cache is still large enough to have very fast
227 // send() operations while not too large such that millions of channels can
228 // coexist at once.
229 //
230 // ### MPSC optimizations
231 //
232 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
233 // a linked list under the hood to earn its unboundedness, but I have not put
234 // forth much effort into having a cache of nodes similar to the SPSC queue.
235 //
236 // For now, I believe that this is "ok" because shared channels are not the most
237 // common type, but soon we may wish to revisit this queue choice and determine
238 // another candidate for backend storage of shared channels.
239 //
240 // ## Overview of the Implementation
241 //
242 // Now that there's a little background on the concurrent queues used, it's
243 // worth going into much more detail about the channels themselves. The basic
244 // pseudocode for a send/recv are:
245 //
246 //
247 //      send(t)                             recv()
248 //        queue.push(t)                       return if queue.pop()
249 //        if increment() == -1                deschedule {
250 //          wakeup()                            if decrement() > 0
251 //                                                cancel_deschedule()
252 //                                            }
253 //                                            queue.pop()
254 //
255 // As mentioned before, there are no locks in this implementation, only atomic
256 // instructions are used.
257 //
258 // ### The internal atomic counter
259 //
260 // Every channel has a shared counter with each half to keep track of the size
261 // of the queue. This counter is used to abort descheduling by the receiver and
262 // to know when to wake up on the sending side.
263 //
264 // As seen in the pseudocode, senders will increment this count and receivers
265 // will decrement the count. The theory behind this is that if a sender sees a
266 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
267 // then it doesn't need to block.
268 //
269 // The recv() method has a beginning call to pop(), and if successful, it needs
270 // to decrement the count. It is a crucial implementation detail that this
271 // decrement does *not* happen to the shared counter. If this were the case,
272 // then it would be possible for the counter to be very negative when there were
273 // no receivers waiting, in which case the senders would have to determine when
274 // it was actually appropriate to wake up a receiver.
275 //
276 // Instead, the "steal count" is kept track of separately (not atomically
277 // because it's only used by receivers), and then the decrement() call when
278 // descheduling will lump in all of the recent steals into one large decrement.
279 //
280 // The implication of this is that if a sender sees a -1 count, then there's
281 // guaranteed to be a waiter waiting!
282 //
283 // ## Native Implementation
284 //
285 // A major goal of these channels is to work seamlessly on and off the runtime.
286 // All of the previous race conditions have been worded in terms of
287 // scheduler-isms (which is obviously not available without the runtime).
288 //
289 // For now, native usage of channels (off the runtime) will fall back onto
290 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
291 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
292 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
293 // condition variable.
294 //
295 // ## Select
296 //
297 // Being able to support selection over channels has greatly influenced this
298 // design, and not only does selection need to work inside the runtime, but also
299 // outside the runtime.
300 //
301 // The implementation is fairly straightforward. The goal of select() is not to
302 // return some data, but only to return which channel can receive data without
303 // blocking. The implementation is essentially the entire blocking procedure
304 // followed by an increment as soon as its woken up. The cancellation procedure
305 // involves an increment and swapping out of to_wake to acquire ownership of the
306 // task to unblock.
307 //
308 // Sadly this current implementation requires multiple allocations, so I have
309 // seen the throughput of select() be much worse than it should be. I do not
310 // believe that there is anything fundamental that needs to change about these
311 // channels, however, in order to support a more efficient select().
312 //
313 // # Conclusion
314 //
315 // And now that you've seen all the races that I found and attempted to fix,
316 // here's the code for you to find some more!
317
318 use prelude::v1::*;
319
320 use sync::Arc;
321 use fmt;
322 use marker;
323 use mem;
324 use cell::UnsafeCell;
325
326 pub use self::select::{Select, Handle};
327 use self::select::StartResult;
328 use self::select::StartResult::*;
329 use self::blocking::SignalToken;
330
331 mod blocking;
332 mod oneshot;
333 mod select;
334 mod shared;
335 mod stream;
336 mod sync;
337 mod mpsc_queue;
338 mod spsc_queue;
339
340 /// The receiving-half of Rust's channel type. This half can only be owned by
341 /// one task
342 #[stable]
343 pub struct Receiver<T> {
344     inner: UnsafeCell<Flavor<T>>,
345 }
346
347 // The receiver port can be sent from place to place, so long as it
348 // is not used to receive non-sendable things.
349 unsafe impl<T:Send> Send for Receiver<T> { }
350
351 /// An iterator over messages on a receiver, this iterator will block
352 /// whenever `next` is called, waiting for a new message, and `None` will be
353 /// returned when the corresponding channel has hung up.
354 #[stable]
355 pub struct Iter<'a, T:'a> {
356     rx: &'a Receiver<T>
357 }
358
359 /// The sending-half of Rust's asynchronous channel type. This half can only be
360 /// owned by one task, but it can be cloned to send to other tasks.
361 #[stable]
362 pub struct Sender<T> {
363     inner: UnsafeCell<Flavor<T>>,
364 }
365
366 // The send port can be sent from place to place, so long as it
367 // is not used to send non-sendable things.
368 unsafe impl<T:Send> Send for Sender<T> { }
369
370 /// The sending-half of Rust's synchronous channel type. This half can only be
371 /// owned by one task, but it can be cloned to send to other tasks.
372 #[stable]
373 pub struct SyncSender<T> {
374     inner: Arc<RacyCell<sync::Packet<T>>>,
375     // can't share in an arc
376     _marker: marker::NoSync,
377 }
378
379 /// An error returned from the `send` function on channels.
380 ///
381 /// A `send` operation can only fail if the receiving end of a channel is
382 /// disconnected, implying that the data could never be received. The error
383 /// contains the data being sent as a payload so it can be recovered.
384 #[derive(PartialEq, Eq)]
385 #[stable]
386 pub struct SendError<T>(pub T);
387
388 /// An error returned from the `recv` function on a `Receiver`.
389 ///
390 /// The `recv` operation can only fail if the sending half of a channel is
391 /// disconnected, implying that no further messages will ever be received.
392 #[derive(PartialEq, Eq, Clone, Copy)]
393 #[stable]
394 pub struct RecvError;
395
396 /// This enumeration is the list of the possible reasons that try_recv could not
397 /// return data when called.
398 #[derive(PartialEq, Clone, Copy)]
399 #[stable]
400 pub enum TryRecvError {
401     /// This channel is currently empty, but the sender(s) have not yet
402     /// disconnected, so data may yet become available.
403     #[stable]
404     Empty,
405
406     /// This channel's sending half has become disconnected, and there will
407     /// never be any more data received on this channel
408     #[stable]
409     Disconnected,
410 }
411
412 /// This enumeration is the list of the possible error outcomes for the
413 /// `SyncSender::try_send` method.
414 #[derive(PartialEq, Clone)]
415 #[stable]
416 pub enum TrySendError<T> {
417     /// The data could not be sent on the channel because it would require that
418     /// the callee block to send the data.
419     ///
420     /// If this is a buffered channel, then the buffer is full at this time. If
421     /// this is not a buffered channel, then there is no receiver available to
422     /// acquire the data.
423     #[stable]
424     Full(T),
425
426     /// This channel's receiving half has disconnected, so the data could not be
427     /// sent. The data is returned back to the callee in this case.
428     #[stable]
429     Disconnected(T),
430 }
431
432 enum Flavor<T> {
433     Oneshot(Arc<RacyCell<oneshot::Packet<T>>>),
434     Stream(Arc<RacyCell<stream::Packet<T>>>),
435     Shared(Arc<RacyCell<shared::Packet<T>>>),
436     Sync(Arc<RacyCell<sync::Packet<T>>>),
437 }
438
439 #[doc(hidden)]
440 trait UnsafeFlavor<T> {
441     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>>;
442     unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> {
443         &mut *self.inner_unsafe().get()
444     }
445     unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
446         &*self.inner_unsafe().get()
447     }
448 }
449 impl<T> UnsafeFlavor<T> for Sender<T> {
450     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
451         &self.inner
452     }
453 }
454 impl<T> UnsafeFlavor<T> for Receiver<T> {
455     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
456         &self.inner
457     }
458 }
459
460 /// Creates a new asynchronous channel, returning the sender/receiver halves.
461 ///
462 /// All data sent on the sender will become available on the receiver, and no
463 /// send will block the calling task (this channel has an "infinite buffer").
464 ///
465 /// # Example
466 ///
467 /// ```
468 /// use std::sync::mpsc::channel;
469 /// use std::thread::Thread;
470 ///
471 /// // tx is is the sending half (tx for transmission), and rx is the receiving
472 /// // half (rx for receiving).
473 /// let (tx, rx) = channel();
474 ///
475 /// // Spawn off an expensive computation
476 /// Thread::spawn(move|| {
477 /// #   fn expensive_computation() {}
478 ///     tx.send(expensive_computation()).unwrap();
479 /// });
480 ///
481 /// // Do some useful work for awhile
482 ///
483 /// // Let's see what that answer was
484 /// println!("{:?}", rx.recv().unwrap());
485 /// ```
486 #[stable]
487 pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
488     let a = Arc::new(RacyCell::new(oneshot::Packet::new()));
489     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
490 }
491
492 /// Creates a new synchronous, bounded channel.
493 ///
494 /// Like asynchronous channels, the `Receiver` will block until a message
495 /// becomes available. These channels differ greatly in the semantics of the
496 /// sender from asynchronous channels, however.
497 ///
498 /// This channel has an internal buffer on which messages will be queued. When
499 /// the internal buffer becomes full, future sends will *block* waiting for the
500 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
501 /// becomes  "rendezvous channel" where each send will not return until a recv
502 /// is paired with it.
503 ///
504 /// As with asynchronous channels, all senders will panic in `send` if the
505 /// `Receiver` has been destroyed.
506 ///
507 /// # Example
508 ///
509 /// ```
510 /// use std::sync::mpsc::sync_channel;
511 /// use std::thread::Thread;
512 ///
513 /// let (tx, rx) = sync_channel(1);
514 ///
515 /// // this returns immediately
516 /// tx.send(1i).unwrap();
517 ///
518 /// Thread::spawn(move|| {
519 ///     // this will block until the previous message has been received
520 ///     tx.send(2i).unwrap();
521 /// });
522 ///
523 /// assert_eq!(rx.recv().unwrap(), 1i);
524 /// assert_eq!(rx.recv().unwrap(), 2i);
525 /// ```
526 #[stable]
527 pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
528     let a = Arc::new(RacyCell::new(sync::Packet::new(bound)));
529     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
530 }
531
532 ////////////////////////////////////////////////////////////////////////////////
533 // Sender
534 ////////////////////////////////////////////////////////////////////////////////
535
536 impl<T: Send> Sender<T> {
537     fn new(inner: Flavor<T>) -> Sender<T> {
538         Sender {
539             inner: UnsafeCell::new(inner),
540         }
541     }
542
543     /// Attempts to send a value on this channel, returning it back if it could
544     /// not be sent.
545     ///
546     /// A successful send occurs when it is determined that the other end of
547     /// the channel has not hung up already. An unsuccessful send would be one
548     /// where the corresponding receiver has already been deallocated. Note
549     /// that a return value of `Err` means that the data will never be
550     /// received, but a return value of `Ok` does *not* mean that the data
551     /// will be received.  It is possible for the corresponding receiver to
552     /// hang up immediately after this function returns `Ok`.
553     ///
554     /// This method will never block the current thread.
555     ///
556     /// # Example
557     ///
558     /// ```
559     /// use std::sync::mpsc::channel;
560     ///
561     /// let (tx, rx) = channel();
562     ///
563     /// // This send is always successful
564     /// tx.send(1i).unwrap();
565     ///
566     /// // This send will fail because the receiver is gone
567     /// drop(rx);
568     /// assert_eq!(tx.send(1i).err().unwrap().0, 1);
569     /// ```
570     #[stable]
571     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
572         let (new_inner, ret) = match *unsafe { self.inner() } {
573             Flavor::Oneshot(ref p) => {
574                 unsafe {
575                     let p = p.get();
576                     if !(*p).sent() {
577                         return (*p).send(t).map_err(SendError);
578                     } else {
579                         let a =
580                             Arc::new(RacyCell::new(stream::Packet::new()));
581                         let rx = Receiver::new(Flavor::Stream(a.clone()));
582                         match (*p).upgrade(rx) {
583                             oneshot::UpSuccess => {
584                                 let ret = (*a.get()).send(t);
585                                 (a, ret)
586                             }
587                             oneshot::UpDisconnected => (a, Err(t)),
588                             oneshot::UpWoke(token) => {
589                                 // This send cannot panic because the thread is
590                                 // asleep (we're looking at it), so the receiver
591                                 // can't go away.
592                                 (*a.get()).send(t).ok().unwrap();
593                         token.signal();
594                                 (a, Ok(()))
595                             }
596                         }
597                     }
598                 }
599             }
600             Flavor::Stream(ref p) => return unsafe {
601                 (*p.get()).send(t).map_err(SendError)
602             },
603             Flavor::Shared(ref p) => return unsafe {
604                 (*p.get()).send(t).map_err(SendError)
605             },
606             Flavor::Sync(..) => unreachable!(),
607         };
608
609         unsafe {
610             let tmp = Sender::new(Flavor::Stream(new_inner));
611             mem::swap(self.inner_mut(), tmp.inner_mut());
612         }
613         ret.map_err(SendError)
614     }
615 }
616
617 #[stable]
618 impl<T: Send> Clone for Sender<T> {
619     fn clone(&self) -> Sender<T> {
620         let (packet, sleeper, guard) = match *unsafe { self.inner() } {
621             Flavor::Oneshot(ref p) => {
622                 let a = Arc::new(RacyCell::new(shared::Packet::new()));
623                 unsafe {
624                     let guard = (*a.get()).postinit_lock();
625                     let rx = Receiver::new(Flavor::Shared(a.clone()));
626                     match (*p.get()).upgrade(rx) {
627                         oneshot::UpSuccess |
628                         oneshot::UpDisconnected => (a, None, guard),
629                         oneshot::UpWoke(task) => (a, Some(task), guard)
630                     }
631                 }
632             }
633             Flavor::Stream(ref p) => {
634                 let a = Arc::new(RacyCell::new(shared::Packet::new()));
635                 unsafe {
636                     let guard = (*a.get()).postinit_lock();
637                     let rx = Receiver::new(Flavor::Shared(a.clone()));
638                     match (*p.get()).upgrade(rx) {
639                         stream::UpSuccess |
640                         stream::UpDisconnected => (a, None, guard),
641                         stream::UpWoke(task) => (a, Some(task), guard),
642                     }
643                 }
644             }
645             Flavor::Shared(ref p) => {
646                 unsafe { (*p.get()).clone_chan(); }
647                 return Sender::new(Flavor::Shared(p.clone()));
648             }
649             Flavor::Sync(..) => unreachable!(),
650         };
651
652         unsafe {
653             (*packet.get()).inherit_blocker(sleeper, guard);
654
655             let tmp = Sender::new(Flavor::Shared(packet.clone()));
656             mem::swap(self.inner_mut(), tmp.inner_mut());
657         }
658         Sender::new(Flavor::Shared(packet))
659     }
660 }
661
662 #[unsafe_destructor]
663 #[stable]
664 impl<T: Send> Drop for Sender<T> {
665     fn drop(&mut self) {
666         match *unsafe { self.inner_mut() } {
667             Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
668             Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
669             Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
670             Flavor::Sync(..) => unreachable!(),
671         }
672     }
673 }
674
675 ////////////////////////////////////////////////////////////////////////////////
676 // SyncSender
677 ////////////////////////////////////////////////////////////////////////////////
678
679 impl<T: Send> SyncSender<T> {
680     fn new(inner: Arc<RacyCell<sync::Packet<T>>>) -> SyncSender<T> {
681         SyncSender { inner: inner, _marker: marker::NoSync }
682     }
683
684     /// Sends a value on this synchronous channel.
685     ///
686     /// This function will *block* until space in the internal buffer becomes
687     /// available or a receiver is available to hand off the message to.
688     ///
689     /// Note that a successful send does *not* guarantee that the receiver will
690     /// ever see the data if there is a buffer on this channel. Items may be
691     /// enqueued in the internal buffer for the receiver to receive at a later
692     /// time. If the buffer size is 0, however, it can be guaranteed that the
693     /// receiver has indeed received the data if this function returns success.
694     ///
695     /// This function will never panic, but it may return `Err` if the
696     /// `Receiver` has disconnected and is no longer able to receive
697     /// information.
698     #[stable]
699     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
700         unsafe { (*self.inner.get()).send(t).map_err(SendError) }
701     }
702
703     /// Attempts to send a value on this channel without blocking.
704     ///
705     /// This method differs from `send` by returning immediately if the
706     /// channel's buffer is full or no receiver is waiting to acquire some
707     /// data. Compared with `send`, this function has two failure cases
708     /// instead of one (one for disconnection, one for a full buffer).
709     ///
710     /// See `SyncSender::send` for notes about guarantees of whether the
711     /// receiver has received the data or not if this function is successful.
712     #[stable]
713     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
714         unsafe { (*self.inner.get()).try_send(t) }
715     }
716 }
717
718 #[stable]
719 impl<T: Send> Clone for SyncSender<T> {
720     fn clone(&self) -> SyncSender<T> {
721         unsafe { (*self.inner.get()).clone_chan(); }
722         return SyncSender::new(self.inner.clone());
723     }
724 }
725
726 #[unsafe_destructor]
727 #[stable]
728 impl<T: Send> Drop for SyncSender<T> {
729     fn drop(&mut self) {
730         unsafe { (*self.inner.get()).drop_chan(); }
731     }
732 }
733
734 ////////////////////////////////////////////////////////////////////////////////
735 // Receiver
736 ////////////////////////////////////////////////////////////////////////////////
737
738 impl<T: Send> Receiver<T> {
739     fn new(inner: Flavor<T>) -> Receiver<T> {
740         Receiver { inner: UnsafeCell::new(inner) }
741     }
742
743     /// Attempts to return a pending value on this receiver without blocking
744     ///
745     /// This method will never block the caller in order to wait for data to
746     /// become available. Instead, this will always return immediately with a
747     /// possible option of pending data on the channel.
748     ///
749     /// This is useful for a flavor of "optimistic check" before deciding to
750     /// block on a receiver.
751     #[stable]
752     pub fn try_recv(&self) -> Result<T, TryRecvError> {
753         loop {
754             let new_port = match *unsafe { self.inner() } {
755                 Flavor::Oneshot(ref p) => {
756                     match unsafe { (*p.get()).try_recv() } {
757                         Ok(t) => return Ok(t),
758                         Err(oneshot::Empty) => return Err(TryRecvError::Empty),
759                         Err(oneshot::Disconnected) => {
760                             return Err(TryRecvError::Disconnected)
761                         }
762                         Err(oneshot::Upgraded(rx)) => rx,
763                     }
764                 }
765                 Flavor::Stream(ref p) => {
766                     match unsafe { (*p.get()).try_recv() } {
767                         Ok(t) => return Ok(t),
768                         Err(stream::Empty) => return Err(TryRecvError::Empty),
769                         Err(stream::Disconnected) => {
770                             return Err(TryRecvError::Disconnected)
771                         }
772                         Err(stream::Upgraded(rx)) => rx,
773                     }
774                 }
775                 Flavor::Shared(ref p) => {
776                     match unsafe { (*p.get()).try_recv() } {
777                         Ok(t) => return Ok(t),
778                         Err(shared::Empty) => return Err(TryRecvError::Empty),
779                         Err(shared::Disconnected) => {
780                             return Err(TryRecvError::Disconnected)
781                         }
782                     }
783                 }
784                 Flavor::Sync(ref p) => {
785                     match unsafe { (*p.get()).try_recv() } {
786                         Ok(t) => return Ok(t),
787                         Err(sync::Empty) => return Err(TryRecvError::Empty),
788                         Err(sync::Disconnected) => {
789                             return Err(TryRecvError::Disconnected)
790                         }
791                     }
792                 }
793             };
794             unsafe {
795                 mem::swap(self.inner_mut(),
796                           new_port.inner_mut());
797             }
798         }
799     }
800
801     /// Attempt to wait for a value on this receiver, returning an error if the
802     /// corresponding channel has hung up.
803     ///
804     /// This function will always block the current thread if there is no data
805     /// available and it's possible for more data to be sent. Once a message is
806     /// sent to the corresponding `Sender`, then this receiver will wake up and
807     /// return that message.
808     ///
809     /// If the corresponding `Sender` has disconnected, or it disconnects while
810     /// this call is blocking, this call will wake up and return `Err` to
811     /// indicate that no more messages can ever be received on this channel.
812     #[stable]
813     pub fn recv(&self) -> Result<T, RecvError> {
814         loop {
815             let new_port = match *unsafe { self.inner() } {
816                 Flavor::Oneshot(ref p) => {
817                     match unsafe { (*p.get()).recv() } {
818                         Ok(t) => return Ok(t),
819                         Err(oneshot::Empty) => return unreachable!(),
820                         Err(oneshot::Disconnected) => return Err(RecvError),
821                         Err(oneshot::Upgraded(rx)) => rx,
822                     }
823                 }
824                 Flavor::Stream(ref p) => {
825                     match unsafe { (*p.get()).recv() } {
826                         Ok(t) => return Ok(t),
827                         Err(stream::Empty) => return unreachable!(),
828                         Err(stream::Disconnected) => return Err(RecvError),
829                         Err(stream::Upgraded(rx)) => rx,
830                     }
831                 }
832                 Flavor::Shared(ref p) => {
833                     match unsafe { (*p.get()).recv() } {
834                         Ok(t) => return Ok(t),
835                         Err(shared::Empty) => return unreachable!(),
836                         Err(shared::Disconnected) => return Err(RecvError),
837                     }
838                 }
839                 Flavor::Sync(ref p) => return unsafe {
840                     (*p.get()).recv().map_err(|()| RecvError)
841                 }
842             };
843             unsafe {
844                 mem::swap(self.inner_mut(), new_port.inner_mut());
845             }
846         }
847     }
848
849     /// Returns an iterator that will block waiting for messages, but never
850     /// `panic!`. It will return `None` when the channel has hung up.
851     #[stable]
852     pub fn iter(&self) -> Iter<T> {
853         Iter { rx: self }
854     }
855 }
856
857 impl<T: Send> select::Packet for Receiver<T> {
858     fn can_recv(&self) -> bool {
859         loop {
860             let new_port = match *unsafe { self.inner() } {
861                 Flavor::Oneshot(ref p) => {
862                     match unsafe { (*p.get()).can_recv() } {
863                         Ok(ret) => return ret,
864                         Err(upgrade) => upgrade,
865                     }
866                 }
867                 Flavor::Stream(ref p) => {
868                     match unsafe { (*p.get()).can_recv() } {
869                         Ok(ret) => return ret,
870                         Err(upgrade) => upgrade,
871                     }
872                 }
873                 Flavor::Shared(ref p) => {
874                     return unsafe { (*p.get()).can_recv() };
875                 }
876                 Flavor::Sync(ref p) => {
877                     return unsafe { (*p.get()).can_recv() };
878                 }
879             };
880             unsafe {
881                 mem::swap(self.inner_mut(),
882                           new_port.inner_mut());
883             }
884         }
885     }
886
887     fn start_selection(&self, mut token: SignalToken) -> StartResult {
888         loop {
889             let (t, new_port) = match *unsafe { self.inner() } {
890                 Flavor::Oneshot(ref p) => {
891                     match unsafe { (*p.get()).start_selection(token) } {
892                         oneshot::SelSuccess => return Installed,
893                         oneshot::SelCanceled => return Abort,
894                         oneshot::SelUpgraded(t, rx) => (t, rx),
895                     }
896                 }
897                 Flavor::Stream(ref p) => {
898                     match unsafe { (*p.get()).start_selection(token) } {
899                         stream::SelSuccess => return Installed,
900                         stream::SelCanceled => return Abort,
901                         stream::SelUpgraded(t, rx) => (t, rx),
902                     }
903                 }
904                 Flavor::Shared(ref p) => {
905                     return unsafe { (*p.get()).start_selection(token) };
906                 }
907                 Flavor::Sync(ref p) => {
908                     return unsafe { (*p.get()).start_selection(token) };
909                 }
910             };
911             token = t;
912             unsafe {
913                 mem::swap(self.inner_mut(), new_port.inner_mut());
914             }
915         }
916     }
917
918     fn abort_selection(&self) -> bool {
919         let mut was_upgrade = false;
920         loop {
921             let result = match *unsafe { self.inner() } {
922                 Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
923                 Flavor::Stream(ref p) => unsafe {
924                     (*p.get()).abort_selection(was_upgrade)
925                 },
926                 Flavor::Shared(ref p) => return unsafe {
927                     (*p.get()).abort_selection(was_upgrade)
928                 },
929                 Flavor::Sync(ref p) => return unsafe {
930                     (*p.get()).abort_selection()
931                 },
932             };
933             let new_port = match result { Ok(b) => return b, Err(p) => p };
934             was_upgrade = true;
935             unsafe {
936                 mem::swap(self.inner_mut(),
937                           new_port.inner_mut());
938             }
939         }
940     }
941 }
942
943 #[stable]
944 impl<'a, T: Send> Iterator for Iter<'a, T> {
945     type Item = T;
946
947     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
948 }
949
950 #[unsafe_destructor]
951 #[stable]
952 impl<T: Send> Drop for Receiver<T> {
953     fn drop(&mut self) {
954         match *unsafe { self.inner_mut() } {
955             Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
956             Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
957             Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
958             Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
959         }
960     }
961 }
962
963 /// A version of `UnsafeCell` intended for use in concurrent data
964 /// structures (for example, you might put it in an `Arc`).
965 struct RacyCell<T>(pub UnsafeCell<T>);
966
967 impl<T> RacyCell<T> {
968
969     fn new(value: T) -> RacyCell<T> {
970         RacyCell(UnsafeCell { value: value })
971     }
972
973     unsafe fn get(&self) -> *mut T {
974         self.0.get()
975     }
976
977 }
978
979 unsafe impl<T:Send> Send for RacyCell<T> { }
980
981 unsafe impl<T> Sync for RacyCell<T> { } // Oh dear
982
983 impl<T> fmt::Show for SendError<T> {
984     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
985         "sending on a closed channel".fmt(f)
986     }
987 }
988
989 impl<T> fmt::Show for TrySendError<T> {
990     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
991         match *self {
992             TrySendError::Full(..) => {
993                 "sending on a full channel".fmt(f)
994             }
995             TrySendError::Disconnected(..) => {
996                 "sending on a closed channel".fmt(f)
997             }
998         }
999     }
1000 }
1001
1002 impl fmt::Show for RecvError {
1003     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1004         "receiving on a closed channel".fmt(f)
1005     }
1006 }
1007
1008 impl fmt::Show for TryRecvError {
1009     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1010         match *self {
1011             TryRecvError::Empty => {
1012                 "receiving on an empty channel".fmt(f)
1013             }
1014             TryRecvError::Disconnected => {
1015                 "receiving on a closed channel".fmt(f)
1016             }
1017         }
1018     }
1019 }
1020
1021 #[cfg(test)]
1022 mod test {
1023     use prelude::v1::*;
1024
1025     use os;
1026     use super::*;
1027     use thread::Thread;
1028
1029     pub fn stress_factor() -> uint {
1030         match os::getenv("RUST_TEST_STRESS") {
1031             Some(val) => val.parse().unwrap(),
1032             None => 1,
1033         }
1034     }
1035
1036     #[test]
1037     fn smoke() {
1038         let (tx, rx) = channel::<int>();
1039         tx.send(1).unwrap();
1040         assert_eq!(rx.recv().unwrap(), 1);
1041     }
1042
1043     #[test]
1044     fn drop_full() {
1045         let (tx, _rx) = channel();
1046         tx.send(box 1i).unwrap();
1047     }
1048
1049     #[test]
1050     fn drop_full_shared() {
1051         let (tx, _rx) = channel();
1052         drop(tx.clone());
1053         drop(tx.clone());
1054         tx.send(box 1i).unwrap();
1055     }
1056
1057     #[test]
1058     fn smoke_shared() {
1059         let (tx, rx) = channel::<int>();
1060         tx.send(1).unwrap();
1061         assert_eq!(rx.recv().unwrap(), 1);
1062         let tx = tx.clone();
1063         tx.send(1).unwrap();
1064         assert_eq!(rx.recv().unwrap(), 1);
1065     }
1066
1067     #[test]
1068     fn smoke_threads() {
1069         let (tx, rx) = channel::<int>();
1070         let _t = Thread::spawn(move|| {
1071             tx.send(1).unwrap();
1072         });
1073         assert_eq!(rx.recv().unwrap(), 1);
1074     }
1075
1076     #[test]
1077     fn smoke_port_gone() {
1078         let (tx, rx) = channel::<int>();
1079         drop(rx);
1080         assert!(tx.send(1).is_err());
1081     }
1082
1083     #[test]
1084     fn smoke_shared_port_gone() {
1085         let (tx, rx) = channel::<int>();
1086         drop(rx);
1087         assert!(tx.send(1).is_err())
1088     }
1089
1090     #[test]
1091     fn smoke_shared_port_gone2() {
1092         let (tx, rx) = channel::<int>();
1093         drop(rx);
1094         let tx2 = tx.clone();
1095         drop(tx);
1096         assert!(tx2.send(1).is_err());
1097     }
1098
1099     #[test]
1100     fn port_gone_concurrent() {
1101         let (tx, rx) = channel::<int>();
1102         let _t = Thread::spawn(move|| {
1103             rx.recv().unwrap();
1104         });
1105         while tx.send(1).is_ok() {}
1106     }
1107
1108     #[test]
1109     fn port_gone_concurrent_shared() {
1110         let (tx, rx) = channel::<int>();
1111         let tx2 = tx.clone();
1112         let _t = Thread::spawn(move|| {
1113             rx.recv().unwrap();
1114         });
1115         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1116     }
1117
1118     #[test]
1119     fn smoke_chan_gone() {
1120         let (tx, rx) = channel::<int>();
1121         drop(tx);
1122         assert!(rx.recv().is_err());
1123     }
1124
1125     #[test]
1126     fn smoke_chan_gone_shared() {
1127         let (tx, rx) = channel::<()>();
1128         let tx2 = tx.clone();
1129         drop(tx);
1130         drop(tx2);
1131         assert!(rx.recv().is_err());
1132     }
1133
1134     #[test]
1135     fn chan_gone_concurrent() {
1136         let (tx, rx) = channel::<int>();
1137         let _t = Thread::spawn(move|| {
1138             tx.send(1).unwrap();
1139             tx.send(1).unwrap();
1140         });
1141         while rx.recv().is_ok() {}
1142     }
1143
1144     #[test]
1145     fn stress() {
1146         let (tx, rx) = channel::<int>();
1147         let t = Thread::scoped(move|| {
1148             for _ in range(0u, 10000) { tx.send(1i).unwrap(); }
1149         });
1150         for _ in range(0u, 10000) {
1151             assert_eq!(rx.recv().unwrap(), 1);
1152         }
1153         t.join().ok().unwrap();
1154     }
1155
1156     #[test]
1157     fn stress_shared() {
1158         static AMT: uint = 10000;
1159         static NTHREADS: uint = 8;
1160         let (tx, rx) = channel::<int>();
1161
1162         let t = Thread::scoped(move|| {
1163             for _ in range(0, AMT * NTHREADS) {
1164                 assert_eq!(rx.recv().unwrap(), 1);
1165             }
1166             match rx.try_recv() {
1167                 Ok(..) => panic!(),
1168                 _ => {}
1169             }
1170         });
1171
1172         for _ in range(0, NTHREADS) {
1173             let tx = tx.clone();
1174             Thread::spawn(move|| {
1175                 for _ in range(0, AMT) { tx.send(1).unwrap(); }
1176             });
1177         }
1178         drop(tx);
1179         t.join().ok().unwrap();
1180     }
1181
1182     #[test]
1183     fn send_from_outside_runtime() {
1184         let (tx1, rx1) = channel::<()>();
1185         let (tx2, rx2) = channel::<int>();
1186         let t1 = Thread::scoped(move|| {
1187             tx1.send(()).unwrap();
1188             for _ in range(0i, 40) {
1189                 assert_eq!(rx2.recv().unwrap(), 1);
1190             }
1191         });
1192         rx1.recv().unwrap();
1193         let t2 = Thread::scoped(move|| {
1194             for _ in range(0i, 40) {
1195                 tx2.send(1).unwrap();
1196             }
1197         });
1198         t1.join().ok().unwrap();
1199         t2.join().ok().unwrap();
1200     }
1201
1202     #[test]
1203     fn recv_from_outside_runtime() {
1204         let (tx, rx) = channel::<int>();
1205         let t = Thread::scoped(move|| {
1206             for _ in range(0i, 40) {
1207                 assert_eq!(rx.recv().unwrap(), 1);
1208             }
1209         });
1210         for _ in range(0u, 40) {
1211             tx.send(1).unwrap();
1212         }
1213         t.join().ok().unwrap();
1214     }
1215
1216     #[test]
1217     fn no_runtime() {
1218         let (tx1, rx1) = channel::<int>();
1219         let (tx2, rx2) = channel::<int>();
1220         let t1 = Thread::scoped(move|| {
1221             assert_eq!(rx1.recv().unwrap(), 1);
1222             tx2.send(2).unwrap();
1223         });
1224         let t2 = Thread::scoped(move|| {
1225             tx1.send(1).unwrap();
1226             assert_eq!(rx2.recv().unwrap(), 2);
1227         });
1228         t1.join().ok().unwrap();
1229         t2.join().ok().unwrap();
1230     }
1231
1232     #[test]
1233     fn oneshot_single_thread_close_port_first() {
1234         // Simple test of closing without sending
1235         let (_tx, rx) = channel::<int>();
1236         drop(rx);
1237     }
1238
1239     #[test]
1240     fn oneshot_single_thread_close_chan_first() {
1241         // Simple test of closing without sending
1242         let (tx, _rx) = channel::<int>();
1243         drop(tx);
1244     }
1245
1246     #[test]
1247     fn oneshot_single_thread_send_port_close() {
1248         // Testing that the sender cleans up the payload if receiver is closed
1249         let (tx, rx) = channel::<Box<int>>();
1250         drop(rx);
1251         assert!(tx.send(box 0).is_err());
1252     }
1253
1254     #[test]
1255     fn oneshot_single_thread_recv_chan_close() {
1256         // Receiving on a closed chan will panic
1257         let res = Thread::scoped(move|| {
1258             let (tx, rx) = channel::<int>();
1259             drop(tx);
1260             rx.recv().unwrap();
1261         }).join();
1262         // What is our res?
1263         assert!(res.is_err());
1264     }
1265
1266     #[test]
1267     fn oneshot_single_thread_send_then_recv() {
1268         let (tx, rx) = channel::<Box<int>>();
1269         tx.send(box 10).unwrap();
1270         assert!(rx.recv().unwrap() == box 10);
1271     }
1272
1273     #[test]
1274     fn oneshot_single_thread_try_send_open() {
1275         let (tx, rx) = channel::<int>();
1276         assert!(tx.send(10).is_ok());
1277         assert!(rx.recv().unwrap() == 10);
1278     }
1279
1280     #[test]
1281     fn oneshot_single_thread_try_send_closed() {
1282         let (tx, rx) = channel::<int>();
1283         drop(rx);
1284         assert!(tx.send(10).is_err());
1285     }
1286
1287     #[test]
1288     fn oneshot_single_thread_try_recv_open() {
1289         let (tx, rx) = channel::<int>();
1290         tx.send(10).unwrap();
1291         assert!(rx.recv() == Ok(10));
1292     }
1293
1294     #[test]
1295     fn oneshot_single_thread_try_recv_closed() {
1296         let (tx, rx) = channel::<int>();
1297         drop(tx);
1298         assert!(rx.recv().is_err());
1299     }
1300
1301     #[test]
1302     fn oneshot_single_thread_peek_data() {
1303         let (tx, rx) = channel::<int>();
1304         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1305         tx.send(10).unwrap();
1306         assert_eq!(rx.try_recv(), Ok(10));
1307     }
1308
1309     #[test]
1310     fn oneshot_single_thread_peek_close() {
1311         let (tx, rx) = channel::<int>();
1312         drop(tx);
1313         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1314         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1315     }
1316
1317     #[test]
1318     fn oneshot_single_thread_peek_open() {
1319         let (_tx, rx) = channel::<int>();
1320         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1321     }
1322
1323     #[test]
1324     fn oneshot_multi_task_recv_then_send() {
1325         let (tx, rx) = channel::<Box<int>>();
1326         let _t = Thread::spawn(move|| {
1327             assert!(rx.recv().unwrap() == box 10);
1328         });
1329
1330         tx.send(box 10).unwrap();
1331     }
1332
1333     #[test]
1334     fn oneshot_multi_task_recv_then_close() {
1335         let (tx, rx) = channel::<Box<int>>();
1336         let _t = Thread::spawn(move|| {
1337             drop(tx);
1338         });
1339         let res = Thread::scoped(move|| {
1340             assert!(rx.recv().unwrap() == box 10);
1341         }).join();
1342         assert!(res.is_err());
1343     }
1344
1345     #[test]
1346     fn oneshot_multi_thread_close_stress() {
1347         for _ in range(0, stress_factor()) {
1348             let (tx, rx) = channel::<int>();
1349             let _t = Thread::spawn(move|| {
1350                 drop(rx);
1351             });
1352             drop(tx);
1353         }
1354     }
1355
1356     #[test]
1357     fn oneshot_multi_thread_send_close_stress() {
1358         for _ in range(0, stress_factor()) {
1359             let (tx, rx) = channel::<int>();
1360             let _t = Thread::spawn(move|| {
1361                 drop(rx);
1362             });
1363             let _ = Thread::scoped(move|| {
1364                 tx.send(1).unwrap();
1365             }).join();
1366         }
1367     }
1368
1369     #[test]
1370     fn oneshot_multi_thread_recv_close_stress() {
1371         for _ in range(0, stress_factor()) {
1372             let (tx, rx) = channel::<int>();
1373             Thread::spawn(move|| {
1374                 let res = Thread::scoped(move|| {
1375                     rx.recv().unwrap();
1376                 }).join();
1377                 assert!(res.is_err());
1378             });
1379             let _t = Thread::spawn(move|| {
1380                 Thread::spawn(move|| {
1381                     drop(tx);
1382                 });
1383             });
1384         }
1385     }
1386
1387     #[test]
1388     fn oneshot_multi_thread_send_recv_stress() {
1389         for _ in range(0, stress_factor()) {
1390             let (tx, rx) = channel();
1391             let _t = Thread::spawn(move|| {
1392                 tx.send(box 10i).unwrap();
1393             });
1394             assert!(rx.recv().unwrap() == box 10i);
1395         }
1396     }
1397
1398     #[test]
1399     fn stream_send_recv_stress() {
1400         for _ in range(0, stress_factor()) {
1401             let (tx, rx) = channel();
1402
1403             send(tx, 0);
1404             recv(rx, 0);
1405
1406             fn send(tx: Sender<Box<int>>, i: int) {
1407                 if i == 10 { return }
1408
1409                 Thread::spawn(move|| {
1410                     tx.send(box i).unwrap();
1411                     send(tx, i + 1);
1412                 });
1413             }
1414
1415             fn recv(rx: Receiver<Box<int>>, i: int) {
1416                 if i == 10 { return }
1417
1418                 Thread::spawn(move|| {
1419                     assert!(rx.recv().unwrap() == box i);
1420                     recv(rx, i + 1);
1421                 });
1422             }
1423         }
1424     }
1425
1426     #[test]
1427     fn recv_a_lot() {
1428         // Regression test that we don't run out of stack in scheduler context
1429         let (tx, rx) = channel();
1430         for _ in range(0i, 10000) { tx.send(()).unwrap(); }
1431         for _ in range(0i, 10000) { rx.recv().unwrap(); }
1432     }
1433
1434     #[test]
1435     fn shared_chan_stress() {
1436         let (tx, rx) = channel();
1437         let total = stress_factor() + 100;
1438         for _ in range(0, total) {
1439             let tx = tx.clone();
1440             Thread::spawn(move|| {
1441                 tx.send(()).unwrap();
1442             });
1443         }
1444
1445         for _ in range(0, total) {
1446             rx.recv().unwrap();
1447         }
1448     }
1449
1450     #[test]
1451     fn test_nested_recv_iter() {
1452         let (tx, rx) = channel::<int>();
1453         let (total_tx, total_rx) = channel::<int>();
1454
1455         let _t = Thread::spawn(move|| {
1456             let mut acc = 0;
1457             for x in rx.iter() {
1458                 acc += x;
1459             }
1460             total_tx.send(acc).unwrap();
1461         });
1462
1463         tx.send(3).unwrap();
1464         tx.send(1).unwrap();
1465         tx.send(2).unwrap();
1466         drop(tx);
1467         assert_eq!(total_rx.recv().unwrap(), 6);
1468     }
1469
1470     #[test]
1471     fn test_recv_iter_break() {
1472         let (tx, rx) = channel::<int>();
1473         let (count_tx, count_rx) = channel();
1474
1475         let _t = Thread::spawn(move|| {
1476             let mut count = 0;
1477             for x in rx.iter() {
1478                 if count >= 3 {
1479                     break;
1480                 } else {
1481                     count += x;
1482                 }
1483             }
1484             count_tx.send(count).unwrap();
1485         });
1486
1487         tx.send(2).unwrap();
1488         tx.send(2).unwrap();
1489         tx.send(2).unwrap();
1490         let _ = tx.send(2);
1491         drop(tx);
1492         assert_eq!(count_rx.recv().unwrap(), 4);
1493     }
1494
1495     #[test]
1496     fn try_recv_states() {
1497         let (tx1, rx1) = channel::<int>();
1498         let (tx2, rx2) = channel::<()>();
1499         let (tx3, rx3) = channel::<()>();
1500         let _t = Thread::spawn(move|| {
1501             rx2.recv().unwrap();
1502             tx1.send(1).unwrap();
1503             tx3.send(()).unwrap();
1504             rx2.recv().unwrap();
1505             drop(tx1);
1506             tx3.send(()).unwrap();
1507         });
1508
1509         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1510         tx2.send(()).unwrap();
1511         rx3.recv().unwrap();
1512         assert_eq!(rx1.try_recv(), Ok(1));
1513         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1514         tx2.send(()).unwrap();
1515         rx3.recv().unwrap();
1516         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1517     }
1518
1519     // This bug used to end up in a livelock inside of the Receiver destructor
1520     // because the internal state of the Shared packet was corrupted
1521     #[test]
1522     fn destroy_upgraded_shared_port_when_sender_still_active() {
1523         let (tx, rx) = channel();
1524         let (tx2, rx2) = channel();
1525         let _t = Thread::spawn(move|| {
1526             rx.recv().unwrap(); // wait on a oneshot
1527             drop(rx);  // destroy a shared
1528             tx2.send(()).unwrap();
1529         });
1530         // make sure the other task has gone to sleep
1531         for _ in range(0u, 5000) { Thread::yield_now(); }
1532
1533         // upgrade to a shared chan and send a message
1534         let t = tx.clone();
1535         drop(tx);
1536         t.send(()).unwrap();
1537
1538         // wait for the child task to exit before we exit
1539         rx2.recv().unwrap();
1540     }
1541 }
1542
1543 #[cfg(test)]
1544 mod sync_tests {
1545     use prelude::v1::*;
1546
1547     use os;
1548     use thread::Thread;
1549     use super::*;
1550
1551     pub fn stress_factor() -> uint {
1552         match os::getenv("RUST_TEST_STRESS") {
1553             Some(val) => val.parse().unwrap(),
1554             None => 1,
1555         }
1556     }
1557
1558     #[test]
1559     fn smoke() {
1560         let (tx, rx) = sync_channel::<int>(1);
1561         tx.send(1).unwrap();
1562         assert_eq!(rx.recv().unwrap(), 1);
1563     }
1564
1565     #[test]
1566     fn drop_full() {
1567         let (tx, _rx) = sync_channel(1);
1568         tx.send(box 1i).unwrap();
1569     }
1570
1571     #[test]
1572     fn smoke_shared() {
1573         let (tx, rx) = sync_channel::<int>(1);
1574         tx.send(1).unwrap();
1575         assert_eq!(rx.recv().unwrap(), 1);
1576         let tx = tx.clone();
1577         tx.send(1).unwrap();
1578         assert_eq!(rx.recv().unwrap(), 1);
1579     }
1580
1581     #[test]
1582     fn smoke_threads() {
1583         let (tx, rx) = sync_channel::<int>(0);
1584         let _t = Thread::spawn(move|| {
1585             tx.send(1).unwrap();
1586         });
1587         assert_eq!(rx.recv().unwrap(), 1);
1588     }
1589
1590     #[test]
1591     fn smoke_port_gone() {
1592         let (tx, rx) = sync_channel::<int>(0);
1593         drop(rx);
1594         assert!(tx.send(1).is_err());
1595     }
1596
1597     #[test]
1598     fn smoke_shared_port_gone2() {
1599         let (tx, rx) = sync_channel::<int>(0);
1600         drop(rx);
1601         let tx2 = tx.clone();
1602         drop(tx);
1603         assert!(tx2.send(1).is_err());
1604     }
1605
1606     #[test]
1607     fn port_gone_concurrent() {
1608         let (tx, rx) = sync_channel::<int>(0);
1609         let _t = Thread::spawn(move|| {
1610             rx.recv().unwrap();
1611         });
1612         while tx.send(1).is_ok() {}
1613     }
1614
1615     #[test]
1616     fn port_gone_concurrent_shared() {
1617         let (tx, rx) = sync_channel::<int>(0);
1618         let tx2 = tx.clone();
1619         let _t = Thread::spawn(move|| {
1620             rx.recv().unwrap();
1621         });
1622         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1623     }
1624
1625     #[test]
1626     fn smoke_chan_gone() {
1627         let (tx, rx) = sync_channel::<int>(0);
1628         drop(tx);
1629         assert!(rx.recv().is_err());
1630     }
1631
1632     #[test]
1633     fn smoke_chan_gone_shared() {
1634         let (tx, rx) = sync_channel::<()>(0);
1635         let tx2 = tx.clone();
1636         drop(tx);
1637         drop(tx2);
1638         assert!(rx.recv().is_err());
1639     }
1640
1641     #[test]
1642     fn chan_gone_concurrent() {
1643         let (tx, rx) = sync_channel::<int>(0);
1644         Thread::spawn(move|| {
1645             tx.send(1).unwrap();
1646             tx.send(1).unwrap();
1647         });
1648         while rx.recv().is_ok() {}
1649     }
1650
1651     #[test]
1652     fn stress() {
1653         let (tx, rx) = sync_channel::<int>(0);
1654         Thread::spawn(move|| {
1655             for _ in range(0u, 10000) { tx.send(1).unwrap(); }
1656         });
1657         for _ in range(0u, 10000) {
1658             assert_eq!(rx.recv().unwrap(), 1);
1659         }
1660     }
1661
1662     #[test]
1663     fn stress_shared() {
1664         static AMT: uint = 1000;
1665         static NTHREADS: uint = 8;
1666         let (tx, rx) = sync_channel::<int>(0);
1667         let (dtx, drx) = sync_channel::<()>(0);
1668
1669         Thread::spawn(move|| {
1670             for _ in range(0, AMT * NTHREADS) {
1671                 assert_eq!(rx.recv().unwrap(), 1);
1672             }
1673             match rx.try_recv() {
1674                 Ok(..) => panic!(),
1675                 _ => {}
1676             }
1677             dtx.send(()).unwrap();
1678         });
1679
1680         for _ in range(0, NTHREADS) {
1681             let tx = tx.clone();
1682             Thread::spawn(move|| {
1683                 for _ in range(0, AMT) { tx.send(1).unwrap(); }
1684             });
1685         }
1686         drop(tx);
1687         drx.recv().unwrap();
1688     }
1689
1690     #[test]
1691     fn oneshot_single_thread_close_port_first() {
1692         // Simple test of closing without sending
1693         let (_tx, rx) = sync_channel::<int>(0);
1694         drop(rx);
1695     }
1696
1697     #[test]
1698     fn oneshot_single_thread_close_chan_first() {
1699         // Simple test of closing without sending
1700         let (tx, _rx) = sync_channel::<int>(0);
1701         drop(tx);
1702     }
1703
1704     #[test]
1705     fn oneshot_single_thread_send_port_close() {
1706         // Testing that the sender cleans up the payload if receiver is closed
1707         let (tx, rx) = sync_channel::<Box<int>>(0);
1708         drop(rx);
1709         assert!(tx.send(box 0).is_err());
1710     }
1711
1712     #[test]
1713     fn oneshot_single_thread_recv_chan_close() {
1714         // Receiving on a closed chan will panic
1715         let res = Thread::scoped(move|| {
1716             let (tx, rx) = sync_channel::<int>(0);
1717             drop(tx);
1718             rx.recv().unwrap();
1719         }).join();
1720         // What is our res?
1721         assert!(res.is_err());
1722     }
1723
1724     #[test]
1725     fn oneshot_single_thread_send_then_recv() {
1726         let (tx, rx) = sync_channel::<Box<int>>(1);
1727         tx.send(box 10).unwrap();
1728         assert!(rx.recv().unwrap() == box 10);
1729     }
1730
1731     #[test]
1732     fn oneshot_single_thread_try_send_open() {
1733         let (tx, rx) = sync_channel::<int>(1);
1734         assert_eq!(tx.try_send(10), Ok(()));
1735         assert!(rx.recv().unwrap() == 10);
1736     }
1737
1738     #[test]
1739     fn oneshot_single_thread_try_send_closed() {
1740         let (tx, rx) = sync_channel::<int>(0);
1741         drop(rx);
1742         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1743     }
1744
1745     #[test]
1746     fn oneshot_single_thread_try_send_closed2() {
1747         let (tx, _rx) = sync_channel::<int>(0);
1748         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1749     }
1750
1751     #[test]
1752     fn oneshot_single_thread_try_recv_open() {
1753         let (tx, rx) = sync_channel::<int>(1);
1754         tx.send(10).unwrap();
1755         assert!(rx.recv() == Ok(10));
1756     }
1757
1758     #[test]
1759     fn oneshot_single_thread_try_recv_closed() {
1760         let (tx, rx) = sync_channel::<int>(0);
1761         drop(tx);
1762         assert!(rx.recv().is_err());
1763     }
1764
1765     #[test]
1766     fn oneshot_single_thread_peek_data() {
1767         let (tx, rx) = sync_channel::<int>(1);
1768         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1769         tx.send(10).unwrap();
1770         assert_eq!(rx.try_recv(), Ok(10));
1771     }
1772
1773     #[test]
1774     fn oneshot_single_thread_peek_close() {
1775         let (tx, rx) = sync_channel::<int>(0);
1776         drop(tx);
1777         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1778         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1779     }
1780
1781     #[test]
1782     fn oneshot_single_thread_peek_open() {
1783         let (_tx, rx) = sync_channel::<int>(0);
1784         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1785     }
1786
1787     #[test]
1788     fn oneshot_multi_task_recv_then_send() {
1789         let (tx, rx) = sync_channel::<Box<int>>(0);
1790         let _t = Thread::spawn(move|| {
1791             assert!(rx.recv().unwrap() == box 10);
1792         });
1793
1794         tx.send(box 10).unwrap();
1795     }
1796
1797     #[test]
1798     fn oneshot_multi_task_recv_then_close() {
1799         let (tx, rx) = sync_channel::<Box<int>>(0);
1800         let _t = Thread::spawn(move|| {
1801             drop(tx);
1802         });
1803         let res = Thread::scoped(move|| {
1804             assert!(rx.recv().unwrap() == box 10);
1805         }).join();
1806         assert!(res.is_err());
1807     }
1808
1809     #[test]
1810     fn oneshot_multi_thread_close_stress() {
1811         for _ in range(0, stress_factor()) {
1812             let (tx, rx) = sync_channel::<int>(0);
1813             let _t = Thread::spawn(move|| {
1814                 drop(rx);
1815             });
1816             drop(tx);
1817         }
1818     }
1819
1820     #[test]
1821     fn oneshot_multi_thread_send_close_stress() {
1822         for _ in range(0, stress_factor()) {
1823             let (tx, rx) = sync_channel::<int>(0);
1824             let _t = Thread::spawn(move|| {
1825                 drop(rx);
1826             });
1827             let _ = Thread::scoped(move || {
1828                 tx.send(1).unwrap();
1829             }).join();
1830         }
1831     }
1832
1833     #[test]
1834     fn oneshot_multi_thread_recv_close_stress() {
1835         for _ in range(0, stress_factor()) {
1836             let (tx, rx) = sync_channel::<int>(0);
1837             let _t = Thread::spawn(move|| {
1838                 let res = Thread::scoped(move|| {
1839                     rx.recv().unwrap();
1840                 }).join();
1841                 assert!(res.is_err());
1842             });
1843             let _t = Thread::spawn(move|| {
1844                 Thread::spawn(move|| {
1845                     drop(tx);
1846                 });
1847             });
1848         }
1849     }
1850
1851     #[test]
1852     fn oneshot_multi_thread_send_recv_stress() {
1853         for _ in range(0, stress_factor()) {
1854             let (tx, rx) = sync_channel::<Box<int>>(0);
1855             let _t = Thread::spawn(move|| {
1856                 tx.send(box 10i).unwrap();
1857             });
1858             assert!(rx.recv().unwrap() == box 10i);
1859         }
1860     }
1861
1862     #[test]
1863     fn stream_send_recv_stress() {
1864         for _ in range(0, stress_factor()) {
1865             let (tx, rx) = sync_channel::<Box<int>>(0);
1866
1867             send(tx, 0);
1868             recv(rx, 0);
1869
1870             fn send(tx: SyncSender<Box<int>>, i: int) {
1871                 if i == 10 { return }
1872
1873                 Thread::spawn(move|| {
1874                     tx.send(box i).unwrap();
1875                     send(tx, i + 1);
1876                 });
1877             }
1878
1879             fn recv(rx: Receiver<Box<int>>, i: int) {
1880                 if i == 10 { return }
1881
1882                 Thread::spawn(move|| {
1883                     assert!(rx.recv().unwrap() == box i);
1884                     recv(rx, i + 1);
1885                 });
1886             }
1887         }
1888     }
1889
1890     #[test]
1891     fn recv_a_lot() {
1892         // Regression test that we don't run out of stack in scheduler context
1893         let (tx, rx) = sync_channel(10000);
1894         for _ in range(0u, 10000) { tx.send(()).unwrap(); }
1895         for _ in range(0u, 10000) { rx.recv().unwrap(); }
1896     }
1897
1898     #[test]
1899     fn shared_chan_stress() {
1900         let (tx, rx) = sync_channel(0);
1901         let total = stress_factor() + 100;
1902         for _ in range(0, total) {
1903             let tx = tx.clone();
1904             Thread::spawn(move|| {
1905                 tx.send(()).unwrap();
1906             });
1907         }
1908
1909         for _ in range(0, total) {
1910             rx.recv().unwrap();
1911         }
1912     }
1913
1914     #[test]
1915     fn test_nested_recv_iter() {
1916         let (tx, rx) = sync_channel::<int>(0);
1917         let (total_tx, total_rx) = sync_channel::<int>(0);
1918
1919         let _t = Thread::spawn(move|| {
1920             let mut acc = 0;
1921             for x in rx.iter() {
1922                 acc += x;
1923             }
1924             total_tx.send(acc).unwrap();
1925         });
1926
1927         tx.send(3).unwrap();
1928         tx.send(1).unwrap();
1929         tx.send(2).unwrap();
1930         drop(tx);
1931         assert_eq!(total_rx.recv().unwrap(), 6);
1932     }
1933
1934     #[test]
1935     fn test_recv_iter_break() {
1936         let (tx, rx) = sync_channel::<int>(0);
1937         let (count_tx, count_rx) = sync_channel(0);
1938
1939         let _t = Thread::spawn(move|| {
1940             let mut count = 0;
1941             for x in rx.iter() {
1942                 if count >= 3 {
1943                     break;
1944                 } else {
1945                     count += x;
1946                 }
1947             }
1948             count_tx.send(count).unwrap();
1949         });
1950
1951         tx.send(2).unwrap();
1952         tx.send(2).unwrap();
1953         tx.send(2).unwrap();
1954         let _ = tx.try_send(2);
1955         drop(tx);
1956         assert_eq!(count_rx.recv().unwrap(), 4);
1957     }
1958
1959     #[test]
1960     fn try_recv_states() {
1961         let (tx1, rx1) = sync_channel::<int>(1);
1962         let (tx2, rx2) = sync_channel::<()>(1);
1963         let (tx3, rx3) = sync_channel::<()>(1);
1964         let _t = Thread::spawn(move|| {
1965             rx2.recv().unwrap();
1966             tx1.send(1).unwrap();
1967             tx3.send(()).unwrap();
1968             rx2.recv().unwrap();
1969             drop(tx1);
1970             tx3.send(()).unwrap();
1971         });
1972
1973         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1974         tx2.send(()).unwrap();
1975         rx3.recv().unwrap();
1976         assert_eq!(rx1.try_recv(), Ok(1));
1977         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1978         tx2.send(()).unwrap();
1979         rx3.recv().unwrap();
1980         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1981     }
1982
1983     // This bug used to end up in a livelock inside of the Receiver destructor
1984     // because the internal state of the Shared packet was corrupted
1985     #[test]
1986     fn destroy_upgraded_shared_port_when_sender_still_active() {
1987         let (tx, rx) = sync_channel::<()>(0);
1988         let (tx2, rx2) = sync_channel::<()>(0);
1989         let _t = Thread::spawn(move|| {
1990             rx.recv().unwrap(); // wait on a oneshot
1991             drop(rx);  // destroy a shared
1992             tx2.send(()).unwrap();
1993         });
1994         // make sure the other task has gone to sleep
1995         for _ in range(0u, 5000) { Thread::yield_now(); }
1996
1997         // upgrade to a shared chan and send a message
1998         let t = tx.clone();
1999         drop(tx);
2000         t.send(()).unwrap();
2001
2002         // wait for the child task to exit before we exit
2003         rx2.recv().unwrap();
2004     }
2005
2006     #[test]
2007     fn send1() {
2008         let (tx, rx) = sync_channel::<int>(0);
2009         let _t = Thread::spawn(move|| { rx.recv().unwrap(); });
2010         assert_eq!(tx.send(1), Ok(()));
2011     }
2012
2013     #[test]
2014     fn send2() {
2015         let (tx, rx) = sync_channel::<int>(0);
2016         let _t = Thread::spawn(move|| { drop(rx); });
2017         assert!(tx.send(1).is_err());
2018     }
2019
2020     #[test]
2021     fn send3() {
2022         let (tx, rx) = sync_channel::<int>(1);
2023         assert_eq!(tx.send(1), Ok(()));
2024         let _t =Thread::spawn(move|| { drop(rx); });
2025         assert!(tx.send(1).is_err());
2026     }
2027
2028     #[test]
2029     fn send4() {
2030         let (tx, rx) = sync_channel::<int>(0);
2031         let tx2 = tx.clone();
2032         let (done, donerx) = channel();
2033         let done2 = done.clone();
2034         let _t = Thread::spawn(move|| {
2035             assert!(tx.send(1).is_err());
2036             done.send(()).unwrap();
2037         });
2038         let _t = Thread::spawn(move|| {
2039             assert!(tx2.send(2).is_err());
2040             done2.send(()).unwrap();
2041         });
2042         drop(rx);
2043         donerx.recv().unwrap();
2044         donerx.recv().unwrap();
2045     }
2046
2047     #[test]
2048     fn try_send1() {
2049         let (tx, _rx) = sync_channel::<int>(0);
2050         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2051     }
2052
2053     #[test]
2054     fn try_send2() {
2055         let (tx, _rx) = sync_channel::<int>(1);
2056         assert_eq!(tx.try_send(1), Ok(()));
2057         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2058     }
2059
2060     #[test]
2061     fn try_send3() {
2062         let (tx, rx) = sync_channel::<int>(1);
2063         assert_eq!(tx.try_send(1), Ok(()));
2064         drop(rx);
2065         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2066     }
2067
2068     #[test]
2069     fn issue_15761() {
2070         fn repro() {
2071             let (tx1, rx1) = sync_channel::<()>(3);
2072             let (tx2, rx2) = sync_channel::<()>(3);
2073
2074             let _t = Thread::spawn(move|| {
2075                 rx1.recv().unwrap();
2076                 tx2.try_send(()).unwrap();
2077             });
2078
2079             tx1.try_send(()).unwrap();
2080             rx2.recv().unwrap();
2081         }
2082
2083         for _ in range(0u, 100) {
2084             repro()
2085         }
2086     }
2087 }