]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/mod.rs
doc: remove incomplete sentence
[rust.git] / src / libstd / sync / mpsc / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Multi-producer, single-consumer communication primitives threads
12 //!
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
15 //!
16 //! * `Sender`
17 //! * `SyncSender`
18 //! * `Receiver`
19 //!
20 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
23 //!
24 //! These channels come in two flavors:
25 //!
26 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27 //!    will return a `(Sender, Receiver)` tuple where all sends will be
28 //!    **asynchronous** (they never block). The channel conceptually has an
29 //!    infinite buffer.
30 //!
31 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32 //!    a `(SyncSender, Receiver)` tuple where the storage for pending messages
33 //!    is a pre-allocated buffer of a fixed size. All sends will be
34 //!    **synchronous** by blocking until there is buffer space available. Note
35 //!    that a bound of 0 is allowed, causing the channel to become a
36 //!    "rendezvous" channel where each sender atomically hands off a message to
37 //!    a receiver.
38 //!
39 //! ## Disconnection
40 //!
41 //! The send and receive operations on channels will all return a `Result`
42 //! indicating whether the operation succeeded or not. An unsuccessful operation
43 //! is normally indicative of the other half of a channel having "hung up" by
44 //! being dropped in its corresponding thread.
45 //!
46 //! Once half of a channel has been deallocated, most operations can no longer
47 //! continue to make progress, so `Err` will be returned. Many applications will
48 //! continue to `unwrap()` the results returned from this module, instigating a
49 //! propagation of failure among threads if one unexpectedly dies.
50 //!
51 //! # Examples
52 //!
53 //! Simple usage:
54 //!
55 //! ```
56 //! use std::thread::Thread;
57 //! use std::sync::mpsc::channel;
58 //!
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! Thread::spawn(move|| {
62 //!     tx.send(10i).unwrap();
63 //! }).detach();
64 //! assert_eq!(rx.recv().unwrap(), 10i);
65 //! ```
66 //!
67 //! Shared usage:
68 //!
69 //! ```
70 //! use std::thread::Thread;
71 //! use std::sync::mpsc::channel;
72 //!
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
77 //! for i in range(0i, 10i) {
78 //!     let tx = tx.clone();
79 //!     Thread::spawn(move|| {
80 //!         tx.send(i).unwrap();
81 //!     }).detach()
82 //! }
83 //!
84 //! for _ in range(0i, 10i) {
85 //!     let j = rx.recv().unwrap();
86 //!     assert!(0 <= j && j < 10);
87 //! }
88 //! ```
89 //!
90 //! Propagating panics:
91 //!
92 //! ```
93 //! use std::sync::mpsc::channel;
94 //!
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<int>();
98 //! drop(tx);
99 //! assert!(rx.recv().is_err());
100 //! ```
101 //!
102 //! Synchronous channels:
103 //!
104 //! ```
105 //! use std::thread::Thread;
106 //! use std::sync::mpsc::sync_channel;
107 //!
108 //! let (tx, rx) = sync_channel::<int>(0);
109 //! Thread::spawn(move|| {
110 //!     // This will wait for the parent task to start receiving
111 //!     tx.send(53).unwrap();
112 //! }).detach();
113 //! rx.recv().unwrap();
114 //! ```
115 //!
116 //! Reading from a channel with a timeout requires to use a Timer together
117 //! with the channel. You can use the select! macro to select either and
118 //! handle the timeout case. This first example will break out of the loop
119 //! after 10 seconds no matter what:
120 //!
121 //! ```no_run
122 //! use std::sync::mpsc::channel;
123 //! use std::io::timer::Timer;
124 //! use std::time::Duration;
125 //!
126 //! let (tx, rx) = channel::<int>();
127 //! let mut timer = Timer::new().unwrap();
128 //! let timeout = timer.oneshot(Duration::seconds(10));
129 //!
130 //! loop {
131 //!     select! {
132 //!         val = rx.recv() => println!("Received {}", val.unwrap()),
133 //!         _ = timeout.recv() => {
134 //!             println!("timed out, total time was more than 10 seconds");
135 //!             break;
136 //!         }
137 //!     }
138 //! }
139 //! ```
140 //!
141 //! This second example is more costly since it allocates a new timer every
142 //! time a message is received, but it allows you to timeout after the channel
143 //! has been inactive for 5 seconds:
144 //!
145 //! ```no_run
146 //! use std::sync::mpsc::channel;
147 //! use std::io::timer::Timer;
148 //! use std::time::Duration;
149 //!
150 //! let (tx, rx) = channel::<int>();
151 //! let mut timer = Timer::new().unwrap();
152 //!
153 //! loop {
154 //!     let timeout = timer.oneshot(Duration::seconds(5));
155 //!
156 //!     select! {
157 //!         val = rx.recv() => println!("Received {}", val.unwrap()),
158 //!         _ = timeout.recv() => {
159 //!             println!("timed out, no message received in 5 seconds");
160 //!             break;
161 //!         }
162 //!     }
163 //! }
164 //! ```
165
166 // A description of how Rust's channel implementation works
167 //
168 // Channels are supposed to be the basic building block for all other
169 // concurrent primitives that are used in Rust. As a result, the channel type
170 // needs to be highly optimized, flexible, and broad enough for use everywhere.
171 //
172 // The choice of implementation of all channels is to be built on lock-free data
173 // structures. The channels themselves are then consequently also lock-free data
174 // structures. As always with lock-free code, this is a very "here be dragons"
175 // territory, especially because I'm unaware of any academic papers that have
176 // gone into great length about channels of these flavors.
177 //
178 // ## Flavors of channels
179 //
180 // From the perspective of a consumer of this library, there is only one flavor
181 // of channel. This channel can be used as a stream and cloned to allow multiple
182 // senders. Under the hood, however, there are actually three flavors of
183 // channels in play.
184 //
185 // * Flavor::Oneshots - these channels are highly optimized for the one-send use case.
186 //              They contain as few atomics as possible and involve one and
187 //              exactly one allocation.
188 // * Streams - these channels are optimized for the non-shared use case. They
189 //             use a different concurrent queue that is more tailored for this
190 //             use case. The initial allocation of this flavor of channel is not
191 //             optimized.
192 // * Shared - this is the most general form of channel that this module offers,
193 //            a channel with multiple senders. This type is as optimized as it
194 //            can be, but the previous two types mentioned are much faster for
195 //            their use-cases.
196 //
197 // ## Concurrent queues
198 //
199 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
200 // recv() obviously blocks. This means that under the hood there must be some
201 // shared and concurrent queue holding all of the actual data.
202 //
203 // With two flavors of channels, two flavors of queues are also used. We have
204 // chosen to use queues from a well-known author that are abbreviated as SPSC
205 // and MPSC (single producer, single consumer and multiple producer, single
206 // consumer). SPSC queues are used for streams while MPSC queues are used for
207 // shared channels.
208 //
209 // ### SPSC optimizations
210 //
211 // The SPSC queue found online is essentially a linked list of nodes where one
212 // half of the nodes are the "queue of data" and the other half of nodes are a
213 // cache of unused nodes. The unused nodes are used such that an allocation is
214 // not required on every push() and a free doesn't need to happen on every
215 // pop().
216 //
217 // As found online, however, the cache of nodes is of an infinite size. This
218 // means that if a channel at one point in its life had 50k items in the queue,
219 // then the queue will always have the capacity for 50k items. I believed that
220 // this was an unnecessary limitation of the implementation, so I have altered
221 // the queue to optionally have a bound on the cache size.
222 //
223 // By default, streams will have an unbounded SPSC queue with a small-ish cache
224 // size. The hope is that the cache is still large enough to have very fast
225 // send() operations while not too large such that millions of channels can
226 // coexist at once.
227 //
228 // ### MPSC optimizations
229 //
230 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
231 // a linked list under the hood to earn its unboundedness, but I have not put
232 // forth much effort into having a cache of nodes similar to the SPSC queue.
233 //
234 // For now, I believe that this is "ok" because shared channels are not the most
235 // common type, but soon we may wish to revisit this queue choice and determine
236 // another candidate for backend storage of shared channels.
237 //
238 // ## Overview of the Implementation
239 //
240 // Now that there's a little background on the concurrent queues used, it's
241 // worth going into much more detail about the channels themselves. The basic
242 // pseudocode for a send/recv are:
243 //
244 //
245 //      send(t)                             recv()
246 //        queue.push(t)                       return if queue.pop()
247 //        if increment() == -1                deschedule {
248 //          wakeup()                            if decrement() > 0
249 //                                                cancel_deschedule()
250 //                                            }
251 //                                            queue.pop()
252 //
253 // As mentioned before, there are no locks in this implementation, only atomic
254 // instructions are used.
255 //
256 // ### The internal atomic counter
257 //
258 // Every channel has a shared counter with each half to keep track of the size
259 // of the queue. This counter is used to abort descheduling by the receiver and
260 // to know when to wake up on the sending side.
261 //
262 // As seen in the pseudocode, senders will increment this count and receivers
263 // will decrement the count. The theory behind this is that if a sender sees a
264 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
265 // then it doesn't need to block.
266 //
267 // The recv() method has a beginning call to pop(), and if successful, it needs
268 // to decrement the count. It is a crucial implementation detail that this
269 // decrement does *not* happen to the shared counter. If this were the case,
270 // then it would be possible for the counter to be very negative when there were
271 // no receivers waiting, in which case the senders would have to determine when
272 // it was actually appropriate to wake up a receiver.
273 //
274 // Instead, the "steal count" is kept track of separately (not atomically
275 // because it's only used by receivers), and then the decrement() call when
276 // descheduling will lump in all of the recent steals into one large decrement.
277 //
278 // The implication of this is that if a sender sees a -1 count, then there's
279 // guaranteed to be a waiter waiting!
280 //
281 // ## Native Implementation
282 //
283 // A major goal of these channels is to work seamlessly on and off the runtime.
284 // All of the previous race conditions have been worded in terms of
285 // scheduler-isms (which is obviously not available without the runtime).
286 //
287 // For now, native usage of channels (off the runtime) will fall back onto
288 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
289 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
290 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
291 // condition variable.
292 //
293 // ## Select
294 //
295 // Being able to support selection over channels has greatly influenced this
296 // design, and not only does selection need to work inside the runtime, but also
297 // outside the runtime.
298 //
299 // The implementation is fairly straightforward. The goal of select() is not to
300 // return some data, but only to return which channel can receive data without
301 // blocking. The implementation is essentially the entire blocking procedure
302 // followed by an increment as soon as its woken up. The cancellation procedure
303 // involves an increment and swapping out of to_wake to acquire ownership of the
304 // task to unblock.
305 //
306 // Sadly this current implementation requires multiple allocations, so I have
307 // seen the throughput of select() be much worse than it should be. I do not
308 // believe that there is anything fundamental that needs to change about these
309 // channels, however, in order to support a more efficient select().
310 //
311 // # Conclusion
312 //
313 // And now that you've seen all the races that I found and attempted to fix,
314 // here's the code for you to find some more!
315
316 use prelude::v1::*;
317
318 use sync::Arc;
319 use fmt;
320 use kinds::marker;
321 use mem;
322 use cell::UnsafeCell;
323
324 pub use self::select::{Select, Handle};
325 use self::select::StartResult;
326 use self::select::StartResult::*;
327 use self::blocking::SignalToken;
328
329 mod blocking;
330 mod oneshot;
331 mod select;
332 mod shared;
333 mod stream;
334 mod sync;
335 mod mpsc_queue;
336 mod spsc_queue;
337
338 /// The receiving-half of Rust's channel type. This half can only be owned by
339 /// one task
340 #[stable]
341 pub struct Receiver<T> {
342     inner: UnsafeCell<Flavor<T>>,
343 }
344
345 // The receiver port can be sent from place to place, so long as it
346 // is not used to receive non-sendable things.
347 unsafe impl<T:Send> Send for Receiver<T> { }
348
349 /// An iterator over messages on a receiver, this iterator will block
350 /// whenever `next` is called, waiting for a new message, and `None` will be
351 /// returned when the corresponding channel has hung up.
352 #[stable]
353 pub struct Iter<'a, T:'a> {
354     rx: &'a Receiver<T>
355 }
356
357 /// The sending-half of Rust's asynchronous channel type. This half can only be
358 /// owned by one task, but it can be cloned to send to other tasks.
359 #[stable]
360 pub struct Sender<T> {
361     inner: UnsafeCell<Flavor<T>>,
362 }
363
364 // The send port can be sent from place to place, so long as it
365 // is not used to send non-sendable things.
366 unsafe impl<T:Send> Send for Sender<T> { }
367
368 /// The sending-half of Rust's synchronous channel type. This half can only be
369 /// owned by one task, but it can be cloned to send to other tasks.
370 #[stable]
371 pub struct SyncSender<T> {
372     inner: Arc<RacyCell<sync::Packet<T>>>,
373     // can't share in an arc
374     _marker: marker::NoSync,
375 }
376
377 /// An error returned from the `send` function on channels.
378 ///
379 /// A `send` operation can only fail if the receiving end of a channel is
380 /// disconnected, implying that the data could never be received. The error
381 /// contains the data being sent as a payload so it can be recovered.
382 #[deriving(PartialEq, Eq)]
383 #[stable]
384 pub struct SendError<T>(pub T);
385
386 /// An error returned from the `recv` function on a `Receiver`.
387 ///
388 /// The `recv` operation can only fail if the sending half of a channel is
389 /// disconnected, implying that no further messages will ever be received.
390 #[deriving(PartialEq, Eq, Clone, Copy)]
391 #[stable]
392 pub struct RecvError;
393
394 /// This enumeration is the list of the possible reasons that try_recv could not
395 /// return data when called.
396 #[deriving(PartialEq, Clone, Copy)]
397 #[stable]
398 pub enum TryRecvError {
399     /// This channel is currently empty, but the sender(s) have not yet
400     /// disconnected, so data may yet become available.
401     #[stable]
402     Empty,
403
404     /// This channel's sending half has become disconnected, and there will
405     /// never be any more data received on this channel
406     #[stable]
407     Disconnected,
408 }
409
410 /// This enumeration is the list of the possible error outcomes for the
411 /// `SyncSender::try_send` method.
412 #[deriving(PartialEq, Clone)]
413 #[stable]
414 pub enum TrySendError<T> {
415     /// The data could not be sent on the channel because it would require that
416     /// the callee block to send the data.
417     ///
418     /// If this is a buffered channel, then the buffer is full at this time. If
419     /// this is not a buffered channel, then there is no receiver available to
420     /// acquire the data.
421     #[stable]
422     Full(T),
423
424     /// This channel's receiving half has disconnected, so the data could not be
425     /// sent. The data is returned back to the callee in this case.
426     #[stable]
427     Disconnected(T),
428 }
429
430 enum Flavor<T> {
431     Oneshot(Arc<RacyCell<oneshot::Packet<T>>>),
432     Stream(Arc<RacyCell<stream::Packet<T>>>),
433     Shared(Arc<RacyCell<shared::Packet<T>>>),
434     Sync(Arc<RacyCell<sync::Packet<T>>>),
435 }
436
437 #[doc(hidden)]
438 trait UnsafeFlavor<T> {
439     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>>;
440     unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> {
441         &mut *self.inner_unsafe().get()
442     }
443     unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
444         &*self.inner_unsafe().get()
445     }
446 }
447 impl<T> UnsafeFlavor<T> for Sender<T> {
448     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
449         &self.inner
450     }
451 }
452 impl<T> UnsafeFlavor<T> for Receiver<T> {
453     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
454         &self.inner
455     }
456 }
457
458 /// Creates a new asynchronous channel, returning the sender/receiver halves.
459 ///
460 /// All data sent on the sender will become available on the receiver, and no
461 /// send will block the calling task (this channel has an "infinite buffer").
462 ///
463 /// # Example
464 ///
465 /// ```
466 /// use std::sync::mpsc::channel;
467 /// use std::thread::Thread;
468 ///
469 /// // tx is is the sending half (tx for transmission), and rx is the receiving
470 /// // half (rx for receiving).
471 /// let (tx, rx) = channel();
472 ///
473 /// // Spawn off an expensive computation
474 /// Thread::spawn(move|| {
475 /// #   fn expensive_computation() {}
476 ///     tx.send(expensive_computation()).unwrap();
477 /// }).detach();
478 ///
479 /// // Do some useful work for awhile
480 ///
481 /// // Let's see what that answer was
482 /// println!("{}", rx.recv().unwrap());
483 /// ```
484 #[stable]
485 pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
486     let a = Arc::new(RacyCell::new(oneshot::Packet::new()));
487     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
488 }
489
490 /// Creates a new synchronous, bounded channel.
491 ///
492 /// Like asynchronous channels, the `Receiver` will block until a message
493 /// becomes available. These channels differ greatly in the semantics of the
494 /// sender from asynchronous channels, however.
495 ///
496 /// This channel has an internal buffer on which messages will be queued. When
497 /// the internal buffer becomes full, future sends will *block* waiting for the
498 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
499 /// becomes  "rendezvous channel" where each send will not return until a recv
500 /// is paired with it.
501 ///
502 /// As with asynchronous channels, all senders will panic in `send` if the
503 /// `Receiver` has been destroyed.
504 ///
505 /// # Example
506 ///
507 /// ```
508 /// use std::sync::mpsc::sync_channel;
509 /// use std::thread::Thread;
510 ///
511 /// let (tx, rx) = sync_channel(1);
512 ///
513 /// // this returns immediately
514 /// tx.send(1i).unwrap();
515 ///
516 /// Thread::spawn(move|| {
517 ///     // this will block until the previous message has been received
518 ///     tx.send(2i).unwrap();
519 /// }).detach();
520 ///
521 /// assert_eq!(rx.recv().unwrap(), 1i);
522 /// assert_eq!(rx.recv().unwrap(), 2i);
523 /// ```
524 #[stable]
525 pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
526     let a = Arc::new(RacyCell::new(sync::Packet::new(bound)));
527     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
528 }
529
530 ////////////////////////////////////////////////////////////////////////////////
531 // Sender
532 ////////////////////////////////////////////////////////////////////////////////
533
534 impl<T: Send> Sender<T> {
535     fn new(inner: Flavor<T>) -> Sender<T> {
536         Sender {
537             inner: UnsafeCell::new(inner),
538         }
539     }
540
541     /// Attempts to send a value on this channel, returning it back if it could
542     /// not be sent.
543     ///
544     /// A successful send occurs when it is determined that the other end of
545     /// the channel has not hung up already. An unsuccessful send would be one
546     /// where the corresponding receiver has already been deallocated. Note
547     /// that a return value of `Err` means that the data will never be
548     /// received, but a return value of `Ok` does *not* mean that the data
549     /// will be received.  It is possible for the corresponding receiver to
550     /// hang up immediately after this function returns `Ok`.
551     ///
552     /// This method will never block the current thread.
553     ///
554     /// # Example
555     ///
556     /// ```
557     /// use std::sync::mpsc::channel;
558     ///
559     /// let (tx, rx) = channel();
560     ///
561     /// // This send is always successful
562     /// tx.send(1i).unwrap();
563     ///
564     /// // This send will fail because the receiver is gone
565     /// drop(rx);
566     /// assert_eq!(tx.send(1i).err().unwrap().0, 1);
567     /// ```
568     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
569         let (new_inner, ret) = match *unsafe { self.inner() } {
570             Flavor::Oneshot(ref p) => {
571                 unsafe {
572                     let p = p.get();
573                     if !(*p).sent() {
574                         return (*p).send(t).map_err(SendError);
575                     } else {
576                         let a =
577                             Arc::new(RacyCell::new(stream::Packet::new()));
578                         let rx = Receiver::new(Flavor::Stream(a.clone()));
579                         match (*p).upgrade(rx) {
580                             oneshot::UpSuccess => {
581                                 let ret = (*a.get()).send(t);
582                                 (a, ret)
583                             }
584                             oneshot::UpDisconnected => (a, Err(t)),
585                             oneshot::UpWoke(token) => {
586                                 // This send cannot panic because the thread is
587                                 // asleep (we're looking at it), so the receiver
588                                 // can't go away.
589                                 (*a.get()).send(t).ok().unwrap();
590                                 token.signal();
591                                 (a, Ok(()))
592                             }
593                         }
594                     }
595                 }
596             }
597             Flavor::Stream(ref p) => return unsafe {
598                 (*p.get()).send(t).map_err(SendError)
599             },
600             Flavor::Shared(ref p) => return unsafe {
601                 (*p.get()).send(t).map_err(SendError)
602             },
603             Flavor::Sync(..) => unreachable!(),
604         };
605
606         unsafe {
607             let tmp = Sender::new(Flavor::Stream(new_inner));
608             mem::swap(self.inner_mut(), tmp.inner_mut());
609         }
610         ret.map_err(SendError)
611     }
612 }
613
614 #[stable]
615 impl<T: Send> Clone for Sender<T> {
616     fn clone(&self) -> Sender<T> {
617         let (packet, sleeper, guard) = match *unsafe { self.inner() } {
618             Flavor::Oneshot(ref p) => {
619                 let a = Arc::new(RacyCell::new(shared::Packet::new()));
620                 unsafe {
621                     let guard = (*a.get()).postinit_lock();
622                     let rx = Receiver::new(Flavor::Shared(a.clone()));
623                     match (*p.get()).upgrade(rx) {
624                         oneshot::UpSuccess |
625                         oneshot::UpDisconnected => (a, None, guard),
626                         oneshot::UpWoke(task) => (a, Some(task), guard)
627                     }
628                 }
629             }
630             Flavor::Stream(ref p) => {
631                 let a = Arc::new(RacyCell::new(shared::Packet::new()));
632                 unsafe {
633                     let guard = (*a.get()).postinit_lock();
634                     let rx = Receiver::new(Flavor::Shared(a.clone()));
635                     match (*p.get()).upgrade(rx) {
636                         stream::UpSuccess |
637                         stream::UpDisconnected => (a, None, guard),
638                         stream::UpWoke(task) => (a, Some(task), guard),
639                     }
640                 }
641             }
642             Flavor::Shared(ref p) => {
643                 unsafe { (*p.get()).clone_chan(); }
644                 return Sender::new(Flavor::Shared(p.clone()));
645             }
646             Flavor::Sync(..) => unreachable!(),
647         };
648
649         unsafe {
650             (*packet.get()).inherit_blocker(sleeper, guard);
651
652             let tmp = Sender::new(Flavor::Shared(packet.clone()));
653             mem::swap(self.inner_mut(), tmp.inner_mut());
654         }
655         Sender::new(Flavor::Shared(packet))
656     }
657 }
658
659 #[unsafe_destructor]
660 impl<T: Send> Drop for Sender<T> {
661     fn drop(&mut self) {
662         match *unsafe { self.inner_mut() } {
663             Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
664             Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
665             Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
666             Flavor::Sync(..) => unreachable!(),
667         }
668     }
669 }
670
671 ////////////////////////////////////////////////////////////////////////////////
672 // SyncSender
673 ////////////////////////////////////////////////////////////////////////////////
674
675 impl<T: Send> SyncSender<T> {
676     fn new(inner: Arc<RacyCell<sync::Packet<T>>>) -> SyncSender<T> {
677         SyncSender { inner: inner, _marker: marker::NoSync }
678     }
679
680     /// Sends a value on this synchronous channel.
681     ///
682     /// This function will *block* until space in the internal buffer becomes
683     /// available or a receiver is available to hand off the message to.
684     ///
685     /// Note that a successful send does *not* guarantee that the receiver will
686     /// ever see the data if there is a buffer on this channel. Items may be
687     /// enqueued in the internal buffer for the receiver to receive at a later
688     /// time. If the buffer size is 0, however, it can be guaranteed that the
689     /// receiver has indeed received the data if this function returns success.
690     ///
691     /// This function will never panic, but it may return `Err` if the
692     /// `Receiver` has disconnected and is no longer able to receive
693     /// information.
694     #[stable]
695     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
696         unsafe { (*self.inner.get()).send(t).map_err(SendError) }
697     }
698
699     /// Attempts to send a value on this channel without blocking.
700     ///
701     /// This method differs from `send` by returning immediately if the
702     /// channel's buffer is full or no receiver is waiting to acquire some
703     /// data. Compared with `send`, this function has two failure cases
704     /// instead of one (one for disconnection, one for a full buffer).
705     ///
706     /// See `SyncSender::send` for notes about guarantees of whether the
707     /// receiver has received the data or not if this function is successful.
708     #[stable]
709     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
710         unsafe { (*self.inner.get()).try_send(t) }
711     }
712 }
713
714 #[stable]
715 impl<T: Send> Clone for SyncSender<T> {
716     fn clone(&self) -> SyncSender<T> {
717         unsafe { (*self.inner.get()).clone_chan(); }
718         return SyncSender::new(self.inner.clone());
719     }
720 }
721
722 #[unsafe_destructor]
723 impl<T: Send> Drop for SyncSender<T> {
724     fn drop(&mut self) {
725         unsafe { (*self.inner.get()).drop_chan(); }
726     }
727 }
728
729 ////////////////////////////////////////////////////////////////////////////////
730 // Receiver
731 ////////////////////////////////////////////////////////////////////////////////
732
733 impl<T: Send> Receiver<T> {
734     fn new(inner: Flavor<T>) -> Receiver<T> {
735         Receiver { inner: UnsafeCell::new(inner) }
736     }
737
738     /// Attempts to return a pending value on this receiver without blocking
739     ///
740     /// This method will never block the caller in order to wait for data to
741     /// become available. Instead, this will always return immediately with a
742     /// possible option of pending data on the channel.
743     ///
744     /// This is useful for a flavor of "optimistic check" before deciding to
745     /// block on a receiver.
746     #[stable]
747     pub fn try_recv(&self) -> Result<T, TryRecvError> {
748         loop {
749             let new_port = match *unsafe { self.inner() } {
750                 Flavor::Oneshot(ref p) => {
751                     match unsafe { (*p.get()).try_recv() } {
752                         Ok(t) => return Ok(t),
753                         Err(oneshot::Empty) => return Err(TryRecvError::Empty),
754                         Err(oneshot::Disconnected) => {
755                             return Err(TryRecvError::Disconnected)
756                         }
757                         Err(oneshot::Upgraded(rx)) => rx,
758                     }
759                 }
760                 Flavor::Stream(ref p) => {
761                     match unsafe { (*p.get()).try_recv() } {
762                         Ok(t) => return Ok(t),
763                         Err(stream::Empty) => return Err(TryRecvError::Empty),
764                         Err(stream::Disconnected) => {
765                             return Err(TryRecvError::Disconnected)
766                         }
767                         Err(stream::Upgraded(rx)) => rx,
768                     }
769                 }
770                 Flavor::Shared(ref p) => {
771                     match unsafe { (*p.get()).try_recv() } {
772                         Ok(t) => return Ok(t),
773                         Err(shared::Empty) => return Err(TryRecvError::Empty),
774                         Err(shared::Disconnected) => {
775                             return Err(TryRecvError::Disconnected)
776                         }
777                     }
778                 }
779                 Flavor::Sync(ref p) => {
780                     match unsafe { (*p.get()).try_recv() } {
781                         Ok(t) => return Ok(t),
782                         Err(sync::Empty) => return Err(TryRecvError::Empty),
783                         Err(sync::Disconnected) => {
784                             return Err(TryRecvError::Disconnected)
785                         }
786                     }
787                 }
788             };
789             unsafe {
790                 mem::swap(self.inner_mut(),
791                           new_port.inner_mut());
792             }
793         }
794     }
795
796     /// Attempt to wait for a value on this receiver, returning an error if the
797     /// corresponding channel has hung up.
798     ///
799     /// This function will always block the current thread if there is no data
800     /// available and it's possible for more data to be sent. Once a message is
801     /// sent to the corresponding `Sender`, then this receiver will wake up and
802     /// return that message.
803     ///
804     /// If the corresponding `Sender` has disconnected, or it disconnects while
805     /// this call is blocking, this call will wake up and return `Err` to
806     /// indicate that no more messages can ever be received on this channel.
807     #[stable]
808     pub fn recv(&self) -> Result<T, RecvError> {
809         loop {
810             let new_port = match *unsafe { self.inner() } {
811                 Flavor::Oneshot(ref p) => {
812                     match unsafe { (*p.get()).recv() } {
813                         Ok(t) => return Ok(t),
814                         Err(oneshot::Empty) => return unreachable!(),
815                         Err(oneshot::Disconnected) => return Err(RecvError),
816                         Err(oneshot::Upgraded(rx)) => rx,
817                     }
818                 }
819                 Flavor::Stream(ref p) => {
820                     match unsafe { (*p.get()).recv() } {
821                         Ok(t) => return Ok(t),
822                         Err(stream::Empty) => return unreachable!(),
823                         Err(stream::Disconnected) => return Err(RecvError),
824                         Err(stream::Upgraded(rx)) => rx,
825                     }
826                 }
827                 Flavor::Shared(ref p) => {
828                     match unsafe { (*p.get()).recv() } {
829                         Ok(t) => return Ok(t),
830                         Err(shared::Empty) => return unreachable!(),
831                         Err(shared::Disconnected) => return Err(RecvError),
832                     }
833                 }
834                 Flavor::Sync(ref p) => return unsafe {
835                     (*p.get()).recv().map_err(|()| RecvError)
836                 }
837             };
838             unsafe {
839                 mem::swap(self.inner_mut(), new_port.inner_mut());
840             }
841         }
842     }
843
844     /// Returns an iterator that will block waiting for messages, but never
845     /// `panic!`. It will return `None` when the channel has hung up.
846     #[stable]
847     pub fn iter(&self) -> Iter<T> {
848         Iter { rx: self }
849     }
850 }
851
852 impl<T: Send> select::Packet for Receiver<T> {
853     fn can_recv(&self) -> bool {
854         loop {
855             let new_port = match *unsafe { self.inner() } {
856                 Flavor::Oneshot(ref p) => {
857                     match unsafe { (*p.get()).can_recv() } {
858                         Ok(ret) => return ret,
859                         Err(upgrade) => upgrade,
860                     }
861                 }
862                 Flavor::Stream(ref p) => {
863                     match unsafe { (*p.get()).can_recv() } {
864                         Ok(ret) => return ret,
865                         Err(upgrade) => upgrade,
866                     }
867                 }
868                 Flavor::Shared(ref p) => {
869                     return unsafe { (*p.get()).can_recv() };
870                 }
871                 Flavor::Sync(ref p) => {
872                     return unsafe { (*p.get()).can_recv() };
873                 }
874             };
875             unsafe {
876                 mem::swap(self.inner_mut(),
877                           new_port.inner_mut());
878             }
879         }
880     }
881
882     fn start_selection(&self, mut token: SignalToken) -> StartResult {
883         loop {
884             let (t, new_port) = match *unsafe { self.inner() } {
885                 Flavor::Oneshot(ref p) => {
886                     match unsafe { (*p.get()).start_selection(token) } {
887                         oneshot::SelSuccess => return Installed,
888                         oneshot::SelCanceled => return Abort,
889                         oneshot::SelUpgraded(t, rx) => (t, rx),
890                     }
891                 }
892                 Flavor::Stream(ref p) => {
893                     match unsafe { (*p.get()).start_selection(token) } {
894                         stream::SelSuccess => return Installed,
895                         stream::SelCanceled => return Abort,
896                         stream::SelUpgraded(t, rx) => (t, rx),
897                     }
898                 }
899                 Flavor::Shared(ref p) => {
900                     return unsafe { (*p.get()).start_selection(token) };
901                 }
902                 Flavor::Sync(ref p) => {
903                     return unsafe { (*p.get()).start_selection(token) };
904                 }
905             };
906             token = t;
907             unsafe {
908                 mem::swap(self.inner_mut(), new_port.inner_mut());
909             }
910         }
911     }
912
913     fn abort_selection(&self) -> bool {
914         let mut was_upgrade = false;
915         loop {
916             let result = match *unsafe { self.inner() } {
917                 Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
918                 Flavor::Stream(ref p) => unsafe {
919                     (*p.get()).abort_selection(was_upgrade)
920                 },
921                 Flavor::Shared(ref p) => return unsafe {
922                     (*p.get()).abort_selection(was_upgrade)
923                 },
924                 Flavor::Sync(ref p) => return unsafe {
925                     (*p.get()).abort_selection()
926                 },
927             };
928             let new_port = match result { Ok(b) => return b, Err(p) => p };
929             was_upgrade = true;
930             unsafe {
931                 mem::swap(self.inner_mut(),
932                           new_port.inner_mut());
933             }
934         }
935     }
936 }
937
938 #[unstable]
939 impl<'a, T: Send> Iterator for Iter<'a, T> {
940     type Item = T;
941
942     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
943 }
944
945 #[unsafe_destructor]
946 impl<T: Send> Drop for Receiver<T> {
947     fn drop(&mut self) {
948         match *unsafe { self.inner_mut() } {
949             Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
950             Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
951             Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
952             Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
953         }
954     }
955 }
956
957 /// A version of `UnsafeCell` intended for use in concurrent data
958 /// structures (for example, you might put it in an `Arc`).
959 struct RacyCell<T>(pub UnsafeCell<T>);
960
961 impl<T> RacyCell<T> {
962
963     fn new(value: T) -> RacyCell<T> {
964         RacyCell(UnsafeCell { value: value })
965     }
966
967     unsafe fn get(&self) -> *mut T {
968         self.0.get()
969     }
970
971 }
972
973 unsafe impl<T:Send> Send for RacyCell<T> { }
974
975 unsafe impl<T> Sync for RacyCell<T> { } // Oh dear
976
977 impl<T> fmt::Show for SendError<T> {
978     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
979         "sending on a closed channel".fmt(f)
980     }
981 }
982
983 impl<T> fmt::Show for TrySendError<T> {
984     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
985         match *self {
986             TrySendError::Full(..) => {
987                 "sending on a full channel".fmt(f)
988             }
989             TrySendError::Disconnected(..) => {
990                 "sending on a closed channel".fmt(f)
991             }
992         }
993     }
994 }
995
996 impl fmt::Show for RecvError {
997     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
998         "receiving on a closed channel".fmt(f)
999     }
1000 }
1001
1002 impl fmt::Show for TryRecvError {
1003     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1004         match *self {
1005             TryRecvError::Empty => {
1006                 "receiving on an empty channel".fmt(f)
1007             }
1008             TryRecvError::Disconnected => {
1009                 "receiving on a closed channel".fmt(f)
1010             }
1011         }
1012     }
1013 }
1014
1015 #[cfg(test)]
1016 mod test {
1017     use prelude::v1::*;
1018
1019     use os;
1020     use super::*;
1021     use thread::Thread;
1022
1023     pub fn stress_factor() -> uint {
1024         match os::getenv("RUST_TEST_STRESS") {
1025             Some(val) => val.parse().unwrap(),
1026             None => 1,
1027         }
1028     }
1029
1030     #[test]
1031     fn smoke() {
1032         let (tx, rx) = channel::<int>();
1033         tx.send(1).unwrap();
1034         assert_eq!(rx.recv().unwrap(), 1);
1035     }
1036
1037     #[test]
1038     fn drop_full() {
1039         let (tx, _rx) = channel();
1040         tx.send(box 1i).unwrap();
1041     }
1042
1043     #[test]
1044     fn drop_full_shared() {
1045         let (tx, _rx) = channel();
1046         drop(tx.clone());
1047         drop(tx.clone());
1048         tx.send(box 1i).unwrap();
1049     }
1050
1051     #[test]
1052     fn smoke_shared() {
1053         let (tx, rx) = channel::<int>();
1054         tx.send(1).unwrap();
1055         assert_eq!(rx.recv().unwrap(), 1);
1056         let tx = tx.clone();
1057         tx.send(1).unwrap();
1058         assert_eq!(rx.recv().unwrap(), 1);
1059     }
1060
1061     #[test]
1062     fn smoke_threads() {
1063         let (tx, rx) = channel::<int>();
1064         let _t = Thread::spawn(move|| {
1065             tx.send(1).unwrap();
1066         });
1067         assert_eq!(rx.recv().unwrap(), 1);
1068     }
1069
1070     #[test]
1071     fn smoke_port_gone() {
1072         let (tx, rx) = channel::<int>();
1073         drop(rx);
1074         assert!(tx.send(1).is_err());
1075     }
1076
1077     #[test]
1078     fn smoke_shared_port_gone() {
1079         let (tx, rx) = channel::<int>();
1080         drop(rx);
1081         assert!(tx.send(1).is_err())
1082     }
1083
1084     #[test]
1085     fn smoke_shared_port_gone2() {
1086         let (tx, rx) = channel::<int>();
1087         drop(rx);
1088         let tx2 = tx.clone();
1089         drop(tx);
1090         assert!(tx2.send(1).is_err());
1091     }
1092
1093     #[test]
1094     fn port_gone_concurrent() {
1095         let (tx, rx) = channel::<int>();
1096         let _t = Thread::spawn(move|| {
1097             rx.recv().unwrap();
1098         });
1099         while tx.send(1).is_ok() {}
1100     }
1101
1102     #[test]
1103     fn port_gone_concurrent_shared() {
1104         let (tx, rx) = channel::<int>();
1105         let tx2 = tx.clone();
1106         let _t = Thread::spawn(move|| {
1107             rx.recv().unwrap();
1108         });
1109         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1110     }
1111
1112     #[test]
1113     fn smoke_chan_gone() {
1114         let (tx, rx) = channel::<int>();
1115         drop(tx);
1116         assert!(rx.recv().is_err());
1117     }
1118
1119     #[test]
1120     fn smoke_chan_gone_shared() {
1121         let (tx, rx) = channel::<()>();
1122         let tx2 = tx.clone();
1123         drop(tx);
1124         drop(tx2);
1125         assert!(rx.recv().is_err());
1126     }
1127
1128     #[test]
1129     fn chan_gone_concurrent() {
1130         let (tx, rx) = channel::<int>();
1131         let _t = Thread::spawn(move|| {
1132             tx.send(1).unwrap();
1133             tx.send(1).unwrap();
1134         });
1135         while rx.recv().is_ok() {}
1136     }
1137
1138     #[test]
1139     fn stress() {
1140         let (tx, rx) = channel::<int>();
1141         let t = Thread::spawn(move|| {
1142             for _ in range(0u, 10000) { tx.send(1i).unwrap(); }
1143         });
1144         for _ in range(0u, 10000) {
1145             assert_eq!(rx.recv().unwrap(), 1);
1146         }
1147         t.join().ok().unwrap();
1148     }
1149
1150     #[test]
1151     fn stress_shared() {
1152         static AMT: uint = 10000;
1153         static NTHREADS: uint = 8;
1154         let (tx, rx) = channel::<int>();
1155
1156         let t = Thread::spawn(move|| {
1157             for _ in range(0, AMT * NTHREADS) {
1158                 assert_eq!(rx.recv().unwrap(), 1);
1159             }
1160             match rx.try_recv() {
1161                 Ok(..) => panic!(),
1162                 _ => {}
1163             }
1164         });
1165
1166         for _ in range(0, NTHREADS) {
1167             let tx = tx.clone();
1168             Thread::spawn(move|| {
1169                 for _ in range(0, AMT) { tx.send(1).unwrap(); }
1170             }).detach();
1171         }
1172         drop(tx);
1173         t.join().ok().unwrap();
1174     }
1175
1176     #[test]
1177     fn send_from_outside_runtime() {
1178         let (tx1, rx1) = channel::<()>();
1179         let (tx2, rx2) = channel::<int>();
1180         let t1 = Thread::spawn(move|| {
1181             tx1.send(()).unwrap();
1182             for _ in range(0i, 40) {
1183                 assert_eq!(rx2.recv().unwrap(), 1);
1184             }
1185         });
1186         rx1.recv().unwrap();
1187         let t2 = Thread::spawn(move|| {
1188             for _ in range(0i, 40) {
1189                 tx2.send(1).unwrap();
1190             }
1191         });
1192         t1.join().ok().unwrap();
1193         t2.join().ok().unwrap();
1194     }
1195
1196     #[test]
1197     fn recv_from_outside_runtime() {
1198         let (tx, rx) = channel::<int>();
1199         let t = Thread::spawn(move|| {
1200             for _ in range(0i, 40) {
1201                 assert_eq!(rx.recv().unwrap(), 1);
1202             }
1203         });
1204         for _ in range(0u, 40) {
1205             tx.send(1).unwrap();
1206         }
1207         t.join().ok().unwrap();
1208     }
1209
1210     #[test]
1211     fn no_runtime() {
1212         let (tx1, rx1) = channel::<int>();
1213         let (tx2, rx2) = channel::<int>();
1214         let t1 = Thread::spawn(move|| {
1215             assert_eq!(rx1.recv().unwrap(), 1);
1216             tx2.send(2).unwrap();
1217         });
1218         let t2 = Thread::spawn(move|| {
1219             tx1.send(1).unwrap();
1220             assert_eq!(rx2.recv().unwrap(), 2);
1221         });
1222         t1.join().ok().unwrap();
1223         t2.join().ok().unwrap();
1224     }
1225
1226     #[test]
1227     fn oneshot_single_thread_close_port_first() {
1228         // Simple test of closing without sending
1229         let (_tx, rx) = channel::<int>();
1230         drop(rx);
1231     }
1232
1233     #[test]
1234     fn oneshot_single_thread_close_chan_first() {
1235         // Simple test of closing without sending
1236         let (tx, _rx) = channel::<int>();
1237         drop(tx);
1238     }
1239
1240     #[test]
1241     fn oneshot_single_thread_send_port_close() {
1242         // Testing that the sender cleans up the payload if receiver is closed
1243         let (tx, rx) = channel::<Box<int>>();
1244         drop(rx);
1245         assert!(tx.send(box 0).is_err());
1246     }
1247
1248     #[test]
1249     fn oneshot_single_thread_recv_chan_close() {
1250         // Receiving on a closed chan will panic
1251         let res = Thread::spawn(move|| {
1252             let (tx, rx) = channel::<int>();
1253             drop(tx);
1254             rx.recv().unwrap();
1255         }).join();
1256         // What is our res?
1257         assert!(res.is_err());
1258     }
1259
1260     #[test]
1261     fn oneshot_single_thread_send_then_recv() {
1262         let (tx, rx) = channel::<Box<int>>();
1263         tx.send(box 10).unwrap();
1264         assert!(rx.recv().unwrap() == box 10);
1265     }
1266
1267     #[test]
1268     fn oneshot_single_thread_try_send_open() {
1269         let (tx, rx) = channel::<int>();
1270         assert!(tx.send(10).is_ok());
1271         assert!(rx.recv().unwrap() == 10);
1272     }
1273
1274     #[test]
1275     fn oneshot_single_thread_try_send_closed() {
1276         let (tx, rx) = channel::<int>();
1277         drop(rx);
1278         assert!(tx.send(10).is_err());
1279     }
1280
1281     #[test]
1282     fn oneshot_single_thread_try_recv_open() {
1283         let (tx, rx) = channel::<int>();
1284         tx.send(10).unwrap();
1285         assert!(rx.recv() == Ok(10));
1286     }
1287
1288     #[test]
1289     fn oneshot_single_thread_try_recv_closed() {
1290         let (tx, rx) = channel::<int>();
1291         drop(tx);
1292         assert!(rx.recv().is_err());
1293     }
1294
1295     #[test]
1296     fn oneshot_single_thread_peek_data() {
1297         let (tx, rx) = channel::<int>();
1298         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1299         tx.send(10).unwrap();
1300         assert_eq!(rx.try_recv(), Ok(10));
1301     }
1302
1303     #[test]
1304     fn oneshot_single_thread_peek_close() {
1305         let (tx, rx) = channel::<int>();
1306         drop(tx);
1307         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1308         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1309     }
1310
1311     #[test]
1312     fn oneshot_single_thread_peek_open() {
1313         let (_tx, rx) = channel::<int>();
1314         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1315     }
1316
1317     #[test]
1318     fn oneshot_multi_task_recv_then_send() {
1319         let (tx, rx) = channel::<Box<int>>();
1320         let _t = Thread::spawn(move|| {
1321             assert!(rx.recv().unwrap() == box 10);
1322         });
1323
1324         tx.send(box 10).unwrap();
1325     }
1326
1327     #[test]
1328     fn oneshot_multi_task_recv_then_close() {
1329         let (tx, rx) = channel::<Box<int>>();
1330         let _t = Thread::spawn(move|| {
1331             drop(tx);
1332         });
1333         let res = Thread::spawn(move|| {
1334             assert!(rx.recv().unwrap() == box 10);
1335         }).join();
1336         assert!(res.is_err());
1337     }
1338
1339     #[test]
1340     fn oneshot_multi_thread_close_stress() {
1341         for _ in range(0, stress_factor()) {
1342             let (tx, rx) = channel::<int>();
1343             let _t = Thread::spawn(move|| {
1344                 drop(rx);
1345             });
1346             drop(tx);
1347         }
1348     }
1349
1350     #[test]
1351     fn oneshot_multi_thread_send_close_stress() {
1352         for _ in range(0, stress_factor()) {
1353             let (tx, rx) = channel::<int>();
1354             let _t = Thread::spawn(move|| {
1355                 drop(rx);
1356             });
1357             let _ = Thread::spawn(move|| {
1358                 tx.send(1).unwrap();
1359             }).join();
1360         }
1361     }
1362
1363     #[test]
1364     fn oneshot_multi_thread_recv_close_stress() {
1365         for _ in range(0, stress_factor()) {
1366             let (tx, rx) = channel::<int>();
1367             Thread::spawn(move|| {
1368                 let res = Thread::spawn(move|| {
1369                     rx.recv().unwrap();
1370                 }).join();
1371                 assert!(res.is_err());
1372             }).detach();
1373             let _t = Thread::spawn(move|| {
1374                 Thread::spawn(move|| {
1375                     drop(tx);
1376                 }).detach();
1377             });
1378         }
1379     }
1380
1381     #[test]
1382     fn oneshot_multi_thread_send_recv_stress() {
1383         for _ in range(0, stress_factor()) {
1384             let (tx, rx) = channel();
1385             let _t = Thread::spawn(move|| {
1386                 tx.send(box 10i).unwrap();
1387             });
1388             assert!(rx.recv().unwrap() == box 10i);
1389         }
1390     }
1391
1392     #[test]
1393     fn stream_send_recv_stress() {
1394         for _ in range(0, stress_factor()) {
1395             let (tx, rx) = channel();
1396
1397             send(tx, 0);
1398             recv(rx, 0);
1399
1400             fn send(tx: Sender<Box<int>>, i: int) {
1401                 if i == 10 { return }
1402
1403                 Thread::spawn(move|| {
1404                     tx.send(box i).unwrap();
1405                     send(tx, i + 1);
1406                 }).detach();
1407             }
1408
1409             fn recv(rx: Receiver<Box<int>>, i: int) {
1410                 if i == 10 { return }
1411
1412                 Thread::spawn(move|| {
1413                     assert!(rx.recv().unwrap() == box i);
1414                     recv(rx, i + 1);
1415                 }).detach();
1416             }
1417         }
1418     }
1419
1420     #[test]
1421     fn recv_a_lot() {
1422         // Regression test that we don't run out of stack in scheduler context
1423         let (tx, rx) = channel();
1424         for _ in range(0i, 10000) { tx.send(()).unwrap(); }
1425         for _ in range(0i, 10000) { rx.recv().unwrap(); }
1426     }
1427
1428     #[test]
1429     fn shared_chan_stress() {
1430         let (tx, rx) = channel();
1431         let total = stress_factor() + 100;
1432         for _ in range(0, total) {
1433             let tx = tx.clone();
1434             Thread::spawn(move|| {
1435                 tx.send(()).unwrap();
1436             }).detach();
1437         }
1438
1439         for _ in range(0, total) {
1440             rx.recv().unwrap();
1441         }
1442     }
1443
1444     #[test]
1445     fn test_nested_recv_iter() {
1446         let (tx, rx) = channel::<int>();
1447         let (total_tx, total_rx) = channel::<int>();
1448
1449         let _t = Thread::spawn(move|| {
1450             let mut acc = 0;
1451             for x in rx.iter() {
1452                 acc += x;
1453             }
1454             total_tx.send(acc).unwrap();
1455         });
1456
1457         tx.send(3).unwrap();
1458         tx.send(1).unwrap();
1459         tx.send(2).unwrap();
1460         drop(tx);
1461         assert_eq!(total_rx.recv().unwrap(), 6);
1462     }
1463
1464     #[test]
1465     fn test_recv_iter_break() {
1466         let (tx, rx) = channel::<int>();
1467         let (count_tx, count_rx) = channel();
1468
1469         let _t = Thread::spawn(move|| {
1470             let mut count = 0;
1471             for x in rx.iter() {
1472                 if count >= 3 {
1473                     break;
1474                 } else {
1475                     count += x;
1476                 }
1477             }
1478             count_tx.send(count).unwrap();
1479         });
1480
1481         tx.send(2).unwrap();
1482         tx.send(2).unwrap();
1483         tx.send(2).unwrap();
1484         let _ = tx.send(2);
1485         drop(tx);
1486         assert_eq!(count_rx.recv().unwrap(), 4);
1487     }
1488
1489     #[test]
1490     fn try_recv_states() {
1491         let (tx1, rx1) = channel::<int>();
1492         let (tx2, rx2) = channel::<()>();
1493         let (tx3, rx3) = channel::<()>();
1494         let _t = Thread::spawn(move|| {
1495             rx2.recv().unwrap();
1496             tx1.send(1).unwrap();
1497             tx3.send(()).unwrap();
1498             rx2.recv().unwrap();
1499             drop(tx1);
1500             tx3.send(()).unwrap();
1501         });
1502
1503         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1504         tx2.send(()).unwrap();
1505         rx3.recv().unwrap();
1506         assert_eq!(rx1.try_recv(), Ok(1));
1507         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1508         tx2.send(()).unwrap();
1509         rx3.recv().unwrap();
1510         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1511     }
1512
1513     // This bug used to end up in a livelock inside of the Receiver destructor
1514     // because the internal state of the Shared packet was corrupted
1515     #[test]
1516     fn destroy_upgraded_shared_port_when_sender_still_active() {
1517         let (tx, rx) = channel();
1518         let (tx2, rx2) = channel();
1519         let _t = Thread::spawn(move|| {
1520             rx.recv().unwrap(); // wait on a oneshot
1521             drop(rx);  // destroy a shared
1522             tx2.send(()).unwrap();
1523         });
1524         // make sure the other task has gone to sleep
1525         for _ in range(0u, 5000) { Thread::yield_now(); }
1526
1527         // upgrade to a shared chan and send a message
1528         let t = tx.clone();
1529         drop(tx);
1530         t.send(()).unwrap();
1531
1532         // wait for the child task to exit before we exit
1533         rx2.recv().unwrap();
1534     }
1535 }
1536
1537 #[cfg(test)]
1538 mod sync_tests {
1539     use prelude::v1::*;
1540
1541     use os;
1542     use thread::Thread;
1543     use super::*;
1544
1545     pub fn stress_factor() -> uint {
1546         match os::getenv("RUST_TEST_STRESS") {
1547             Some(val) => val.parse().unwrap(),
1548             None => 1,
1549         }
1550     }
1551
1552     #[test]
1553     fn smoke() {
1554         let (tx, rx) = sync_channel::<int>(1);
1555         tx.send(1).unwrap();
1556         assert_eq!(rx.recv().unwrap(), 1);
1557     }
1558
1559     #[test]
1560     fn drop_full() {
1561         let (tx, _rx) = sync_channel(1);
1562         tx.send(box 1i).unwrap();
1563     }
1564
1565     #[test]
1566     fn smoke_shared() {
1567         let (tx, rx) = sync_channel::<int>(1);
1568         tx.send(1).unwrap();
1569         assert_eq!(rx.recv().unwrap(), 1);
1570         let tx = tx.clone();
1571         tx.send(1).unwrap();
1572         assert_eq!(rx.recv().unwrap(), 1);
1573     }
1574
1575     #[test]
1576     fn smoke_threads() {
1577         let (tx, rx) = sync_channel::<int>(0);
1578         let _t = Thread::spawn(move|| {
1579             tx.send(1).unwrap();
1580         });
1581         assert_eq!(rx.recv().unwrap(), 1);
1582     }
1583
1584     #[test]
1585     fn smoke_port_gone() {
1586         let (tx, rx) = sync_channel::<int>(0);
1587         drop(rx);
1588         assert!(tx.send(1).is_err());
1589     }
1590
1591     #[test]
1592     fn smoke_shared_port_gone2() {
1593         let (tx, rx) = sync_channel::<int>(0);
1594         drop(rx);
1595         let tx2 = tx.clone();
1596         drop(tx);
1597         assert!(tx2.send(1).is_err());
1598     }
1599
1600     #[test]
1601     fn port_gone_concurrent() {
1602         let (tx, rx) = sync_channel::<int>(0);
1603         let _t = Thread::spawn(move|| {
1604             rx.recv().unwrap();
1605         });
1606         while tx.send(1).is_ok() {}
1607     }
1608
1609     #[test]
1610     fn port_gone_concurrent_shared() {
1611         let (tx, rx) = sync_channel::<int>(0);
1612         let tx2 = tx.clone();
1613         let _t = Thread::spawn(move|| {
1614             rx.recv().unwrap();
1615         });
1616         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1617     }
1618
1619     #[test]
1620     fn smoke_chan_gone() {
1621         let (tx, rx) = sync_channel::<int>(0);
1622         drop(tx);
1623         assert!(rx.recv().is_err());
1624     }
1625
1626     #[test]
1627     fn smoke_chan_gone_shared() {
1628         let (tx, rx) = sync_channel::<()>(0);
1629         let tx2 = tx.clone();
1630         drop(tx);
1631         drop(tx2);
1632         assert!(rx.recv().is_err());
1633     }
1634
1635     #[test]
1636     fn chan_gone_concurrent() {
1637         let (tx, rx) = sync_channel::<int>(0);
1638         Thread::spawn(move|| {
1639             tx.send(1).unwrap();
1640             tx.send(1).unwrap();
1641         }).detach();
1642         while rx.recv().is_ok() {}
1643     }
1644
1645     #[test]
1646     fn stress() {
1647         let (tx, rx) = sync_channel::<int>(0);
1648         Thread::spawn(move|| {
1649             for _ in range(0u, 10000) { tx.send(1).unwrap(); }
1650         }).detach();
1651         for _ in range(0u, 10000) {
1652             assert_eq!(rx.recv().unwrap(), 1);
1653         }
1654     }
1655
1656     #[test]
1657     fn stress_shared() {
1658         static AMT: uint = 1000;
1659         static NTHREADS: uint = 8;
1660         let (tx, rx) = sync_channel::<int>(0);
1661         let (dtx, drx) = sync_channel::<()>(0);
1662
1663         Thread::spawn(move|| {
1664             for _ in range(0, AMT * NTHREADS) {
1665                 assert_eq!(rx.recv().unwrap(), 1);
1666             }
1667             match rx.try_recv() {
1668                 Ok(..) => panic!(),
1669                 _ => {}
1670             }
1671             dtx.send(()).unwrap();
1672         }).detach();
1673
1674         for _ in range(0, NTHREADS) {
1675             let tx = tx.clone();
1676             Thread::spawn(move|| {
1677                 for _ in range(0, AMT) { tx.send(1).unwrap(); }
1678             }).detach();
1679         }
1680         drop(tx);
1681         drx.recv().unwrap();
1682     }
1683
1684     #[test]
1685     fn oneshot_single_thread_close_port_first() {
1686         // Simple test of closing without sending
1687         let (_tx, rx) = sync_channel::<int>(0);
1688         drop(rx);
1689     }
1690
1691     #[test]
1692     fn oneshot_single_thread_close_chan_first() {
1693         // Simple test of closing without sending
1694         let (tx, _rx) = sync_channel::<int>(0);
1695         drop(tx);
1696     }
1697
1698     #[test]
1699     fn oneshot_single_thread_send_port_close() {
1700         // Testing that the sender cleans up the payload if receiver is closed
1701         let (tx, rx) = sync_channel::<Box<int>>(0);
1702         drop(rx);
1703         assert!(tx.send(box 0).is_err());
1704     }
1705
1706     #[test]
1707     fn oneshot_single_thread_recv_chan_close() {
1708         // Receiving on a closed chan will panic
1709         let res = Thread::spawn(move|| {
1710             let (tx, rx) = sync_channel::<int>(0);
1711             drop(tx);
1712             rx.recv().unwrap();
1713         }).join();
1714         // What is our res?
1715         assert!(res.is_err());
1716     }
1717
1718     #[test]
1719     fn oneshot_single_thread_send_then_recv() {
1720         let (tx, rx) = sync_channel::<Box<int>>(1);
1721         tx.send(box 10).unwrap();
1722         assert!(rx.recv().unwrap() == box 10);
1723     }
1724
1725     #[test]
1726     fn oneshot_single_thread_try_send_open() {
1727         let (tx, rx) = sync_channel::<int>(1);
1728         assert_eq!(tx.try_send(10), Ok(()));
1729         assert!(rx.recv().unwrap() == 10);
1730     }
1731
1732     #[test]
1733     fn oneshot_single_thread_try_send_closed() {
1734         let (tx, rx) = sync_channel::<int>(0);
1735         drop(rx);
1736         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1737     }
1738
1739     #[test]
1740     fn oneshot_single_thread_try_send_closed2() {
1741         let (tx, _rx) = sync_channel::<int>(0);
1742         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1743     }
1744
1745     #[test]
1746     fn oneshot_single_thread_try_recv_open() {
1747         let (tx, rx) = sync_channel::<int>(1);
1748         tx.send(10).unwrap();
1749         assert!(rx.recv() == Ok(10));
1750     }
1751
1752     #[test]
1753     fn oneshot_single_thread_try_recv_closed() {
1754         let (tx, rx) = sync_channel::<int>(0);
1755         drop(tx);
1756         assert!(rx.recv().is_err());
1757     }
1758
1759     #[test]
1760     fn oneshot_single_thread_peek_data() {
1761         let (tx, rx) = sync_channel::<int>(1);
1762         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1763         tx.send(10).unwrap();
1764         assert_eq!(rx.try_recv(), Ok(10));
1765     }
1766
1767     #[test]
1768     fn oneshot_single_thread_peek_close() {
1769         let (tx, rx) = sync_channel::<int>(0);
1770         drop(tx);
1771         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1772         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1773     }
1774
1775     #[test]
1776     fn oneshot_single_thread_peek_open() {
1777         let (_tx, rx) = sync_channel::<int>(0);
1778         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1779     }
1780
1781     #[test]
1782     fn oneshot_multi_task_recv_then_send() {
1783         let (tx, rx) = sync_channel::<Box<int>>(0);
1784         let _t = Thread::spawn(move|| {
1785             assert!(rx.recv().unwrap() == box 10);
1786         });
1787
1788         tx.send(box 10).unwrap();
1789     }
1790
1791     #[test]
1792     fn oneshot_multi_task_recv_then_close() {
1793         let (tx, rx) = sync_channel::<Box<int>>(0);
1794         let _t = Thread::spawn(move|| {
1795             drop(tx);
1796         });
1797         let res = Thread::spawn(move|| {
1798             assert!(rx.recv().unwrap() == box 10);
1799         }).join();
1800         assert!(res.is_err());
1801     }
1802
1803     #[test]
1804     fn oneshot_multi_thread_close_stress() {
1805         for _ in range(0, stress_factor()) {
1806             let (tx, rx) = sync_channel::<int>(0);
1807             let _t = Thread::spawn(move|| {
1808                 drop(rx);
1809             });
1810             drop(tx);
1811         }
1812     }
1813
1814     #[test]
1815     fn oneshot_multi_thread_send_close_stress() {
1816         for _ in range(0, stress_factor()) {
1817             let (tx, rx) = sync_channel::<int>(0);
1818             let _t = Thread::spawn(move|| {
1819                 drop(rx);
1820             });
1821             let _ = Thread::spawn(move || {
1822                 tx.send(1).unwrap();
1823             }).join();
1824         }
1825     }
1826
1827     #[test]
1828     fn oneshot_multi_thread_recv_close_stress() {
1829         for _ in range(0, stress_factor()) {
1830             let (tx, rx) = sync_channel::<int>(0);
1831             let _t = Thread::spawn(move|| {
1832                 let res = Thread::spawn(move|| {
1833                     rx.recv().unwrap();
1834                 }).join();
1835                 assert!(res.is_err());
1836             });
1837             let _t = Thread::spawn(move|| {
1838                 Thread::spawn(move|| {
1839                     drop(tx);
1840                 }).detach();
1841             });
1842         }
1843     }
1844
1845     #[test]
1846     fn oneshot_multi_thread_send_recv_stress() {
1847         for _ in range(0, stress_factor()) {
1848             let (tx, rx) = sync_channel::<Box<int>>(0);
1849             let _t = Thread::spawn(move|| {
1850                 tx.send(box 10i).unwrap();
1851             });
1852             assert!(rx.recv().unwrap() == box 10i);
1853         }
1854     }
1855
1856     #[test]
1857     fn stream_send_recv_stress() {
1858         for _ in range(0, stress_factor()) {
1859             let (tx, rx) = sync_channel::<Box<int>>(0);
1860
1861             send(tx, 0);
1862             recv(rx, 0);
1863
1864             fn send(tx: SyncSender<Box<int>>, i: int) {
1865                 if i == 10 { return }
1866
1867                 Thread::spawn(move|| {
1868                     tx.send(box i).unwrap();
1869                     send(tx, i + 1);
1870                 }).detach();
1871             }
1872
1873             fn recv(rx: Receiver<Box<int>>, i: int) {
1874                 if i == 10 { return }
1875
1876                 Thread::spawn(move|| {
1877                     assert!(rx.recv().unwrap() == box i);
1878                     recv(rx, i + 1);
1879                 }).detach();
1880             }
1881         }
1882     }
1883
1884     #[test]
1885     fn recv_a_lot() {
1886         // Regression test that we don't run out of stack in scheduler context
1887         let (tx, rx) = sync_channel(10000);
1888         for _ in range(0u, 10000) { tx.send(()).unwrap(); }
1889         for _ in range(0u, 10000) { rx.recv().unwrap(); }
1890     }
1891
1892     #[test]
1893     fn shared_chan_stress() {
1894         let (tx, rx) = sync_channel(0);
1895         let total = stress_factor() + 100;
1896         for _ in range(0, total) {
1897             let tx = tx.clone();
1898             Thread::spawn(move|| {
1899                 tx.send(()).unwrap();
1900             }).detach();
1901         }
1902
1903         for _ in range(0, total) {
1904             rx.recv().unwrap();
1905         }
1906     }
1907
1908     #[test]
1909     fn test_nested_recv_iter() {
1910         let (tx, rx) = sync_channel::<int>(0);
1911         let (total_tx, total_rx) = sync_channel::<int>(0);
1912
1913         let _t = Thread::spawn(move|| {
1914             let mut acc = 0;
1915             for x in rx.iter() {
1916                 acc += x;
1917             }
1918             total_tx.send(acc).unwrap();
1919         });
1920
1921         tx.send(3).unwrap();
1922         tx.send(1).unwrap();
1923         tx.send(2).unwrap();
1924         drop(tx);
1925         assert_eq!(total_rx.recv().unwrap(), 6);
1926     }
1927
1928     #[test]
1929     fn test_recv_iter_break() {
1930         let (tx, rx) = sync_channel::<int>(0);
1931         let (count_tx, count_rx) = sync_channel(0);
1932
1933         let _t = Thread::spawn(move|| {
1934             let mut count = 0;
1935             for x in rx.iter() {
1936                 if count >= 3 {
1937                     break;
1938                 } else {
1939                     count += x;
1940                 }
1941             }
1942             count_tx.send(count).unwrap();
1943         });
1944
1945         tx.send(2).unwrap();
1946         tx.send(2).unwrap();
1947         tx.send(2).unwrap();
1948         let _ = tx.try_send(2);
1949         drop(tx);
1950         assert_eq!(count_rx.recv().unwrap(), 4);
1951     }
1952
1953     #[test]
1954     fn try_recv_states() {
1955         let (tx1, rx1) = sync_channel::<int>(1);
1956         let (tx2, rx2) = sync_channel::<()>(1);
1957         let (tx3, rx3) = sync_channel::<()>(1);
1958         let _t = Thread::spawn(move|| {
1959             rx2.recv().unwrap();
1960             tx1.send(1).unwrap();
1961             tx3.send(()).unwrap();
1962             rx2.recv().unwrap();
1963             drop(tx1);
1964             tx3.send(()).unwrap();
1965         });
1966
1967         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1968         tx2.send(()).unwrap();
1969         rx3.recv().unwrap();
1970         assert_eq!(rx1.try_recv(), Ok(1));
1971         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1972         tx2.send(()).unwrap();
1973         rx3.recv().unwrap();
1974         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1975     }
1976
1977     // This bug used to end up in a livelock inside of the Receiver destructor
1978     // because the internal state of the Shared packet was corrupted
1979     #[test]
1980     fn destroy_upgraded_shared_port_when_sender_still_active() {
1981         let (tx, rx) = sync_channel::<()>(0);
1982         let (tx2, rx2) = sync_channel::<()>(0);
1983         let _t = Thread::spawn(move|| {
1984             rx.recv().unwrap(); // wait on a oneshot
1985             drop(rx);  // destroy a shared
1986             tx2.send(()).unwrap();
1987         });
1988         // make sure the other task has gone to sleep
1989         for _ in range(0u, 5000) { Thread::yield_now(); }
1990
1991         // upgrade to a shared chan and send a message
1992         let t = tx.clone();
1993         drop(tx);
1994         t.send(()).unwrap();
1995
1996         // wait for the child task to exit before we exit
1997         rx2.recv().unwrap();
1998     }
1999
2000     #[test]
2001     fn send1() {
2002         let (tx, rx) = sync_channel::<int>(0);
2003         let _t = Thread::spawn(move|| { rx.recv().unwrap(); });
2004         assert_eq!(tx.send(1), Ok(()));
2005     }
2006
2007     #[test]
2008     fn send2() {
2009         let (tx, rx) = sync_channel::<int>(0);
2010         let _t = Thread::spawn(move|| { drop(rx); });
2011         assert!(tx.send(1).is_err());
2012     }
2013
2014     #[test]
2015     fn send3() {
2016         let (tx, rx) = sync_channel::<int>(1);
2017         assert_eq!(tx.send(1), Ok(()));
2018         let _t =Thread::spawn(move|| { drop(rx); });
2019         assert!(tx.send(1).is_err());
2020     }
2021
2022     #[test]
2023     fn send4() {
2024         let (tx, rx) = sync_channel::<int>(0);
2025         let tx2 = tx.clone();
2026         let (done, donerx) = channel();
2027         let done2 = done.clone();
2028         let _t = Thread::spawn(move|| {
2029             assert!(tx.send(1).is_err());
2030             done.send(()).unwrap();
2031         });
2032         let _t = Thread::spawn(move|| {
2033             assert!(tx2.send(2).is_err());
2034             done2.send(()).unwrap();
2035         });
2036         drop(rx);
2037         donerx.recv().unwrap();
2038         donerx.recv().unwrap();
2039     }
2040
2041     #[test]
2042     fn try_send1() {
2043         let (tx, _rx) = sync_channel::<int>(0);
2044         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2045     }
2046
2047     #[test]
2048     fn try_send2() {
2049         let (tx, _rx) = sync_channel::<int>(1);
2050         assert_eq!(tx.try_send(1), Ok(()));
2051         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2052     }
2053
2054     #[test]
2055     fn try_send3() {
2056         let (tx, rx) = sync_channel::<int>(1);
2057         assert_eq!(tx.try_send(1), Ok(()));
2058         drop(rx);
2059         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2060     }
2061
2062     #[test]
2063     fn issue_15761() {
2064         fn repro() {
2065             let (tx1, rx1) = sync_channel::<()>(3);
2066             let (tx2, rx2) = sync_channel::<()>(3);
2067
2068             let _t = Thread::spawn(move|| {
2069                 rx1.recv().unwrap();
2070                 tx2.try_send(()).unwrap();
2071             });
2072
2073             tx1.try_send(()).unwrap();
2074             rx2.recv().unwrap();
2075         }
2076
2077         for _ in range(0u, 100) {
2078             repro()
2079         }
2080     }
2081 }