]> git.lizzy.rs Git - rust.git/blob - src/libstd/comm/mod.rs
2b66e91c00db05eadc1a06bf47a502775ef9ba2e
[rust.git] / src / libstd / comm / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Communication primitives for concurrent tasks
12 //!
13 //! Rust makes it very difficult to share data among tasks to prevent race
14 //! conditions and to improve parallelism, but there is often a need for
15 //! communication between concurrent tasks. The primitives defined in this
16 //! module are the building blocks for synchronization in rust.
17 //!
18 //! This module provides message-based communication over channels, concretely
19 //! defined among three types:
20 //!
21 //! * `Sender`
22 //! * `SyncSender`
23 //! * `Receiver`
24 //!
25 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
26 //! senders are clone-able such that many tasks can send simultaneously to one
27 //! receiver.  These channels are *task blocking*, not *thread blocking*. This
28 //! means that if one task is blocked on a channel, other tasks can continue to
29 //! make progress.
30 //!
31 //! Rust channels come in one of two flavors:
32 //!
33 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
34 //!    will return a `(Sender, Receiver)` tuple where all sends will be
35 //!    **asynchronous** (they never block). The channel conceptually has an
36 //!    infinite buffer.
37 //!
38 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
39 //!    a `(SyncSender, Receiver)` tuple where the storage for pending messages
40 //!    is a pre-allocated buffer of a fixed size. All sends will be
41 //!    **synchronous** by blocking until there is buffer space available. Note
42 //!    that a bound of 0 is allowed, causing the channel to become a
43 //!    "rendezvous" channel where each sender atomically hands off a message to
44 //!    a receiver.
45 //!
46 //! ## Panic Propagation
47 //!
48 //! In addition to being a core primitive for communicating in rust, channels
49 //! are the points at which panics are propagated among tasks.  Whenever the one
50 //! half of channel is closed, the other half will have its next operation
51 //! `panic!`. The purpose of this is to allow propagation of panics among tasks
52 //! that are linked to one another via channels.
53 //!
54 //! There are methods on both of senders and receivers to perform their
55 //! respective operations without panicking, however.
56 //!
57 //! ## Runtime Requirements
58 //!
59 //! The channel types defined in this module generally have very few runtime
60 //! requirements in order to operate. The major requirement they have is for a
61 //! local rust `Task` to be available if any *blocking* operation is performed.
62 //!
63 //! If a local `Task` is not available (for example an FFI callback), then the
64 //! `send` operation is safe on a `Sender` (as well as a `send_opt`) as well as
65 //! the `try_send` method on a `SyncSender`, but no other operations are
66 //! guaranteed to be safe.
67 //!
68 //! # Example
69 //!
70 //! Simple usage:
71 //!
72 //! ```
73 //! // Create a simple streaming channel
74 //! let (tx, rx) = channel();
75 //! spawn(proc() {
76 //!     tx.send(10i);
77 //! });
78 //! assert_eq!(rx.recv(), 10i);
79 //! ```
80 //!
81 //! Shared usage:
82 //!
83 //! ```
84 //! // Create a shared channel which can be sent along from many tasks
85 //! // where tx is the sending half (tx for transmission), and rx is the receiving
86 //! // half (rx for receiving).
87 //! let (tx, rx) = channel();
88 //! for i in range(0i, 10i) {
89 //!     let tx = tx.clone();
90 //!     spawn(proc() {
91 //!         tx.send(i);
92 //!     })
93 //! }
94 //!
95 //! for _ in range(0i, 10i) {
96 //!     let j = rx.recv();
97 //!     assert!(0 <= j && j < 10);
98 //! }
99 //! ```
100 //!
101 //! Propagating panics:
102 //!
103 //! ```should_fail
104 //! // The call to recv() will panic!() because the channel has already hung
105 //! // up (or been deallocated)
106 //! let (tx, rx) = channel::<int>();
107 //! drop(tx);
108 //! rx.recv();
109 //! ```
110 //!
111 //! Synchronous channels:
112 //!
113 //! ```
114 //! let (tx, rx) = sync_channel::<int>(0);
115 //! spawn(proc() {
116 //!     // This will wait for the parent task to start receiving
117 //!     tx.send(53);
118 //! });
119 //! rx.recv();
120 //! ```
121 //!
122 //! Reading from a channel with a timeout requires to use a Timer together
123 //! with the channel. You can use the select! macro to select either and
124 //! handle the timeout case. This first example will break out of the loop
125 //! after 10 seconds no matter what:
126 //!
127 //! ```no_run
128 //! use std::io::timer::Timer;
129 //! use std::time::Duration;
130 //!
131 //! let (tx, rx) = channel::<int>();
132 //! let mut timer = Timer::new().unwrap();
133 //! let timeout = timer.oneshot(Duration::seconds(10));
134 //!
135 //! loop {
136 //!     select! {
137 //!         val = rx.recv() => println!("Received {}", val),
138 //!         () = timeout.recv() => {
139 //!             println!("timed out, total time was more than 10 seconds")
140 //!             break;
141 //!         }
142 //!     }
143 //! }
144 //! ```
145 //!
146 //! This second example is more costly since it allocates a new timer every
147 //! time a message is received, but it allows you to timeout after the channel
148 //! has been inactive for 5 seconds:
149 //!
150 //! ```no_run
151 //! use std::io::timer::Timer;
152 //! use std::time::Duration;
153 //!
154 //! let (tx, rx) = channel::<int>();
155 //! let mut timer = Timer::new().unwrap();
156 //!
157 //! loop {
158 //!     let timeout = timer.oneshot(Duration::seconds(5));
159 //!
160 //!     select! {
161 //!         val = rx.recv() => println!("Received {}", val),
162 //!         () = timeout.recv() => {
163 //!             println!("timed out, no message received in 5 seconds")
164 //!             break;
165 //!         }
166 //!     }
167 //! }
168 //! ```
169
170 // A description of how Rust's channel implementation works
171 //
172 // Channels are supposed to be the basic building block for all other
173 // concurrent primitives that are used in Rust. As a result, the channel type
174 // needs to be highly optimized, flexible, and broad enough for use everywhere.
175 //
176 // The choice of implementation of all channels is to be built on lock-free data
177 // structures. The channels themselves are then consequently also lock-free data
178 // structures. As always with lock-free code, this is a very "here be dragons"
179 // territory, especially because I'm unaware of any academic papers which have
180 // gone into great length about channels of these flavors.
181 //
182 // ## Flavors of channels
183 //
184 // From the perspective of a consumer of this library, there is only one flavor
185 // of channel. This channel can be used as a stream and cloned to allow multiple
186 // senders. Under the hood, however, there are actually three flavors of
187 // channels in play.
188 //
189 // * Oneshots - these channels are highly optimized for the one-send use case.
190 //              They contain as few atomics as possible and involve one and
191 //              exactly one allocation.
192 // * Streams - these channels are optimized for the non-shared use case. They
193 //             use a different concurrent queue which is more tailored for this
194 //             use case. The initial allocation of this flavor of channel is not
195 //             optimized.
196 // * Shared - this is the most general form of channel that this module offers,
197 //            a channel with multiple senders. This type is as optimized as it
198 //            can be, but the previous two types mentioned are much faster for
199 //            their use-cases.
200 //
201 // ## Concurrent queues
202 //
203 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
204 // recv() obviously blocks. This means that under the hood there must be some
205 // shared and concurrent queue holding all of the actual data.
206 //
207 // With two flavors of channels, two flavors of queues are also used. We have
208 // chosen to use queues from a well-known author which are abbreviated as SPSC
209 // and MPSC (single producer, single consumer and multiple producer, single
210 // consumer). SPSC queues are used for streams while MPSC queues are used for
211 // shared channels.
212 //
213 // ### SPSC optimizations
214 //
215 // The SPSC queue found online is essentially a linked list of nodes where one
216 // half of the nodes are the "queue of data" and the other half of nodes are a
217 // cache of unused nodes. The unused nodes are used such that an allocation is
218 // not required on every push() and a free doesn't need to happen on every
219 // pop().
220 //
221 // As found online, however, the cache of nodes is of an infinite size. This
222 // means that if a channel at one point in its life had 50k items in the queue,
223 // then the queue will always have the capacity for 50k items. I believed that
224 // this was an unnecessary limitation of the implementation, so I have altered
225 // the queue to optionally have a bound on the cache size.
226 //
227 // By default, streams will have an unbounded SPSC queue with a small-ish cache
228 // size. The hope is that the cache is still large enough to have very fast
229 // send() operations while not too large such that millions of channels can
230 // coexist at once.
231 //
232 // ### MPSC optimizations
233 //
234 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
235 // a linked list under the hood to earn its unboundedness, but I have not put
236 // forth much effort into having a cache of nodes similar to the SPSC queue.
237 //
238 // For now, I believe that this is "ok" because shared channels are not the most
239 // common type, but soon we may wish to revisit this queue choice and determine
240 // another candidate for backend storage of shared channels.
241 //
242 // ## Overview of the Implementation
243 //
244 // Now that there's a little background on the concurrent queues used, it's
245 // worth going into much more detail about the channels themselves. The basic
246 // pseudocode for a send/recv are:
247 //
248 //
249 //      send(t)                             recv()
250 //        queue.push(t)                       return if queue.pop()
251 //        if increment() == -1                deschedule {
252 //          wakeup()                            if decrement() > 0
253 //                                                cancel_deschedule()
254 //                                            }
255 //                                            queue.pop()
256 //
257 // As mentioned before, there are no locks in this implementation, only atomic
258 // instructions are used.
259 //
260 // ### The internal atomic counter
261 //
262 // Every channel has a shared counter with each half to keep track of the size
263 // of the queue. This counter is used to abort descheduling by the receiver and
264 // to know when to wake up on the sending side.
265 //
266 // As seen in the pseudocode, senders will increment this count and receivers
267 // will decrement the count. The theory behind this is that if a sender sees a
268 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
269 // then it doesn't need to block.
270 //
271 // The recv() method has a beginning call to pop(), and if successful, it needs
272 // to decrement the count. It is a crucial implementation detail that this
273 // decrement does *not* happen to the shared counter. If this were the case,
274 // then it would be possible for the counter to be very negative when there were
275 // no receivers waiting, in which case the senders would have to determine when
276 // it was actually appropriate to wake up a receiver.
277 //
278 // Instead, the "steal count" is kept track of separately (not atomically
279 // because it's only used by receivers), and then the decrement() call when
280 // descheduling will lump in all of the recent steals into one large decrement.
281 //
282 // The implication of this is that if a sender sees a -1 count, then there's
283 // guaranteed to be a waiter waiting!
284 //
285 // ## Native Implementation
286 //
287 // A major goal of these channels is to work seamlessly on and off the runtime.
288 // All of the previous race conditions have been worded in terms of
289 // scheduler-isms (which is obviously not available without the runtime).
290 //
291 // For now, native usage of channels (off the runtime) will fall back onto
292 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
293 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
294 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
295 // condition variable.
296 //
297 // ## Select
298 //
299 // Being able to support selection over channels has greatly influenced this
300 // design, and not only does selection need to work inside the runtime, but also
301 // outside the runtime.
302 //
303 // The implementation is fairly straightforward. The goal of select() is not to
304 // return some data, but only to return which channel can receive data without
305 // blocking. The implementation is essentially the entire blocking procedure
306 // followed by an increment as soon as its woken up. The cancellation procedure
307 // involves an increment and swapping out of to_wake to acquire ownership of the
308 // task to unblock.
309 //
310 // Sadly this current implementation requires multiple allocations, so I have
311 // seen the throughput of select() be much worse than it should be. I do not
312 // believe that there is anything fundamental which needs to change about these
313 // channels, however, in order to support a more efficient select().
314 //
315 // # Conclusion
316 //
317 // And now that you've seen all the races that I found and attempted to fix,
318 // here's the code for you to find some more!
319
320 use core::prelude::*;
321
322 pub use self::TryRecvError::*;
323 pub use self::TrySendError::*;
324 use self::Flavor::*;
325
326 use alloc::arc::Arc;
327 use core::kinds::marker;
328 use core::mem;
329 use core::cell::UnsafeCell;
330 use rustrt::task::BlockedTask;
331
332 pub use comm::select::{Select, Handle};
333
334 macro_rules! test (
335     { fn $name:ident() $b:block $(#[$a:meta])*} => (
336         mod $name {
337             #![allow(unused_imports)]
338
339             extern crate rustrt;
340
341             use prelude::*;
342
343             use comm::*;
344             use super::*;
345             use task;
346
347             $(#[$a])* #[test] fn f() { $b }
348         }
349     )
350 )
351
352 mod oneshot;
353 mod select;
354 mod shared;
355 mod stream;
356 mod sync;
357
358 /// The receiving-half of Rust's channel type. This half can only be owned by
359 /// one task
360 #[unstable]
361 pub struct Receiver<T> {
362     inner: UnsafeCell<Flavor<T>>,
363     // can't share in an arc
364     _marker: marker::NoSync,
365 }
366
367 /// An iterator over messages on a receiver, this iterator will block
368 /// whenever `next` is called, waiting for a new message, and `None` will be
369 /// returned when the corresponding channel has hung up.
370 #[unstable]
371 pub struct Messages<'a, T:'a> {
372     rx: &'a Receiver<T>
373 }
374
375 /// The sending-half of Rust's asynchronous channel type. This half can only be
376 /// owned by one task, but it can be cloned to send to other tasks.
377 #[unstable]
378 pub struct Sender<T> {
379     inner: UnsafeCell<Flavor<T>>,
380     // can't share in an arc
381     _marker: marker::NoSync,
382 }
383
384 /// The sending-half of Rust's synchronous channel type. This half can only be
385 /// owned by one task, but it can be cloned to send to other tasks.
386 #[unstable = "this type may be renamed, but it will always exist"]
387 pub struct SyncSender<T> {
388     inner: Arc<UnsafeCell<sync::Packet<T>>>,
389     // can't share in an arc
390     _marker: marker::NoSync,
391 }
392
393 /// This enumeration is the list of the possible reasons that try_recv could not
394 /// return data when called.
395 #[deriving(PartialEq, Clone, Show)]
396 #[experimental = "this is likely to be removed in changing try_recv()"]
397 pub enum TryRecvError {
398     /// This channel is currently empty, but the sender(s) have not yet
399     /// disconnected, so data may yet become available.
400     Empty,
401     /// This channel's sending half has become disconnected, and there will
402     /// never be any more data received on this channel
403     Disconnected,
404 }
405
406 /// This enumeration is the list of the possible error outcomes for the
407 /// `SyncSender::try_send` method.
408 #[deriving(PartialEq, Clone, Show)]
409 #[experimental = "this is likely to be removed in changing try_send()"]
410 pub enum TrySendError<T> {
411     /// The data could not be sent on the channel because it would require that
412     /// the callee block to send the data.
413     ///
414     /// If this is a buffered channel, then the buffer is full at this time. If
415     /// this is not a buffered channel, then there is no receiver available to
416     /// acquire the data.
417     Full(T),
418     /// This channel's receiving half has disconnected, so the data could not be
419     /// sent. The data is returned back to the callee in this case.
420     RecvDisconnected(T),
421 }
422
423 enum Flavor<T> {
424     Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
425     Stream(Arc<UnsafeCell<stream::Packet<T>>>),
426     Shared(Arc<UnsafeCell<shared::Packet<T>>>),
427     Sync(Arc<UnsafeCell<sync::Packet<T>>>),
428 }
429
430 #[doc(hidden)]
431 trait UnsafeFlavor<T> {
432     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>>;
433     unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> {
434         &mut *self.inner_unsafe().get()
435     }
436     unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
437         &*self.inner_unsafe().get()
438     }
439 }
440 impl<T> UnsafeFlavor<T> for Sender<T> {
441     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
442         &self.inner
443     }
444 }
445 impl<T> UnsafeFlavor<T> for Receiver<T> {
446     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
447         &self.inner
448     }
449 }
450
451 /// Creates a new asynchronous channel, returning the sender/receiver halves.
452 ///
453 /// All data sent on the sender will become available on the receiver, and no
454 /// send will block the calling task (this channel has an "infinite buffer").
455 ///
456 /// # Example
457 ///
458 /// ```
459 /// // tx is is the sending half (tx for transmission), and rx is the receiving
460 /// // half (rx for receiving).
461 /// let (tx, rx) = channel();
462 ///
463 /// // Spawn off an expensive computation
464 /// spawn(proc() {
465 /// #   fn expensive_computation() {}
466 ///     tx.send(expensive_computation());
467 /// });
468 ///
469 /// // Do some useful work for awhile
470 ///
471 /// // Let's see what that answer was
472 /// println!("{}", rx.recv());
473 /// ```
474 #[unstable]
475 pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
476     let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
477     (Sender::new(Oneshot(a.clone())), Receiver::new(Oneshot(a)))
478 }
479
480 /// Creates a new synchronous, bounded channel.
481 ///
482 /// Like asynchronous channels, the `Receiver` will block until a message
483 /// becomes available. These channels differ greatly in the semantics of the
484 /// sender from asynchronous channels, however.
485 ///
486 /// This channel has an internal buffer on which messages will be queued. When
487 /// the internal buffer becomes full, future sends will *block* waiting for the
488 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
489 /// becomes  "rendezvous channel" where each send will not return until a recv
490 /// is paired with it.
491 ///
492 /// As with asynchronous channels, all senders will panic in `send` if the
493 /// `Receiver` has been destroyed.
494 ///
495 /// # Example
496 ///
497 /// ```
498 /// let (tx, rx) = sync_channel(1);
499 ///
500 /// // this returns immediately
501 /// tx.send(1i);
502 ///
503 /// spawn(proc() {
504 ///     // this will block until the previous message has been received
505 ///     tx.send(2i);
506 /// });
507 ///
508 /// assert_eq!(rx.recv(), 1i);
509 /// assert_eq!(rx.recv(), 2i);
510 /// ```
511 #[unstable = "this function may be renamed to more accurately reflect the type \
512               of channel that is is creating"]
513 pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
514     let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
515     (SyncSender::new(a.clone()), Receiver::new(Sync(a)))
516 }
517
518 ////////////////////////////////////////////////////////////////////////////////
519 // Sender
520 ////////////////////////////////////////////////////////////////////////////////
521
522 impl<T: Send> Sender<T> {
523     fn new(inner: Flavor<T>) -> Sender<T> {
524         Sender {
525             inner: UnsafeCell::new(inner),
526             _marker: marker::NoSync,
527         }
528     }
529
530     /// Sends a value along this channel to be received by the corresponding
531     /// receiver.
532     ///
533     /// Rust channels are infinitely buffered so this method will never block.
534     ///
535     /// # Panics
536     ///
537     /// This function will panic if the other end of the channel has hung up.
538     /// This means that if the corresponding receiver has fallen out of scope,
539     /// this function will trigger a panic message saying that a message is
540     /// being sent on a closed channel.
541     ///
542     /// Note that if this function does *not* panic, it does not mean that the
543     /// data will be successfully received. All sends are placed into a queue,
544     /// so it is possible for a send to succeed (the other end is alive), but
545     /// then the other end could immediately disconnect.
546     ///
547     /// The purpose of this functionality is to propagate panicks among tasks.
548     /// If a panic is not desired, then consider using the `send_opt` method
549     #[experimental = "this function is being considered candidate for removal \
550                       to adhere to the general guidelines of rust"]
551     pub fn send(&self, t: T) {
552         if self.send_opt(t).is_err() {
553             panic!("sending on a closed channel");
554         }
555     }
556
557     /// Attempts to send a value on this channel, returning it back if it could
558     /// not be sent.
559     ///
560     /// A successful send occurs when it is determined that the other end of
561     /// the channel has not hung up already. An unsuccessful send would be one
562     /// where the corresponding receiver has already been deallocated. Note
563     /// that a return value of `Err` means that the data will never be
564     /// received, but a return value of `Ok` does *not* mean that the data
565     /// will be received.  It is possible for the corresponding receiver to
566     /// hang up immediately after this function returns `Ok`.
567     ///
568     /// Like `send`, this method will never block.
569     ///
570     /// # Panics
571     ///
572     /// This method will never panic, it will return the message back to the
573     /// caller if the other end is disconnected
574     ///
575     /// # Example
576     ///
577     /// ```
578     /// let (tx, rx) = channel();
579     ///
580     /// // This send is always successful
581     /// assert_eq!(tx.send_opt(1i), Ok(()));
582     ///
583     /// // This send will fail because the receiver is gone
584     /// drop(rx);
585     /// assert_eq!(tx.send_opt(1i), Err(1));
586     /// ```
587     #[unstable = "this function may be renamed to send() in the future"]
588     pub fn send_opt(&self, t: T) -> Result<(), T> {
589         let (new_inner, ret) = match *unsafe { self.inner() } {
590             Oneshot(ref p) => {
591                 unsafe {
592                     let p = p.get();
593                     if !(*p).sent() {
594                         return (*p).send(t);
595                     } else {
596                         let a = Arc::new(UnsafeCell::new(stream::Packet::new()));
597                         match (*p).upgrade(Receiver::new(Stream(a.clone()))) {
598                             oneshot::UpSuccess => {
599                                 let ret = (*a.get()).send(t);
600                                 (a, ret)
601                             }
602                             oneshot::UpDisconnected => (a, Err(t)),
603                             oneshot::UpWoke(task) => {
604                                 // This send cannot panic because the task is
605                                 // asleep (we're looking at it), so the receiver
606                                 // can't go away.
607                                 (*a.get()).send(t).ok().unwrap();
608                                 task.wake().map(|t| t.reawaken());
609                                 (a, Ok(()))
610                             }
611                         }
612                     }
613                 }
614             }
615             Stream(ref p) => return unsafe { (*p.get()).send(t) },
616             Shared(ref p) => return unsafe { (*p.get()).send(t) },
617             Sync(..) => unreachable!(),
618         };
619
620         unsafe {
621             let tmp = Sender::new(Stream(new_inner));
622             mem::swap(self.inner_mut(), tmp.inner_mut());
623         }
624         return ret;
625     }
626 }
627
628 #[unstable]
629 impl<T: Send> Clone for Sender<T> {
630     fn clone(&self) -> Sender<T> {
631         let (packet, sleeper) = match *unsafe { self.inner() } {
632             Oneshot(ref p) => {
633                 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
634                 unsafe {
635                     (*a.get()).postinit_lock();
636                     match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
637                         oneshot::UpSuccess | oneshot::UpDisconnected => (a, None),
638                         oneshot::UpWoke(task) => (a, Some(task))
639                     }
640                 }
641             }
642             Stream(ref p) => {
643                 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
644                 unsafe {
645                     (*a.get()).postinit_lock();
646                     match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
647                         stream::UpSuccess | stream::UpDisconnected => (a, None),
648                         stream::UpWoke(task) => (a, Some(task)),
649                     }
650                 }
651             }
652             Shared(ref p) => {
653                 unsafe { (*p.get()).clone_chan(); }
654                 return Sender::new(Shared(p.clone()));
655             }
656             Sync(..) => unreachable!(),
657         };
658
659         unsafe {
660             (*packet.get()).inherit_blocker(sleeper);
661
662             let tmp = Sender::new(Shared(packet.clone()));
663             mem::swap(self.inner_mut(), tmp.inner_mut());
664         }
665         Sender::new(Shared(packet))
666     }
667 }
668
669 #[unsafe_destructor]
670 impl<T: Send> Drop for Sender<T> {
671     fn drop(&mut self) {
672         match *unsafe { self.inner_mut() } {
673             Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
674             Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
675             Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
676             Sync(..) => unreachable!(),
677         }
678     }
679 }
680
681 ////////////////////////////////////////////////////////////////////////////////
682 // SyncSender
683 ////////////////////////////////////////////////////////////////////////////////
684
685 impl<T: Send> SyncSender<T> {
686     fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
687         SyncSender { inner: inner, _marker: marker::NoSync }
688     }
689
690     /// Sends a value on this synchronous channel.
691     ///
692     /// This function will *block* until space in the internal buffer becomes
693     /// available or a receiver is available to hand off the message to.
694     ///
695     /// Note that a successful send does *not* guarantee that the receiver will
696     /// ever see the data if there is a buffer on this channel. Messages may be
697     /// enqueued in the internal buffer for the receiver to receive at a later
698     /// time. If the buffer size is 0, however, it can be guaranteed that the
699     /// receiver has indeed received the data if this function returns success.
700     ///
701     /// # Panics
702     ///
703     /// Similarly to `Sender::send`, this function will panic if the
704     /// corresponding `Receiver` for this channel has disconnected. This
705     /// behavior is used to propagate panics among tasks.
706     ///
707     /// If a panic is not desired, you can achieve the same semantics with the
708     /// `SyncSender::send_opt` method which will not panic if the receiver
709     /// disconnects.
710     #[experimental = "this function is being considered candidate for removal \
711                       to adhere to the general guidelines of rust"]
712     pub fn send(&self, t: T) {
713         if self.send_opt(t).is_err() {
714             panic!("sending on a closed channel");
715         }
716     }
717
718     /// Send a value on a channel, returning it back if the receiver
719     /// disconnected
720     ///
721     /// This method will *block* to send the value `t` on the channel, but if
722     /// the value could not be sent due to the receiver disconnecting, the value
723     /// is returned back to the callee. This function is similar to `try_send`,
724     /// except that it will block if the channel is currently full.
725     ///
726     /// # Panics
727     ///
728     /// This function cannot panic.
729     #[unstable = "this function may be renamed to send() in the future"]
730     pub fn send_opt(&self, t: T) -> Result<(), T> {
731         unsafe { (*self.inner.get()).send(t) }
732     }
733
734     /// Attempts to send a value on this channel without blocking.
735     ///
736     /// This method differs from `send_opt` by returning immediately if the
737     /// channel's buffer is full or no receiver is waiting to acquire some
738     /// data. Compared with `send_opt`, this function has two failure cases
739     /// instead of one (one for disconnection, one for a full buffer).
740     ///
741     /// See `SyncSender::send` for notes about guarantees of whether the
742     /// receiver has received the data or not if this function is successful.
743     ///
744     /// # Panics
745     ///
746     /// This function cannot panic
747     #[unstable = "the return type of this function is candidate for \
748                   modification"]
749     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
750         unsafe { (*self.inner.get()).try_send(t) }
751     }
752 }
753
754 #[unstable]
755 impl<T: Send> Clone for SyncSender<T> {
756     fn clone(&self) -> SyncSender<T> {
757         unsafe { (*self.inner.get()).clone_chan(); }
758         return SyncSender::new(self.inner.clone());
759     }
760 }
761
762 #[unsafe_destructor]
763 impl<T: Send> Drop for SyncSender<T> {
764     fn drop(&mut self) {
765         unsafe { (*self.inner.get()).drop_chan(); }
766     }
767 }
768
769 ////////////////////////////////////////////////////////////////////////////////
770 // Receiver
771 ////////////////////////////////////////////////////////////////////////////////
772
773 impl<T: Send> Receiver<T> {
774     fn new(inner: Flavor<T>) -> Receiver<T> {
775         Receiver { inner: UnsafeCell::new(inner), _marker: marker::NoSync }
776     }
777
778     /// Blocks waiting for a value on this receiver
779     ///
780     /// This function will block if necessary to wait for a corresponding send
781     /// on the channel from its paired `Sender` structure. This receiver will
782     /// be woken up when data is ready, and the data will be returned.
783     ///
784     /// # Panics
785     ///
786     /// Similar to channels, this method will trigger a task panic if the
787     /// other end of the channel has hung up (been deallocated). The purpose of
788     /// this is to propagate panicks among tasks.
789     ///
790     /// If a panic is not desired, then there are two options:
791     ///
792     /// * If blocking is still desired, the `recv_opt` method will return `None`
793     ///   when the other end hangs up
794     ///
795     /// * If blocking is not desired, then the `try_recv` method will attempt to
796     ///   peek at a value on this receiver.
797     #[experimental = "this function is being considered candidate for removal \
798                       to adhere to the general guidelines of rust"]
799     pub fn recv(&self) -> T {
800         match self.recv_opt() {
801             Ok(t) => t,
802             Err(()) => panic!("receiving on a closed channel"),
803         }
804     }
805
806     /// Attempts to return a pending value on this receiver without blocking
807     ///
808     /// This method will never block the caller in order to wait for data to
809     /// become available. Instead, this will always return immediately with a
810     /// possible option of pending data on the channel.
811     ///
812     /// This is useful for a flavor of "optimistic check" before deciding to
813     /// block on a receiver.
814     ///
815     /// # Panics
816     ///
817     /// This function cannot panic.
818     #[unstable = "the return type of this function may be altered"]
819     pub fn try_recv(&self) -> Result<T, TryRecvError> {
820         loop {
821             let new_port = match *unsafe { self.inner() } {
822                 Oneshot(ref p) => {
823                     match unsafe { (*p.get()).try_recv() } {
824                         Ok(t) => return Ok(t),
825                         Err(oneshot::Empty) => return Err(Empty),
826                         Err(oneshot::Disconnected) => return Err(Disconnected),
827                         Err(oneshot::Upgraded(rx)) => rx,
828                     }
829                 }
830                 Stream(ref p) => {
831                     match unsafe { (*p.get()).try_recv() } {
832                         Ok(t) => return Ok(t),
833                         Err(stream::Empty) => return Err(Empty),
834                         Err(stream::Disconnected) => return Err(Disconnected),
835                         Err(stream::Upgraded(rx)) => rx,
836                     }
837                 }
838                 Shared(ref p) => {
839                     match unsafe { (*p.get()).try_recv() } {
840                         Ok(t) => return Ok(t),
841                         Err(shared::Empty) => return Err(Empty),
842                         Err(shared::Disconnected) => return Err(Disconnected),
843                     }
844                 }
845                 Sync(ref p) => {
846                     match unsafe { (*p.get()).try_recv() } {
847                         Ok(t) => return Ok(t),
848                         Err(sync::Empty) => return Err(Empty),
849                         Err(sync::Disconnected) => return Err(Disconnected),
850                     }
851                 }
852             };
853             unsafe {
854                 mem::swap(self.inner_mut(),
855                           new_port.inner_mut());
856             }
857         }
858     }
859
860     /// Attempt to wait for a value on this receiver, but does not panic if the
861     /// corresponding channel has hung up.
862     ///
863     /// This implementation of iterators for ports will always block if there is
864     /// not data available on the receiver, but it will not panic in the case
865     /// that the channel has been deallocated.
866     ///
867     /// In other words, this function has the same semantics as the `recv`
868     /// method except for the panic aspect.
869     ///
870     /// If the channel has hung up, then `Err` is returned. Otherwise `Ok` of
871     /// the value found on the receiver is returned.
872     #[unstable = "this function may be renamed to recv()"]
873     pub fn recv_opt(&self) -> Result<T, ()> {
874         loop {
875             let new_port = match *unsafe { self.inner() } {
876                 Oneshot(ref p) => {
877                     match unsafe { (*p.get()).recv() } {
878                         Ok(t) => return Ok(t),
879                         Err(oneshot::Empty) => return unreachable!(),
880                         Err(oneshot::Disconnected) => return Err(()),
881                         Err(oneshot::Upgraded(rx)) => rx,
882                     }
883                 }
884                 Stream(ref p) => {
885                     match unsafe { (*p.get()).recv() } {
886                         Ok(t) => return Ok(t),
887                         Err(stream::Empty) => return unreachable!(),
888                         Err(stream::Disconnected) => return Err(()),
889                         Err(stream::Upgraded(rx)) => rx,
890                     }
891                 }
892                 Shared(ref p) => {
893                     match unsafe { (*p.get()).recv() } {
894                         Ok(t) => return Ok(t),
895                         Err(shared::Empty) => return unreachable!(),
896                         Err(shared::Disconnected) => return Err(()),
897                     }
898                 }
899                 Sync(ref p) => return unsafe { (*p.get()).recv() }
900             };
901             unsafe {
902                 mem::swap(self.inner_mut(), new_port.inner_mut());
903             }
904         }
905     }
906
907     /// Returns an iterator which will block waiting for messages, but never
908     /// `panic!`. It will return `None` when the channel has hung up.
909     #[unstable]
910     pub fn iter<'a>(&'a self) -> Messages<'a, T> {
911         Messages { rx: self }
912     }
913 }
914
915 impl<T: Send> select::Packet for Receiver<T> {
916     fn can_recv(&self) -> bool {
917         loop {
918             let new_port = match *unsafe { self.inner() } {
919                 Oneshot(ref p) => {
920                     match unsafe { (*p.get()).can_recv() } {
921                         Ok(ret) => return ret,
922                         Err(upgrade) => upgrade,
923                     }
924                 }
925                 Stream(ref p) => {
926                     match unsafe { (*p.get()).can_recv() } {
927                         Ok(ret) => return ret,
928                         Err(upgrade) => upgrade,
929                     }
930                 }
931                 Shared(ref p) => {
932                     return unsafe { (*p.get()).can_recv() };
933                 }
934                 Sync(ref p) => {
935                     return unsafe { (*p.get()).can_recv() };
936                 }
937             };
938             unsafe {
939                 mem::swap(self.inner_mut(),
940                           new_port.inner_mut());
941             }
942         }
943     }
944
945     fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{
946         loop {
947             let (t, new_port) = match *unsafe { self.inner() } {
948                 Oneshot(ref p) => {
949                     match unsafe { (*p.get()).start_selection(task) } {
950                         oneshot::SelSuccess => return Ok(()),
951                         oneshot::SelCanceled(task) => return Err(task),
952                         oneshot::SelUpgraded(t, rx) => (t, rx),
953                     }
954                 }
955                 Stream(ref p) => {
956                     match unsafe { (*p.get()).start_selection(task) } {
957                         stream::SelSuccess => return Ok(()),
958                         stream::SelCanceled(task) => return Err(task),
959                         stream::SelUpgraded(t, rx) => (t, rx),
960                     }
961                 }
962                 Shared(ref p) => {
963                     return unsafe { (*p.get()).start_selection(task) };
964                 }
965                 Sync(ref p) => {
966                     return unsafe { (*p.get()).start_selection(task) };
967                 }
968             };
969             task = t;
970             unsafe {
971                 mem::swap(self.inner_mut(),
972                           new_port.inner_mut());
973             }
974         }
975     }
976
977     fn abort_selection(&self) -> bool {
978         let mut was_upgrade = false;
979         loop {
980             let result = match *unsafe { self.inner() } {
981                 Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
982                 Stream(ref p) => unsafe {
983                     (*p.get()).abort_selection(was_upgrade)
984                 },
985                 Shared(ref p) => return unsafe {
986                     (*p.get()).abort_selection(was_upgrade)
987                 },
988                 Sync(ref p) => return unsafe {
989                     (*p.get()).abort_selection()
990                 },
991             };
992             let new_port = match result { Ok(b) => return b, Err(p) => p };
993             was_upgrade = true;
994             unsafe {
995                 mem::swap(self.inner_mut(),
996                           new_port.inner_mut());
997             }
998         }
999     }
1000 }
1001
1002 #[unstable]
1003 impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
1004     fn next(&mut self) -> Option<T> { self.rx.recv_opt().ok() }
1005 }
1006
1007 #[unsafe_destructor]
1008 impl<T: Send> Drop for Receiver<T> {
1009     fn drop(&mut self) {
1010         match *unsafe { self.inner_mut() } {
1011             Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
1012             Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
1013             Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
1014             Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
1015         }
1016     }
1017 }
1018
1019 #[cfg(test)]
1020 mod test {
1021     use prelude::*;
1022
1023     use os;
1024     use super::*;
1025
1026     pub fn stress_factor() -> uint {
1027         match os::getenv("RUST_TEST_STRESS") {
1028             Some(val) => from_str::<uint>(val.as_slice()).unwrap(),
1029             None => 1,
1030         }
1031     }
1032
1033     test!(fn smoke() {
1034         let (tx, rx) = channel::<int>();
1035         tx.send(1);
1036         assert_eq!(rx.recv(), 1);
1037     })
1038
1039     test!(fn drop_full() {
1040         let (tx, _rx) = channel();
1041         tx.send(box 1i);
1042     })
1043
1044     test!(fn drop_full_shared() {
1045         let (tx, _rx) = channel();
1046         drop(tx.clone());
1047         drop(tx.clone());
1048         tx.send(box 1i);
1049     })
1050
1051     test!(fn smoke_shared() {
1052         let (tx, rx) = channel::<int>();
1053         tx.send(1);
1054         assert_eq!(rx.recv(), 1);
1055         let tx = tx.clone();
1056         tx.send(1);
1057         assert_eq!(rx.recv(), 1);
1058     })
1059
1060     test!(fn smoke_threads() {
1061         let (tx, rx) = channel::<int>();
1062         spawn(proc() {
1063             tx.send(1);
1064         });
1065         assert_eq!(rx.recv(), 1);
1066     })
1067
1068     test!(fn smoke_port_gone() {
1069         let (tx, rx) = channel::<int>();
1070         drop(rx);
1071         tx.send(1);
1072     } #[should_fail])
1073
1074     test!(fn smoke_shared_port_gone() {
1075         let (tx, rx) = channel::<int>();
1076         drop(rx);
1077         tx.send(1);
1078     } #[should_fail])
1079
1080     test!(fn smoke_shared_port_gone2() {
1081         let (tx, rx) = channel::<int>();
1082         drop(rx);
1083         let tx2 = tx.clone();
1084         drop(tx);
1085         tx2.send(1);
1086     } #[should_fail])
1087
1088     test!(fn port_gone_concurrent() {
1089         let (tx, rx) = channel::<int>();
1090         spawn(proc() {
1091             rx.recv();
1092         });
1093         loop { tx.send(1) }
1094     } #[should_fail])
1095
1096     test!(fn port_gone_concurrent_shared() {
1097         let (tx, rx) = channel::<int>();
1098         let tx2 = tx.clone();
1099         spawn(proc() {
1100             rx.recv();
1101         });
1102         loop {
1103             tx.send(1);
1104             tx2.send(1);
1105         }
1106     } #[should_fail])
1107
1108     test!(fn smoke_chan_gone() {
1109         let (tx, rx) = channel::<int>();
1110         drop(tx);
1111         rx.recv();
1112     } #[should_fail])
1113
1114     test!(fn smoke_chan_gone_shared() {
1115         let (tx, rx) = channel::<()>();
1116         let tx2 = tx.clone();
1117         drop(tx);
1118         drop(tx2);
1119         rx.recv();
1120     } #[should_fail])
1121
1122     test!(fn chan_gone_concurrent() {
1123         let (tx, rx) = channel::<int>();
1124         spawn(proc() {
1125             tx.send(1);
1126             tx.send(1);
1127         });
1128         loop { rx.recv(); }
1129     } #[should_fail])
1130
1131     test!(fn stress() {
1132         let (tx, rx) = channel::<int>();
1133         spawn(proc() {
1134             for _ in range(0u, 10000) { tx.send(1i); }
1135         });
1136         for _ in range(0u, 10000) {
1137             assert_eq!(rx.recv(), 1);
1138         }
1139     })
1140
1141     test!(fn stress_shared() {
1142         static AMT: uint = 10000;
1143         static NTHREADS: uint = 8;
1144         let (tx, rx) = channel::<int>();
1145         let (dtx, drx) = channel::<()>();
1146
1147         spawn(proc() {
1148             for _ in range(0, AMT * NTHREADS) {
1149                 assert_eq!(rx.recv(), 1);
1150             }
1151             match rx.try_recv() {
1152                 Ok(..) => panic!(),
1153                 _ => {}
1154             }
1155             dtx.send(());
1156         });
1157
1158         for _ in range(0, NTHREADS) {
1159             let tx = tx.clone();
1160             spawn(proc() {
1161                 for _ in range(0, AMT) { tx.send(1); }
1162             });
1163         }
1164         drop(tx);
1165         drx.recv();
1166     })
1167
1168     #[test]
1169     fn send_from_outside_runtime() {
1170         let (tx1, rx1) = channel::<()>();
1171         let (tx2, rx2) = channel::<int>();
1172         let (tx3, rx3) = channel::<()>();
1173         let tx4 = tx3.clone();
1174         spawn(proc() {
1175             tx1.send(());
1176             for _ in range(0i, 40) {
1177                 assert_eq!(rx2.recv(), 1);
1178             }
1179             tx3.send(());
1180         });
1181         rx1.recv();
1182         spawn(proc() {
1183             for _ in range(0i, 40) {
1184                 tx2.send(1);
1185             }
1186             tx4.send(());
1187         });
1188         rx3.recv();
1189         rx3.recv();
1190     }
1191
1192     #[test]
1193     fn recv_from_outside_runtime() {
1194         let (tx, rx) = channel::<int>();
1195         let (dtx, drx) = channel();
1196         spawn(proc() {
1197             for _ in range(0i, 40) {
1198                 assert_eq!(rx.recv(), 1);
1199             }
1200             dtx.send(());
1201         });
1202         for _ in range(0u, 40) {
1203             tx.send(1);
1204         }
1205         drx.recv();
1206     }
1207
1208     #[test]
1209     fn no_runtime() {
1210         let (tx1, rx1) = channel::<int>();
1211         let (tx2, rx2) = channel::<int>();
1212         let (tx3, rx3) = channel::<()>();
1213         let tx4 = tx3.clone();
1214         spawn(proc() {
1215             assert_eq!(rx1.recv(), 1);
1216             tx2.send(2);
1217             tx4.send(());
1218         });
1219         spawn(proc() {
1220             tx1.send(1);
1221             assert_eq!(rx2.recv(), 2);
1222             tx3.send(());
1223         });
1224         rx3.recv();
1225         rx3.recv();
1226     }
1227
1228     test!(fn oneshot_single_thread_close_port_first() {
1229         // Simple test of closing without sending
1230         let (_tx, rx) = channel::<int>();
1231         drop(rx);
1232     })
1233
1234     test!(fn oneshot_single_thread_close_chan_first() {
1235         // Simple test of closing without sending
1236         let (tx, _rx) = channel::<int>();
1237         drop(tx);
1238     })
1239
1240     test!(fn oneshot_single_thread_send_port_close() {
1241         // Testing that the sender cleans up the payload if receiver is closed
1242         let (tx, rx) = channel::<Box<int>>();
1243         drop(rx);
1244         tx.send(box 0);
1245     } #[should_fail])
1246
1247     test!(fn oneshot_single_thread_recv_chan_close() {
1248         // Receiving on a closed chan will panic
1249         let res = task::try(proc() {
1250             let (tx, rx) = channel::<int>();
1251             drop(tx);
1252             rx.recv();
1253         });
1254         // What is our res?
1255         assert!(res.is_err());
1256     })
1257
1258     test!(fn oneshot_single_thread_send_then_recv() {
1259         let (tx, rx) = channel::<Box<int>>();
1260         tx.send(box 10);
1261         assert!(rx.recv() == box 10);
1262     })
1263
1264     test!(fn oneshot_single_thread_try_send_open() {
1265         let (tx, rx) = channel::<int>();
1266         assert!(tx.send_opt(10).is_ok());
1267         assert!(rx.recv() == 10);
1268     })
1269
1270     test!(fn oneshot_single_thread_try_send_closed() {
1271         let (tx, rx) = channel::<int>();
1272         drop(rx);
1273         assert!(tx.send_opt(10).is_err());
1274     })
1275
1276     test!(fn oneshot_single_thread_try_recv_open() {
1277         let (tx, rx) = channel::<int>();
1278         tx.send(10);
1279         assert!(rx.recv_opt() == Ok(10));
1280     })
1281
1282     test!(fn oneshot_single_thread_try_recv_closed() {
1283         let (tx, rx) = channel::<int>();
1284         drop(tx);
1285         assert!(rx.recv_opt() == Err(()));
1286     })
1287
1288     test!(fn oneshot_single_thread_peek_data() {
1289         let (tx, rx) = channel::<int>();
1290         assert_eq!(rx.try_recv(), Err(Empty))
1291         tx.send(10);
1292         assert_eq!(rx.try_recv(), Ok(10));
1293     })
1294
1295     test!(fn oneshot_single_thread_peek_close() {
1296         let (tx, rx) = channel::<int>();
1297         drop(tx);
1298         assert_eq!(rx.try_recv(), Err(Disconnected));
1299         assert_eq!(rx.try_recv(), Err(Disconnected));
1300     })
1301
1302     test!(fn oneshot_single_thread_peek_open() {
1303         let (_tx, rx) = channel::<int>();
1304         assert_eq!(rx.try_recv(), Err(Empty));
1305     })
1306
1307     test!(fn oneshot_multi_task_recv_then_send() {
1308         let (tx, rx) = channel::<Box<int>>();
1309         spawn(proc() {
1310             assert!(rx.recv() == box 10);
1311         });
1312
1313         tx.send(box 10);
1314     })
1315
1316     test!(fn oneshot_multi_task_recv_then_close() {
1317         let (tx, rx) = channel::<Box<int>>();
1318         spawn(proc() {
1319             drop(tx);
1320         });
1321         let res = task::try(proc() {
1322             assert!(rx.recv() == box 10);
1323         });
1324         assert!(res.is_err());
1325     })
1326
1327     test!(fn oneshot_multi_thread_close_stress() {
1328         for _ in range(0, stress_factor()) {
1329             let (tx, rx) = channel::<int>();
1330             spawn(proc() {
1331                 drop(rx);
1332             });
1333             drop(tx);
1334         }
1335     })
1336
1337     test!(fn oneshot_multi_thread_send_close_stress() {
1338         for _ in range(0, stress_factor()) {
1339             let (tx, rx) = channel::<int>();
1340             spawn(proc() {
1341                 drop(rx);
1342             });
1343             let _ = task::try(proc() {
1344                 tx.send(1);
1345             });
1346         }
1347     })
1348
1349     test!(fn oneshot_multi_thread_recv_close_stress() {
1350         for _ in range(0, stress_factor()) {
1351             let (tx, rx) = channel::<int>();
1352             spawn(proc() {
1353                 let res = task::try(proc() {
1354                     rx.recv();
1355                 });
1356                 assert!(res.is_err());
1357             });
1358             spawn(proc() {
1359                 spawn(proc() {
1360                     drop(tx);
1361                 });
1362             });
1363         }
1364     })
1365
1366     test!(fn oneshot_multi_thread_send_recv_stress() {
1367         for _ in range(0, stress_factor()) {
1368             let (tx, rx) = channel();
1369             spawn(proc() {
1370                 tx.send(box 10i);
1371             });
1372             spawn(proc() {
1373                 assert!(rx.recv() == box 10i);
1374             });
1375         }
1376     })
1377
1378     test!(fn stream_send_recv_stress() {
1379         for _ in range(0, stress_factor()) {
1380             let (tx, rx) = channel();
1381
1382             send(tx, 0);
1383             recv(rx, 0);
1384
1385             fn send(tx: Sender<Box<int>>, i: int) {
1386                 if i == 10 { return }
1387
1388                 spawn(proc() {
1389                     tx.send(box i);
1390                     send(tx, i + 1);
1391                 });
1392             }
1393
1394             fn recv(rx: Receiver<Box<int>>, i: int) {
1395                 if i == 10 { return }
1396
1397                 spawn(proc() {
1398                     assert!(rx.recv() == box i);
1399                     recv(rx, i + 1);
1400                 });
1401             }
1402         }
1403     })
1404
1405     test!(fn recv_a_lot() {
1406         // Regression test that we don't run out of stack in scheduler context
1407         let (tx, rx) = channel();
1408         for _ in range(0i, 10000) { tx.send(()); }
1409         for _ in range(0i, 10000) { rx.recv(); }
1410     })
1411
1412     test!(fn shared_chan_stress() {
1413         let (tx, rx) = channel();
1414         let total = stress_factor() + 100;
1415         for _ in range(0, total) {
1416             let tx = tx.clone();
1417             spawn(proc() {
1418                 tx.send(());
1419             });
1420         }
1421
1422         for _ in range(0, total) {
1423             rx.recv();
1424         }
1425     })
1426
1427     test!(fn test_nested_recv_iter() {
1428         let (tx, rx) = channel::<int>();
1429         let (total_tx, total_rx) = channel::<int>();
1430
1431         spawn(proc() {
1432             let mut acc = 0;
1433             for x in rx.iter() {
1434                 acc += x;
1435             }
1436             total_tx.send(acc);
1437         });
1438
1439         tx.send(3);
1440         tx.send(1);
1441         tx.send(2);
1442         drop(tx);
1443         assert_eq!(total_rx.recv(), 6);
1444     })
1445
1446     test!(fn test_recv_iter_break() {
1447         let (tx, rx) = channel::<int>();
1448         let (count_tx, count_rx) = channel();
1449
1450         spawn(proc() {
1451             let mut count = 0;
1452             for x in rx.iter() {
1453                 if count >= 3 {
1454                     break;
1455                 } else {
1456                     count += x;
1457                 }
1458             }
1459             count_tx.send(count);
1460         });
1461
1462         tx.send(2);
1463         tx.send(2);
1464         tx.send(2);
1465         let _ = tx.send_opt(2);
1466         drop(tx);
1467         assert_eq!(count_rx.recv(), 4);
1468     })
1469
1470     test!(fn try_recv_states() {
1471         let (tx1, rx1) = channel::<int>();
1472         let (tx2, rx2) = channel::<()>();
1473         let (tx3, rx3) = channel::<()>();
1474         spawn(proc() {
1475             rx2.recv();
1476             tx1.send(1);
1477             tx3.send(());
1478             rx2.recv();
1479             drop(tx1);
1480             tx3.send(());
1481         });
1482
1483         assert_eq!(rx1.try_recv(), Err(Empty));
1484         tx2.send(());
1485         rx3.recv();
1486         assert_eq!(rx1.try_recv(), Ok(1));
1487         assert_eq!(rx1.try_recv(), Err(Empty));
1488         tx2.send(());
1489         rx3.recv();
1490         assert_eq!(rx1.try_recv(), Err(Disconnected));
1491     })
1492
1493     // This bug used to end up in a livelock inside of the Receiver destructor
1494     // because the internal state of the Shared packet was corrupted
1495     test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
1496         let (tx, rx) = channel();
1497         let (tx2, rx2) = channel();
1498         spawn(proc() {
1499             rx.recv(); // wait on a oneshot
1500             drop(rx);  // destroy a shared
1501             tx2.send(());
1502         });
1503         // make sure the other task has gone to sleep
1504         for _ in range(0u, 5000) { task::deschedule(); }
1505
1506         // upgrade to a shared chan and send a message
1507         let t = tx.clone();
1508         drop(tx);
1509         t.send(());
1510
1511         // wait for the child task to exit before we exit
1512         rx2.recv();
1513     })
1514
1515     test!(fn sends_off_the_runtime() {
1516         use rustrt::thread::Thread;
1517
1518         let (tx, rx) = channel();
1519         let t = Thread::start(proc() {
1520             for _ in range(0u, 1000) {
1521                 tx.send(());
1522             }
1523         });
1524         for _ in range(0u, 1000) {
1525             rx.recv();
1526         }
1527         t.join();
1528     })
1529
1530     test!(fn try_recvs_off_the_runtime() {
1531         use rustrt::thread::Thread;
1532
1533         let (tx, rx) = channel();
1534         let (cdone, pdone) = channel();
1535         let t = Thread::start(proc() {
1536             let mut hits = 0u;
1537             while hits < 10 {
1538                 match rx.try_recv() {
1539                     Ok(()) => { hits += 1; }
1540                     Err(Empty) => { Thread::yield_now(); }
1541                     Err(Disconnected) => return,
1542                 }
1543             }
1544             cdone.send(());
1545         });
1546         for _ in range(0u, 10) {
1547             tx.send(());
1548         }
1549         t.join();
1550         pdone.recv();
1551     })
1552 }
1553
1554 #[cfg(test)]
1555 mod sync_tests {
1556     use prelude::*;
1557     use os;
1558
1559     pub fn stress_factor() -> uint {
1560         match os::getenv("RUST_TEST_STRESS") {
1561             Some(val) => from_str::<uint>(val.as_slice()).unwrap(),
1562             None => 1,
1563         }
1564     }
1565
1566     test!(fn smoke() {
1567         let (tx, rx) = sync_channel::<int>(1);
1568         tx.send(1);
1569         assert_eq!(rx.recv(), 1);
1570     })
1571
1572     test!(fn drop_full() {
1573         let (tx, _rx) = sync_channel(1);
1574         tx.send(box 1i);
1575     })
1576
1577     test!(fn smoke_shared() {
1578         let (tx, rx) = sync_channel::<int>(1);
1579         tx.send(1);
1580         assert_eq!(rx.recv(), 1);
1581         let tx = tx.clone();
1582         tx.send(1);
1583         assert_eq!(rx.recv(), 1);
1584     })
1585
1586     test!(fn smoke_threads() {
1587         let (tx, rx) = sync_channel::<int>(0);
1588         spawn(proc() {
1589             tx.send(1);
1590         });
1591         assert_eq!(rx.recv(), 1);
1592     })
1593
1594     test!(fn smoke_port_gone() {
1595         let (tx, rx) = sync_channel::<int>(0);
1596         drop(rx);
1597         tx.send(1);
1598     } #[should_fail])
1599
1600     test!(fn smoke_shared_port_gone2() {
1601         let (tx, rx) = sync_channel::<int>(0);
1602         drop(rx);
1603         let tx2 = tx.clone();
1604         drop(tx);
1605         tx2.send(1);
1606     } #[should_fail])
1607
1608     test!(fn port_gone_concurrent() {
1609         let (tx, rx) = sync_channel::<int>(0);
1610         spawn(proc() {
1611             rx.recv();
1612         });
1613         loop { tx.send(1) }
1614     } #[should_fail])
1615
1616     test!(fn port_gone_concurrent_shared() {
1617         let (tx, rx) = sync_channel::<int>(0);
1618         let tx2 = tx.clone();
1619         spawn(proc() {
1620             rx.recv();
1621         });
1622         loop {
1623             tx.send(1);
1624             tx2.send(1);
1625         }
1626     } #[should_fail])
1627
1628     test!(fn smoke_chan_gone() {
1629         let (tx, rx) = sync_channel::<int>(0);
1630         drop(tx);
1631         rx.recv();
1632     } #[should_fail])
1633
1634     test!(fn smoke_chan_gone_shared() {
1635         let (tx, rx) = sync_channel::<()>(0);
1636         let tx2 = tx.clone();
1637         drop(tx);
1638         drop(tx2);
1639         rx.recv();
1640     } #[should_fail])
1641
1642     test!(fn chan_gone_concurrent() {
1643         let (tx, rx) = sync_channel::<int>(0);
1644         spawn(proc() {
1645             tx.send(1);
1646             tx.send(1);
1647         });
1648         loop { rx.recv(); }
1649     } #[should_fail])
1650
1651     test!(fn stress() {
1652         let (tx, rx) = sync_channel::<int>(0);
1653         spawn(proc() {
1654             for _ in range(0u, 10000) { tx.send(1); }
1655         });
1656         for _ in range(0u, 10000) {
1657             assert_eq!(rx.recv(), 1);
1658         }
1659     })
1660
1661     test!(fn stress_shared() {
1662         static AMT: uint = 1000;
1663         static NTHREADS: uint = 8;
1664         let (tx, rx) = sync_channel::<int>(0);
1665         let (dtx, drx) = sync_channel::<()>(0);
1666
1667         spawn(proc() {
1668             for _ in range(0, AMT * NTHREADS) {
1669                 assert_eq!(rx.recv(), 1);
1670             }
1671             match rx.try_recv() {
1672                 Ok(..) => panic!(),
1673                 _ => {}
1674             }
1675             dtx.send(());
1676         });
1677
1678         for _ in range(0, NTHREADS) {
1679             let tx = tx.clone();
1680             spawn(proc() {
1681                 for _ in range(0, AMT) { tx.send(1); }
1682             });
1683         }
1684         drop(tx);
1685         drx.recv();
1686     })
1687
1688     test!(fn oneshot_single_thread_close_port_first() {
1689         // Simple test of closing without sending
1690         let (_tx, rx) = sync_channel::<int>(0);
1691         drop(rx);
1692     })
1693
1694     test!(fn oneshot_single_thread_close_chan_first() {
1695         // Simple test of closing without sending
1696         let (tx, _rx) = sync_channel::<int>(0);
1697         drop(tx);
1698     })
1699
1700     test!(fn oneshot_single_thread_send_port_close() {
1701         // Testing that the sender cleans up the payload if receiver is closed
1702         let (tx, rx) = sync_channel::<Box<int>>(0);
1703         drop(rx);
1704         tx.send(box 0);
1705     } #[should_fail])
1706
1707     test!(fn oneshot_single_thread_recv_chan_close() {
1708         // Receiving on a closed chan will panic
1709         let res = task::try(proc() {
1710             let (tx, rx) = sync_channel::<int>(0);
1711             drop(tx);
1712             rx.recv();
1713         });
1714         // What is our res?
1715         assert!(res.is_err());
1716     })
1717
1718     test!(fn oneshot_single_thread_send_then_recv() {
1719         let (tx, rx) = sync_channel::<Box<int>>(1);
1720         tx.send(box 10);
1721         assert!(rx.recv() == box 10);
1722     })
1723
1724     test!(fn oneshot_single_thread_try_send_open() {
1725         let (tx, rx) = sync_channel::<int>(1);
1726         assert_eq!(tx.try_send(10), Ok(()));
1727         assert!(rx.recv() == 10);
1728     })
1729
1730     test!(fn oneshot_single_thread_try_send_closed() {
1731         let (tx, rx) = sync_channel::<int>(0);
1732         drop(rx);
1733         assert_eq!(tx.try_send(10), Err(RecvDisconnected(10)));
1734     })
1735
1736     test!(fn oneshot_single_thread_try_send_closed2() {
1737         let (tx, _rx) = sync_channel::<int>(0);
1738         assert_eq!(tx.try_send(10), Err(Full(10)));
1739     })
1740
1741     test!(fn oneshot_single_thread_try_recv_open() {
1742         let (tx, rx) = sync_channel::<int>(1);
1743         tx.send(10);
1744         assert!(rx.recv_opt() == Ok(10));
1745     })
1746
1747     test!(fn oneshot_single_thread_try_recv_closed() {
1748         let (tx, rx) = sync_channel::<int>(0);
1749         drop(tx);
1750         assert!(rx.recv_opt() == Err(()));
1751     })
1752
1753     test!(fn oneshot_single_thread_peek_data() {
1754         let (tx, rx) = sync_channel::<int>(1);
1755         assert_eq!(rx.try_recv(), Err(Empty))
1756         tx.send(10);
1757         assert_eq!(rx.try_recv(), Ok(10));
1758     })
1759
1760     test!(fn oneshot_single_thread_peek_close() {
1761         let (tx, rx) = sync_channel::<int>(0);
1762         drop(tx);
1763         assert_eq!(rx.try_recv(), Err(Disconnected));
1764         assert_eq!(rx.try_recv(), Err(Disconnected));
1765     })
1766
1767     test!(fn oneshot_single_thread_peek_open() {
1768         let (_tx, rx) = sync_channel::<int>(0);
1769         assert_eq!(rx.try_recv(), Err(Empty));
1770     })
1771
1772     test!(fn oneshot_multi_task_recv_then_send() {
1773         let (tx, rx) = sync_channel::<Box<int>>(0);
1774         spawn(proc() {
1775             assert!(rx.recv() == box 10);
1776         });
1777
1778         tx.send(box 10);
1779     })
1780
1781     test!(fn oneshot_multi_task_recv_then_close() {
1782         let (tx, rx) = sync_channel::<Box<int>>(0);
1783         spawn(proc() {
1784             drop(tx);
1785         });
1786         let res = task::try(proc() {
1787             assert!(rx.recv() == box 10);
1788         });
1789         assert!(res.is_err());
1790     })
1791
1792     test!(fn oneshot_multi_thread_close_stress() {
1793         for _ in range(0, stress_factor()) {
1794             let (tx, rx) = sync_channel::<int>(0);
1795             spawn(proc() {
1796                 drop(rx);
1797             });
1798             drop(tx);
1799         }
1800     })
1801
1802     test!(fn oneshot_multi_thread_send_close_stress() {
1803         for _ in range(0, stress_factor()) {
1804             let (tx, rx) = sync_channel::<int>(0);
1805             spawn(proc() {
1806                 drop(rx);
1807             });
1808             let _ = task::try(proc() {
1809                 tx.send(1);
1810             });
1811         }
1812     })
1813
1814     test!(fn oneshot_multi_thread_recv_close_stress() {
1815         for _ in range(0, stress_factor()) {
1816             let (tx, rx) = sync_channel::<int>(0);
1817             spawn(proc() {
1818                 let res = task::try(proc() {
1819                     rx.recv();
1820                 });
1821                 assert!(res.is_err());
1822             });
1823             spawn(proc() {
1824                 spawn(proc() {
1825                     drop(tx);
1826                 });
1827             });
1828         }
1829     })
1830
1831     test!(fn oneshot_multi_thread_send_recv_stress() {
1832         for _ in range(0, stress_factor()) {
1833             let (tx, rx) = sync_channel::<Box<int>>(0);
1834             spawn(proc() {
1835                 tx.send(box 10i);
1836             });
1837             spawn(proc() {
1838                 assert!(rx.recv() == box 10i);
1839             });
1840         }
1841     })
1842
1843     test!(fn stream_send_recv_stress() {
1844         for _ in range(0, stress_factor()) {
1845             let (tx, rx) = sync_channel::<Box<int>>(0);
1846
1847             send(tx, 0);
1848             recv(rx, 0);
1849
1850             fn send(tx: SyncSender<Box<int>>, i: int) {
1851                 if i == 10 { return }
1852
1853                 spawn(proc() {
1854                     tx.send(box i);
1855                     send(tx, i + 1);
1856                 });
1857             }
1858
1859             fn recv(rx: Receiver<Box<int>>, i: int) {
1860                 if i == 10 { return }
1861
1862                 spawn(proc() {
1863                     assert!(rx.recv() == box i);
1864                     recv(rx, i + 1);
1865                 });
1866             }
1867         }
1868     })
1869
1870     test!(fn recv_a_lot() {
1871         // Regression test that we don't run out of stack in scheduler context
1872         let (tx, rx) = sync_channel(10000);
1873         for _ in range(0u, 10000) { tx.send(()); }
1874         for _ in range(0u, 10000) { rx.recv(); }
1875     })
1876
1877     test!(fn shared_chan_stress() {
1878         let (tx, rx) = sync_channel(0);
1879         let total = stress_factor() + 100;
1880         for _ in range(0, total) {
1881             let tx = tx.clone();
1882             spawn(proc() {
1883                 tx.send(());
1884             });
1885         }
1886
1887         for _ in range(0, total) {
1888             rx.recv();
1889         }
1890     })
1891
1892     test!(fn test_nested_recv_iter() {
1893         let (tx, rx) = sync_channel::<int>(0);
1894         let (total_tx, total_rx) = sync_channel::<int>(0);
1895
1896         spawn(proc() {
1897             let mut acc = 0;
1898             for x in rx.iter() {
1899                 acc += x;
1900             }
1901             total_tx.send(acc);
1902         });
1903
1904         tx.send(3);
1905         tx.send(1);
1906         tx.send(2);
1907         drop(tx);
1908         assert_eq!(total_rx.recv(), 6);
1909     })
1910
1911     test!(fn test_recv_iter_break() {
1912         let (tx, rx) = sync_channel::<int>(0);
1913         let (count_tx, count_rx) = sync_channel(0);
1914
1915         spawn(proc() {
1916             let mut count = 0;
1917             for x in rx.iter() {
1918                 if count >= 3 {
1919                     break;
1920                 } else {
1921                     count += x;
1922                 }
1923             }
1924             count_tx.send(count);
1925         });
1926
1927         tx.send(2);
1928         tx.send(2);
1929         tx.send(2);
1930         let _ = tx.try_send(2);
1931         drop(tx);
1932         assert_eq!(count_rx.recv(), 4);
1933     })
1934
1935     test!(fn try_recv_states() {
1936         let (tx1, rx1) = sync_channel::<int>(1);
1937         let (tx2, rx2) = sync_channel::<()>(1);
1938         let (tx3, rx3) = sync_channel::<()>(1);
1939         spawn(proc() {
1940             rx2.recv();
1941             tx1.send(1);
1942             tx3.send(());
1943             rx2.recv();
1944             drop(tx1);
1945             tx3.send(());
1946         });
1947
1948         assert_eq!(rx1.try_recv(), Err(Empty));
1949         tx2.send(());
1950         rx3.recv();
1951         assert_eq!(rx1.try_recv(), Ok(1));
1952         assert_eq!(rx1.try_recv(), Err(Empty));
1953         tx2.send(());
1954         rx3.recv();
1955         assert_eq!(rx1.try_recv(), Err(Disconnected));
1956     })
1957
1958     // This bug used to end up in a livelock inside of the Receiver destructor
1959     // because the internal state of the Shared packet was corrupted
1960     test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
1961         let (tx, rx) = sync_channel::<()>(0);
1962         let (tx2, rx2) = sync_channel::<()>(0);
1963         spawn(proc() {
1964             rx.recv(); // wait on a oneshot
1965             drop(rx);  // destroy a shared
1966             tx2.send(());
1967         });
1968         // make sure the other task has gone to sleep
1969         for _ in range(0u, 5000) { task::deschedule(); }
1970
1971         // upgrade to a shared chan and send a message
1972         let t = tx.clone();
1973         drop(tx);
1974         t.send(());
1975
1976         // wait for the child task to exit before we exit
1977         rx2.recv();
1978     })
1979
1980     test!(fn try_recvs_off_the_runtime() {
1981         use rustrt::thread::Thread;
1982
1983         let (tx, rx) = sync_channel::<()>(0);
1984         let (cdone, pdone) = channel();
1985         let t = Thread::start(proc() {
1986             let mut hits = 0u;
1987             while hits < 10 {
1988                 match rx.try_recv() {
1989                     Ok(()) => { hits += 1; }
1990                     Err(Empty) => { Thread::yield_now(); }
1991                     Err(Disconnected) => return,
1992                 }
1993             }
1994             cdone.send(());
1995         });
1996         for _ in range(0u, 10) {
1997             tx.send(());
1998         }
1999         t.join();
2000         pdone.recv();
2001     })
2002
2003     test!(fn send_opt1() {
2004         let (tx, rx) = sync_channel::<int>(0);
2005         spawn(proc() { rx.recv(); });
2006         assert_eq!(tx.send_opt(1), Ok(()));
2007     })
2008
2009     test!(fn send_opt2() {
2010         let (tx, rx) = sync_channel::<int>(0);
2011         spawn(proc() { drop(rx); });
2012         assert_eq!(tx.send_opt(1), Err(1));
2013     })
2014
2015     test!(fn send_opt3() {
2016         let (tx, rx) = sync_channel::<int>(1);
2017         assert_eq!(tx.send_opt(1), Ok(()));
2018         spawn(proc() { drop(rx); });
2019         assert_eq!(tx.send_opt(1), Err(1));
2020     })
2021
2022     test!(fn send_opt4() {
2023         let (tx, rx) = sync_channel::<int>(0);
2024         let tx2 = tx.clone();
2025         let (done, donerx) = channel();
2026         let done2 = done.clone();
2027         spawn(proc() {
2028             assert_eq!(tx.send_opt(1), Err(1));
2029             done.send(());
2030         });
2031         spawn(proc() {
2032             assert_eq!(tx2.send_opt(2), Err(2));
2033             done2.send(());
2034         });
2035         drop(rx);
2036         donerx.recv();
2037         donerx.recv();
2038     })
2039
2040     test!(fn try_send1() {
2041         let (tx, _rx) = sync_channel::<int>(0);
2042         assert_eq!(tx.try_send(1), Err(Full(1)));
2043     })
2044
2045     test!(fn try_send2() {
2046         let (tx, _rx) = sync_channel::<int>(1);
2047         assert_eq!(tx.try_send(1), Ok(()));
2048         assert_eq!(tx.try_send(1), Err(Full(1)));
2049     })
2050
2051     test!(fn try_send3() {
2052         let (tx, rx) = sync_channel::<int>(1);
2053         assert_eq!(tx.try_send(1), Ok(()));
2054         drop(rx);
2055         assert_eq!(tx.try_send(1), Err(RecvDisconnected(1)));
2056     })
2057
2058     test!(fn try_send4() {
2059         let (tx, rx) = sync_channel::<int>(0);
2060         spawn(proc() {
2061             for _ in range(0u, 1000) { task::deschedule(); }
2062             assert_eq!(tx.try_send(1), Ok(()));
2063         });
2064         assert_eq!(rx.recv(), 1);
2065     } #[ignore(reason = "flaky on libnative")])
2066
2067     test!(fn issue_15761() {
2068         fn repro() {
2069             let (tx1, rx1) = sync_channel::<()>(3);
2070             let (tx2, rx2) = sync_channel::<()>(3);
2071
2072             spawn(proc() {
2073                 rx1.recv();
2074                 tx2.try_send(()).unwrap();
2075             });
2076
2077             tx1.try_send(()).unwrap();
2078             rx2.recv();
2079         }
2080
2081         for _ in range(0u, 100) {
2082             repro()
2083         }
2084     })
2085 }