]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/mod.rs
rollup merge of #20273: alexcrichton/second-pass-comm
[rust.git] / src / libstd / sync / mpsc / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Multi-producer, single-consumer communication primitives threads
12 //!
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
15 //!
16 //! * `Sender`
17 //! * `SyncSender`
18 //! * `Receiver`
19 //!
20 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).  These channels are
23 //!
24 //! These channels come in two flavors:
25 //!
26 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27 //!    will return a `(Sender, Receiver)` tuple where all sends will be
28 //!    **asynchronous** (they never block). The channel conceptually has an
29 //!    infinite buffer.
30 //!
31 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32 //!    a `(SyncSender, Receiver)` tuple where the storage for pending messages
33 //!    is a pre-allocated buffer of a fixed size. All sends will be
34 //!    **synchronous** by blocking until there is buffer space available. Note
35 //!    that a bound of 0 is allowed, causing the channel to become a
36 //!    "rendezvous" channel where each sender atomically hands off a message to
37 //!    a receiver.
38 //!
39 //! ## Disconnection
40 //!
41 //! The send and receive operations on channels will all return a `Result`
42 //! indicating whether the operation succeeded or not. An unsuccessful operation
43 //! is normally indicative of the other half of a channel having "hung up" by
44 //! being dropped in its corresponding thread.
45 //!
46 //! Once half of a channel has been deallocated, most operations can no longer
47 //! continue to make progress, so `Err` will be returned. Many applications will
48 //! continue to `unwrap()` the results returned from this module, instigating a
49 //! propagation of failure among threads if one unexpectedly dies.
50 //!
51 //! # Examples
52 //!
53 //! Simple usage:
54 //!
55 //! ```
56 //! use std::thread::Thread;
57 //! use std::sync::mpsc::channel;
58 //!
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! Thread::spawn(move|| {
62 //!     tx.send(10i).unwrap();
63 //! }).detach();
64 //! assert_eq!(rx.recv().unwrap(), 10i);
65 //! ```
66 //!
67 //! Shared usage:
68 //!
69 //! ```
70 //! use std::thread::Thread;
71 //! use std::sync::mpsc::channel;
72 //!
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
77 //! for i in range(0i, 10i) {
78 //!     let tx = tx.clone();
79 //!     Thread::spawn(move|| {
80 //!         tx.send(i).unwrap();
81 //!     }).detach()
82 //! }
83 //!
84 //! for _ in range(0i, 10i) {
85 //!     let j = rx.recv().unwrap();
86 //!     assert!(0 <= j && j < 10);
87 //! }
88 //! ```
89 //!
90 //! Propagating panics:
91 //!
92 //! ```
93 //! use std::sync::mpsc::channel;
94 //!
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<int>();
98 //! drop(tx);
99 //! assert!(rx.recv().is_err());
100 //! ```
101 //!
102 //! Synchronous channels:
103 //!
104 //! ```
105 //! use std::thread::Thread;
106 //! use std::sync::mpsc::sync_channel;
107 //!
108 //! let (tx, rx) = sync_channel::<int>(0);
109 //! Thread::spawn(move|| {
110 //!     // This will wait for the parent task to start receiving
111 //!     tx.send(53).unwrap();
112 //! }).detach();
113 //! rx.recv().unwrap();
114 //! ```
115 //!
116 //! Reading from a channel with a timeout requires to use a Timer together
117 //! with the channel. You can use the select! macro to select either and
118 //! handle the timeout case. This first example will break out of the loop
119 //! after 10 seconds no matter what:
120 //!
121 //! ```no_run
122 //! use std::sync::mpsc::channel;
123 //! use std::io::timer::Timer;
124 //! use std::time::Duration;
125 //!
126 //! let (tx, rx) = channel::<int>();
127 //! let mut timer = Timer::new().unwrap();
128 //! let timeout = timer.oneshot(Duration::seconds(10));
129 //!
130 //! loop {
131 //!     select! {
132 //!         val = rx.recv() => println!("Received {}", val.unwrap()),
133 //!         _ = timeout.recv() => {
134 //!             println!("timed out, total time was more than 10 seconds");
135 //!             break;
136 //!         }
137 //!     }
138 //! }
139 //! ```
140 //!
141 //! This second example is more costly since it allocates a new timer every
142 //! time a message is received, but it allows you to timeout after the channel
143 //! has been inactive for 5 seconds:
144 //!
145 //! ```no_run
146 //! use std::sync::mpsc::channel;
147 //! use std::io::timer::Timer;
148 //! use std::time::Duration;
149 //!
150 //! let (tx, rx) = channel::<int>();
151 //! let mut timer = Timer::new().unwrap();
152 //!
153 //! loop {
154 //!     let timeout = timer.oneshot(Duration::seconds(5));
155 //!
156 //!     select! {
157 //!         val = rx.recv() => println!("Received {}", val.unwrap()),
158 //!         _ = timeout.recv() => {
159 //!             println!("timed out, no message received in 5 seconds");
160 //!             break;
161 //!         }
162 //!     }
163 //! }
164 //! ```
165
166 // A description of how Rust's channel implementation works
167 //
168 // Channels are supposed to be the basic building block for all other
169 // concurrent primitives that are used in Rust. As a result, the channel type
170 // needs to be highly optimized, flexible, and broad enough for use everywhere.
171 //
172 // The choice of implementation of all channels is to be built on lock-free data
173 // structures. The channels themselves are then consequently also lock-free data
174 // structures. As always with lock-free code, this is a very "here be dragons"
175 // territory, especially because I'm unaware of any academic papers that have
176 // gone into great length about channels of these flavors.
177 //
178 // ## Flavors of channels
179 //
180 // From the perspective of a consumer of this library, there is only one flavor
181 // of channel. This channel can be used as a stream and cloned to allow multiple
182 // senders. Under the hood, however, there are actually three flavors of
183 // channels in play.
184 //
185 // * Flavor::Oneshots - these channels are highly optimized for the one-send use case.
186 //              They contain as few atomics as possible and involve one and
187 //              exactly one allocation.
188 // * Streams - these channels are optimized for the non-shared use case. They
189 //             use a different concurrent queue that is more tailored for this
190 //             use case. The initial allocation of this flavor of channel is not
191 //             optimized.
192 // * Shared - this is the most general form of channel that this module offers,
193 //            a channel with multiple senders. This type is as optimized as it
194 //            can be, but the previous two types mentioned are much faster for
195 //            their use-cases.
196 //
197 // ## Concurrent queues
198 //
199 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
200 // recv() obviously blocks. This means that under the hood there must be some
201 // shared and concurrent queue holding all of the actual data.
202 //
203 // With two flavors of channels, two flavors of queues are also used. We have
204 // chosen to use queues from a well-known author that are abbreviated as SPSC
205 // and MPSC (single producer, single consumer and multiple producer, single
206 // consumer). SPSC queues are used for streams while MPSC queues are used for
207 // shared channels.
208 //
209 // ### SPSC optimizations
210 //
211 // The SPSC queue found online is essentially a linked list of nodes where one
212 // half of the nodes are the "queue of data" and the other half of nodes are a
213 // cache of unused nodes. The unused nodes are used such that an allocation is
214 // not required on every push() and a free doesn't need to happen on every
215 // pop().
216 //
217 // As found online, however, the cache of nodes is of an infinite size. This
218 // means that if a channel at one point in its life had 50k items in the queue,
219 // then the queue will always have the capacity for 50k items. I believed that
220 // this was an unnecessary limitation of the implementation, so I have altered
221 // the queue to optionally have a bound on the cache size.
222 //
223 // By default, streams will have an unbounded SPSC queue with a small-ish cache
224 // size. The hope is that the cache is still large enough to have very fast
225 // send() operations while not too large such that millions of channels can
226 // coexist at once.
227 //
228 // ### MPSC optimizations
229 //
230 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
231 // a linked list under the hood to earn its unboundedness, but I have not put
232 // forth much effort into having a cache of nodes similar to the SPSC queue.
233 //
234 // For now, I believe that this is "ok" because shared channels are not the most
235 // common type, but soon we may wish to revisit this queue choice and determine
236 // another candidate for backend storage of shared channels.
237 //
238 // ## Overview of the Implementation
239 //
240 // Now that there's a little background on the concurrent queues used, it's
241 // worth going into much more detail about the channels themselves. The basic
242 // pseudocode for a send/recv are:
243 //
244 //
245 //      send(t)                             recv()
246 //        queue.push(t)                       return if queue.pop()
247 //        if increment() == -1                deschedule {
248 //          wakeup()                            if decrement() > 0
249 //                                                cancel_deschedule()
250 //                                            }
251 //                                            queue.pop()
252 //
253 // As mentioned before, there are no locks in this implementation, only atomic
254 // instructions are used.
255 //
256 // ### The internal atomic counter
257 //
258 // Every channel has a shared counter with each half to keep track of the size
259 // of the queue. This counter is used to abort descheduling by the receiver and
260 // to know when to wake up on the sending side.
261 //
262 // As seen in the pseudocode, senders will increment this count and receivers
263 // will decrement the count. The theory behind this is that if a sender sees a
264 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
265 // then it doesn't need to block.
266 //
267 // The recv() method has a beginning call to pop(), and if successful, it needs
268 // to decrement the count. It is a crucial implementation detail that this
269 // decrement does *not* happen to the shared counter. If this were the case,
270 // then it would be possible for the counter to be very negative when there were
271 // no receivers waiting, in which case the senders would have to determine when
272 // it was actually appropriate to wake up a receiver.
273 //
274 // Instead, the "steal count" is kept track of separately (not atomically
275 // because it's only used by receivers), and then the decrement() call when
276 // descheduling will lump in all of the recent steals into one large decrement.
277 //
278 // The implication of this is that if a sender sees a -1 count, then there's
279 // guaranteed to be a waiter waiting!
280 //
281 // ## Native Implementation
282 //
283 // A major goal of these channels is to work seamlessly on and off the runtime.
284 // All of the previous race conditions have been worded in terms of
285 // scheduler-isms (which is obviously not available without the runtime).
286 //
287 // For now, native usage of channels (off the runtime) will fall back onto
288 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
289 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
290 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
291 // condition variable.
292 //
293 // ## Select
294 //
295 // Being able to support selection over channels has greatly influenced this
296 // design, and not only does selection need to work inside the runtime, but also
297 // outside the runtime.
298 //
299 // The implementation is fairly straightforward. The goal of select() is not to
300 // return some data, but only to return which channel can receive data without
301 // blocking. The implementation is essentially the entire blocking procedure
302 // followed by an increment as soon as its woken up. The cancellation procedure
303 // involves an increment and swapping out of to_wake to acquire ownership of the
304 // task to unblock.
305 //
306 // Sadly this current implementation requires multiple allocations, so I have
307 // seen the throughput of select() be much worse than it should be. I do not
308 // believe that there is anything fundamental that needs to change about these
309 // channels, however, in order to support a more efficient select().
310 //
311 // # Conclusion
312 //
313 // And now that you've seen all the races that I found and attempted to fix,
314 // here's the code for you to find some more!
315
316 use prelude::v1::*;
317
318 use sync::Arc;
319 use fmt;
320 use kinds::marker;
321 use mem;
322 use cell::UnsafeCell;
323
324 pub use self::select::{Select, Handle};
325 use self::select::StartResult;
326 use self::select::StartResult::*;
327 use self::blocking::SignalToken;
328
329 mod blocking;
330 mod oneshot;
331 mod select;
332 mod shared;
333 mod stream;
334 mod sync;
335 mod mpsc_queue;
336 mod spsc_queue;
337
338 /// The receiving-half of Rust's channel type. This half can only be owned by
339 /// one task
340 #[stable]
341 pub struct Receiver<T> {
342     inner: UnsafeCell<Flavor<T>>,
343 }
344
345 // The receiver port can be sent from place to place, so long as it
346 // is not used to receive non-sendable things.
347 unsafe impl<T:Send> Send for Receiver<T> { }
348
349 /// An iterator over messages on a receiver, this iterator will block
350 /// whenever `next` is called, waiting for a new message, and `None` will be
351 /// returned when the corresponding channel has hung up.
352 #[stable]
353 pub struct Iter<'a, T:'a> {
354     rx: &'a Receiver<T>
355 }
356
357 /// The sending-half of Rust's asynchronous channel type. This half can only be
358 /// owned by one task, but it can be cloned to send to other tasks.
359 #[stable]
360 pub struct Sender<T> {
361     inner: UnsafeCell<Flavor<T>>,
362 }
363
364 // The send port can be sent from place to place, so long as it
365 // is not used to send non-sendable things.
366 unsafe impl<T:Send> Send for Sender<T> { }
367
368 /// The sending-half of Rust's synchronous channel type. This half can only be
369 /// owned by one task, but it can be cloned to send to other tasks.
370 #[stable]
371 pub struct SyncSender<T> {
372     inner: Arc<RacyCell<sync::Packet<T>>>,
373     // can't share in an arc
374     _marker: marker::NoSync,
375 }
376
377 /// An error returned from the `send` function on channels.
378 ///
379 /// A `send` operation can only fail if the receiving end of a channel is
380 /// disconnected, implying that the data could never be received. The error
381 /// contains the data being sent as a payload so it can be recovered.
382 #[deriving(PartialEq, Eq)]
383 #[stable]
384 pub struct SendError<T>(pub T);
385
386 /// An error returned from the `recv` function on a `Receiver`.
387 ///
388 /// The `recv` operation can only fail if the sending half of a channel is
389 /// disconnected, implying that no further messages will ever be received.
390 #[deriving(PartialEq, Eq, Clone, Copy)]
391 #[stable]
392 pub struct RecvError;
393
394 /// This enumeration is the list of the possible reasons that try_recv could not
395 /// return data when called.
396 #[deriving(PartialEq, Clone, Copy)]
397 #[stable]
398 pub enum TryRecvError {
399     /// This channel is currently empty, but the sender(s) have not yet
400     /// disconnected, so data may yet become available.
401     #[stable]
402     Empty,
403
404     /// This channel's sending half has become disconnected, and there will
405     /// never be any more data received on this channel
406     #[stable]
407     Disconnected,
408 }
409
410 /// This enumeration is the list of the possible error outcomes for the
411 /// `SyncSender::try_send` method.
412 #[deriving(PartialEq, Clone)]
413 #[stable]
414 pub enum TrySendError<T> {
415     /// The data could not be sent on the channel because it would require that
416     /// the callee block to send the data.
417     ///
418     /// If this is a buffered channel, then the buffer is full at this time. If
419     /// this is not a buffered channel, then there is no receiver available to
420     /// acquire the data.
421     #[stable]
422     Full(T),
423
424     /// This channel's receiving half has disconnected, so the data could not be
425     /// sent. The data is returned back to the callee in this case.
426     #[stable]
427     Disconnected(T),
428 }
429
430 enum Flavor<T> {
431     Oneshot(Arc<RacyCell<oneshot::Packet<T>>>),
432     Stream(Arc<RacyCell<stream::Packet<T>>>),
433     Shared(Arc<RacyCell<shared::Packet<T>>>),
434     Sync(Arc<RacyCell<sync::Packet<T>>>),
435 }
436
437 #[doc(hidden)]
438 trait UnsafeFlavor<T> {
439     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>>;
440     unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> {
441         &mut *self.inner_unsafe().get()
442     }
443     unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
444         &*self.inner_unsafe().get()
445     }
446 }
447 impl<T> UnsafeFlavor<T> for Sender<T> {
448     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
449         &self.inner
450     }
451 }
452 impl<T> UnsafeFlavor<T> for Receiver<T> {
453     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
454         &self.inner
455     }
456 }
457
458 /// Creates a new asynchronous channel, returning the sender/receiver halves.
459 ///
460 /// All data sent on the sender will become available on the receiver, and no
461 /// send will block the calling task (this channel has an "infinite buffer").
462 ///
463 /// # Example
464 ///
465 /// ```
466 /// use std::sync::mpsc::channel;
467 /// use std::thread::Thread;
468 ///
469 /// // tx is is the sending half (tx for transmission), and rx is the receiving
470 /// // half (rx for receiving).
471 /// let (tx, rx) = channel();
472 ///
473 /// // Spawn off an expensive computation
474 /// Thread::spawn(move|| {
475 /// #   fn expensive_computation() {}
476 ///     tx.send(expensive_computation()).unwrap();
477 /// }).detach();
478 ///
479 /// // Do some useful work for awhile
480 ///
481 /// // Let's see what that answer was
482 /// println!("{}", rx.recv().unwrap());
483 /// ```
484 #[stable]
485 pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
486     let a = Arc::new(RacyCell::new(oneshot::Packet::new()));
487     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
488 }
489
490 /// Creates a new synchronous, bounded channel.
491 ///
492 /// Like asynchronous channels, the `Receiver` will block until a message
493 /// becomes available. These channels differ greatly in the semantics of the
494 /// sender from asynchronous channels, however.
495 ///
496 /// This channel has an internal buffer on which messages will be queued. When
497 /// the internal buffer becomes full, future sends will *block* waiting for the
498 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
499 /// becomes  "rendezvous channel" where each send will not return until a recv
500 /// is paired with it.
501 ///
502 /// As with asynchronous channels, all senders will panic in `send` if the
503 /// `Receiver` has been destroyed.
504 ///
505 /// # Example
506 ///
507 /// ```
508 /// use std::sync::mpsc::sync_channel;
509 /// use std::thread::Thread;
510 ///
511 /// let (tx, rx) = sync_channel(1);
512 ///
513 /// // this returns immediately
514 /// tx.send(1i).unwrap();
515 ///
516 /// Thread::spawn(move|| {
517 ///     // this will block until the previous message has been received
518 ///     tx.send(2i).unwrap();
519 /// }).detach();
520 ///
521 /// assert_eq!(rx.recv().unwrap(), 1i);
522 /// assert_eq!(rx.recv().unwrap(), 2i);
523 /// ```
524 #[stable]
525 pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
526     let a = Arc::new(RacyCell::new(sync::Packet::new(bound)));
527     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
528 }
529
530 ////////////////////////////////////////////////////////////////////////////////
531 // Sender
532 ////////////////////////////////////////////////////////////////////////////////
533
534 impl<T: Send> Sender<T> {
535     fn new(inner: Flavor<T>) -> Sender<T> {
536         Sender {
537             inner: UnsafeCell::new(inner),
538         }
539     }
540
541     /// Attempts to send a value on this channel, returning it back if it could
542     /// not be sent.
543     ///
544     /// A successful send occurs when it is determined that the other end of
545     /// the channel has not hung up already. An unsuccessful send would be one
546     /// where the corresponding receiver has already been deallocated. Note
547     /// that a return value of `Err` means that the data will never be
548     /// received, but a return value of `Ok` does *not* mean that the data
549     /// will be received.  It is possible for the corresponding receiver to
550     /// hang up immediately after this function returns `Ok`.
551     ///
552     /// This method will never block the current thread.
553     ///
554     /// # Example
555     ///
556     /// ```
557     /// use std::sync::mpsc::channel;
558     ///
559     /// let (tx, rx) = channel();
560     ///
561     /// // This send is always successful
562     /// tx.send(1i).unwrap();
563     ///
564     /// // This send will fail because the receiver is gone
565     /// drop(rx);
566     /// assert_eq!(tx.send(1i).err().unwrap().0, 1);
567     /// ```
568     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
569         let (new_inner, ret) = match *unsafe { self.inner() } {
570             Flavor::Oneshot(ref p) => {
571                 unsafe {
572                     let p = p.get();
573                     if !(*p).sent() {
574                         return (*p).send(t).map_err(SendError);
575                     } else {
576                         let a =
577                             Arc::new(RacyCell::new(stream::Packet::new()));
578                         let rx = Receiver::new(Flavor::Stream(a.clone()));
579                         match (*p).upgrade(rx) {
580                             oneshot::UpSuccess => {
581                                 let ret = (*a.get()).send(t);
582                                 (a, ret)
583                             }
584                             oneshot::UpDisconnected => (a, Err(t)),
585                             oneshot::UpWoke(token) => {
586                                 // This send cannot panic because the thread is
587                                 // asleep (we're looking at it), so the receiver
588                                 // can't go away.
589                                 (*a.get()).send(t).ok().unwrap();
590                                 token.signal();
591                                 (a, Ok(()))
592                             }
593                         }
594                     }
595                 }
596             }
597             Flavor::Stream(ref p) => return unsafe {
598                 (*p.get()).send(t).map_err(SendError)
599             },
600             Flavor::Shared(ref p) => return unsafe {
601                 (*p.get()).send(t).map_err(SendError)
602             },
603             Flavor::Sync(..) => unreachable!(),
604         };
605
606         unsafe {
607             let tmp = Sender::new(Flavor::Stream(new_inner));
608             mem::swap(self.inner_mut(), tmp.inner_mut());
609         }
610         ret.map_err(SendError)
611     }
612 }
613
614 #[stable]
615 impl<T: Send> Clone for Sender<T> {
616     fn clone(&self) -> Sender<T> {
617         let (packet, sleeper, guard) = match *unsafe { self.inner() } {
618             Flavor::Oneshot(ref p) => {
619                 let a = Arc::new(RacyCell::new(shared::Packet::new()));
620                 unsafe {
621                     let guard = (*a.get()).postinit_lock();
622                     let rx = Receiver::new(Flavor::Shared(a.clone()));
623                     match (*p.get()).upgrade(rx) {
624                         oneshot::UpSuccess |
625                         oneshot::UpDisconnected => (a, None, guard),
626                         oneshot::UpWoke(task) => (a, Some(task), guard)
627                     }
628                 }
629             }
630             Flavor::Stream(ref p) => {
631                 let a = Arc::new(RacyCell::new(shared::Packet::new()));
632                 unsafe {
633                     let guard = (*a.get()).postinit_lock();
634                     let rx = Receiver::new(Flavor::Shared(a.clone()));
635                     match (*p.get()).upgrade(rx) {
636                         stream::UpSuccess |
637                         stream::UpDisconnected => (a, None, guard),
638                         stream::UpWoke(task) => (a, Some(task), guard),
639                     }
640                 }
641             }
642             Flavor::Shared(ref p) => {
643                 unsafe { (*p.get()).clone_chan(); }
644                 return Sender::new(Flavor::Shared(p.clone()));
645             }
646             Flavor::Sync(..) => unreachable!(),
647         };
648
649         unsafe {
650             (*packet.get()).inherit_blocker(sleeper, guard);
651
652             let tmp = Sender::new(Flavor::Shared(packet.clone()));
653             mem::swap(self.inner_mut(), tmp.inner_mut());
654         }
655         Sender::new(Flavor::Shared(packet))
656     }
657 }
658
659 #[unsafe_destructor]
660 impl<T: Send> Drop for Sender<T> {
661     fn drop(&mut self) {
662         match *unsafe { self.inner_mut() } {
663             Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
664             Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
665             Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
666             Flavor::Sync(..) => unreachable!(),
667         }
668     }
669 }
670
671 ////////////////////////////////////////////////////////////////////////////////
672 // SyncSender
673 ////////////////////////////////////////////////////////////////////////////////
674
675 impl<T: Send> SyncSender<T> {
676     fn new(inner: Arc<RacyCell<sync::Packet<T>>>) -> SyncSender<T> {
677         SyncSender { inner: inner, _marker: marker::NoSync }
678     }
679
680     /// Sends a value on this synchronous channel.
681     ///
682     /// This function will *block* until space in the internal buffer becomes
683     /// available or a receiver is available to hand off the message to.
684     ///
685     /// Note that a successful send does *not* guarantee that the receiver will
686     /// ever see the data if there is a buffer on this channel. Items may be
687     /// enqueued in the internal buffer for the receiver to receive at a later
688     /// time. If the buffer size is 0, however, it can be guaranteed that the
689     /// receiver has indeed received the data if this function returns success.
690     ///
691     /// This function will never panic, but it may return `Err` if the
692     /// `Receiver` has disconnected and is no longer able to receive
693     /// information.
694     #[stable]
695     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
696         unsafe { (*self.inner.get()).send(t).map_err(SendError) }
697     }
698
699     /// Attempts to send a value on this channel without blocking.
700     ///
701     /// This method differs from `send` by returning immediately if the
702     /// channel's buffer is full or no receiver is waiting to acquire some
703     /// data. Compared with `send`, this function has two failure cases
704     /// instead of one (one for disconnection, one for a full buffer).
705     ///
706     /// See `SyncSender::send` for notes about guarantees of whether the
707     /// receiver has received the data or not if this function is successful.
708     #[stable]
709     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
710         unsafe { (*self.inner.get()).try_send(t) }
711     }
712 }
713
714 #[stable]
715 impl<T: Send> Clone for SyncSender<T> {
716     fn clone(&self) -> SyncSender<T> {
717         unsafe { (*self.inner.get()).clone_chan(); }
718         return SyncSender::new(self.inner.clone());
719     }
720 }
721
722 #[unsafe_destructor]
723 impl<T: Send> Drop for SyncSender<T> {
724     fn drop(&mut self) {
725         unsafe { (*self.inner.get()).drop_chan(); }
726     }
727 }
728
729 ////////////////////////////////////////////////////////////////////////////////
730 // Receiver
731 ////////////////////////////////////////////////////////////////////////////////
732
733 impl<T: Send> Receiver<T> {
734     fn new(inner: Flavor<T>) -> Receiver<T> {
735         Receiver { inner: UnsafeCell::new(inner) }
736     }
737
738     /// Attempts to return a pending value on this receiver without blocking
739     ///
740     /// This method will never block the caller in order to wait for data to
741     /// become available. Instead, this will always return immediately with a
742     /// possible option of pending data on the channel.
743     ///
744     /// This is useful for a flavor of "optimistic check" before deciding to
745     /// block on a receiver.
746     #[stable]
747     pub fn try_recv(&self) -> Result<T, TryRecvError> {
748         loop {
749             let new_port = match *unsafe { self.inner() } {
750                 Flavor::Oneshot(ref p) => {
751                     match unsafe { (*p.get()).try_recv() } {
752                         Ok(t) => return Ok(t),
753                         Err(oneshot::Empty) => return Err(TryRecvError::Empty),
754                         Err(oneshot::Disconnected) => {
755                             return Err(TryRecvError::Disconnected)
756                         }
757                         Err(oneshot::Upgraded(rx)) => rx,
758                     }
759                 }
760                 Flavor::Stream(ref p) => {
761                     match unsafe { (*p.get()).try_recv() } {
762                         Ok(t) => return Ok(t),
763                         Err(stream::Empty) => return Err(TryRecvError::Empty),
764                         Err(stream::Disconnected) => {
765                             return Err(TryRecvError::Disconnected)
766                         }
767                         Err(stream::Upgraded(rx)) => rx,
768                     }
769                 }
770                 Flavor::Shared(ref p) => {
771                     match unsafe { (*p.get()).try_recv() } {
772                         Ok(t) => return Ok(t),
773                         Err(shared::Empty) => return Err(TryRecvError::Empty),
774                         Err(shared::Disconnected) => {
775                             return Err(TryRecvError::Disconnected)
776                         }
777                     }
778                 }
779                 Flavor::Sync(ref p) => {
780                     match unsafe { (*p.get()).try_recv() } {
781                         Ok(t) => return Ok(t),
782                         Err(sync::Empty) => return Err(TryRecvError::Empty),
783                         Err(sync::Disconnected) => {
784                             return Err(TryRecvError::Disconnected)
785                         }
786                     }
787                 }
788             };
789             unsafe {
790                 mem::swap(self.inner_mut(),
791                           new_port.inner_mut());
792             }
793         }
794     }
795
796     /// Attempt to wait for a value on this receiver, returning an error if the
797     /// corresponding channel has hung up.
798     ///
799     /// This function will always block the current thread if there is no data
800     /// available and it's possible for more data to be sent. Once a message is
801     /// sent to the corresponding `Sender`, then this receiver will wake up and
802     /// return that message.
803     ///
804     /// If the corresponding `Sender` has disconnected, or it disconnects while
805     /// this call is blocking, this call will wake up and return `Err` to
806     /// indicate that no more messages can ever be received on this channel.
807     #[stable]
808     pub fn recv(&self) -> Result<T, RecvError> {
809         loop {
810             let new_port = match *unsafe { self.inner() } {
811                 Flavor::Oneshot(ref p) => {
812                     match unsafe { (*p.get()).recv() } {
813                         Ok(t) => return Ok(t),
814                         Err(oneshot::Empty) => return unreachable!(),
815                         Err(oneshot::Disconnected) => return Err(RecvError),
816                         Err(oneshot::Upgraded(rx)) => rx,
817                     }
818                 }
819                 Flavor::Stream(ref p) => {
820                     match unsafe { (*p.get()).recv() } {
821                         Ok(t) => return Ok(t),
822                         Err(stream::Empty) => return unreachable!(),
823                         Err(stream::Disconnected) => return Err(RecvError),
824                         Err(stream::Upgraded(rx)) => rx,
825                     }
826                 }
827                 Flavor::Shared(ref p) => {
828                     match unsafe { (*p.get()).recv() } {
829                         Ok(t) => return Ok(t),
830                         Err(shared::Empty) => return unreachable!(),
831                         Err(shared::Disconnected) => return Err(RecvError),
832                     }
833                 }
834                 Flavor::Sync(ref p) => return unsafe {
835                     (*p.get()).recv().map_err(|()| RecvError)
836                 }
837             };
838             unsafe {
839                 mem::swap(self.inner_mut(), new_port.inner_mut());
840             }
841         }
842     }
843
844     /// Returns an iterator that will block waiting for messages, but never
845     /// `panic!`. It will return `None` when the channel has hung up.
846     #[stable]
847     pub fn iter(&self) -> Iter<T> {
848         Iter { rx: self }
849     }
850 }
851
852 impl<T: Send> select::Packet for Receiver<T> {
853     fn can_recv(&self) -> bool {
854         loop {
855             let new_port = match *unsafe { self.inner() } {
856                 Flavor::Oneshot(ref p) => {
857                     match unsafe { (*p.get()).can_recv() } {
858                         Ok(ret) => return ret,
859                         Err(upgrade) => upgrade,
860                     }
861                 }
862                 Flavor::Stream(ref p) => {
863                     match unsafe { (*p.get()).can_recv() } {
864                         Ok(ret) => return ret,
865                         Err(upgrade) => upgrade,
866                     }
867                 }
868                 Flavor::Shared(ref p) => {
869                     return unsafe { (*p.get()).can_recv() };
870                 }
871                 Flavor::Sync(ref p) => {
872                     return unsafe { (*p.get()).can_recv() };
873                 }
874             };
875             unsafe {
876                 mem::swap(self.inner_mut(),
877                           new_port.inner_mut());
878             }
879         }
880     }
881
882     fn start_selection(&self, mut token: SignalToken) -> StartResult {
883         loop {
884             let (t, new_port) = match *unsafe { self.inner() } {
885                 Flavor::Oneshot(ref p) => {
886                     match unsafe { (*p.get()).start_selection(token) } {
887                         oneshot::SelSuccess => return Installed,
888                         oneshot::SelCanceled => return Abort,
889                         oneshot::SelUpgraded(t, rx) => (t, rx),
890                     }
891                 }
892                 Flavor::Stream(ref p) => {
893                     match unsafe { (*p.get()).start_selection(token) } {
894                         stream::SelSuccess => return Installed,
895                         stream::SelCanceled => return Abort,
896                         stream::SelUpgraded(t, rx) => (t, rx),
897                     }
898                 }
899                 Flavor::Shared(ref p) => {
900                     return unsafe { (*p.get()).start_selection(token) };
901                 }
902                 Flavor::Sync(ref p) => {
903                     return unsafe { (*p.get()).start_selection(token) };
904                 }
905             };
906             token = t;
907             unsafe {
908                 mem::swap(self.inner_mut(), new_port.inner_mut());
909             }
910         }
911     }
912
913     fn abort_selection(&self) -> bool {
914         let mut was_upgrade = false;
915         loop {
916             let result = match *unsafe { self.inner() } {
917                 Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
918                 Flavor::Stream(ref p) => unsafe {
919                     (*p.get()).abort_selection(was_upgrade)
920                 },
921                 Flavor::Shared(ref p) => return unsafe {
922                     (*p.get()).abort_selection(was_upgrade)
923                 },
924                 Flavor::Sync(ref p) => return unsafe {
925                     (*p.get()).abort_selection()
926                 },
927             };
928             let new_port = match result { Ok(b) => return b, Err(p) => p };
929             was_upgrade = true;
930             unsafe {
931                 mem::swap(self.inner_mut(),
932                           new_port.inner_mut());
933             }
934         }
935     }
936 }
937
938 #[unstable]
939 impl<'a, T: Send> Iterator<T> for Iter<'a, T> {
940     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
941 }
942
943 #[unsafe_destructor]
944 impl<T: Send> Drop for Receiver<T> {
945     fn drop(&mut self) {
946         match *unsafe { self.inner_mut() } {
947             Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
948             Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
949             Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
950             Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
951         }
952     }
953 }
954
955 /// A version of `UnsafeCell` intended for use in concurrent data
956 /// structures (for example, you might put it in an `Arc`).
957 struct RacyCell<T>(pub UnsafeCell<T>);
958
959 impl<T> RacyCell<T> {
960
961     fn new(value: T) -> RacyCell<T> {
962         RacyCell(UnsafeCell { value: value })
963     }
964
965     unsafe fn get(&self) -> *mut T {
966         self.0.get()
967     }
968
969 }
970
971 unsafe impl<T:Send> Send for RacyCell<T> { }
972
973 unsafe impl<T> Sync for RacyCell<T> { } // Oh dear
974
975 impl<T> fmt::Show for SendError<T> {
976     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
977         "sending on a closed channel".fmt(f)
978     }
979 }
980
981 impl<T> fmt::Show for TrySendError<T> {
982     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
983         match *self {
984             TrySendError::Full(..) => {
985                 "sending on a full channel".fmt(f)
986             }
987             TrySendError::Disconnected(..) => {
988                 "sending on a closed channel".fmt(f)
989             }
990         }
991     }
992 }
993
994 impl fmt::Show for RecvError {
995     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
996         "receiving on a closed channel".fmt(f)
997     }
998 }
999
1000 impl fmt::Show for TryRecvError {
1001     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1002         match *self {
1003             TryRecvError::Empty => {
1004                 "receiving on an empty channel".fmt(f)
1005             }
1006             TryRecvError::Disconnected => {
1007                 "receiving on a closed channel".fmt(f)
1008             }
1009         }
1010     }
1011 }
1012
1013 #[cfg(test)]
1014 mod test {
1015     use prelude::v1::*;
1016
1017     use os;
1018     use super::*;
1019     use thread::Thread;
1020
1021     pub fn stress_factor() -> uint {
1022         match os::getenv("RUST_TEST_STRESS") {
1023             Some(val) => val.parse().unwrap(),
1024             None => 1,
1025         }
1026     }
1027
1028     #[test]
1029     fn smoke() {
1030         let (tx, rx) = channel::<int>();
1031         tx.send(1).unwrap();
1032         assert_eq!(rx.recv().unwrap(), 1);
1033     }
1034
1035     #[test]
1036     fn drop_full() {
1037         let (tx, _rx) = channel();
1038         tx.send(box 1i).unwrap();
1039     }
1040
1041     #[test]
1042     fn drop_full_shared() {
1043         let (tx, _rx) = channel();
1044         drop(tx.clone());
1045         drop(tx.clone());
1046         tx.send(box 1i).unwrap();
1047     }
1048
1049     #[test]
1050     fn smoke_shared() {
1051         let (tx, rx) = channel::<int>();
1052         tx.send(1).unwrap();
1053         assert_eq!(rx.recv().unwrap(), 1);
1054         let tx = tx.clone();
1055         tx.send(1).unwrap();
1056         assert_eq!(rx.recv().unwrap(), 1);
1057     }
1058
1059     #[test]
1060     fn smoke_threads() {
1061         let (tx, rx) = channel::<int>();
1062         let _t = Thread::spawn(move|| {
1063             tx.send(1).unwrap();
1064         });
1065         assert_eq!(rx.recv().unwrap(), 1);
1066     }
1067
1068     #[test]
1069     fn smoke_port_gone() {
1070         let (tx, rx) = channel::<int>();
1071         drop(rx);
1072         assert!(tx.send(1).is_err());
1073     }
1074
1075     #[test]
1076     fn smoke_shared_port_gone() {
1077         let (tx, rx) = channel::<int>();
1078         drop(rx);
1079         assert!(tx.send(1).is_err())
1080     }
1081
1082     #[test]
1083     fn smoke_shared_port_gone2() {
1084         let (tx, rx) = channel::<int>();
1085         drop(rx);
1086         let tx2 = tx.clone();
1087         drop(tx);
1088         assert!(tx2.send(1).is_err());
1089     }
1090
1091     #[test]
1092     fn port_gone_concurrent() {
1093         let (tx, rx) = channel::<int>();
1094         let _t = Thread::spawn(move|| {
1095             rx.recv().unwrap();
1096         });
1097         while tx.send(1).is_ok() {}
1098     }
1099
1100     #[test]
1101     fn port_gone_concurrent_shared() {
1102         let (tx, rx) = channel::<int>();
1103         let tx2 = tx.clone();
1104         let _t = Thread::spawn(move|| {
1105             rx.recv().unwrap();
1106         });
1107         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1108     }
1109
1110     #[test]
1111     fn smoke_chan_gone() {
1112         let (tx, rx) = channel::<int>();
1113         drop(tx);
1114         assert!(rx.recv().is_err());
1115     }
1116
1117     #[test]
1118     fn smoke_chan_gone_shared() {
1119         let (tx, rx) = channel::<()>();
1120         let tx2 = tx.clone();
1121         drop(tx);
1122         drop(tx2);
1123         assert!(rx.recv().is_err());
1124     }
1125
1126     #[test]
1127     fn chan_gone_concurrent() {
1128         let (tx, rx) = channel::<int>();
1129         let _t = Thread::spawn(move|| {
1130             tx.send(1).unwrap();
1131             tx.send(1).unwrap();
1132         });
1133         while rx.recv().is_ok() {}
1134     }
1135
1136     #[test]
1137     fn stress() {
1138         let (tx, rx) = channel::<int>();
1139         let t = Thread::spawn(move|| {
1140             for _ in range(0u, 10000) { tx.send(1i).unwrap(); }
1141         });
1142         for _ in range(0u, 10000) {
1143             assert_eq!(rx.recv().unwrap(), 1);
1144         }
1145         t.join().ok().unwrap();
1146     }
1147
1148     #[test]
1149     fn stress_shared() {
1150         static AMT: uint = 10000;
1151         static NTHREADS: uint = 8;
1152         let (tx, rx) = channel::<int>();
1153
1154         let t = Thread::spawn(move|| {
1155             for _ in range(0, AMT * NTHREADS) {
1156                 assert_eq!(rx.recv().unwrap(), 1);
1157             }
1158             match rx.try_recv() {
1159                 Ok(..) => panic!(),
1160                 _ => {}
1161             }
1162         });
1163
1164         for _ in range(0, NTHREADS) {
1165             let tx = tx.clone();
1166             Thread::spawn(move|| {
1167                 for _ in range(0, AMT) { tx.send(1).unwrap(); }
1168             }).detach();
1169         }
1170         drop(tx);
1171         t.join().ok().unwrap();
1172     }
1173
1174     #[test]
1175     fn send_from_outside_runtime() {
1176         let (tx1, rx1) = channel::<()>();
1177         let (tx2, rx2) = channel::<int>();
1178         let t1 = Thread::spawn(move|| {
1179             tx1.send(()).unwrap();
1180             for _ in range(0i, 40) {
1181                 assert_eq!(rx2.recv().unwrap(), 1);
1182             }
1183         });
1184         rx1.recv().unwrap();
1185         let t2 = Thread::spawn(move|| {
1186             for _ in range(0i, 40) {
1187                 tx2.send(1).unwrap();
1188             }
1189         });
1190         t1.join().ok().unwrap();
1191         t2.join().ok().unwrap();
1192     }
1193
1194     #[test]
1195     fn recv_from_outside_runtime() {
1196         let (tx, rx) = channel::<int>();
1197         let t = Thread::spawn(move|| {
1198             for _ in range(0i, 40) {
1199                 assert_eq!(rx.recv().unwrap(), 1);
1200             }
1201         });
1202         for _ in range(0u, 40) {
1203             tx.send(1).unwrap();
1204         }
1205         t.join().ok().unwrap();
1206     }
1207
1208     #[test]
1209     fn no_runtime() {
1210         let (tx1, rx1) = channel::<int>();
1211         let (tx2, rx2) = channel::<int>();
1212         let t1 = Thread::spawn(move|| {
1213             assert_eq!(rx1.recv().unwrap(), 1);
1214             tx2.send(2).unwrap();
1215         });
1216         let t2 = Thread::spawn(move|| {
1217             tx1.send(1).unwrap();
1218             assert_eq!(rx2.recv().unwrap(), 2);
1219         });
1220         t1.join().ok().unwrap();
1221         t2.join().ok().unwrap();
1222     }
1223
1224     #[test]
1225     fn oneshot_single_thread_close_port_first() {
1226         // Simple test of closing without sending
1227         let (_tx, rx) = channel::<int>();
1228         drop(rx);
1229     }
1230
1231     #[test]
1232     fn oneshot_single_thread_close_chan_first() {
1233         // Simple test of closing without sending
1234         let (tx, _rx) = channel::<int>();
1235         drop(tx);
1236     }
1237
1238     #[test]
1239     fn oneshot_single_thread_send_port_close() {
1240         // Testing that the sender cleans up the payload if receiver is closed
1241         let (tx, rx) = channel::<Box<int>>();
1242         drop(rx);
1243         assert!(tx.send(box 0).is_err());
1244     }
1245
1246     #[test]
1247     fn oneshot_single_thread_recv_chan_close() {
1248         // Receiving on a closed chan will panic
1249         let res = Thread::spawn(move|| {
1250             let (tx, rx) = channel::<int>();
1251             drop(tx);
1252             rx.recv().unwrap();
1253         }).join();
1254         // What is our res?
1255         assert!(res.is_err());
1256     }
1257
1258     #[test]
1259     fn oneshot_single_thread_send_then_recv() {
1260         let (tx, rx) = channel::<Box<int>>();
1261         tx.send(box 10).unwrap();
1262         assert!(rx.recv().unwrap() == box 10);
1263     }
1264
1265     #[test]
1266     fn oneshot_single_thread_try_send_open() {
1267         let (tx, rx) = channel::<int>();
1268         assert!(tx.send(10).is_ok());
1269         assert!(rx.recv().unwrap() == 10);
1270     }
1271
1272     #[test]
1273     fn oneshot_single_thread_try_send_closed() {
1274         let (tx, rx) = channel::<int>();
1275         drop(rx);
1276         assert!(tx.send(10).is_err());
1277     }
1278
1279     #[test]
1280     fn oneshot_single_thread_try_recv_open() {
1281         let (tx, rx) = channel::<int>();
1282         tx.send(10).unwrap();
1283         assert!(rx.recv() == Ok(10));
1284     }
1285
1286     #[test]
1287     fn oneshot_single_thread_try_recv_closed() {
1288         let (tx, rx) = channel::<int>();
1289         drop(tx);
1290         assert!(rx.recv().is_err());
1291     }
1292
1293     #[test]
1294     fn oneshot_single_thread_peek_data() {
1295         let (tx, rx) = channel::<int>();
1296         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1297         tx.send(10).unwrap();
1298         assert_eq!(rx.try_recv(), Ok(10));
1299     }
1300
1301     #[test]
1302     fn oneshot_single_thread_peek_close() {
1303         let (tx, rx) = channel::<int>();
1304         drop(tx);
1305         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1306         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1307     }
1308
1309     #[test]
1310     fn oneshot_single_thread_peek_open() {
1311         let (_tx, rx) = channel::<int>();
1312         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1313     }
1314
1315     #[test]
1316     fn oneshot_multi_task_recv_then_send() {
1317         let (tx, rx) = channel::<Box<int>>();
1318         let _t = Thread::spawn(move|| {
1319             assert!(rx.recv().unwrap() == box 10);
1320         });
1321
1322         tx.send(box 10).unwrap();
1323     }
1324
1325     #[test]
1326     fn oneshot_multi_task_recv_then_close() {
1327         let (tx, rx) = channel::<Box<int>>();
1328         let _t = Thread::spawn(move|| {
1329             drop(tx);
1330         });
1331         let res = Thread::spawn(move|| {
1332             assert!(rx.recv().unwrap() == box 10);
1333         }).join();
1334         assert!(res.is_err());
1335     }
1336
1337     #[test]
1338     fn oneshot_multi_thread_close_stress() {
1339         for _ in range(0, stress_factor()) {
1340             let (tx, rx) = channel::<int>();
1341             let _t = Thread::spawn(move|| {
1342                 drop(rx);
1343             });
1344             drop(tx);
1345         }
1346     }
1347
1348     #[test]
1349     fn oneshot_multi_thread_send_close_stress() {
1350         for _ in range(0, stress_factor()) {
1351             let (tx, rx) = channel::<int>();
1352             let _t = Thread::spawn(move|| {
1353                 drop(rx);
1354             });
1355             let _ = Thread::spawn(move|| {
1356                 tx.send(1).unwrap();
1357             }).join();
1358         }
1359     }
1360
1361     #[test]
1362     fn oneshot_multi_thread_recv_close_stress() {
1363         for _ in range(0, stress_factor()) {
1364             let (tx, rx) = channel::<int>();
1365             Thread::spawn(move|| {
1366                 let res = Thread::spawn(move|| {
1367                     rx.recv().unwrap();
1368                 }).join();
1369                 assert!(res.is_err());
1370             }).detach();
1371             let _t = Thread::spawn(move|| {
1372                 Thread::spawn(move|| {
1373                     drop(tx);
1374                 }).detach();
1375             });
1376         }
1377     }
1378
1379     #[test]
1380     fn oneshot_multi_thread_send_recv_stress() {
1381         for _ in range(0, stress_factor()) {
1382             let (tx, rx) = channel();
1383             let _t = Thread::spawn(move|| {
1384                 tx.send(box 10i).unwrap();
1385             });
1386             assert!(rx.recv().unwrap() == box 10i);
1387         }
1388     }
1389
1390     #[test]
1391     fn stream_send_recv_stress() {
1392         for _ in range(0, stress_factor()) {
1393             let (tx, rx) = channel();
1394
1395             send(tx, 0);
1396             recv(rx, 0);
1397
1398             fn send(tx: Sender<Box<int>>, i: int) {
1399                 if i == 10 { return }
1400
1401                 Thread::spawn(move|| {
1402                     tx.send(box i).unwrap();
1403                     send(tx, i + 1);
1404                 }).detach();
1405             }
1406
1407             fn recv(rx: Receiver<Box<int>>, i: int) {
1408                 if i == 10 { return }
1409
1410                 Thread::spawn(move|| {
1411                     assert!(rx.recv().unwrap() == box i);
1412                     recv(rx, i + 1);
1413                 }).detach();
1414             }
1415         }
1416     }
1417
1418     #[test]
1419     fn recv_a_lot() {
1420         // Regression test that we don't run out of stack in scheduler context
1421         let (tx, rx) = channel();
1422         for _ in range(0i, 10000) { tx.send(()).unwrap(); }
1423         for _ in range(0i, 10000) { rx.recv().unwrap(); }
1424     }
1425
1426     #[test]
1427     fn shared_chan_stress() {
1428         let (tx, rx) = channel();
1429         let total = stress_factor() + 100;
1430         for _ in range(0, total) {
1431             let tx = tx.clone();
1432             Thread::spawn(move|| {
1433                 tx.send(()).unwrap();
1434             }).detach();
1435         }
1436
1437         for _ in range(0, total) {
1438             rx.recv().unwrap();
1439         }
1440     }
1441
1442     #[test]
1443     fn test_nested_recv_iter() {
1444         let (tx, rx) = channel::<int>();
1445         let (total_tx, total_rx) = channel::<int>();
1446
1447         let _t = Thread::spawn(move|| {
1448             let mut acc = 0;
1449             for x in rx.iter() {
1450                 acc += x;
1451             }
1452             total_tx.send(acc).unwrap();
1453         });
1454
1455         tx.send(3).unwrap();
1456         tx.send(1).unwrap();
1457         tx.send(2).unwrap();
1458         drop(tx);
1459         assert_eq!(total_rx.recv().unwrap(), 6);
1460     }
1461
1462     #[test]
1463     fn test_recv_iter_break() {
1464         let (tx, rx) = channel::<int>();
1465         let (count_tx, count_rx) = channel();
1466
1467         let _t = Thread::spawn(move|| {
1468             let mut count = 0;
1469             for x in rx.iter() {
1470                 if count >= 3 {
1471                     break;
1472                 } else {
1473                     count += x;
1474                 }
1475             }
1476             count_tx.send(count).unwrap();
1477         });
1478
1479         tx.send(2).unwrap();
1480         tx.send(2).unwrap();
1481         tx.send(2).unwrap();
1482         let _ = tx.send(2);
1483         drop(tx);
1484         assert_eq!(count_rx.recv().unwrap(), 4);
1485     }
1486
1487     #[test]
1488     fn try_recv_states() {
1489         let (tx1, rx1) = channel::<int>();
1490         let (tx2, rx2) = channel::<()>();
1491         let (tx3, rx3) = channel::<()>();
1492         let _t = Thread::spawn(move|| {
1493             rx2.recv().unwrap();
1494             tx1.send(1).unwrap();
1495             tx3.send(()).unwrap();
1496             rx2.recv().unwrap();
1497             drop(tx1);
1498             tx3.send(()).unwrap();
1499         });
1500
1501         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1502         tx2.send(()).unwrap();
1503         rx3.recv().unwrap();
1504         assert_eq!(rx1.try_recv(), Ok(1));
1505         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1506         tx2.send(()).unwrap();
1507         rx3.recv().unwrap();
1508         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1509     }
1510
1511     // This bug used to end up in a livelock inside of the Receiver destructor
1512     // because the internal state of the Shared packet was corrupted
1513     #[test]
1514     fn destroy_upgraded_shared_port_when_sender_still_active() {
1515         let (tx, rx) = channel();
1516         let (tx2, rx2) = channel();
1517         let _t = Thread::spawn(move|| {
1518             rx.recv().unwrap(); // wait on a oneshot
1519             drop(rx);  // destroy a shared
1520             tx2.send(()).unwrap();
1521         });
1522         // make sure the other task has gone to sleep
1523         for _ in range(0u, 5000) { Thread::yield_now(); }
1524
1525         // upgrade to a shared chan and send a message
1526         let t = tx.clone();
1527         drop(tx);
1528         t.send(()).unwrap();
1529
1530         // wait for the child task to exit before we exit
1531         rx2.recv().unwrap();
1532     }
1533 }
1534
1535 #[cfg(test)]
1536 mod sync_tests {
1537     use prelude::v1::*;
1538
1539     use os;
1540     use thread::Thread;
1541     use super::*;
1542
1543     pub fn stress_factor() -> uint {
1544         match os::getenv("RUST_TEST_STRESS") {
1545             Some(val) => val.parse().unwrap(),
1546             None => 1,
1547         }
1548     }
1549
1550     #[test]
1551     fn smoke() {
1552         let (tx, rx) = sync_channel::<int>(1);
1553         tx.send(1).unwrap();
1554         assert_eq!(rx.recv().unwrap(), 1);
1555     }
1556
1557     #[test]
1558     fn drop_full() {
1559         let (tx, _rx) = sync_channel(1);
1560         tx.send(box 1i).unwrap();
1561     }
1562
1563     #[test]
1564     fn smoke_shared() {
1565         let (tx, rx) = sync_channel::<int>(1);
1566         tx.send(1).unwrap();
1567         assert_eq!(rx.recv().unwrap(), 1);
1568         let tx = tx.clone();
1569         tx.send(1).unwrap();
1570         assert_eq!(rx.recv().unwrap(), 1);
1571     }
1572
1573     #[test]
1574     fn smoke_threads() {
1575         let (tx, rx) = sync_channel::<int>(0);
1576         let _t = Thread::spawn(move|| {
1577             tx.send(1).unwrap();
1578         });
1579         assert_eq!(rx.recv().unwrap(), 1);
1580     }
1581
1582     #[test]
1583     fn smoke_port_gone() {
1584         let (tx, rx) = sync_channel::<int>(0);
1585         drop(rx);
1586         assert!(tx.send(1).is_err());
1587     }
1588
1589     #[test]
1590     fn smoke_shared_port_gone2() {
1591         let (tx, rx) = sync_channel::<int>(0);
1592         drop(rx);
1593         let tx2 = tx.clone();
1594         drop(tx);
1595         assert!(tx2.send(1).is_err());
1596     }
1597
1598     #[test]
1599     fn port_gone_concurrent() {
1600         let (tx, rx) = sync_channel::<int>(0);
1601         let _t = Thread::spawn(move|| {
1602             rx.recv().unwrap();
1603         });
1604         while tx.send(1).is_ok() {}
1605     }
1606
1607     #[test]
1608     fn port_gone_concurrent_shared() {
1609         let (tx, rx) = sync_channel::<int>(0);
1610         let tx2 = tx.clone();
1611         let _t = Thread::spawn(move|| {
1612             rx.recv().unwrap();
1613         });
1614         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1615     }
1616
1617     #[test]
1618     fn smoke_chan_gone() {
1619         let (tx, rx) = sync_channel::<int>(0);
1620         drop(tx);
1621         assert!(rx.recv().is_err());
1622     }
1623
1624     #[test]
1625     fn smoke_chan_gone_shared() {
1626         let (tx, rx) = sync_channel::<()>(0);
1627         let tx2 = tx.clone();
1628         drop(tx);
1629         drop(tx2);
1630         assert!(rx.recv().is_err());
1631     }
1632
1633     #[test]
1634     fn chan_gone_concurrent() {
1635         let (tx, rx) = sync_channel::<int>(0);
1636         Thread::spawn(move|| {
1637             tx.send(1).unwrap();
1638             tx.send(1).unwrap();
1639         }).detach();
1640         while rx.recv().is_ok() {}
1641     }
1642
1643     #[test]
1644     fn stress() {
1645         let (tx, rx) = sync_channel::<int>(0);
1646         Thread::spawn(move|| {
1647             for _ in range(0u, 10000) { tx.send(1).unwrap(); }
1648         }).detach();
1649         for _ in range(0u, 10000) {
1650             assert_eq!(rx.recv().unwrap(), 1);
1651         }
1652     }
1653
1654     #[test]
1655     fn stress_shared() {
1656         static AMT: uint = 1000;
1657         static NTHREADS: uint = 8;
1658         let (tx, rx) = sync_channel::<int>(0);
1659         let (dtx, drx) = sync_channel::<()>(0);
1660
1661         Thread::spawn(move|| {
1662             for _ in range(0, AMT * NTHREADS) {
1663                 assert_eq!(rx.recv().unwrap(), 1);
1664             }
1665             match rx.try_recv() {
1666                 Ok(..) => panic!(),
1667                 _ => {}
1668             }
1669             dtx.send(()).unwrap();
1670         }).detach();
1671
1672         for _ in range(0, NTHREADS) {
1673             let tx = tx.clone();
1674             Thread::spawn(move|| {
1675                 for _ in range(0, AMT) { tx.send(1).unwrap(); }
1676             }).detach();
1677         }
1678         drop(tx);
1679         drx.recv().unwrap();
1680     }
1681
1682     #[test]
1683     fn oneshot_single_thread_close_port_first() {
1684         // Simple test of closing without sending
1685         let (_tx, rx) = sync_channel::<int>(0);
1686         drop(rx);
1687     }
1688
1689     #[test]
1690     fn oneshot_single_thread_close_chan_first() {
1691         // Simple test of closing without sending
1692         let (tx, _rx) = sync_channel::<int>(0);
1693         drop(tx);
1694     }
1695
1696     #[test]
1697     fn oneshot_single_thread_send_port_close() {
1698         // Testing that the sender cleans up the payload if receiver is closed
1699         let (tx, rx) = sync_channel::<Box<int>>(0);
1700         drop(rx);
1701         assert!(tx.send(box 0).is_err());
1702     }
1703
1704     #[test]
1705     fn oneshot_single_thread_recv_chan_close() {
1706         // Receiving on a closed chan will panic
1707         let res = Thread::spawn(move|| {
1708             let (tx, rx) = sync_channel::<int>(0);
1709             drop(tx);
1710             rx.recv().unwrap();
1711         }).join();
1712         // What is our res?
1713         assert!(res.is_err());
1714     }
1715
1716     #[test]
1717     fn oneshot_single_thread_send_then_recv() {
1718         let (tx, rx) = sync_channel::<Box<int>>(1);
1719         tx.send(box 10).unwrap();
1720         assert!(rx.recv().unwrap() == box 10);
1721     }
1722
1723     #[test]
1724     fn oneshot_single_thread_try_send_open() {
1725         let (tx, rx) = sync_channel::<int>(1);
1726         assert_eq!(tx.try_send(10), Ok(()));
1727         assert!(rx.recv().unwrap() == 10);
1728     }
1729
1730     #[test]
1731     fn oneshot_single_thread_try_send_closed() {
1732         let (tx, rx) = sync_channel::<int>(0);
1733         drop(rx);
1734         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1735     }
1736
1737     #[test]
1738     fn oneshot_single_thread_try_send_closed2() {
1739         let (tx, _rx) = sync_channel::<int>(0);
1740         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1741     }
1742
1743     #[test]
1744     fn oneshot_single_thread_try_recv_open() {
1745         let (tx, rx) = sync_channel::<int>(1);
1746         tx.send(10).unwrap();
1747         assert!(rx.recv() == Ok(10));
1748     }
1749
1750     #[test]
1751     fn oneshot_single_thread_try_recv_closed() {
1752         let (tx, rx) = sync_channel::<int>(0);
1753         drop(tx);
1754         assert!(rx.recv().is_err());
1755     }
1756
1757     #[test]
1758     fn oneshot_single_thread_peek_data() {
1759         let (tx, rx) = sync_channel::<int>(1);
1760         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1761         tx.send(10).unwrap();
1762         assert_eq!(rx.try_recv(), Ok(10));
1763     }
1764
1765     #[test]
1766     fn oneshot_single_thread_peek_close() {
1767         let (tx, rx) = sync_channel::<int>(0);
1768         drop(tx);
1769         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1770         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1771     }
1772
1773     #[test]
1774     fn oneshot_single_thread_peek_open() {
1775         let (_tx, rx) = sync_channel::<int>(0);
1776         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1777     }
1778
1779     #[test]
1780     fn oneshot_multi_task_recv_then_send() {
1781         let (tx, rx) = sync_channel::<Box<int>>(0);
1782         let _t = Thread::spawn(move|| {
1783             assert!(rx.recv().unwrap() == box 10);
1784         });
1785
1786         tx.send(box 10).unwrap();
1787     }
1788
1789     #[test]
1790     fn oneshot_multi_task_recv_then_close() {
1791         let (tx, rx) = sync_channel::<Box<int>>(0);
1792         let _t = Thread::spawn(move|| {
1793             drop(tx);
1794         });
1795         let res = Thread::spawn(move|| {
1796             assert!(rx.recv().unwrap() == box 10);
1797         }).join();
1798         assert!(res.is_err());
1799     }
1800
1801     #[test]
1802     fn oneshot_multi_thread_close_stress() {
1803         for _ in range(0, stress_factor()) {
1804             let (tx, rx) = sync_channel::<int>(0);
1805             let _t = Thread::spawn(move|| {
1806                 drop(rx);
1807             });
1808             drop(tx);
1809         }
1810     }
1811
1812     #[test]
1813     fn oneshot_multi_thread_send_close_stress() {
1814         for _ in range(0, stress_factor()) {
1815             let (tx, rx) = sync_channel::<int>(0);
1816             let _t = Thread::spawn(move|| {
1817                 drop(rx);
1818             });
1819             let _ = Thread::spawn(move || {
1820                 tx.send(1).unwrap();
1821             }).join();
1822         }
1823     }
1824
1825     #[test]
1826     fn oneshot_multi_thread_recv_close_stress() {
1827         for _ in range(0, stress_factor()) {
1828             let (tx, rx) = sync_channel::<int>(0);
1829             let _t = Thread::spawn(move|| {
1830                 let res = Thread::spawn(move|| {
1831                     rx.recv().unwrap();
1832                 }).join();
1833                 assert!(res.is_err());
1834             });
1835             let _t = Thread::spawn(move|| {
1836                 Thread::spawn(move|| {
1837                     drop(tx);
1838                 }).detach();
1839             });
1840         }
1841     }
1842
1843     #[test]
1844     fn oneshot_multi_thread_send_recv_stress() {
1845         for _ in range(0, stress_factor()) {
1846             let (tx, rx) = sync_channel::<Box<int>>(0);
1847             let _t = Thread::spawn(move|| {
1848                 tx.send(box 10i).unwrap();
1849             });
1850             assert!(rx.recv().unwrap() == box 10i);
1851         }
1852     }
1853
1854     #[test]
1855     fn stream_send_recv_stress() {
1856         for _ in range(0, stress_factor()) {
1857             let (tx, rx) = sync_channel::<Box<int>>(0);
1858
1859             send(tx, 0);
1860             recv(rx, 0);
1861
1862             fn send(tx: SyncSender<Box<int>>, i: int) {
1863                 if i == 10 { return }
1864
1865                 Thread::spawn(move|| {
1866                     tx.send(box i).unwrap();
1867                     send(tx, i + 1);
1868                 }).detach();
1869             }
1870
1871             fn recv(rx: Receiver<Box<int>>, i: int) {
1872                 if i == 10 { return }
1873
1874                 Thread::spawn(move|| {
1875                     assert!(rx.recv().unwrap() == box i);
1876                     recv(rx, i + 1);
1877                 }).detach();
1878             }
1879         }
1880     }
1881
1882     #[test]
1883     fn recv_a_lot() {
1884         // Regression test that we don't run out of stack in scheduler context
1885         let (tx, rx) = sync_channel(10000);
1886         for _ in range(0u, 10000) { tx.send(()).unwrap(); }
1887         for _ in range(0u, 10000) { rx.recv().unwrap(); }
1888     }
1889
1890     #[test]
1891     fn shared_chan_stress() {
1892         let (tx, rx) = sync_channel(0);
1893         let total = stress_factor() + 100;
1894         for _ in range(0, total) {
1895             let tx = tx.clone();
1896             Thread::spawn(move|| {
1897                 tx.send(()).unwrap();
1898             }).detach();
1899         }
1900
1901         for _ in range(0, total) {
1902             rx.recv().unwrap();
1903         }
1904     }
1905
1906     #[test]
1907     fn test_nested_recv_iter() {
1908         let (tx, rx) = sync_channel::<int>(0);
1909         let (total_tx, total_rx) = sync_channel::<int>(0);
1910
1911         let _t = Thread::spawn(move|| {
1912             let mut acc = 0;
1913             for x in rx.iter() {
1914                 acc += x;
1915             }
1916             total_tx.send(acc).unwrap();
1917         });
1918
1919         tx.send(3).unwrap();
1920         tx.send(1).unwrap();
1921         tx.send(2).unwrap();
1922         drop(tx);
1923         assert_eq!(total_rx.recv().unwrap(), 6);
1924     }
1925
1926     #[test]
1927     fn test_recv_iter_break() {
1928         let (tx, rx) = sync_channel::<int>(0);
1929         let (count_tx, count_rx) = sync_channel(0);
1930
1931         let _t = Thread::spawn(move|| {
1932             let mut count = 0;
1933             for x in rx.iter() {
1934                 if count >= 3 {
1935                     break;
1936                 } else {
1937                     count += x;
1938                 }
1939             }
1940             count_tx.send(count).unwrap();
1941         });
1942
1943         tx.send(2).unwrap();
1944         tx.send(2).unwrap();
1945         tx.send(2).unwrap();
1946         let _ = tx.try_send(2);
1947         drop(tx);
1948         assert_eq!(count_rx.recv().unwrap(), 4);
1949     }
1950
1951     #[test]
1952     fn try_recv_states() {
1953         let (tx1, rx1) = sync_channel::<int>(1);
1954         let (tx2, rx2) = sync_channel::<()>(1);
1955         let (tx3, rx3) = sync_channel::<()>(1);
1956         let _t = Thread::spawn(move|| {
1957             rx2.recv().unwrap();
1958             tx1.send(1).unwrap();
1959             tx3.send(()).unwrap();
1960             rx2.recv().unwrap();
1961             drop(tx1);
1962             tx3.send(()).unwrap();
1963         });
1964
1965         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1966         tx2.send(()).unwrap();
1967         rx3.recv().unwrap();
1968         assert_eq!(rx1.try_recv(), Ok(1));
1969         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1970         tx2.send(()).unwrap();
1971         rx3.recv().unwrap();
1972         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1973     }
1974
1975     // This bug used to end up in a livelock inside of the Receiver destructor
1976     // because the internal state of the Shared packet was corrupted
1977     #[test]
1978     fn destroy_upgraded_shared_port_when_sender_still_active() {
1979         let (tx, rx) = sync_channel::<()>(0);
1980         let (tx2, rx2) = sync_channel::<()>(0);
1981         let _t = Thread::spawn(move|| {
1982             rx.recv().unwrap(); // wait on a oneshot
1983             drop(rx);  // destroy a shared
1984             tx2.send(()).unwrap();
1985         });
1986         // make sure the other task has gone to sleep
1987         for _ in range(0u, 5000) { Thread::yield_now(); }
1988
1989         // upgrade to a shared chan and send a message
1990         let t = tx.clone();
1991         drop(tx);
1992         t.send(()).unwrap();
1993
1994         // wait for the child task to exit before we exit
1995         rx2.recv().unwrap();
1996     }
1997
1998     #[test]
1999     fn send1() {
2000         let (tx, rx) = sync_channel::<int>(0);
2001         let _t = Thread::spawn(move|| { rx.recv().unwrap(); });
2002         assert_eq!(tx.send(1), Ok(()));
2003     }
2004
2005     #[test]
2006     fn send2() {
2007         let (tx, rx) = sync_channel::<int>(0);
2008         let _t = Thread::spawn(move|| { drop(rx); });
2009         assert!(tx.send(1).is_err());
2010     }
2011
2012     #[test]
2013     fn send3() {
2014         let (tx, rx) = sync_channel::<int>(1);
2015         assert_eq!(tx.send(1), Ok(()));
2016         let _t =Thread::spawn(move|| { drop(rx); });
2017         assert!(tx.send(1).is_err());
2018     }
2019
2020     #[test]
2021     fn send4() {
2022         let (tx, rx) = sync_channel::<int>(0);
2023         let tx2 = tx.clone();
2024         let (done, donerx) = channel();
2025         let done2 = done.clone();
2026         let _t = Thread::spawn(move|| {
2027             assert!(tx.send(1).is_err());
2028             done.send(()).unwrap();
2029         });
2030         let _t = Thread::spawn(move|| {
2031             assert!(tx2.send(2).is_err());
2032             done2.send(()).unwrap();
2033         });
2034         drop(rx);
2035         donerx.recv().unwrap();
2036         donerx.recv().unwrap();
2037     }
2038
2039     #[test]
2040     fn try_send1() {
2041         let (tx, _rx) = sync_channel::<int>(0);
2042         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2043     }
2044
2045     #[test]
2046     fn try_send2() {
2047         let (tx, _rx) = sync_channel::<int>(1);
2048         assert_eq!(tx.try_send(1), Ok(()));
2049         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2050     }
2051
2052     #[test]
2053     fn try_send3() {
2054         let (tx, rx) = sync_channel::<int>(1);
2055         assert_eq!(tx.try_send(1), Ok(()));
2056         drop(rx);
2057         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2058     }
2059
2060     #[test]
2061     fn issue_15761() {
2062         fn repro() {
2063             let (tx1, rx1) = sync_channel::<()>(3);
2064             let (tx2, rx2) = sync_channel::<()>(3);
2065
2066             let _t = Thread::spawn(move|| {
2067                 rx1.recv().unwrap();
2068                 tx2.try_send(()).unwrap();
2069             });
2070
2071             tx1.try_send(()).unwrap();
2072             rx2.recv().unwrap();
2073         }
2074
2075         for _ in range(0u, 100) {
2076             repro()
2077         }
2078     }
2079 }