]> git.lizzy.rs Git - rust.git/blob - src/libsync/comm/mod.rs
Apply stability attributes to std::comm
[rust.git] / src / libsync / comm / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Communication primitives for concurrent tasks
12 //!
13 //! Rust makes it very difficult to share data among tasks to prevent race
14 //! conditions and to improve parallelism, but there is often a need for
15 //! communication between concurrent tasks. The primitives defined in this
16 //! module are the building blocks for synchronization in rust.
17 //!
18 //! This module provides message-based communication over channels, concretely
19 //! defined among three types:
20 //!
21 //! * `Sender`
22 //! * `SyncSender`
23 //! * `Receiver`
24 //!
25 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
26 //! senders are clone-able such that many tasks can send simultaneously to one
27 //! receiver.  These channels are *task blocking*, not *thread blocking*. This
28 //! means that if one task is blocked on a channel, other tasks can continue to
29 //! make progress.
30 //!
31 //! Rust channels come in one of two flavors:
32 //!
33 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
34 //!    will return a `(Sender, Receiver)` tuple where all sends will be
35 //!    **asynchronous** (they never block). The channel conceptually has an
36 //!    infinite buffer.
37 //!
38 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
39 //!    a `(SyncSender, Receiver)` tuple where the storage for pending messages
40 //!    is a pre-allocated buffer of a fixed size. All sends will be
41 //!    **synchronous** by blocking until there is buffer space available. Note
42 //!    that a bound of 0 is allowed, causing the channel to become a
43 //!    "rendezvous" channel where each sender atomically hands off a message to
44 //!    a receiver.
45 //!
46 //! ## Failure Propagation
47 //!
48 //! In addition to being a core primitive for communicating in rust, channels
49 //! are the points at which failure is propagated among tasks.  Whenever the one
50 //! half of channel is closed, the other half will have its next operation
51 //! `fail!`. The purpose of this is to allow propagation of failure among tasks
52 //! that are linked to one another via channels.
53 //!
54 //! There are methods on both of senders and receivers to perform their
55 //! respective operations without failing, however.
56 //!
57 //! ## Runtime Requirements
58 //!
59 //! The channel types defined in this module generally have very few runtime
60 //! requirements in order to operate. The major requirement they have is for a
61 //! local rust `Task` to be available if any *blocking* operation is performed.
62 //!
63 //! If a local `Task` is not available (for example an FFI callback), then the
64 //! `send` operation is safe on a `Sender` (as well as a `send_opt`) as well as
65 //! the `try_send` method on a `SyncSender`, but no other operations are
66 //! guaranteed to be safe.
67 //!
68 //! Additionally, channels can interoperate between runtimes. If one task in a
69 //! program is running on libnative and another is running on libgreen, they can
70 //! still communicate with one another using channels.
71 //!
72 //! # Example
73 //!
74 //! Simple usage:
75 //!
76 //! ```
77 //! // Create a simple streaming channel
78 //! let (tx, rx) = channel();
79 //! spawn(proc() {
80 //!     tx.send(10i);
81 //! });
82 //! assert_eq!(rx.recv(), 10i);
83 //! ```
84 //!
85 //! Shared usage:
86 //!
87 //! ```
88 //! // Create a shared channel which can be sent along from many tasks
89 //! let (tx, rx) = channel();
90 //! for i in range(0i, 10i) {
91 //!     let tx = tx.clone();
92 //!     spawn(proc() {
93 //!         tx.send(i);
94 //!     })
95 //! }
96 //!
97 //! for _ in range(0i, 10i) {
98 //!     let j = rx.recv();
99 //!     assert!(0 <= j && j < 10);
100 //! }
101 //! ```
102 //!
103 //! Propagating failure:
104 //!
105 //! ```should_fail
106 //! // The call to recv() will fail!() because the channel has already hung
107 //! // up (or been deallocated)
108 //! let (tx, rx) = channel::<int>();
109 //! drop(tx);
110 //! rx.recv();
111 //! ```
112 //!
113 //! Synchronous channels:
114 //!
115 //! ```
116 //! let (tx, rx) = sync_channel::<int>(0);
117 //! spawn(proc() {
118 //!     // This will wait for the parent task to start receiving
119 //!     tx.send(53);
120 //! });
121 //! rx.recv();
122 //! ```
123 //!
124 //! Reading from a channel with a timeout requires to use a Timer together
125 //! with the channel. You can use the select! macro to select either and
126 //! handle the timeout case. This first example will break out of the loop
127 //! after 10 seconds no matter what:
128 //!
129 //! ```no_run
130 //! use std::io::timer::Timer;
131 //!
132 //! let (tx, rx) = channel::<int>();
133 //! let mut timer = Timer::new().unwrap();
134 //! let timeout = timer.oneshot(10000);
135 //!
136 //! loop {
137 //!     select! {
138 //!         val = rx.recv() => println!("Received {}", val),
139 //!         () = timeout.recv() => {
140 //!             println!("timed out, total time was more than 10 seconds")
141 //!             break;
142 //!         }
143 //!     }
144 //! }
145 //! ```
146 //!
147 //! This second example is more costly since it allocates a new timer every
148 //! time a message is received, but it allows you to timeout after the channel
149 //! has been inactive for 5 seconds:
150 //!
151 //! ```no_run
152 //! use std::io::timer::Timer;
153 //!
154 //! let (tx, rx) = channel::<int>();
155 //! let mut timer = Timer::new().unwrap();
156 //!
157 //! loop {
158 //!     let timeout = timer.oneshot(5000);
159 //!
160 //!     select! {
161 //!         val = rx.recv() => println!("Received {}", val),
162 //!         () = timeout.recv() => {
163 //!             println!("timed out, no message received in 5 seconds")
164 //!             break;
165 //!         }
166 //!     }
167 //! }
168 //! ```
169
170 // A description of how Rust's channel implementation works
171 //
172 // Channels are supposed to be the basic building block for all other
173 // concurrent primitives that are used in Rust. As a result, the channel type
174 // needs to be highly optimized, flexible, and broad enough for use everywhere.
175 //
176 // The choice of implementation of all channels is to be built on lock-free data
177 // structures. The channels themselves are then consequently also lock-free data
178 // structures. As always with lock-free code, this is a very "here be dragons"
179 // territory, especially because I'm unaware of any academic papers which have
180 // gone into great length about channels of these flavors.
181 //
182 // ## Flavors of channels
183 //
184 // From the perspective of a consumer of this library, there is only one flavor
185 // of channel. This channel can be used as a stream and cloned to allow multiple
186 // senders. Under the hood, however, there are actually three flavors of
187 // channels in play.
188 //
189 // * Oneshots - these channels are highly optimized for the one-send use case.
190 //              They contain as few atomics as possible and involve one and
191 //              exactly one allocation.
192 // * Streams - these channels are optimized for the non-shared use case. They
193 //             use a different concurrent queue which is more tailored for this
194 //             use case. The initial allocation of this flavor of channel is not
195 //             optimized.
196 // * Shared - this is the most general form of channel that this module offers,
197 //            a channel with multiple senders. This type is as optimized as it
198 //            can be, but the previous two types mentioned are much faster for
199 //            their use-cases.
200 //
201 // ## Concurrent queues
202 //
203 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
204 // recv() obviously blocks. This means that under the hood there must be some
205 // shared and concurrent queue holding all of the actual data.
206 //
207 // With two flavors of channels, two flavors of queues are also used. We have
208 // chosen to use queues from a well-known author which are abbreviated as SPSC
209 // and MPSC (single producer, single consumer and multiple producer, single
210 // consumer). SPSC queues are used for streams while MPSC queues are used for
211 // shared channels.
212 //
213 // ### SPSC optimizations
214 //
215 // The SPSC queue found online is essentially a linked list of nodes where one
216 // half of the nodes are the "queue of data" and the other half of nodes are a
217 // cache of unused nodes. The unused nodes are used such that an allocation is
218 // not required on every push() and a free doesn't need to happen on every
219 // pop().
220 //
221 // As found online, however, the cache of nodes is of an infinite size. This
222 // means that if a channel at one point in its life had 50k items in the queue,
223 // then the queue will always have the capacity for 50k items. I believed that
224 // this was an unnecessary limitation of the implementation, so I have altered
225 // the queue to optionally have a bound on the cache size.
226 //
227 // By default, streams will have an unbounded SPSC queue with a small-ish cache
228 // size. The hope is that the cache is still large enough to have very fast
229 // send() operations while not too large such that millions of channels can
230 // coexist at once.
231 //
232 // ### MPSC optimizations
233 //
234 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
235 // a linked list under the hood to earn its unboundedness, but I have not put
236 // forth much effort into having a cache of nodes similar to the SPSC queue.
237 //
238 // For now, I believe that this is "ok" because shared channels are not the most
239 // common type, but soon we may wish to revisit this queue choice and determine
240 // another candidate for backend storage of shared channels.
241 //
242 // ## Overview of the Implementation
243 //
244 // Now that there's a little background on the concurrent queues used, it's
245 // worth going into much more detail about the channels themselves. The basic
246 // pseudocode for a send/recv are:
247 //
248 //
249 //      send(t)                             recv()
250 //        queue.push(t)                       return if queue.pop()
251 //        if increment() == -1                deschedule {
252 //          wakeup()                            if decrement() > 0
253 //                                                cancel_deschedule()
254 //                                            }
255 //                                            queue.pop()
256 //
257 // As mentioned before, there are no locks in this implementation, only atomic
258 // instructions are used.
259 //
260 // ### The internal atomic counter
261 //
262 // Every channel has a shared counter with each half to keep track of the size
263 // of the queue. This counter is used to abort descheduling by the receiver and
264 // to know when to wake up on the sending side.
265 //
266 // As seen in the pseudocode, senders will increment this count and receivers
267 // will decrement the count. The theory behind this is that if a sender sees a
268 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
269 // then it doesn't need to block.
270 //
271 // The recv() method has a beginning call to pop(), and if successful, it needs
272 // to decrement the count. It is a crucial implementation detail that this
273 // decrement does *not* happen to the shared counter. If this were the case,
274 // then it would be possible for the counter to be very negative when there were
275 // no receivers waiting, in which case the senders would have to determine when
276 // it was actually appropriate to wake up a receiver.
277 //
278 // Instead, the "steal count" is kept track of separately (not atomically
279 // because it's only used by receivers), and then the decrement() call when
280 // descheduling will lump in all of the recent steals into one large decrement.
281 //
282 // The implication of this is that if a sender sees a -1 count, then there's
283 // guaranteed to be a waiter waiting!
284 //
285 // ## Native Implementation
286 //
287 // A major goal of these channels is to work seamlessly on and off the runtime.
288 // All of the previous race conditions have been worded in terms of
289 // scheduler-isms (which is obviously not available without the runtime).
290 //
291 // For now, native usage of channels (off the runtime) will fall back onto
292 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
293 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
294 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
295 // condition variable.
296 //
297 // ## Select
298 //
299 // Being able to support selection over channels has greatly influenced this
300 // design, and not only does selection need to work inside the runtime, but also
301 // outside the runtime.
302 //
303 // The implementation is fairly straightforward. The goal of select() is not to
304 // return some data, but only to return which channel can receive data without
305 // blocking. The implementation is essentially the entire blocking procedure
306 // followed by an increment as soon as its woken up. The cancellation procedure
307 // involves an increment and swapping out of to_wake to acquire ownership of the
308 // task to unblock.
309 //
310 // Sadly this current implementation requires multiple allocations, so I have
311 // seen the throughput of select() be much worse than it should be. I do not
312 // believe that there is anything fundamental which needs to change about these
313 // channels, however, in order to support a more efficient select().
314 //
315 // # Conclusion
316 //
317 // And now that you've seen all the races that I found and attempted to fix,
318 // here's the code for you to find some more!
319
320 use core::prelude::*;
321
322 use alloc::arc::Arc;
323 use alloc::owned::Box;
324 use core::cell::Cell;
325 use core::kinds::marker;
326 use core::mem;
327 use core::ty::Unsafe;
328 use rustrt::local::Local;
329 use rustrt::task::{Task, BlockedTask};
330
331 pub use comm::select::{Select, Handle};
332 pub use comm::duplex::{DuplexStream, duplex};
333
334 macro_rules! test (
335     { fn $name:ident() $b:block $(#[$a:meta])*} => (
336         mod $name {
337             #![allow(unused_imports)]
338
339             use std::prelude::*;
340
341             use native;
342             use comm::*;
343             use super::*;
344             use super::super::*;
345             use std::task;
346
347             fn f() $b
348
349             $(#[$a])* #[test] fn uv() { f() }
350             $(#[$a])* #[test] fn native() {
351                 use native;
352                 let (tx, rx) = channel();
353                 native::task::spawn(proc() { tx.send(f()) });
354                 rx.recv();
355             }
356         }
357     )
358 )
359
360 mod duplex;
361 mod oneshot;
362 mod select;
363 mod shared;
364 mod stream;
365 mod sync;
366
367 // Use a power of 2 to allow LLVM to optimize to something that's not a
368 // division, this is hit pretty regularly.
369 static RESCHED_FREQ: int = 256;
370
371 /// The receiving-half of Rust's channel type. This half can only be owned by
372 /// one task
373 #[unstable]
374 pub struct Receiver<T> {
375     inner: Unsafe<Flavor<T>>,
376     receives: Cell<uint>,
377     // can't share in an arc
378     marker: marker::NoShare,
379 }
380
381 /// An iterator over messages on a receiver, this iterator will block
382 /// whenever `next` is called, waiting for a new message, and `None` will be
383 /// returned when the corresponding channel has hung up.
384 #[unstable]
385 pub struct Messages<'a, T> {
386     rx: &'a Receiver<T>
387 }
388
389 /// The sending-half of Rust's asynchronous channel type. This half can only be
390 /// owned by one task, but it can be cloned to send to other tasks.
391 #[unstable]
392 pub struct Sender<T> {
393     inner: Unsafe<Flavor<T>>,
394     sends: Cell<uint>,
395     // can't share in an arc
396     marker: marker::NoShare,
397 }
398
399 /// The sending-half of Rust's synchronous channel type. This half can only be
400 /// owned by one task, but it can be cloned to send to other tasks.
401 #[unstable = "this type may be renamed, but it will always exist"]
402 pub struct SyncSender<T> {
403     inner: Arc<Unsafe<sync::Packet<T>>>,
404     // can't share in an arc
405     marker: marker::NoShare,
406 }
407
408 /// This enumeration is the list of the possible reasons that try_recv could not
409 /// return data when called.
410 #[deriving(PartialEq, Clone, Show)]
411 #[experimental = "this is likely to be removed in changing try_recv()"]
412 pub enum TryRecvError {
413     /// This channel is currently empty, but the sender(s) have not yet
414     /// disconnected, so data may yet become available.
415     Empty,
416     /// This channel's sending half has become disconnected, and there will
417     /// never be any more data received on this channel
418     Disconnected,
419 }
420
421 /// This enumeration is the list of the possible error outcomes for the
422 /// `SyncSender::try_send` method.
423 #[deriving(PartialEq, Clone, Show)]
424 #[experimental = "this is likely to be removed in changing try_send()"]
425 pub enum TrySendError<T> {
426     /// The data could not be sent on the channel because it would require that
427     /// the callee block to send the data.
428     ///
429     /// If this is a buffered channel, then the buffer is full at this time. If
430     /// this is not a buffered channel, then there is no receiver available to
431     /// acquire the data.
432     Full(T),
433     /// This channel's receiving half has disconnected, so the data could not be
434     /// sent. The data is returned back to the callee in this case.
435     RecvDisconnected(T),
436 }
437
438 enum Flavor<T> {
439     Oneshot(Arc<Unsafe<oneshot::Packet<T>>>),
440     Stream(Arc<Unsafe<stream::Packet<T>>>),
441     Shared(Arc<Unsafe<shared::Packet<T>>>),
442     Sync(Arc<Unsafe<sync::Packet<T>>>),
443 }
444
445 #[doc(hidden)]
446 trait UnsafeFlavor<T> {
447     fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>>;
448     unsafe fn mut_inner<'a>(&'a self) -> &'a mut Flavor<T> {
449         &mut *self.inner_unsafe().get()
450     }
451     unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
452         &*self.inner_unsafe().get()
453     }
454 }
455 impl<T> UnsafeFlavor<T> for Sender<T> {
456     fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>> {
457         &self.inner
458     }
459 }
460 impl<T> UnsafeFlavor<T> for Receiver<T> {
461     fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>> {
462         &self.inner
463     }
464 }
465
466 /// Creates a new asynchronous channel, returning the sender/receiver halves.
467 ///
468 /// All data sent on the sender will become available on the receiver, and no
469 /// send will block the calling task (this channel has an "infinite buffer").
470 ///
471 /// # Example
472 ///
473 /// ```
474 /// let (tx, rx) = channel();
475 ///
476 /// // Spawn off an expensive computation
477 /// spawn(proc() {
478 /// #   fn expensive_computation() {}
479 ///     tx.send(expensive_computation());
480 /// });
481 ///
482 /// // Do some useful work for awhile
483 ///
484 /// // Let's see what that answer was
485 /// println!("{}", rx.recv());
486 /// ```
487 #[unstable]
488 pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
489     let a = Arc::new(Unsafe::new(oneshot::Packet::new()));
490     (Sender::new(Oneshot(a.clone())), Receiver::new(Oneshot(a)))
491 }
492
493 /// Creates a new synchronous, bounded channel.
494 ///
495 /// Like asynchronous channels, the `Receiver` will block until a message
496 /// becomes available. These channels differ greatly in the semantics of the
497 /// sender from asynchronous channels, however.
498 ///
499 /// This channel has an internal buffer on which messages will be queued. When
500 /// the internal buffer becomes full, future sends will *block* waiting for the
501 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
502 /// becomes  "rendezvous channel" where each send will not return until a recv
503 /// is paired with it.
504 ///
505 /// As with asynchronous channels, all senders will fail in `send` if the
506 /// `Receiver` has been destroyed.
507 ///
508 /// # Example
509 ///
510 /// ```
511 /// let (tx, rx) = sync_channel(1);
512 ///
513 /// // this returns immediately
514 /// tx.send(1i);
515 ///
516 /// spawn(proc() {
517 ///     // this will block until the previous message has been received
518 ///     tx.send(2i);
519 /// });
520 ///
521 /// assert_eq!(rx.recv(), 1i);
522 /// assert_eq!(rx.recv(), 2i);
523 /// ```
524 #[unstable = "this function may be renamed to more accurately reflect the type \
525               of channel that is is creating"]
526 pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
527     let a = Arc::new(Unsafe::new(sync::Packet::new(bound)));
528     (SyncSender::new(a.clone()), Receiver::new(Sync(a)))
529 }
530
531 ////////////////////////////////////////////////////////////////////////////////
532 // Sender
533 ////////////////////////////////////////////////////////////////////////////////
534
535 impl<T: Send> Sender<T> {
536     fn new(inner: Flavor<T>) -> Sender<T> {
537         Sender { inner: Unsafe::new(inner), sends: Cell::new(0), marker: marker::NoShare }
538     }
539
540     /// Sends a value along this channel to be received by the corresponding
541     /// receiver.
542     ///
543     /// Rust channels are infinitely buffered so this method will never block.
544     ///
545     /// # Failure
546     ///
547     /// This function will fail if the other end of the channel has hung up.
548     /// This means that if the corresponding receiver has fallen out of scope,
549     /// this function will trigger a fail message saying that a message is
550     /// being sent on a closed channel.
551     ///
552     /// Note that if this function does *not* fail, it does not mean that the
553     /// data will be successfully received. All sends are placed into a queue,
554     /// so it is possible for a send to succeed (the other end is alive), but
555     /// then the other end could immediately disconnect.
556     ///
557     /// The purpose of this functionality is to propagate failure among tasks.
558     /// If failure is not desired, then consider using the `send_opt` method
559     #[experimental = "this function is being considered candidate for removal \
560                       to adhere to the general guidelines of rust"]
561     pub fn send(&self, t: T) {
562         if self.send_opt(t).is_err() {
563             fail!("sending on a closed channel");
564         }
565     }
566
567     /// Attempts to send a value on this channel, returning it back if it could
568     /// not be sent.
569     ///
570     /// A successful send occurs when it is determined that the other end of
571     /// the channel has not hung up already. An unsuccessful send would be one
572     /// where the corresponding receiver has already been deallocated. Note
573     /// that a return value of `Err` means that the data will never be
574     /// received, but a return value of `Ok` does *not* mean that the data
575     /// will be received.  It is possible for the corresponding receiver to
576     /// hang up immediately after this function returns `Ok`.
577     ///
578     /// Like `send`, this method will never block.
579     ///
580     /// # Failure
581     ///
582     /// This method will never fail, it will return the message back to the
583     /// caller if the other end is disconnected
584     ///
585     /// # Example
586     ///
587     /// ```
588     /// let (tx, rx) = channel();
589     ///
590     /// // This send is always successful
591     /// assert_eq!(tx.send_opt(1i), Ok(()));
592     ///
593     /// // This send will fail because the receiver is gone
594     /// drop(rx);
595     /// assert_eq!(tx.send_opt(1i), Err(1));
596     /// ```
597     #[unstable = "this function may be renamed to send() in the future"]
598     pub fn send_opt(&self, t: T) -> Result<(), T> {
599         // In order to prevent starvation of other tasks in situations where
600         // a task sends repeatedly without ever receiving, we occasionally
601         // yield instead of doing a send immediately.
602         //
603         // Don't unconditionally attempt to yield because the TLS overhead can
604         // be a bit much, and also use `try_take` instead of `take` because
605         // there's no reason that this send shouldn't be usable off the
606         // runtime.
607         let cnt = self.sends.get() + 1;
608         self.sends.set(cnt);
609         if cnt % (RESCHED_FREQ as uint) == 0 {
610             let task: Option<Box<Task>> = Local::try_take();
611             task.map(|t| t.maybe_yield());
612         }
613
614         let (new_inner, ret) = match *unsafe { self.inner() } {
615             Oneshot(ref p) => {
616                 unsafe {
617                     let p = p.get();
618                     if !(*p).sent() {
619                         return (*p).send(t);
620                     } else {
621                         let a = Arc::new(Unsafe::new(stream::Packet::new()));
622                         match (*p).upgrade(Receiver::new(Stream(a.clone()))) {
623                             oneshot::UpSuccess => {
624                                 let ret = (*a.get()).send(t);
625                                 (a, ret)
626                             }
627                             oneshot::UpDisconnected => (a, Err(t)),
628                             oneshot::UpWoke(task) => {
629                                 // This send cannot fail because the task is
630                                 // asleep (we're looking at it), so the receiver
631                                 // can't go away.
632                                 (*a.get()).send(t).ok().unwrap();
633                                 task.wake().map(|t| t.reawaken());
634                                 (a, Ok(()))
635                             }
636                         }
637                     }
638                 }
639             }
640             Stream(ref p) => return unsafe { (*p.get()).send(t) },
641             Shared(ref p) => return unsafe { (*p.get()).send(t) },
642             Sync(..) => unreachable!(),
643         };
644
645         unsafe {
646             let tmp = Sender::new(Stream(new_inner));
647             mem::swap(self.mut_inner(), tmp.mut_inner());
648         }
649         return ret;
650     }
651 }
652
653 #[unstable]
654 impl<T: Send> Clone for Sender<T> {
655     fn clone(&self) -> Sender<T> {
656         let (packet, sleeper) = match *unsafe { self.inner() } {
657             Oneshot(ref p) => {
658                 let a = Arc::new(Unsafe::new(shared::Packet::new()));
659                 unsafe {
660                     (*a.get()).postinit_lock();
661                     match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
662                         oneshot::UpSuccess | oneshot::UpDisconnected => (a, None),
663                         oneshot::UpWoke(task) => (a, Some(task))
664                     }
665                 }
666             }
667             Stream(ref p) => {
668                 let a = Arc::new(Unsafe::new(shared::Packet::new()));
669                 unsafe {
670                     (*a.get()).postinit_lock();
671                     match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
672                         stream::UpSuccess | stream::UpDisconnected => (a, None),
673                         stream::UpWoke(task) => (a, Some(task)),
674                     }
675                 }
676             }
677             Shared(ref p) => {
678                 unsafe { (*p.get()).clone_chan(); }
679                 return Sender::new(Shared(p.clone()));
680             }
681             Sync(..) => unreachable!(),
682         };
683
684         unsafe {
685             (*packet.get()).inherit_blocker(sleeper);
686
687             let tmp = Sender::new(Shared(packet.clone()));
688             mem::swap(self.mut_inner(), tmp.mut_inner());
689         }
690         Sender::new(Shared(packet))
691     }
692 }
693
694 #[unsafe_destructor]
695 impl<T: Send> Drop for Sender<T> {
696     fn drop(&mut self) {
697         match *unsafe { self.mut_inner() } {
698             Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
699             Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
700             Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
701             Sync(..) => unreachable!(),
702         }
703     }
704 }
705
706 ////////////////////////////////////////////////////////////////////////////////
707 // SyncSender
708 ////////////////////////////////////////////////////////////////////////////////
709
710 impl<T: Send> SyncSender<T> {
711     fn new(inner: Arc<Unsafe<sync::Packet<T>>>) -> SyncSender<T> {
712         SyncSender { inner: inner, marker: marker::NoShare }
713     }
714
715     /// Sends a value on this synchronous channel.
716     ///
717     /// This function will *block* until space in the internal buffer becomes
718     /// available or a receiver is available to hand off the message to.
719     ///
720     /// Note that a successful send does *not* guarantee that the receiver will
721     /// ever see the data if there is a buffer on this channel. Messages may be
722     /// enqueued in the internal buffer for the receiver to receive at a later
723     /// time. If the buffer size is 0, however, it can be guaranteed that the
724     /// receiver has indeed received the data if this function returns success.
725     ///
726     /// # Failure
727     ///
728     /// Similarly to `Sender::send`, this function will fail if the
729     /// corresponding `Receiver` for this channel has disconnected. This
730     /// behavior is used to propagate failure among tasks.
731     ///
732     /// If failure is not desired, you can achieve the same semantics with the
733     /// `SyncSender::send_opt` method which will not fail if the receiver
734     /// disconnects.
735     #[experimental = "this function is being considered candidate for removal \
736                       to adhere to the general guidelines of rust"]
737     pub fn send(&self, t: T) {
738         if self.send_opt(t).is_err() {
739             fail!("sending on a closed channel");
740         }
741     }
742
743     /// Send a value on a channel, returning it back if the receiver
744     /// disconnected
745     ///
746     /// This method will *block* to send the value `t` on the channel, but if
747     /// the value could not be sent due to the receiver disconnecting, the value
748     /// is returned back to the callee. This function is similar to `try_send`,
749     /// except that it will block if the channel is currently full.
750     ///
751     /// # Failure
752     ///
753     /// This function cannot fail.
754     #[unstable = "this function may be renamed to send() in the future"]
755     pub fn send_opt(&self, t: T) -> Result<(), T> {
756         unsafe { (*self.inner.get()).send(t) }
757     }
758
759     /// Attempts to send a value on this channel without blocking.
760     ///
761     /// This method differs from `send_opt` by returning immediately if the
762     /// channel's buffer is full or no receiver is waiting to acquire some
763     /// data. Compared with `send_opt`, this function has two failure cases
764     /// instead of one (one for disconnection, one for a full buffer).
765     ///
766     /// See `SyncSender::send` for notes about guarantees of whether the
767     /// receiver has received the data or not if this function is successful.
768     ///
769     /// # Failure
770     ///
771     /// This function cannot fail
772     #[unstable = "the return type of this function is candidate for \
773                   modification"]
774     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
775         unsafe { (*self.inner.get()).try_send(t) }
776     }
777 }
778
779 #[unstable]
780 impl<T: Send> Clone for SyncSender<T> {
781     fn clone(&self) -> SyncSender<T> {
782         unsafe { (*self.inner.get()).clone_chan(); }
783         return SyncSender::new(self.inner.clone());
784     }
785 }
786
787 #[unsafe_destructor]
788 impl<T: Send> Drop for SyncSender<T> {
789     fn drop(&mut self) {
790         unsafe { (*self.inner.get()).drop_chan(); }
791     }
792 }
793
794 ////////////////////////////////////////////////////////////////////////////////
795 // Receiver
796 ////////////////////////////////////////////////////////////////////////////////
797
798 impl<T: Send> Receiver<T> {
799     fn new(inner: Flavor<T>) -> Receiver<T> {
800         Receiver { inner: Unsafe::new(inner), receives: Cell::new(0), marker: marker::NoShare }
801     }
802
803     /// Blocks waiting for a value on this receiver
804     ///
805     /// This function will block if necessary to wait for a corresponding send
806     /// on the channel from its paired `Sender` structure. This receiver will
807     /// be woken up when data is ready, and the data will be returned.
808     ///
809     /// # Failure
810     ///
811     /// Similar to channels, this method will trigger a task failure if the
812     /// other end of the channel has hung up (been deallocated). The purpose of
813     /// this is to propagate failure among tasks.
814     ///
815     /// If failure is not desired, then there are two options:
816     ///
817     /// * If blocking is still desired, the `recv_opt` method will return `None`
818     ///   when the other end hangs up
819     ///
820     /// * If blocking is not desired, then the `try_recv` method will attempt to
821     ///   peek at a value on this receiver.
822     #[experimental = "this function is being considered candidate for removal \
823                       to adhere to the general guidelines of rust"]
824     pub fn recv(&self) -> T {
825         match self.recv_opt() {
826             Ok(t) => t,
827             Err(()) => fail!("receiving on a closed channel"),
828         }
829     }
830
831     /// Attempts to return a pending value on this receiver without blocking
832     ///
833     /// This method will never block the caller in order to wait for data to
834     /// become available. Instead, this will always return immediately with a
835     /// possible option of pending data on the channel.
836     ///
837     /// This is useful for a flavor of "optimistic check" before deciding to
838     /// block on a receiver.
839     ///
840     /// This function cannot fail.
841     #[unstable = "the return type of this function may be altered"]
842     pub fn try_recv(&self) -> Result<T, TryRecvError> {
843         // If a thread is spinning in try_recv, we should take the opportunity
844         // to reschedule things occasionally. See notes above in scheduling on
845         // sends for why this doesn't always hit TLS, and also for why this uses
846         // `try_take` instead of `take`.
847         let cnt = self.receives.get() + 1;
848         self.receives.set(cnt);
849         if cnt % (RESCHED_FREQ as uint) == 0 {
850             let task: Option<Box<Task>> = Local::try_take();
851             task.map(|t| t.maybe_yield());
852         }
853
854         loop {
855             let new_port = match *unsafe { self.inner() } {
856                 Oneshot(ref p) => {
857                     match unsafe { (*p.get()).try_recv() } {
858                         Ok(t) => return Ok(t),
859                         Err(oneshot::Empty) => return Err(Empty),
860                         Err(oneshot::Disconnected) => return Err(Disconnected),
861                         Err(oneshot::Upgraded(rx)) => rx,
862                     }
863                 }
864                 Stream(ref p) => {
865                     match unsafe { (*p.get()).try_recv() } {
866                         Ok(t) => return Ok(t),
867                         Err(stream::Empty) => return Err(Empty),
868                         Err(stream::Disconnected) => return Err(Disconnected),
869                         Err(stream::Upgraded(rx)) => rx,
870                     }
871                 }
872                 Shared(ref p) => {
873                     match unsafe { (*p.get()).try_recv() } {
874                         Ok(t) => return Ok(t),
875                         Err(shared::Empty) => return Err(Empty),
876                         Err(shared::Disconnected) => return Err(Disconnected),
877                     }
878                 }
879                 Sync(ref p) => {
880                     match unsafe { (*p.get()).try_recv() } {
881                         Ok(t) => return Ok(t),
882                         Err(sync::Empty) => return Err(Empty),
883                         Err(sync::Disconnected) => return Err(Disconnected),
884                     }
885                 }
886             };
887             unsafe {
888                 mem::swap(self.mut_inner(),
889                           new_port.mut_inner());
890             }
891         }
892     }
893
894     /// Attempt to wait for a value on this receiver, but does not fail if the
895     /// corresponding channel has hung up.
896     ///
897     /// This implementation of iterators for ports will always block if there is
898     /// not data available on the receiver, but it will not fail in the case
899     /// that the channel has been deallocated.
900     ///
901     /// In other words, this function has the same semantics as the `recv`
902     /// method except for the failure aspect.
903     ///
904     /// If the channel has hung up, then `Err` is returned. Otherwise `Ok` of
905     /// the value found on the receiver is returned.
906     #[unstable = "this function may be renamed to recv()"]
907     pub fn recv_opt(&self) -> Result<T, ()> {
908         loop {
909             let new_port = match *unsafe { self.inner() } {
910                 Oneshot(ref p) => {
911                     match unsafe { (*p.get()).recv() } {
912                         Ok(t) => return Ok(t),
913                         Err(oneshot::Empty) => return unreachable!(),
914                         Err(oneshot::Disconnected) => return Err(()),
915                         Err(oneshot::Upgraded(rx)) => rx,
916                     }
917                 }
918                 Stream(ref p) => {
919                     match unsafe { (*p.get()).recv() } {
920                         Ok(t) => return Ok(t),
921                         Err(stream::Empty) => return unreachable!(),
922                         Err(stream::Disconnected) => return Err(()),
923                         Err(stream::Upgraded(rx)) => rx,
924                     }
925                 }
926                 Shared(ref p) => {
927                     match unsafe { (*p.get()).recv() } {
928                         Ok(t) => return Ok(t),
929                         Err(shared::Empty) => return unreachable!(),
930                         Err(shared::Disconnected) => return Err(()),
931                     }
932                 }
933                 Sync(ref p) => return unsafe { (*p.get()).recv() }
934             };
935             unsafe {
936                 mem::swap(self.mut_inner(), new_port.mut_inner());
937             }
938         }
939     }
940
941     /// Returns an iterator which will block waiting for messages, but never
942     /// `fail!`. It will return `None` when the channel has hung up.
943     #[unstable]
944     pub fn iter<'a>(&'a self) -> Messages<'a, T> {
945         Messages { rx: self }
946     }
947 }
948
949 impl<T: Send> select::Packet for Receiver<T> {
950     fn can_recv(&self) -> bool {
951         loop {
952             let new_port = match *unsafe { self.inner() } {
953                 Oneshot(ref p) => {
954                     match unsafe { (*p.get()).can_recv() } {
955                         Ok(ret) => return ret,
956                         Err(upgrade) => upgrade,
957                     }
958                 }
959                 Stream(ref p) => {
960                     match unsafe { (*p.get()).can_recv() } {
961                         Ok(ret) => return ret,
962                         Err(upgrade) => upgrade,
963                     }
964                 }
965                 Shared(ref p) => {
966                     return unsafe { (*p.get()).can_recv() };
967                 }
968                 Sync(ref p) => {
969                     return unsafe { (*p.get()).can_recv() };
970                 }
971             };
972             unsafe {
973                 mem::swap(self.mut_inner(),
974                           new_port.mut_inner());
975             }
976         }
977     }
978
979     fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{
980         loop {
981             let (t, new_port) = match *unsafe { self.inner() } {
982                 Oneshot(ref p) => {
983                     match unsafe { (*p.get()).start_selection(task) } {
984                         oneshot::SelSuccess => return Ok(()),
985                         oneshot::SelCanceled(task) => return Err(task),
986                         oneshot::SelUpgraded(t, rx) => (t, rx),
987                     }
988                 }
989                 Stream(ref p) => {
990                     match unsafe { (*p.get()).start_selection(task) } {
991                         stream::SelSuccess => return Ok(()),
992                         stream::SelCanceled(task) => return Err(task),
993                         stream::SelUpgraded(t, rx) => (t, rx),
994                     }
995                 }
996                 Shared(ref p) => {
997                     return unsafe { (*p.get()).start_selection(task) };
998                 }
999                 Sync(ref p) => {
1000                     return unsafe { (*p.get()).start_selection(task) };
1001                 }
1002             };
1003             task = t;
1004             unsafe {
1005                 mem::swap(self.mut_inner(),
1006                           new_port.mut_inner());
1007             }
1008         }
1009     }
1010
1011     fn abort_selection(&self) -> bool {
1012         let mut was_upgrade = false;
1013         loop {
1014             let result = match *unsafe { self.inner() } {
1015                 Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
1016                 Stream(ref p) => unsafe {
1017                     (*p.get()).abort_selection(was_upgrade)
1018                 },
1019                 Shared(ref p) => return unsafe {
1020                     (*p.get()).abort_selection(was_upgrade)
1021                 },
1022                 Sync(ref p) => return unsafe {
1023                     (*p.get()).abort_selection()
1024                 },
1025             };
1026             let new_port = match result { Ok(b) => return b, Err(p) => p };
1027             was_upgrade = true;
1028             unsafe {
1029                 mem::swap(self.mut_inner(),
1030                           new_port.mut_inner());
1031             }
1032         }
1033     }
1034 }
1035
1036 #[unstable]
1037 impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
1038     fn next(&mut self) -> Option<T> { self.rx.recv_opt().ok() }
1039 }
1040
1041 #[unsafe_destructor]
1042 impl<T: Send> Drop for Receiver<T> {
1043     fn drop(&mut self) {
1044         match *unsafe { self.mut_inner() } {
1045             Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
1046             Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
1047             Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
1048             Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
1049         }
1050     }
1051 }
1052
1053 #[cfg(test)]
1054 mod test {
1055     use std::prelude::*;
1056
1057     use native;
1058     use std::os;
1059     use super::*;
1060
1061     pub fn stress_factor() -> uint {
1062         match os::getenv("RUST_TEST_STRESS") {
1063             Some(val) => from_str::<uint>(val.as_slice()).unwrap(),
1064             None => 1,
1065         }
1066     }
1067
1068     test!(fn smoke() {
1069         let (tx, rx) = channel::<int>();
1070         tx.send(1);
1071         assert_eq!(rx.recv(), 1);
1072     })
1073
1074     test!(fn drop_full() {
1075         let (tx, _rx) = channel();
1076         tx.send(box 1i);
1077     })
1078
1079     test!(fn drop_full_shared() {
1080         let (tx, _rx) = channel();
1081         drop(tx.clone());
1082         drop(tx.clone());
1083         tx.send(box 1i);
1084     })
1085
1086     test!(fn smoke_shared() {
1087         let (tx, rx) = channel::<int>();
1088         tx.send(1);
1089         assert_eq!(rx.recv(), 1);
1090         let tx = tx.clone();
1091         tx.send(1);
1092         assert_eq!(rx.recv(), 1);
1093     })
1094
1095     test!(fn smoke_threads() {
1096         let (tx, rx) = channel::<int>();
1097         spawn(proc() {
1098             tx.send(1);
1099         });
1100         assert_eq!(rx.recv(), 1);
1101     })
1102
1103     test!(fn smoke_port_gone() {
1104         let (tx, rx) = channel::<int>();
1105         drop(rx);
1106         tx.send(1);
1107     } #[should_fail])
1108
1109     test!(fn smoke_shared_port_gone() {
1110         let (tx, rx) = channel::<int>();
1111         drop(rx);
1112         tx.send(1);
1113     } #[should_fail])
1114
1115     test!(fn smoke_shared_port_gone2() {
1116         let (tx, rx) = channel::<int>();
1117         drop(rx);
1118         let tx2 = tx.clone();
1119         drop(tx);
1120         tx2.send(1);
1121     } #[should_fail])
1122
1123     test!(fn port_gone_concurrent() {
1124         let (tx, rx) = channel::<int>();
1125         spawn(proc() {
1126             rx.recv();
1127         });
1128         loop { tx.send(1) }
1129     } #[should_fail])
1130
1131     test!(fn port_gone_concurrent_shared() {
1132         let (tx, rx) = channel::<int>();
1133         let tx2 = tx.clone();
1134         spawn(proc() {
1135             rx.recv();
1136         });
1137         loop {
1138             tx.send(1);
1139             tx2.send(1);
1140         }
1141     } #[should_fail])
1142
1143     test!(fn smoke_chan_gone() {
1144         let (tx, rx) = channel::<int>();
1145         drop(tx);
1146         rx.recv();
1147     } #[should_fail])
1148
1149     test!(fn smoke_chan_gone_shared() {
1150         let (tx, rx) = channel::<()>();
1151         let tx2 = tx.clone();
1152         drop(tx);
1153         drop(tx2);
1154         rx.recv();
1155     } #[should_fail])
1156
1157     test!(fn chan_gone_concurrent() {
1158         let (tx, rx) = channel::<int>();
1159         spawn(proc() {
1160             tx.send(1);
1161             tx.send(1);
1162         });
1163         loop { rx.recv(); }
1164     } #[should_fail])
1165
1166     test!(fn stress() {
1167         let (tx, rx) = channel::<int>();
1168         spawn(proc() {
1169             for _ in range(0u, 10000) { tx.send(1i); }
1170         });
1171         for _ in range(0u, 10000) {
1172             assert_eq!(rx.recv(), 1);
1173         }
1174     })
1175
1176     test!(fn stress_shared() {
1177         static AMT: uint = 10000;
1178         static NTHREADS: uint = 8;
1179         let (tx, rx) = channel::<int>();
1180         let (dtx, drx) = channel::<()>();
1181
1182         spawn(proc() {
1183             for _ in range(0, AMT * NTHREADS) {
1184                 assert_eq!(rx.recv(), 1);
1185             }
1186             match rx.try_recv() {
1187                 Ok(..) => fail!(),
1188                 _ => {}
1189             }
1190             dtx.send(());
1191         });
1192
1193         for _ in range(0, NTHREADS) {
1194             let tx = tx.clone();
1195             spawn(proc() {
1196                 for _ in range(0, AMT) { tx.send(1); }
1197             });
1198         }
1199         drop(tx);
1200         drx.recv();
1201     })
1202
1203     #[test]
1204     fn send_from_outside_runtime() {
1205         let (tx1, rx1) = channel::<()>();
1206         let (tx2, rx2) = channel::<int>();
1207         let (tx3, rx3) = channel::<()>();
1208         let tx4 = tx3.clone();
1209         spawn(proc() {
1210             tx1.send(());
1211             for _ in range(0i, 40) {
1212                 assert_eq!(rx2.recv(), 1);
1213             }
1214             tx3.send(());
1215         });
1216         rx1.recv();
1217         native::task::spawn(proc() {
1218             for _ in range(0i, 40) {
1219                 tx2.send(1);
1220             }
1221             tx4.send(());
1222         });
1223         rx3.recv();
1224         rx3.recv();
1225     }
1226
1227     #[test]
1228     fn recv_from_outside_runtime() {
1229         let (tx, rx) = channel::<int>();
1230         let (dtx, drx) = channel();
1231         native::task::spawn(proc() {
1232             for _ in range(0i, 40) {
1233                 assert_eq!(rx.recv(), 1);
1234             }
1235             dtx.send(());
1236         });
1237         for _ in range(0u, 40) {
1238             tx.send(1);
1239         }
1240         drx.recv();
1241     }
1242
1243     #[test]
1244     fn no_runtime() {
1245         let (tx1, rx1) = channel::<int>();
1246         let (tx2, rx2) = channel::<int>();
1247         let (tx3, rx3) = channel::<()>();
1248         let tx4 = tx3.clone();
1249         native::task::spawn(proc() {
1250             assert_eq!(rx1.recv(), 1);
1251             tx2.send(2);
1252             tx4.send(());
1253         });
1254         native::task::spawn(proc() {
1255             tx1.send(1);
1256             assert_eq!(rx2.recv(), 2);
1257             tx3.send(());
1258         });
1259         rx3.recv();
1260         rx3.recv();
1261     }
1262
1263     test!(fn oneshot_single_thread_close_port_first() {
1264         // Simple test of closing without sending
1265         let (_tx, rx) = channel::<int>();
1266         drop(rx);
1267     })
1268
1269     test!(fn oneshot_single_thread_close_chan_first() {
1270         // Simple test of closing without sending
1271         let (tx, _rx) = channel::<int>();
1272         drop(tx);
1273     })
1274
1275     test!(fn oneshot_single_thread_send_port_close() {
1276         // Testing that the sender cleans up the payload if receiver is closed
1277         let (tx, rx) = channel::<Box<int>>();
1278         drop(rx);
1279         tx.send(box 0);
1280     } #[should_fail])
1281
1282     test!(fn oneshot_single_thread_recv_chan_close() {
1283         // Receiving on a closed chan will fail
1284         let res = task::try(proc() {
1285             let (tx, rx) = channel::<int>();
1286             drop(tx);
1287             rx.recv();
1288         });
1289         // What is our res?
1290         assert!(res.is_err());
1291     })
1292
1293     test!(fn oneshot_single_thread_send_then_recv() {
1294         let (tx, rx) = channel::<Box<int>>();
1295         tx.send(box 10);
1296         assert!(rx.recv() == box 10);
1297     })
1298
1299     test!(fn oneshot_single_thread_try_send_open() {
1300         let (tx, rx) = channel::<int>();
1301         assert!(tx.send_opt(10).is_ok());
1302         assert!(rx.recv() == 10);
1303     })
1304
1305     test!(fn oneshot_single_thread_try_send_closed() {
1306         let (tx, rx) = channel::<int>();
1307         drop(rx);
1308         assert!(tx.send_opt(10).is_err());
1309     })
1310
1311     test!(fn oneshot_single_thread_try_recv_open() {
1312         let (tx, rx) = channel::<int>();
1313         tx.send(10);
1314         assert!(rx.recv_opt() == Ok(10));
1315     })
1316
1317     test!(fn oneshot_single_thread_try_recv_closed() {
1318         let (tx, rx) = channel::<int>();
1319         drop(tx);
1320         assert!(rx.recv_opt() == Err(()));
1321     })
1322
1323     test!(fn oneshot_single_thread_peek_data() {
1324         let (tx, rx) = channel::<int>();
1325         assert_eq!(rx.try_recv(), Err(Empty))
1326         tx.send(10);
1327         assert_eq!(rx.try_recv(), Ok(10));
1328     })
1329
1330     test!(fn oneshot_single_thread_peek_close() {
1331         let (tx, rx) = channel::<int>();
1332         drop(tx);
1333         assert_eq!(rx.try_recv(), Err(Disconnected));
1334         assert_eq!(rx.try_recv(), Err(Disconnected));
1335     })
1336
1337     test!(fn oneshot_single_thread_peek_open() {
1338         let (_tx, rx) = channel::<int>();
1339         assert_eq!(rx.try_recv(), Err(Empty));
1340     })
1341
1342     test!(fn oneshot_multi_task_recv_then_send() {
1343         let (tx, rx) = channel::<Box<int>>();
1344         spawn(proc() {
1345             assert!(rx.recv() == box 10);
1346         });
1347
1348         tx.send(box 10);
1349     })
1350
1351     test!(fn oneshot_multi_task_recv_then_close() {
1352         let (tx, rx) = channel::<Box<int>>();
1353         spawn(proc() {
1354             drop(tx);
1355         });
1356         let res = task::try(proc() {
1357             assert!(rx.recv() == box 10);
1358         });
1359         assert!(res.is_err());
1360     })
1361
1362     test!(fn oneshot_multi_thread_close_stress() {
1363         for _ in range(0, stress_factor()) {
1364             let (tx, rx) = channel::<int>();
1365             spawn(proc() {
1366                 drop(rx);
1367             });
1368             drop(tx);
1369         }
1370     })
1371
1372     test!(fn oneshot_multi_thread_send_close_stress() {
1373         for _ in range(0, stress_factor()) {
1374             let (tx, rx) = channel::<int>();
1375             spawn(proc() {
1376                 drop(rx);
1377             });
1378             let _ = task::try(proc() {
1379                 tx.send(1);
1380             });
1381         }
1382     })
1383
1384     test!(fn oneshot_multi_thread_recv_close_stress() {
1385         for _ in range(0, stress_factor()) {
1386             let (tx, rx) = channel::<int>();
1387             spawn(proc() {
1388                 let res = task::try(proc() {
1389                     rx.recv();
1390                 });
1391                 assert!(res.is_err());
1392             });
1393             spawn(proc() {
1394                 spawn(proc() {
1395                     drop(tx);
1396                 });
1397             });
1398         }
1399     })
1400
1401     test!(fn oneshot_multi_thread_send_recv_stress() {
1402         for _ in range(0, stress_factor()) {
1403             let (tx, rx) = channel();
1404             spawn(proc() {
1405                 tx.send(box 10i);
1406             });
1407             spawn(proc() {
1408                 assert!(rx.recv() == box 10i);
1409             });
1410         }
1411     })
1412
1413     test!(fn stream_send_recv_stress() {
1414         for _ in range(0, stress_factor()) {
1415             let (tx, rx) = channel();
1416
1417             send(tx, 0);
1418             recv(rx, 0);
1419
1420             fn send(tx: Sender<Box<int>>, i: int) {
1421                 if i == 10 { return }
1422
1423                 spawn(proc() {
1424                     tx.send(box i);
1425                     send(tx, i + 1);
1426                 });
1427             }
1428
1429             fn recv(rx: Receiver<Box<int>>, i: int) {
1430                 if i == 10 { return }
1431
1432                 spawn(proc() {
1433                     assert!(rx.recv() == box i);
1434                     recv(rx, i + 1);
1435                 });
1436             }
1437         }
1438     })
1439
1440     test!(fn recv_a_lot() {
1441         // Regression test that we don't run out of stack in scheduler context
1442         let (tx, rx) = channel();
1443         for _ in range(0i, 10000) { tx.send(()); }
1444         for _ in range(0i, 10000) { rx.recv(); }
1445     })
1446
1447     test!(fn shared_chan_stress() {
1448         let (tx, rx) = channel();
1449         let total = stress_factor() + 100;
1450         for _ in range(0, total) {
1451             let tx = tx.clone();
1452             spawn(proc() {
1453                 tx.send(());
1454             });
1455         }
1456
1457         for _ in range(0, total) {
1458             rx.recv();
1459         }
1460     })
1461
1462     test!(fn test_nested_recv_iter() {
1463         let (tx, rx) = channel::<int>();
1464         let (total_tx, total_rx) = channel::<int>();
1465
1466         spawn(proc() {
1467             let mut acc = 0;
1468             for x in rx.iter() {
1469                 acc += x;
1470             }
1471             total_tx.send(acc);
1472         });
1473
1474         tx.send(3);
1475         tx.send(1);
1476         tx.send(2);
1477         drop(tx);
1478         assert_eq!(total_rx.recv(), 6);
1479     })
1480
1481     test!(fn test_recv_iter_break() {
1482         let (tx, rx) = channel::<int>();
1483         let (count_tx, count_rx) = channel();
1484
1485         spawn(proc() {
1486             let mut count = 0;
1487             for x in rx.iter() {
1488                 if count >= 3 {
1489                     break;
1490                 } else {
1491                     count += x;
1492                 }
1493             }
1494             count_tx.send(count);
1495         });
1496
1497         tx.send(2);
1498         tx.send(2);
1499         tx.send(2);
1500         let _ = tx.send_opt(2);
1501         drop(tx);
1502         assert_eq!(count_rx.recv(), 4);
1503     })
1504
1505     test!(fn try_recv_states() {
1506         let (tx1, rx1) = channel::<int>();
1507         let (tx2, rx2) = channel::<()>();
1508         let (tx3, rx3) = channel::<()>();
1509         spawn(proc() {
1510             rx2.recv();
1511             tx1.send(1);
1512             tx3.send(());
1513             rx2.recv();
1514             drop(tx1);
1515             tx3.send(());
1516         });
1517
1518         assert_eq!(rx1.try_recv(), Err(Empty));
1519         tx2.send(());
1520         rx3.recv();
1521         assert_eq!(rx1.try_recv(), Ok(1));
1522         assert_eq!(rx1.try_recv(), Err(Empty));
1523         tx2.send(());
1524         rx3.recv();
1525         assert_eq!(rx1.try_recv(), Err(Disconnected));
1526     })
1527
1528     // This bug used to end up in a livelock inside of the Receiver destructor
1529     // because the internal state of the Shared packet was corrupted
1530     test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
1531         let (tx, rx) = channel();
1532         let (tx2, rx2) = channel();
1533         spawn(proc() {
1534             rx.recv(); // wait on a oneshot
1535             drop(rx);  // destroy a shared
1536             tx2.send(());
1537         });
1538         // make sure the other task has gone to sleep
1539         for _ in range(0u, 5000) { task::deschedule(); }
1540
1541         // upgrade to a shared chan and send a message
1542         let t = tx.clone();
1543         drop(tx);
1544         t.send(());
1545
1546         // wait for the child task to exit before we exit
1547         rx2.recv();
1548     })
1549
1550     test!(fn sends_off_the_runtime() {
1551         use std::rt::thread::Thread;
1552
1553         let (tx, rx) = channel();
1554         let t = Thread::start(proc() {
1555             for _ in range(0u, 1000) {
1556                 tx.send(());
1557             }
1558         });
1559         for _ in range(0u, 1000) {
1560             rx.recv();
1561         }
1562         t.join();
1563     })
1564
1565     test!(fn try_recvs_off_the_runtime() {
1566         use std::rt::thread::Thread;
1567
1568         let (tx, rx) = channel();
1569         let (cdone, pdone) = channel();
1570         let t = Thread::start(proc() {
1571             let mut hits = 0u;
1572             while hits < 10 {
1573                 match rx.try_recv() {
1574                     Ok(()) => { hits += 1; }
1575                     Err(Empty) => { Thread::yield_now(); }
1576                     Err(Disconnected) => return,
1577                 }
1578             }
1579             cdone.send(());
1580         });
1581         for _ in range(0u, 10) {
1582             tx.send(());
1583         }
1584         t.join();
1585         pdone.recv();
1586     })
1587 }
1588
1589 #[cfg(test)]
1590 mod sync_tests {
1591     use std::prelude::*;
1592     use std::os;
1593
1594     pub fn stress_factor() -> uint {
1595         match os::getenv("RUST_TEST_STRESS") {
1596             Some(val) => from_str::<uint>(val.as_slice()).unwrap(),
1597             None => 1,
1598         }
1599     }
1600
1601     test!(fn smoke() {
1602         let (tx, rx) = sync_channel::<int>(1);
1603         tx.send(1);
1604         assert_eq!(rx.recv(), 1);
1605     })
1606
1607     test!(fn drop_full() {
1608         let (tx, _rx) = sync_channel(1);
1609         tx.send(box 1i);
1610     })
1611
1612     test!(fn smoke_shared() {
1613         let (tx, rx) = sync_channel::<int>(1);
1614         tx.send(1);
1615         assert_eq!(rx.recv(), 1);
1616         let tx = tx.clone();
1617         tx.send(1);
1618         assert_eq!(rx.recv(), 1);
1619     })
1620
1621     test!(fn smoke_threads() {
1622         let (tx, rx) = sync_channel::<int>(0);
1623         spawn(proc() {
1624             tx.send(1);
1625         });
1626         assert_eq!(rx.recv(), 1);
1627     })
1628
1629     test!(fn smoke_port_gone() {
1630         let (tx, rx) = sync_channel::<int>(0);
1631         drop(rx);
1632         tx.send(1);
1633     } #[should_fail])
1634
1635     test!(fn smoke_shared_port_gone2() {
1636         let (tx, rx) = sync_channel::<int>(0);
1637         drop(rx);
1638         let tx2 = tx.clone();
1639         drop(tx);
1640         tx2.send(1);
1641     } #[should_fail])
1642
1643     test!(fn port_gone_concurrent() {
1644         let (tx, rx) = sync_channel::<int>(0);
1645         spawn(proc() {
1646             rx.recv();
1647         });
1648         loop { tx.send(1) }
1649     } #[should_fail])
1650
1651     test!(fn port_gone_concurrent_shared() {
1652         let (tx, rx) = sync_channel::<int>(0);
1653         let tx2 = tx.clone();
1654         spawn(proc() {
1655             rx.recv();
1656         });
1657         loop {
1658             tx.send(1);
1659             tx2.send(1);
1660         }
1661     } #[should_fail])
1662
1663     test!(fn smoke_chan_gone() {
1664         let (tx, rx) = sync_channel::<int>(0);
1665         drop(tx);
1666         rx.recv();
1667     } #[should_fail])
1668
1669     test!(fn smoke_chan_gone_shared() {
1670         let (tx, rx) = sync_channel::<()>(0);
1671         let tx2 = tx.clone();
1672         drop(tx);
1673         drop(tx2);
1674         rx.recv();
1675     } #[should_fail])
1676
1677     test!(fn chan_gone_concurrent() {
1678         let (tx, rx) = sync_channel::<int>(0);
1679         spawn(proc() {
1680             tx.send(1);
1681             tx.send(1);
1682         });
1683         loop { rx.recv(); }
1684     } #[should_fail])
1685
1686     test!(fn stress() {
1687         let (tx, rx) = sync_channel::<int>(0);
1688         spawn(proc() {
1689             for _ in range(0u, 10000) { tx.send(1); }
1690         });
1691         for _ in range(0u, 10000) {
1692             assert_eq!(rx.recv(), 1);
1693         }
1694     })
1695
1696     test!(fn stress_shared() {
1697         static AMT: uint = 1000;
1698         static NTHREADS: uint = 8;
1699         let (tx, rx) = sync_channel::<int>(0);
1700         let (dtx, drx) = sync_channel::<()>(0);
1701
1702         spawn(proc() {
1703             for _ in range(0, AMT * NTHREADS) {
1704                 assert_eq!(rx.recv(), 1);
1705             }
1706             match rx.try_recv() {
1707                 Ok(..) => fail!(),
1708                 _ => {}
1709             }
1710             dtx.send(());
1711         });
1712
1713         for _ in range(0, NTHREADS) {
1714             let tx = tx.clone();
1715             spawn(proc() {
1716                 for _ in range(0, AMT) { tx.send(1); }
1717             });
1718         }
1719         drop(tx);
1720         drx.recv();
1721     })
1722
1723     test!(fn oneshot_single_thread_close_port_first() {
1724         // Simple test of closing without sending
1725         let (_tx, rx) = sync_channel::<int>(0);
1726         drop(rx);
1727     })
1728
1729     test!(fn oneshot_single_thread_close_chan_first() {
1730         // Simple test of closing without sending
1731         let (tx, _rx) = sync_channel::<int>(0);
1732         drop(tx);
1733     })
1734
1735     test!(fn oneshot_single_thread_send_port_close() {
1736         // Testing that the sender cleans up the payload if receiver is closed
1737         let (tx, rx) = sync_channel::<Box<int>>(0);
1738         drop(rx);
1739         tx.send(box 0);
1740     } #[should_fail])
1741
1742     test!(fn oneshot_single_thread_recv_chan_close() {
1743         // Receiving on a closed chan will fail
1744         let res = task::try(proc() {
1745             let (tx, rx) = sync_channel::<int>(0);
1746             drop(tx);
1747             rx.recv();
1748         });
1749         // What is our res?
1750         assert!(res.is_err());
1751     })
1752
1753     test!(fn oneshot_single_thread_send_then_recv() {
1754         let (tx, rx) = sync_channel::<Box<int>>(1);
1755         tx.send(box 10);
1756         assert!(rx.recv() == box 10);
1757     })
1758
1759     test!(fn oneshot_single_thread_try_send_open() {
1760         let (tx, rx) = sync_channel::<int>(1);
1761         assert_eq!(tx.try_send(10), Ok(()));
1762         assert!(rx.recv() == 10);
1763     })
1764
1765     test!(fn oneshot_single_thread_try_send_closed() {
1766         let (tx, rx) = sync_channel::<int>(0);
1767         drop(rx);
1768         assert_eq!(tx.try_send(10), Err(RecvDisconnected(10)));
1769     })
1770
1771     test!(fn oneshot_single_thread_try_send_closed2() {
1772         let (tx, _rx) = sync_channel::<int>(0);
1773         assert_eq!(tx.try_send(10), Err(Full(10)));
1774     })
1775
1776     test!(fn oneshot_single_thread_try_recv_open() {
1777         let (tx, rx) = sync_channel::<int>(1);
1778         tx.send(10);
1779         assert!(rx.recv_opt() == Ok(10));
1780     })
1781
1782     test!(fn oneshot_single_thread_try_recv_closed() {
1783         let (tx, rx) = sync_channel::<int>(0);
1784         drop(tx);
1785         assert!(rx.recv_opt() == Err(()));
1786     })
1787
1788     test!(fn oneshot_single_thread_peek_data() {
1789         let (tx, rx) = sync_channel::<int>(1);
1790         assert_eq!(rx.try_recv(), Err(Empty))
1791         tx.send(10);
1792         assert_eq!(rx.try_recv(), Ok(10));
1793     })
1794
1795     test!(fn oneshot_single_thread_peek_close() {
1796         let (tx, rx) = sync_channel::<int>(0);
1797         drop(tx);
1798         assert_eq!(rx.try_recv(), Err(Disconnected));
1799         assert_eq!(rx.try_recv(), Err(Disconnected));
1800     })
1801
1802     test!(fn oneshot_single_thread_peek_open() {
1803         let (_tx, rx) = sync_channel::<int>(0);
1804         assert_eq!(rx.try_recv(), Err(Empty));
1805     })
1806
1807     test!(fn oneshot_multi_task_recv_then_send() {
1808         let (tx, rx) = sync_channel::<Box<int>>(0);
1809         spawn(proc() {
1810             assert!(rx.recv() == box 10);
1811         });
1812
1813         tx.send(box 10);
1814     })
1815
1816     test!(fn oneshot_multi_task_recv_then_close() {
1817         let (tx, rx) = sync_channel::<Box<int>>(0);
1818         spawn(proc() {
1819             drop(tx);
1820         });
1821         let res = task::try(proc() {
1822             assert!(rx.recv() == box 10);
1823         });
1824         assert!(res.is_err());
1825     })
1826
1827     test!(fn oneshot_multi_thread_close_stress() {
1828         for _ in range(0, stress_factor()) {
1829             let (tx, rx) = sync_channel::<int>(0);
1830             spawn(proc() {
1831                 drop(rx);
1832             });
1833             drop(tx);
1834         }
1835     })
1836
1837     test!(fn oneshot_multi_thread_send_close_stress() {
1838         for _ in range(0, stress_factor()) {
1839             let (tx, rx) = sync_channel::<int>(0);
1840             spawn(proc() {
1841                 drop(rx);
1842             });
1843             let _ = task::try(proc() {
1844                 tx.send(1);
1845             });
1846         }
1847     })
1848
1849     test!(fn oneshot_multi_thread_recv_close_stress() {
1850         for _ in range(0, stress_factor()) {
1851             let (tx, rx) = sync_channel::<int>(0);
1852             spawn(proc() {
1853                 let res = task::try(proc() {
1854                     rx.recv();
1855                 });
1856                 assert!(res.is_err());
1857             });
1858             spawn(proc() {
1859                 spawn(proc() {
1860                     drop(tx);
1861                 });
1862             });
1863         }
1864     })
1865
1866     test!(fn oneshot_multi_thread_send_recv_stress() {
1867         for _ in range(0, stress_factor()) {
1868             let (tx, rx) = sync_channel::<Box<int>>(0);
1869             spawn(proc() {
1870                 tx.send(box 10i);
1871             });
1872             spawn(proc() {
1873                 assert!(rx.recv() == box 10i);
1874             });
1875         }
1876     })
1877
1878     test!(fn stream_send_recv_stress() {
1879         for _ in range(0, stress_factor()) {
1880             let (tx, rx) = sync_channel::<Box<int>>(0);
1881
1882             send(tx, 0);
1883             recv(rx, 0);
1884
1885             fn send(tx: SyncSender<Box<int>>, i: int) {
1886                 if i == 10 { return }
1887
1888                 spawn(proc() {
1889                     tx.send(box i);
1890                     send(tx, i + 1);
1891                 });
1892             }
1893
1894             fn recv(rx: Receiver<Box<int>>, i: int) {
1895                 if i == 10 { return }
1896
1897                 spawn(proc() {
1898                     assert!(rx.recv() == box i);
1899                     recv(rx, i + 1);
1900                 });
1901             }
1902         }
1903     })
1904
1905     test!(fn recv_a_lot() {
1906         // Regression test that we don't run out of stack in scheduler context
1907         let (tx, rx) = sync_channel(10000);
1908         for _ in range(0u, 10000) { tx.send(()); }
1909         for _ in range(0u, 10000) { rx.recv(); }
1910     })
1911
1912     test!(fn shared_chan_stress() {
1913         let (tx, rx) = sync_channel(0);
1914         let total = stress_factor() + 100;
1915         for _ in range(0, total) {
1916             let tx = tx.clone();
1917             spawn(proc() {
1918                 tx.send(());
1919             });
1920         }
1921
1922         for _ in range(0, total) {
1923             rx.recv();
1924         }
1925     })
1926
1927     test!(fn test_nested_recv_iter() {
1928         let (tx, rx) = sync_channel::<int>(0);
1929         let (total_tx, total_rx) = sync_channel::<int>(0);
1930
1931         spawn(proc() {
1932             let mut acc = 0;
1933             for x in rx.iter() {
1934                 acc += x;
1935             }
1936             total_tx.send(acc);
1937         });
1938
1939         tx.send(3);
1940         tx.send(1);
1941         tx.send(2);
1942         drop(tx);
1943         assert_eq!(total_rx.recv(), 6);
1944     })
1945
1946     test!(fn test_recv_iter_break() {
1947         let (tx, rx) = sync_channel::<int>(0);
1948         let (count_tx, count_rx) = sync_channel(0);
1949
1950         spawn(proc() {
1951             let mut count = 0;
1952             for x in rx.iter() {
1953                 if count >= 3 {
1954                     break;
1955                 } else {
1956                     count += x;
1957                 }
1958             }
1959             count_tx.send(count);
1960         });
1961
1962         tx.send(2);
1963         tx.send(2);
1964         tx.send(2);
1965         let _ = tx.try_send(2);
1966         drop(tx);
1967         assert_eq!(count_rx.recv(), 4);
1968     })
1969
1970     test!(fn try_recv_states() {
1971         let (tx1, rx1) = sync_channel::<int>(1);
1972         let (tx2, rx2) = sync_channel::<()>(1);
1973         let (tx3, rx3) = sync_channel::<()>(1);
1974         spawn(proc() {
1975             rx2.recv();
1976             tx1.send(1);
1977             tx3.send(());
1978             rx2.recv();
1979             drop(tx1);
1980             tx3.send(());
1981         });
1982
1983         assert_eq!(rx1.try_recv(), Err(Empty));
1984         tx2.send(());
1985         rx3.recv();
1986         assert_eq!(rx1.try_recv(), Ok(1));
1987         assert_eq!(rx1.try_recv(), Err(Empty));
1988         tx2.send(());
1989         rx3.recv();
1990         assert_eq!(rx1.try_recv(), Err(Disconnected));
1991     })
1992
1993     // This bug used to end up in a livelock inside of the Receiver destructor
1994     // because the internal state of the Shared packet was corrupted
1995     test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
1996         let (tx, rx) = sync_channel::<()>(0);
1997         let (tx2, rx2) = sync_channel::<()>(0);
1998         spawn(proc() {
1999             rx.recv(); // wait on a oneshot
2000             drop(rx);  // destroy a shared
2001             tx2.send(());
2002         });
2003         // make sure the other task has gone to sleep
2004         for _ in range(0u, 5000) { task::deschedule(); }
2005
2006         // upgrade to a shared chan and send a message
2007         let t = tx.clone();
2008         drop(tx);
2009         t.send(());
2010
2011         // wait for the child task to exit before we exit
2012         rx2.recv();
2013     })
2014
2015     test!(fn try_recvs_off_the_runtime() {
2016         use std::rt::thread::Thread;
2017
2018         let (tx, rx) = sync_channel::<()>(0);
2019         let (cdone, pdone) = channel();
2020         let t = Thread::start(proc() {
2021             let mut hits = 0u;
2022             while hits < 10 {
2023                 match rx.try_recv() {
2024                     Ok(()) => { hits += 1; }
2025                     Err(Empty) => { Thread::yield_now(); }
2026                     Err(Disconnected) => return,
2027                 }
2028             }
2029             cdone.send(());
2030         });
2031         for _ in range(0u, 10) {
2032             tx.send(());
2033         }
2034         t.join();
2035         pdone.recv();
2036     })
2037
2038     test!(fn send_opt1() {
2039         let (tx, rx) = sync_channel::<int>(0);
2040         spawn(proc() { rx.recv(); });
2041         assert_eq!(tx.send_opt(1), Ok(()));
2042     })
2043
2044     test!(fn send_opt2() {
2045         let (tx, rx) = sync_channel::<int>(0);
2046         spawn(proc() { drop(rx); });
2047         assert_eq!(tx.send_opt(1), Err(1));
2048     })
2049
2050     test!(fn send_opt3() {
2051         let (tx, rx) = sync_channel::<int>(1);
2052         assert_eq!(tx.send_opt(1), Ok(()));
2053         spawn(proc() { drop(rx); });
2054         assert_eq!(tx.send_opt(1), Err(1));
2055     })
2056
2057     test!(fn send_opt4() {
2058         let (tx, rx) = sync_channel::<int>(0);
2059         let tx2 = tx.clone();
2060         let (done, donerx) = channel();
2061         let done2 = done.clone();
2062         spawn(proc() {
2063             assert_eq!(tx.send_opt(1), Err(1));
2064             done.send(());
2065         });
2066         spawn(proc() {
2067             assert_eq!(tx2.send_opt(2), Err(2));
2068             done2.send(());
2069         });
2070         drop(rx);
2071         donerx.recv();
2072         donerx.recv();
2073     })
2074
2075     test!(fn try_send1() {
2076         let (tx, _rx) = sync_channel::<int>(0);
2077         assert_eq!(tx.try_send(1), Err(Full(1)));
2078     })
2079
2080     test!(fn try_send2() {
2081         let (tx, _rx) = sync_channel::<int>(1);
2082         assert_eq!(tx.try_send(1), Ok(()));
2083         assert_eq!(tx.try_send(1), Err(Full(1)));
2084     })
2085
2086     test!(fn try_send3() {
2087         let (tx, rx) = sync_channel::<int>(1);
2088         assert_eq!(tx.try_send(1), Ok(()));
2089         drop(rx);
2090         assert_eq!(tx.try_send(1), Err(RecvDisconnected(1)));
2091     })
2092
2093     test!(fn try_send4() {
2094         let (tx, rx) = sync_channel::<int>(0);
2095         spawn(proc() {
2096             for _ in range(0u, 1000) { task::deschedule(); }
2097             assert_eq!(tx.try_send(1), Ok(()));
2098         });
2099         assert_eq!(rx.recv(), 1);
2100     } #[ignore(reason = "flaky on libnative")])
2101 }