]> git.lizzy.rs Git - rust.git/blob - src/libsync/comm/mod.rs
auto merge of #16345 : EduardoBautista/rust/fix-error-message-in-guide, r=steveklabnik
[rust.git] / src / libsync / comm / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Communication primitives for concurrent tasks
12 //!
13 //! Rust makes it very difficult to share data among tasks to prevent race
14 //! conditions and to improve parallelism, but there is often a need for
15 //! communication between concurrent tasks. The primitives defined in this
16 //! module are the building blocks for synchronization in rust.
17 //!
18 //! This module provides message-based communication over channels, concretely
19 //! defined among three types:
20 //!
21 //! * `Sender`
22 //! * `SyncSender`
23 //! * `Receiver`
24 //!
25 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
26 //! senders are clone-able such that many tasks can send simultaneously to one
27 //! receiver.  These channels are *task blocking*, not *thread blocking*. This
28 //! means that if one task is blocked on a channel, other tasks can continue to
29 //! make progress.
30 //!
31 //! Rust channels come in one of two flavors:
32 //!
33 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
34 //!    will return a `(Sender, Receiver)` tuple where all sends will be
35 //!    **asynchronous** (they never block). The channel conceptually has an
36 //!    infinite buffer.
37 //!
38 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
39 //!    a `(SyncSender, Receiver)` tuple where the storage for pending messages
40 //!    is a pre-allocated buffer of a fixed size. All sends will be
41 //!    **synchronous** by blocking until there is buffer space available. Note
42 //!    that a bound of 0 is allowed, causing the channel to become a
43 //!    "rendezvous" channel where each sender atomically hands off a message to
44 //!    a receiver.
45 //!
46 //! ## Failure Propagation
47 //!
48 //! In addition to being a core primitive for communicating in rust, channels
49 //! are the points at which failure is propagated among tasks.  Whenever the one
50 //! half of channel is closed, the other half will have its next operation
51 //! `fail!`. The purpose of this is to allow propagation of failure among tasks
52 //! that are linked to one another via channels.
53 //!
54 //! There are methods on both of senders and receivers to perform their
55 //! respective operations without failing, however.
56 //!
57 //! ## Runtime Requirements
58 //!
59 //! The channel types defined in this module generally have very few runtime
60 //! requirements in order to operate. The major requirement they have is for a
61 //! local rust `Task` to be available if any *blocking* operation is performed.
62 //!
63 //! If a local `Task` is not available (for example an FFI callback), then the
64 //! `send` operation is safe on a `Sender` (as well as a `send_opt`) as well as
65 //! the `try_send` method on a `SyncSender`, but no other operations are
66 //! guaranteed to be safe.
67 //!
68 //! Additionally, channels can interoperate between runtimes. If one task in a
69 //! program is running on libnative and another is running on libgreen, they can
70 //! still communicate with one another using channels.
71 //!
72 //! # Example
73 //!
74 //! Simple usage:
75 //!
76 //! ```
77 //! // Create a simple streaming channel
78 //! let (tx, rx) = channel();
79 //! spawn(proc() {
80 //!     tx.send(10i);
81 //! });
82 //! assert_eq!(rx.recv(), 10i);
83 //! ```
84 //!
85 //! Shared usage:
86 //!
87 //! ```
88 //! // Create a shared channel which can be sent along from many tasks
89 //! let (tx, rx) = channel();
90 //! for i in range(0i, 10i) {
91 //!     let tx = tx.clone();
92 //!     spawn(proc() {
93 //!         tx.send(i);
94 //!     })
95 //! }
96 //!
97 //! for _ in range(0i, 10i) {
98 //!     let j = rx.recv();
99 //!     assert!(0 <= j && j < 10);
100 //! }
101 //! ```
102 //!
103 //! Propagating failure:
104 //!
105 //! ```should_fail
106 //! // The call to recv() will fail!() because the channel has already hung
107 //! // up (or been deallocated)
108 //! let (tx, rx) = channel::<int>();
109 //! drop(tx);
110 //! rx.recv();
111 //! ```
112 //!
113 //! Synchronous channels:
114 //!
115 //! ```
116 //! let (tx, rx) = sync_channel::<int>(0);
117 //! spawn(proc() {
118 //!     // This will wait for the parent task to start receiving
119 //!     tx.send(53);
120 //! });
121 //! rx.recv();
122 //! ```
123 //!
124 //! Reading from a channel with a timeout requires to use a Timer together
125 //! with the channel. You can use the select! macro to select either and
126 //! handle the timeout case. This first example will break out of the loop
127 //! after 10 seconds no matter what:
128 //!
129 //! ```no_run
130 //! use std::io::timer::Timer;
131 //! use std::time::Duration;
132 //!
133 //! let (tx, rx) = channel::<int>();
134 //! let mut timer = Timer::new().unwrap();
135 //! let timeout = timer.oneshot(Duration::seconds(10));
136 //!
137 //! loop {
138 //!     select! {
139 //!         val = rx.recv() => println!("Received {}", val),
140 //!         () = timeout.recv() => {
141 //!             println!("timed out, total time was more than 10 seconds")
142 //!             break;
143 //!         }
144 //!     }
145 //! }
146 //! ```
147 //!
148 //! This second example is more costly since it allocates a new timer every
149 //! time a message is received, but it allows you to timeout after the channel
150 //! has been inactive for 5 seconds:
151 //!
152 //! ```no_run
153 //! use std::io::timer::Timer;
154 //! use std::time::Duration;
155 //!
156 //! let (tx, rx) = channel::<int>();
157 //! let mut timer = Timer::new().unwrap();
158 //!
159 //! loop {
160 //!     let timeout = timer.oneshot(Duration::seconds(5));
161 //!
162 //!     select! {
163 //!         val = rx.recv() => println!("Received {}", val),
164 //!         () = timeout.recv() => {
165 //!             println!("timed out, no message received in 5 seconds")
166 //!             break;
167 //!         }
168 //!     }
169 //! }
170 //! ```
171
172 // A description of how Rust's channel implementation works
173 //
174 // Channels are supposed to be the basic building block for all other
175 // concurrent primitives that are used in Rust. As a result, the channel type
176 // needs to be highly optimized, flexible, and broad enough for use everywhere.
177 //
178 // The choice of implementation of all channels is to be built on lock-free data
179 // structures. The channels themselves are then consequently also lock-free data
180 // structures. As always with lock-free code, this is a very "here be dragons"
181 // territory, especially because I'm unaware of any academic papers which have
182 // gone into great length about channels of these flavors.
183 //
184 // ## Flavors of channels
185 //
186 // From the perspective of a consumer of this library, there is only one flavor
187 // of channel. This channel can be used as a stream and cloned to allow multiple
188 // senders. Under the hood, however, there are actually three flavors of
189 // channels in play.
190 //
191 // * Oneshots - these channels are highly optimized for the one-send use case.
192 //              They contain as few atomics as possible and involve one and
193 //              exactly one allocation.
194 // * Streams - these channels are optimized for the non-shared use case. They
195 //             use a different concurrent queue which is more tailored for this
196 //             use case. The initial allocation of this flavor of channel is not
197 //             optimized.
198 // * Shared - this is the most general form of channel that this module offers,
199 //            a channel with multiple senders. This type is as optimized as it
200 //            can be, but the previous two types mentioned are much faster for
201 //            their use-cases.
202 //
203 // ## Concurrent queues
204 //
205 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
206 // recv() obviously blocks. This means that under the hood there must be some
207 // shared and concurrent queue holding all of the actual data.
208 //
209 // With two flavors of channels, two flavors of queues are also used. We have
210 // chosen to use queues from a well-known author which are abbreviated as SPSC
211 // and MPSC (single producer, single consumer and multiple producer, single
212 // consumer). SPSC queues are used for streams while MPSC queues are used for
213 // shared channels.
214 //
215 // ### SPSC optimizations
216 //
217 // The SPSC queue found online is essentially a linked list of nodes where one
218 // half of the nodes are the "queue of data" and the other half of nodes are a
219 // cache of unused nodes. The unused nodes are used such that an allocation is
220 // not required on every push() and a free doesn't need to happen on every
221 // pop().
222 //
223 // As found online, however, the cache of nodes is of an infinite size. This
224 // means that if a channel at one point in its life had 50k items in the queue,
225 // then the queue will always have the capacity for 50k items. I believed that
226 // this was an unnecessary limitation of the implementation, so I have altered
227 // the queue to optionally have a bound on the cache size.
228 //
229 // By default, streams will have an unbounded SPSC queue with a small-ish cache
230 // size. The hope is that the cache is still large enough to have very fast
231 // send() operations while not too large such that millions of channels can
232 // coexist at once.
233 //
234 // ### MPSC optimizations
235 //
236 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
237 // a linked list under the hood to earn its unboundedness, but I have not put
238 // forth much effort into having a cache of nodes similar to the SPSC queue.
239 //
240 // For now, I believe that this is "ok" because shared channels are not the most
241 // common type, but soon we may wish to revisit this queue choice and determine
242 // another candidate for backend storage of shared channels.
243 //
244 // ## Overview of the Implementation
245 //
246 // Now that there's a little background on the concurrent queues used, it's
247 // worth going into much more detail about the channels themselves. The basic
248 // pseudocode for a send/recv are:
249 //
250 //
251 //      send(t)                             recv()
252 //        queue.push(t)                       return if queue.pop()
253 //        if increment() == -1                deschedule {
254 //          wakeup()                            if decrement() > 0
255 //                                                cancel_deschedule()
256 //                                            }
257 //                                            queue.pop()
258 //
259 // As mentioned before, there are no locks in this implementation, only atomic
260 // instructions are used.
261 //
262 // ### The internal atomic counter
263 //
264 // Every channel has a shared counter with each half to keep track of the size
265 // of the queue. This counter is used to abort descheduling by the receiver and
266 // to know when to wake up on the sending side.
267 //
268 // As seen in the pseudocode, senders will increment this count and receivers
269 // will decrement the count. The theory behind this is that if a sender sees a
270 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
271 // then it doesn't need to block.
272 //
273 // The recv() method has a beginning call to pop(), and if successful, it needs
274 // to decrement the count. It is a crucial implementation detail that this
275 // decrement does *not* happen to the shared counter. If this were the case,
276 // then it would be possible for the counter to be very negative when there were
277 // no receivers waiting, in which case the senders would have to determine when
278 // it was actually appropriate to wake up a receiver.
279 //
280 // Instead, the "steal count" is kept track of separately (not atomically
281 // because it's only used by receivers), and then the decrement() call when
282 // descheduling will lump in all of the recent steals into one large decrement.
283 //
284 // The implication of this is that if a sender sees a -1 count, then there's
285 // guaranteed to be a waiter waiting!
286 //
287 // ## Native Implementation
288 //
289 // A major goal of these channels is to work seamlessly on and off the runtime.
290 // All of the previous race conditions have been worded in terms of
291 // scheduler-isms (which is obviously not available without the runtime).
292 //
293 // For now, native usage of channels (off the runtime) will fall back onto
294 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
295 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
296 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
297 // condition variable.
298 //
299 // ## Select
300 //
301 // Being able to support selection over channels has greatly influenced this
302 // design, and not only does selection need to work inside the runtime, but also
303 // outside the runtime.
304 //
305 // The implementation is fairly straightforward. The goal of select() is not to
306 // return some data, but only to return which channel can receive data without
307 // blocking. The implementation is essentially the entire blocking procedure
308 // followed by an increment as soon as its woken up. The cancellation procedure
309 // involves an increment and swapping out of to_wake to acquire ownership of the
310 // task to unblock.
311 //
312 // Sadly this current implementation requires multiple allocations, so I have
313 // seen the throughput of select() be much worse than it should be. I do not
314 // believe that there is anything fundamental which needs to change about these
315 // channels, however, in order to support a more efficient select().
316 //
317 // # Conclusion
318 //
319 // And now that you've seen all the races that I found and attempted to fix,
320 // here's the code for you to find some more!
321
322 use core::prelude::*;
323
324 use alloc::arc::Arc;
325 use alloc::boxed::Box;
326 use core::cell::Cell;
327 use core::kinds::marker;
328 use core::mem;
329 use core::cell::UnsafeCell;
330 use rustrt::local::Local;
331 use rustrt::task::{Task, BlockedTask};
332
333 pub use comm::select::{Select, Handle};
334 pub use comm::duplex::{DuplexStream, duplex};
335
336 macro_rules! test (
337     { fn $name:ident() $b:block $(#[$a:meta])*} => (
338         mod $name {
339             #![allow(unused_imports)]
340
341             use std::prelude::*;
342
343             use native;
344             use comm::*;
345             use super::*;
346             use super::super::*;
347             use std::task;
348
349             fn f() $b
350
351             $(#[$a])* #[test] fn uv() { f() }
352             $(#[$a])* #[test] fn native() {
353                 use native;
354                 let (tx, rx) = channel();
355                 native::task::spawn(proc() { tx.send(f()) });
356                 rx.recv();
357             }
358         }
359     )
360 )
361
362 mod duplex;
363 mod oneshot;
364 mod select;
365 mod shared;
366 mod stream;
367 mod sync;
368
369 // Use a power of 2 to allow LLVM to optimize to something that's not a
370 // division, this is hit pretty regularly.
371 static RESCHED_FREQ: int = 256;
372
373 /// The receiving-half of Rust's channel type. This half can only be owned by
374 /// one task
375 #[unstable]
376 pub struct Receiver<T> {
377     inner: UnsafeCell<Flavor<T>>,
378     receives: Cell<uint>,
379     // can't share in an arc
380     marker: marker::NoSync,
381 }
382
383 /// An iterator over messages on a receiver, this iterator will block
384 /// whenever `next` is called, waiting for a new message, and `None` will be
385 /// returned when the corresponding channel has hung up.
386 #[unstable]
387 pub struct Messages<'a, T> {
388     rx: &'a Receiver<T>
389 }
390
391 /// The sending-half of Rust's asynchronous channel type. This half can only be
392 /// owned by one task, but it can be cloned to send to other tasks.
393 #[unstable]
394 pub struct Sender<T> {
395     inner: UnsafeCell<Flavor<T>>,
396     sends: Cell<uint>,
397     // can't share in an arc
398     marker: marker::NoSync,
399 }
400
401 /// The sending-half of Rust's synchronous channel type. This half can only be
402 /// owned by one task, but it can be cloned to send to other tasks.
403 #[unstable = "this type may be renamed, but it will always exist"]
404 pub struct SyncSender<T> {
405     inner: Arc<UnsafeCell<sync::Packet<T>>>,
406     // can't share in an arc
407     marker: marker::NoSync,
408 }
409
410 /// This enumeration is the list of the possible reasons that try_recv could not
411 /// return data when called.
412 #[deriving(PartialEq, Clone, Show)]
413 #[experimental = "this is likely to be removed in changing try_recv()"]
414 pub enum TryRecvError {
415     /// This channel is currently empty, but the sender(s) have not yet
416     /// disconnected, so data may yet become available.
417     Empty,
418     /// This channel's sending half has become disconnected, and there will
419     /// never be any more data received on this channel
420     Disconnected,
421 }
422
423 /// This enumeration is the list of the possible error outcomes for the
424 /// `SyncSender::try_send` method.
425 #[deriving(PartialEq, Clone, Show)]
426 #[experimental = "this is likely to be removed in changing try_send()"]
427 pub enum TrySendError<T> {
428     /// The data could not be sent on the channel because it would require that
429     /// the callee block to send the data.
430     ///
431     /// If this is a buffered channel, then the buffer is full at this time. If
432     /// this is not a buffered channel, then there is no receiver available to
433     /// acquire the data.
434     Full(T),
435     /// This channel's receiving half has disconnected, so the data could not be
436     /// sent. The data is returned back to the callee in this case.
437     RecvDisconnected(T),
438 }
439
440 enum Flavor<T> {
441     Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
442     Stream(Arc<UnsafeCell<stream::Packet<T>>>),
443     Shared(Arc<UnsafeCell<shared::Packet<T>>>),
444     Sync(Arc<UnsafeCell<sync::Packet<T>>>),
445 }
446
447 #[doc(hidden)]
448 trait UnsafeFlavor<T> {
449     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>>;
450     unsafe fn mut_inner<'a>(&'a self) -> &'a mut Flavor<T> {
451         &mut *self.inner_unsafe().get()
452     }
453     unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
454         &*self.inner_unsafe().get()
455     }
456 }
457 impl<T> UnsafeFlavor<T> for Sender<T> {
458     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
459         &self.inner
460     }
461 }
462 impl<T> UnsafeFlavor<T> for Receiver<T> {
463     fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
464         &self.inner
465     }
466 }
467
468 /// Creates a new asynchronous channel, returning the sender/receiver halves.
469 ///
470 /// All data sent on the sender will become available on the receiver, and no
471 /// send will block the calling task (this channel has an "infinite buffer").
472 ///
473 /// # Example
474 ///
475 /// ```
476 /// let (tx, rx) = channel();
477 ///
478 /// // Spawn off an expensive computation
479 /// spawn(proc() {
480 /// #   fn expensive_computation() {}
481 ///     tx.send(expensive_computation());
482 /// });
483 ///
484 /// // Do some useful work for awhile
485 ///
486 /// // Let's see what that answer was
487 /// println!("{}", rx.recv());
488 /// ```
489 #[unstable]
490 pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
491     let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
492     (Sender::new(Oneshot(a.clone())), Receiver::new(Oneshot(a)))
493 }
494
495 /// Creates a new synchronous, bounded channel.
496 ///
497 /// Like asynchronous channels, the `Receiver` will block until a message
498 /// becomes available. These channels differ greatly in the semantics of the
499 /// sender from asynchronous channels, however.
500 ///
501 /// This channel has an internal buffer on which messages will be queued. When
502 /// the internal buffer becomes full, future sends will *block* waiting for the
503 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
504 /// becomes  "rendezvous channel" where each send will not return until a recv
505 /// is paired with it.
506 ///
507 /// As with asynchronous channels, all senders will fail in `send` if the
508 /// `Receiver` has been destroyed.
509 ///
510 /// # Example
511 ///
512 /// ```
513 /// let (tx, rx) = sync_channel(1);
514 ///
515 /// // this returns immediately
516 /// tx.send(1i);
517 ///
518 /// spawn(proc() {
519 ///     // this will block until the previous message has been received
520 ///     tx.send(2i);
521 /// });
522 ///
523 /// assert_eq!(rx.recv(), 1i);
524 /// assert_eq!(rx.recv(), 2i);
525 /// ```
526 #[unstable = "this function may be renamed to more accurately reflect the type \
527               of channel that is is creating"]
528 pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
529     let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
530     (SyncSender::new(a.clone()), Receiver::new(Sync(a)))
531 }
532
533 ////////////////////////////////////////////////////////////////////////////////
534 // Sender
535 ////////////////////////////////////////////////////////////////////////////////
536
537 impl<T: Send> Sender<T> {
538     fn new(inner: Flavor<T>) -> Sender<T> {
539         Sender {
540             inner: UnsafeCell::new(inner),
541             sends: Cell::new(0),
542             marker: marker::NoSync,
543         }
544     }
545
546     /// Sends a value along this channel to be received by the corresponding
547     /// receiver.
548     ///
549     /// Rust channels are infinitely buffered so this method will never block.
550     ///
551     /// # Failure
552     ///
553     /// This function will fail if the other end of the channel has hung up.
554     /// This means that if the corresponding receiver has fallen out of scope,
555     /// this function will trigger a fail message saying that a message is
556     /// being sent on a closed channel.
557     ///
558     /// Note that if this function does *not* fail, it does not mean that the
559     /// data will be successfully received. All sends are placed into a queue,
560     /// so it is possible for a send to succeed (the other end is alive), but
561     /// then the other end could immediately disconnect.
562     ///
563     /// The purpose of this functionality is to propagate failure among tasks.
564     /// If failure is not desired, then consider using the `send_opt` method
565     #[experimental = "this function is being considered candidate for removal \
566                       to adhere to the general guidelines of rust"]
567     pub fn send(&self, t: T) {
568         if self.send_opt(t).is_err() {
569             fail!("sending on a closed channel");
570         }
571     }
572
573     /// Attempts to send a value on this channel, returning it back if it could
574     /// not be sent.
575     ///
576     /// A successful send occurs when it is determined that the other end of
577     /// the channel has not hung up already. An unsuccessful send would be one
578     /// where the corresponding receiver has already been deallocated. Note
579     /// that a return value of `Err` means that the data will never be
580     /// received, but a return value of `Ok` does *not* mean that the data
581     /// will be received.  It is possible for the corresponding receiver to
582     /// hang up immediately after this function returns `Ok`.
583     ///
584     /// Like `send`, this method will never block.
585     ///
586     /// # Failure
587     ///
588     /// This method will never fail, it will return the message back to the
589     /// caller if the other end is disconnected
590     ///
591     /// # Example
592     ///
593     /// ```
594     /// let (tx, rx) = channel();
595     ///
596     /// // This send is always successful
597     /// assert_eq!(tx.send_opt(1i), Ok(()));
598     ///
599     /// // This send will fail because the receiver is gone
600     /// drop(rx);
601     /// assert_eq!(tx.send_opt(1i), Err(1));
602     /// ```
603     #[unstable = "this function may be renamed to send() in the future"]
604     pub fn send_opt(&self, t: T) -> Result<(), T> {
605         // In order to prevent starvation of other tasks in situations where
606         // a task sends repeatedly without ever receiving, we occasionally
607         // yield instead of doing a send immediately.
608         //
609         // Don't unconditionally attempt to yield because the TLS overhead can
610         // be a bit much, and also use `try_take` instead of `take` because
611         // there's no reason that this send shouldn't be usable off the
612         // runtime.
613         let cnt = self.sends.get() + 1;
614         self.sends.set(cnt);
615         if cnt % (RESCHED_FREQ as uint) == 0 {
616             let task: Option<Box<Task>> = Local::try_take();
617             task.map(|t| t.maybe_yield());
618         }
619
620         let (new_inner, ret) = match *unsafe { self.inner() } {
621             Oneshot(ref p) => {
622                 unsafe {
623                     let p = p.get();
624                     if !(*p).sent() {
625                         return (*p).send(t);
626                     } else {
627                         let a = Arc::new(UnsafeCell::new(stream::Packet::new()));
628                         match (*p).upgrade(Receiver::new(Stream(a.clone()))) {
629                             oneshot::UpSuccess => {
630                                 let ret = (*a.get()).send(t);
631                                 (a, ret)
632                             }
633                             oneshot::UpDisconnected => (a, Err(t)),
634                             oneshot::UpWoke(task) => {
635                                 // This send cannot fail because the task is
636                                 // asleep (we're looking at it), so the receiver
637                                 // can't go away.
638                                 (*a.get()).send(t).ok().unwrap();
639                                 task.wake().map(|t| t.reawaken());
640                                 (a, Ok(()))
641                             }
642                         }
643                     }
644                 }
645             }
646             Stream(ref p) => return unsafe { (*p.get()).send(t) },
647             Shared(ref p) => return unsafe { (*p.get()).send(t) },
648             Sync(..) => unreachable!(),
649         };
650
651         unsafe {
652             let tmp = Sender::new(Stream(new_inner));
653             mem::swap(self.mut_inner(), tmp.mut_inner());
654         }
655         return ret;
656     }
657 }
658
659 #[unstable]
660 impl<T: Send> Clone for Sender<T> {
661     fn clone(&self) -> Sender<T> {
662         let (packet, sleeper) = match *unsafe { self.inner() } {
663             Oneshot(ref p) => {
664                 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
665                 unsafe {
666                     (*a.get()).postinit_lock();
667                     match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
668                         oneshot::UpSuccess | oneshot::UpDisconnected => (a, None),
669                         oneshot::UpWoke(task) => (a, Some(task))
670                     }
671                 }
672             }
673             Stream(ref p) => {
674                 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
675                 unsafe {
676                     (*a.get()).postinit_lock();
677                     match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
678                         stream::UpSuccess | stream::UpDisconnected => (a, None),
679                         stream::UpWoke(task) => (a, Some(task)),
680                     }
681                 }
682             }
683             Shared(ref p) => {
684                 unsafe { (*p.get()).clone_chan(); }
685                 return Sender::new(Shared(p.clone()));
686             }
687             Sync(..) => unreachable!(),
688         };
689
690         unsafe {
691             (*packet.get()).inherit_blocker(sleeper);
692
693             let tmp = Sender::new(Shared(packet.clone()));
694             mem::swap(self.mut_inner(), tmp.mut_inner());
695         }
696         Sender::new(Shared(packet))
697     }
698 }
699
700 #[unsafe_destructor]
701 impl<T: Send> Drop for Sender<T> {
702     fn drop(&mut self) {
703         match *unsafe { self.mut_inner() } {
704             Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
705             Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
706             Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
707             Sync(..) => unreachable!(),
708         }
709     }
710 }
711
712 ////////////////////////////////////////////////////////////////////////////////
713 // SyncSender
714 ////////////////////////////////////////////////////////////////////////////////
715
716 impl<T: Send> SyncSender<T> {
717     fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
718         SyncSender { inner: inner, marker: marker::NoSync }
719     }
720
721     /// Sends a value on this synchronous channel.
722     ///
723     /// This function will *block* until space in the internal buffer becomes
724     /// available or a receiver is available to hand off the message to.
725     ///
726     /// Note that a successful send does *not* guarantee that the receiver will
727     /// ever see the data if there is a buffer on this channel. Messages may be
728     /// enqueued in the internal buffer for the receiver to receive at a later
729     /// time. If the buffer size is 0, however, it can be guaranteed that the
730     /// receiver has indeed received the data if this function returns success.
731     ///
732     /// # Failure
733     ///
734     /// Similarly to `Sender::send`, this function will fail if the
735     /// corresponding `Receiver` for this channel has disconnected. This
736     /// behavior is used to propagate failure among tasks.
737     ///
738     /// If failure is not desired, you can achieve the same semantics with the
739     /// `SyncSender::send_opt` method which will not fail if the receiver
740     /// disconnects.
741     #[experimental = "this function is being considered candidate for removal \
742                       to adhere to the general guidelines of rust"]
743     pub fn send(&self, t: T) {
744         if self.send_opt(t).is_err() {
745             fail!("sending on a closed channel");
746         }
747     }
748
749     /// Send a value on a channel, returning it back if the receiver
750     /// disconnected
751     ///
752     /// This method will *block* to send the value `t` on the channel, but if
753     /// the value could not be sent due to the receiver disconnecting, the value
754     /// is returned back to the callee. This function is similar to `try_send`,
755     /// except that it will block if the channel is currently full.
756     ///
757     /// # Failure
758     ///
759     /// This function cannot fail.
760     #[unstable = "this function may be renamed to send() in the future"]
761     pub fn send_opt(&self, t: T) -> Result<(), T> {
762         unsafe { (*self.inner.get()).send(t) }
763     }
764
765     /// Attempts to send a value on this channel without blocking.
766     ///
767     /// This method differs from `send_opt` by returning immediately if the
768     /// channel's buffer is full or no receiver is waiting to acquire some
769     /// data. Compared with `send_opt`, this function has two failure cases
770     /// instead of one (one for disconnection, one for a full buffer).
771     ///
772     /// See `SyncSender::send` for notes about guarantees of whether the
773     /// receiver has received the data or not if this function is successful.
774     ///
775     /// # Failure
776     ///
777     /// This function cannot fail
778     #[unstable = "the return type of this function is candidate for \
779                   modification"]
780     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
781         unsafe { (*self.inner.get()).try_send(t) }
782     }
783 }
784
785 #[unstable]
786 impl<T: Send> Clone for SyncSender<T> {
787     fn clone(&self) -> SyncSender<T> {
788         unsafe { (*self.inner.get()).clone_chan(); }
789         return SyncSender::new(self.inner.clone());
790     }
791 }
792
793 #[unsafe_destructor]
794 impl<T: Send> Drop for SyncSender<T> {
795     fn drop(&mut self) {
796         unsafe { (*self.inner.get()).drop_chan(); }
797     }
798 }
799
800 ////////////////////////////////////////////////////////////////////////////////
801 // Receiver
802 ////////////////////////////////////////////////////////////////////////////////
803
804 impl<T: Send> Receiver<T> {
805     fn new(inner: Flavor<T>) -> Receiver<T> {
806         Receiver { inner: UnsafeCell::new(inner), receives: Cell::new(0), marker: marker::NoSync }
807     }
808
809     /// Blocks waiting for a value on this receiver
810     ///
811     /// This function will block if necessary to wait for a corresponding send
812     /// on the channel from its paired `Sender` structure. This receiver will
813     /// be woken up when data is ready, and the data will be returned.
814     ///
815     /// # Failure
816     ///
817     /// Similar to channels, this method will trigger a task failure if the
818     /// other end of the channel has hung up (been deallocated). The purpose of
819     /// this is to propagate failure among tasks.
820     ///
821     /// If failure is not desired, then there are two options:
822     ///
823     /// * If blocking is still desired, the `recv_opt` method will return `None`
824     ///   when the other end hangs up
825     ///
826     /// * If blocking is not desired, then the `try_recv` method will attempt to
827     ///   peek at a value on this receiver.
828     #[experimental = "this function is being considered candidate for removal \
829                       to adhere to the general guidelines of rust"]
830     pub fn recv(&self) -> T {
831         match self.recv_opt() {
832             Ok(t) => t,
833             Err(()) => fail!("receiving on a closed channel"),
834         }
835     }
836
837     /// Attempts to return a pending value on this receiver without blocking
838     ///
839     /// This method will never block the caller in order to wait for data to
840     /// become available. Instead, this will always return immediately with a
841     /// possible option of pending data on the channel.
842     ///
843     /// This is useful for a flavor of "optimistic check" before deciding to
844     /// block on a receiver.
845     ///
846     /// This function cannot fail.
847     #[unstable = "the return type of this function may be altered"]
848     pub fn try_recv(&self) -> Result<T, TryRecvError> {
849         // If a thread is spinning in try_recv, we should take the opportunity
850         // to reschedule things occasionally. See notes above in scheduling on
851         // sends for why this doesn't always hit TLS, and also for why this uses
852         // `try_take` instead of `take`.
853         let cnt = self.receives.get() + 1;
854         self.receives.set(cnt);
855         if cnt % (RESCHED_FREQ as uint) == 0 {
856             let task: Option<Box<Task>> = Local::try_take();
857             task.map(|t| t.maybe_yield());
858         }
859
860         loop {
861             let new_port = match *unsafe { self.inner() } {
862                 Oneshot(ref p) => {
863                     match unsafe { (*p.get()).try_recv() } {
864                         Ok(t) => return Ok(t),
865                         Err(oneshot::Empty) => return Err(Empty),
866                         Err(oneshot::Disconnected) => return Err(Disconnected),
867                         Err(oneshot::Upgraded(rx)) => rx,
868                     }
869                 }
870                 Stream(ref p) => {
871                     match unsafe { (*p.get()).try_recv() } {
872                         Ok(t) => return Ok(t),
873                         Err(stream::Empty) => return Err(Empty),
874                         Err(stream::Disconnected) => return Err(Disconnected),
875                         Err(stream::Upgraded(rx)) => rx,
876                     }
877                 }
878                 Shared(ref p) => {
879                     match unsafe { (*p.get()).try_recv() } {
880                         Ok(t) => return Ok(t),
881                         Err(shared::Empty) => return Err(Empty),
882                         Err(shared::Disconnected) => return Err(Disconnected),
883                     }
884                 }
885                 Sync(ref p) => {
886                     match unsafe { (*p.get()).try_recv() } {
887                         Ok(t) => return Ok(t),
888                         Err(sync::Empty) => return Err(Empty),
889                         Err(sync::Disconnected) => return Err(Disconnected),
890                     }
891                 }
892             };
893             unsafe {
894                 mem::swap(self.mut_inner(),
895                           new_port.mut_inner());
896             }
897         }
898     }
899
900     /// Attempt to wait for a value on this receiver, but does not fail if the
901     /// corresponding channel has hung up.
902     ///
903     /// This implementation of iterators for ports will always block if there is
904     /// not data available on the receiver, but it will not fail in the case
905     /// that the channel has been deallocated.
906     ///
907     /// In other words, this function has the same semantics as the `recv`
908     /// method except for the failure aspect.
909     ///
910     /// If the channel has hung up, then `Err` is returned. Otherwise `Ok` of
911     /// the value found on the receiver is returned.
912     #[unstable = "this function may be renamed to recv()"]
913     pub fn recv_opt(&self) -> Result<T, ()> {
914         loop {
915             let new_port = match *unsafe { self.inner() } {
916                 Oneshot(ref p) => {
917                     match unsafe { (*p.get()).recv() } {
918                         Ok(t) => return Ok(t),
919                         Err(oneshot::Empty) => return unreachable!(),
920                         Err(oneshot::Disconnected) => return Err(()),
921                         Err(oneshot::Upgraded(rx)) => rx,
922                     }
923                 }
924                 Stream(ref p) => {
925                     match unsafe { (*p.get()).recv() } {
926                         Ok(t) => return Ok(t),
927                         Err(stream::Empty) => return unreachable!(),
928                         Err(stream::Disconnected) => return Err(()),
929                         Err(stream::Upgraded(rx)) => rx,
930                     }
931                 }
932                 Shared(ref p) => {
933                     match unsafe { (*p.get()).recv() } {
934                         Ok(t) => return Ok(t),
935                         Err(shared::Empty) => return unreachable!(),
936                         Err(shared::Disconnected) => return Err(()),
937                     }
938                 }
939                 Sync(ref p) => return unsafe { (*p.get()).recv() }
940             };
941             unsafe {
942                 mem::swap(self.mut_inner(), new_port.mut_inner());
943             }
944         }
945     }
946
947     /// Returns an iterator which will block waiting for messages, but never
948     /// `fail!`. It will return `None` when the channel has hung up.
949     #[unstable]
950     pub fn iter<'a>(&'a self) -> Messages<'a, T> {
951         Messages { rx: self }
952     }
953 }
954
955 impl<T: Send> select::Packet for Receiver<T> {
956     fn can_recv(&self) -> bool {
957         loop {
958             let new_port = match *unsafe { self.inner() } {
959                 Oneshot(ref p) => {
960                     match unsafe { (*p.get()).can_recv() } {
961                         Ok(ret) => return ret,
962                         Err(upgrade) => upgrade,
963                     }
964                 }
965                 Stream(ref p) => {
966                     match unsafe { (*p.get()).can_recv() } {
967                         Ok(ret) => return ret,
968                         Err(upgrade) => upgrade,
969                     }
970                 }
971                 Shared(ref p) => {
972                     return unsafe { (*p.get()).can_recv() };
973                 }
974                 Sync(ref p) => {
975                     return unsafe { (*p.get()).can_recv() };
976                 }
977             };
978             unsafe {
979                 mem::swap(self.mut_inner(),
980                           new_port.mut_inner());
981             }
982         }
983     }
984
985     fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{
986         loop {
987             let (t, new_port) = match *unsafe { self.inner() } {
988                 Oneshot(ref p) => {
989                     match unsafe { (*p.get()).start_selection(task) } {
990                         oneshot::SelSuccess => return Ok(()),
991                         oneshot::SelCanceled(task) => return Err(task),
992                         oneshot::SelUpgraded(t, rx) => (t, rx),
993                     }
994                 }
995                 Stream(ref p) => {
996                     match unsafe { (*p.get()).start_selection(task) } {
997                         stream::SelSuccess => return Ok(()),
998                         stream::SelCanceled(task) => return Err(task),
999                         stream::SelUpgraded(t, rx) => (t, rx),
1000                     }
1001                 }
1002                 Shared(ref p) => {
1003                     return unsafe { (*p.get()).start_selection(task) };
1004                 }
1005                 Sync(ref p) => {
1006                     return unsafe { (*p.get()).start_selection(task) };
1007                 }
1008             };
1009             task = t;
1010             unsafe {
1011                 mem::swap(self.mut_inner(),
1012                           new_port.mut_inner());
1013             }
1014         }
1015     }
1016
1017     fn abort_selection(&self) -> bool {
1018         let mut was_upgrade = false;
1019         loop {
1020             let result = match *unsafe { self.inner() } {
1021                 Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
1022                 Stream(ref p) => unsafe {
1023                     (*p.get()).abort_selection(was_upgrade)
1024                 },
1025                 Shared(ref p) => return unsafe {
1026                     (*p.get()).abort_selection(was_upgrade)
1027                 },
1028                 Sync(ref p) => return unsafe {
1029                     (*p.get()).abort_selection()
1030                 },
1031             };
1032             let new_port = match result { Ok(b) => return b, Err(p) => p };
1033             was_upgrade = true;
1034             unsafe {
1035                 mem::swap(self.mut_inner(),
1036                           new_port.mut_inner());
1037             }
1038         }
1039     }
1040 }
1041
1042 #[unstable]
1043 impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
1044     fn next(&mut self) -> Option<T> { self.rx.recv_opt().ok() }
1045 }
1046
1047 #[unsafe_destructor]
1048 impl<T: Send> Drop for Receiver<T> {
1049     fn drop(&mut self) {
1050         match *unsafe { self.mut_inner() } {
1051             Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
1052             Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
1053             Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
1054             Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
1055         }
1056     }
1057 }
1058
1059 #[cfg(test)]
1060 mod test {
1061     use std::prelude::*;
1062
1063     use native;
1064     use std::os;
1065     use super::*;
1066
1067     pub fn stress_factor() -> uint {
1068         match os::getenv("RUST_TEST_STRESS") {
1069             Some(val) => from_str::<uint>(val.as_slice()).unwrap(),
1070             None => 1,
1071         }
1072     }
1073
1074     test!(fn smoke() {
1075         let (tx, rx) = channel::<int>();
1076         tx.send(1);
1077         assert_eq!(rx.recv(), 1);
1078     })
1079
1080     test!(fn drop_full() {
1081         let (tx, _rx) = channel();
1082         tx.send(box 1i);
1083     })
1084
1085     test!(fn drop_full_shared() {
1086         let (tx, _rx) = channel();
1087         drop(tx.clone());
1088         drop(tx.clone());
1089         tx.send(box 1i);
1090     })
1091
1092     test!(fn smoke_shared() {
1093         let (tx, rx) = channel::<int>();
1094         tx.send(1);
1095         assert_eq!(rx.recv(), 1);
1096         let tx = tx.clone();
1097         tx.send(1);
1098         assert_eq!(rx.recv(), 1);
1099     })
1100
1101     test!(fn smoke_threads() {
1102         let (tx, rx) = channel::<int>();
1103         spawn(proc() {
1104             tx.send(1);
1105         });
1106         assert_eq!(rx.recv(), 1);
1107     })
1108
1109     test!(fn smoke_port_gone() {
1110         let (tx, rx) = channel::<int>();
1111         drop(rx);
1112         tx.send(1);
1113     } #[should_fail])
1114
1115     test!(fn smoke_shared_port_gone() {
1116         let (tx, rx) = channel::<int>();
1117         drop(rx);
1118         tx.send(1);
1119     } #[should_fail])
1120
1121     test!(fn smoke_shared_port_gone2() {
1122         let (tx, rx) = channel::<int>();
1123         drop(rx);
1124         let tx2 = tx.clone();
1125         drop(tx);
1126         tx2.send(1);
1127     } #[should_fail])
1128
1129     test!(fn port_gone_concurrent() {
1130         let (tx, rx) = channel::<int>();
1131         spawn(proc() {
1132             rx.recv();
1133         });
1134         loop { tx.send(1) }
1135     } #[should_fail])
1136
1137     test!(fn port_gone_concurrent_shared() {
1138         let (tx, rx) = channel::<int>();
1139         let tx2 = tx.clone();
1140         spawn(proc() {
1141             rx.recv();
1142         });
1143         loop {
1144             tx.send(1);
1145             tx2.send(1);
1146         }
1147     } #[should_fail])
1148
1149     test!(fn smoke_chan_gone() {
1150         let (tx, rx) = channel::<int>();
1151         drop(tx);
1152         rx.recv();
1153     } #[should_fail])
1154
1155     test!(fn smoke_chan_gone_shared() {
1156         let (tx, rx) = channel::<()>();
1157         let tx2 = tx.clone();
1158         drop(tx);
1159         drop(tx2);
1160         rx.recv();
1161     } #[should_fail])
1162
1163     test!(fn chan_gone_concurrent() {
1164         let (tx, rx) = channel::<int>();
1165         spawn(proc() {
1166             tx.send(1);
1167             tx.send(1);
1168         });
1169         loop { rx.recv(); }
1170     } #[should_fail])
1171
1172     test!(fn stress() {
1173         let (tx, rx) = channel::<int>();
1174         spawn(proc() {
1175             for _ in range(0u, 10000) { tx.send(1i); }
1176         });
1177         for _ in range(0u, 10000) {
1178             assert_eq!(rx.recv(), 1);
1179         }
1180     })
1181
1182     test!(fn stress_shared() {
1183         static AMT: uint = 10000;
1184         static NTHREADS: uint = 8;
1185         let (tx, rx) = channel::<int>();
1186         let (dtx, drx) = channel::<()>();
1187
1188         spawn(proc() {
1189             for _ in range(0, AMT * NTHREADS) {
1190                 assert_eq!(rx.recv(), 1);
1191             }
1192             match rx.try_recv() {
1193                 Ok(..) => fail!(),
1194                 _ => {}
1195             }
1196             dtx.send(());
1197         });
1198
1199         for _ in range(0, NTHREADS) {
1200             let tx = tx.clone();
1201             spawn(proc() {
1202                 for _ in range(0, AMT) { tx.send(1); }
1203             });
1204         }
1205         drop(tx);
1206         drx.recv();
1207     })
1208
1209     #[test]
1210     fn send_from_outside_runtime() {
1211         let (tx1, rx1) = channel::<()>();
1212         let (tx2, rx2) = channel::<int>();
1213         let (tx3, rx3) = channel::<()>();
1214         let tx4 = tx3.clone();
1215         spawn(proc() {
1216             tx1.send(());
1217             for _ in range(0i, 40) {
1218                 assert_eq!(rx2.recv(), 1);
1219             }
1220             tx3.send(());
1221         });
1222         rx1.recv();
1223         native::task::spawn(proc() {
1224             for _ in range(0i, 40) {
1225                 tx2.send(1);
1226             }
1227             tx4.send(());
1228         });
1229         rx3.recv();
1230         rx3.recv();
1231     }
1232
1233     #[test]
1234     fn recv_from_outside_runtime() {
1235         let (tx, rx) = channel::<int>();
1236         let (dtx, drx) = channel();
1237         native::task::spawn(proc() {
1238             for _ in range(0i, 40) {
1239                 assert_eq!(rx.recv(), 1);
1240             }
1241             dtx.send(());
1242         });
1243         for _ in range(0u, 40) {
1244             tx.send(1);
1245         }
1246         drx.recv();
1247     }
1248
1249     #[test]
1250     fn no_runtime() {
1251         let (tx1, rx1) = channel::<int>();
1252         let (tx2, rx2) = channel::<int>();
1253         let (tx3, rx3) = channel::<()>();
1254         let tx4 = tx3.clone();
1255         native::task::spawn(proc() {
1256             assert_eq!(rx1.recv(), 1);
1257             tx2.send(2);
1258             tx4.send(());
1259         });
1260         native::task::spawn(proc() {
1261             tx1.send(1);
1262             assert_eq!(rx2.recv(), 2);
1263             tx3.send(());
1264         });
1265         rx3.recv();
1266         rx3.recv();
1267     }
1268
1269     test!(fn oneshot_single_thread_close_port_first() {
1270         // Simple test of closing without sending
1271         let (_tx, rx) = channel::<int>();
1272         drop(rx);
1273     })
1274
1275     test!(fn oneshot_single_thread_close_chan_first() {
1276         // Simple test of closing without sending
1277         let (tx, _rx) = channel::<int>();
1278         drop(tx);
1279     })
1280
1281     test!(fn oneshot_single_thread_send_port_close() {
1282         // Testing that the sender cleans up the payload if receiver is closed
1283         let (tx, rx) = channel::<Box<int>>();
1284         drop(rx);
1285         tx.send(box 0);
1286     } #[should_fail])
1287
1288     test!(fn oneshot_single_thread_recv_chan_close() {
1289         // Receiving on a closed chan will fail
1290         let res = task::try(proc() {
1291             let (tx, rx) = channel::<int>();
1292             drop(tx);
1293             rx.recv();
1294         });
1295         // What is our res?
1296         assert!(res.is_err());
1297     })
1298
1299     test!(fn oneshot_single_thread_send_then_recv() {
1300         let (tx, rx) = channel::<Box<int>>();
1301         tx.send(box 10);
1302         assert!(rx.recv() == box 10);
1303     })
1304
1305     test!(fn oneshot_single_thread_try_send_open() {
1306         let (tx, rx) = channel::<int>();
1307         assert!(tx.send_opt(10).is_ok());
1308         assert!(rx.recv() == 10);
1309     })
1310
1311     test!(fn oneshot_single_thread_try_send_closed() {
1312         let (tx, rx) = channel::<int>();
1313         drop(rx);
1314         assert!(tx.send_opt(10).is_err());
1315     })
1316
1317     test!(fn oneshot_single_thread_try_recv_open() {
1318         let (tx, rx) = channel::<int>();
1319         tx.send(10);
1320         assert!(rx.recv_opt() == Ok(10));
1321     })
1322
1323     test!(fn oneshot_single_thread_try_recv_closed() {
1324         let (tx, rx) = channel::<int>();
1325         drop(tx);
1326         assert!(rx.recv_opt() == Err(()));
1327     })
1328
1329     test!(fn oneshot_single_thread_peek_data() {
1330         let (tx, rx) = channel::<int>();
1331         assert_eq!(rx.try_recv(), Err(Empty))
1332         tx.send(10);
1333         assert_eq!(rx.try_recv(), Ok(10));
1334     })
1335
1336     test!(fn oneshot_single_thread_peek_close() {
1337         let (tx, rx) = channel::<int>();
1338         drop(tx);
1339         assert_eq!(rx.try_recv(), Err(Disconnected));
1340         assert_eq!(rx.try_recv(), Err(Disconnected));
1341     })
1342
1343     test!(fn oneshot_single_thread_peek_open() {
1344         let (_tx, rx) = channel::<int>();
1345         assert_eq!(rx.try_recv(), Err(Empty));
1346     })
1347
1348     test!(fn oneshot_multi_task_recv_then_send() {
1349         let (tx, rx) = channel::<Box<int>>();
1350         spawn(proc() {
1351             assert!(rx.recv() == box 10);
1352         });
1353
1354         tx.send(box 10);
1355     })
1356
1357     test!(fn oneshot_multi_task_recv_then_close() {
1358         let (tx, rx) = channel::<Box<int>>();
1359         spawn(proc() {
1360             drop(tx);
1361         });
1362         let res = task::try(proc() {
1363             assert!(rx.recv() == box 10);
1364         });
1365         assert!(res.is_err());
1366     })
1367
1368     test!(fn oneshot_multi_thread_close_stress() {
1369         for _ in range(0, stress_factor()) {
1370             let (tx, rx) = channel::<int>();
1371             spawn(proc() {
1372                 drop(rx);
1373             });
1374             drop(tx);
1375         }
1376     })
1377
1378     test!(fn oneshot_multi_thread_send_close_stress() {
1379         for _ in range(0, stress_factor()) {
1380             let (tx, rx) = channel::<int>();
1381             spawn(proc() {
1382                 drop(rx);
1383             });
1384             let _ = task::try(proc() {
1385                 tx.send(1);
1386             });
1387         }
1388     })
1389
1390     test!(fn oneshot_multi_thread_recv_close_stress() {
1391         for _ in range(0, stress_factor()) {
1392             let (tx, rx) = channel::<int>();
1393             spawn(proc() {
1394                 let res = task::try(proc() {
1395                     rx.recv();
1396                 });
1397                 assert!(res.is_err());
1398             });
1399             spawn(proc() {
1400                 spawn(proc() {
1401                     drop(tx);
1402                 });
1403             });
1404         }
1405     })
1406
1407     test!(fn oneshot_multi_thread_send_recv_stress() {
1408         for _ in range(0, stress_factor()) {
1409             let (tx, rx) = channel();
1410             spawn(proc() {
1411                 tx.send(box 10i);
1412             });
1413             spawn(proc() {
1414                 assert!(rx.recv() == box 10i);
1415             });
1416         }
1417     })
1418
1419     test!(fn stream_send_recv_stress() {
1420         for _ in range(0, stress_factor()) {
1421             let (tx, rx) = channel();
1422
1423             send(tx, 0);
1424             recv(rx, 0);
1425
1426             fn send(tx: Sender<Box<int>>, i: int) {
1427                 if i == 10 { return }
1428
1429                 spawn(proc() {
1430                     tx.send(box i);
1431                     send(tx, i + 1);
1432                 });
1433             }
1434
1435             fn recv(rx: Receiver<Box<int>>, i: int) {
1436                 if i == 10 { return }
1437
1438                 spawn(proc() {
1439                     assert!(rx.recv() == box i);
1440                     recv(rx, i + 1);
1441                 });
1442             }
1443         }
1444     })
1445
1446     test!(fn recv_a_lot() {
1447         // Regression test that we don't run out of stack in scheduler context
1448         let (tx, rx) = channel();
1449         for _ in range(0i, 10000) { tx.send(()); }
1450         for _ in range(0i, 10000) { rx.recv(); }
1451     })
1452
1453     test!(fn shared_chan_stress() {
1454         let (tx, rx) = channel();
1455         let total = stress_factor() + 100;
1456         for _ in range(0, total) {
1457             let tx = tx.clone();
1458             spawn(proc() {
1459                 tx.send(());
1460             });
1461         }
1462
1463         for _ in range(0, total) {
1464             rx.recv();
1465         }
1466     })
1467
1468     test!(fn test_nested_recv_iter() {
1469         let (tx, rx) = channel::<int>();
1470         let (total_tx, total_rx) = channel::<int>();
1471
1472         spawn(proc() {
1473             let mut acc = 0;
1474             for x in rx.iter() {
1475                 acc += x;
1476             }
1477             total_tx.send(acc);
1478         });
1479
1480         tx.send(3);
1481         tx.send(1);
1482         tx.send(2);
1483         drop(tx);
1484         assert_eq!(total_rx.recv(), 6);
1485     })
1486
1487     test!(fn test_recv_iter_break() {
1488         let (tx, rx) = channel::<int>();
1489         let (count_tx, count_rx) = channel();
1490
1491         spawn(proc() {
1492             let mut count = 0;
1493             for x in rx.iter() {
1494                 if count >= 3 {
1495                     break;
1496                 } else {
1497                     count += x;
1498                 }
1499             }
1500             count_tx.send(count);
1501         });
1502
1503         tx.send(2);
1504         tx.send(2);
1505         tx.send(2);
1506         let _ = tx.send_opt(2);
1507         drop(tx);
1508         assert_eq!(count_rx.recv(), 4);
1509     })
1510
1511     test!(fn try_recv_states() {
1512         let (tx1, rx1) = channel::<int>();
1513         let (tx2, rx2) = channel::<()>();
1514         let (tx3, rx3) = channel::<()>();
1515         spawn(proc() {
1516             rx2.recv();
1517             tx1.send(1);
1518             tx3.send(());
1519             rx2.recv();
1520             drop(tx1);
1521             tx3.send(());
1522         });
1523
1524         assert_eq!(rx1.try_recv(), Err(Empty));
1525         tx2.send(());
1526         rx3.recv();
1527         assert_eq!(rx1.try_recv(), Ok(1));
1528         assert_eq!(rx1.try_recv(), Err(Empty));
1529         tx2.send(());
1530         rx3.recv();
1531         assert_eq!(rx1.try_recv(), Err(Disconnected));
1532     })
1533
1534     // This bug used to end up in a livelock inside of the Receiver destructor
1535     // because the internal state of the Shared packet was corrupted
1536     test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
1537         let (tx, rx) = channel();
1538         let (tx2, rx2) = channel();
1539         spawn(proc() {
1540             rx.recv(); // wait on a oneshot
1541             drop(rx);  // destroy a shared
1542             tx2.send(());
1543         });
1544         // make sure the other task has gone to sleep
1545         for _ in range(0u, 5000) { task::deschedule(); }
1546
1547         // upgrade to a shared chan and send a message
1548         let t = tx.clone();
1549         drop(tx);
1550         t.send(());
1551
1552         // wait for the child task to exit before we exit
1553         rx2.recv();
1554     })
1555
1556     test!(fn sends_off_the_runtime() {
1557         use std::rt::thread::Thread;
1558
1559         let (tx, rx) = channel();
1560         let t = Thread::start(proc() {
1561             for _ in range(0u, 1000) {
1562                 tx.send(());
1563             }
1564         });
1565         for _ in range(0u, 1000) {
1566             rx.recv();
1567         }
1568         t.join();
1569     })
1570
1571     test!(fn try_recvs_off_the_runtime() {
1572         use std::rt::thread::Thread;
1573
1574         let (tx, rx) = channel();
1575         let (cdone, pdone) = channel();
1576         let t = Thread::start(proc() {
1577             let mut hits = 0u;
1578             while hits < 10 {
1579                 match rx.try_recv() {
1580                     Ok(()) => { hits += 1; }
1581                     Err(Empty) => { Thread::yield_now(); }
1582                     Err(Disconnected) => return,
1583                 }
1584             }
1585             cdone.send(());
1586         });
1587         for _ in range(0u, 10) {
1588             tx.send(());
1589         }
1590         t.join();
1591         pdone.recv();
1592     })
1593 }
1594
1595 #[cfg(test)]
1596 mod sync_tests {
1597     use std::prelude::*;
1598     use std::os;
1599
1600     pub fn stress_factor() -> uint {
1601         match os::getenv("RUST_TEST_STRESS") {
1602             Some(val) => from_str::<uint>(val.as_slice()).unwrap(),
1603             None => 1,
1604         }
1605     }
1606
1607     test!(fn smoke() {
1608         let (tx, rx) = sync_channel::<int>(1);
1609         tx.send(1);
1610         assert_eq!(rx.recv(), 1);
1611     })
1612
1613     test!(fn drop_full() {
1614         let (tx, _rx) = sync_channel(1);
1615         tx.send(box 1i);
1616     })
1617
1618     test!(fn smoke_shared() {
1619         let (tx, rx) = sync_channel::<int>(1);
1620         tx.send(1);
1621         assert_eq!(rx.recv(), 1);
1622         let tx = tx.clone();
1623         tx.send(1);
1624         assert_eq!(rx.recv(), 1);
1625     })
1626
1627     test!(fn smoke_threads() {
1628         let (tx, rx) = sync_channel::<int>(0);
1629         spawn(proc() {
1630             tx.send(1);
1631         });
1632         assert_eq!(rx.recv(), 1);
1633     })
1634
1635     test!(fn smoke_port_gone() {
1636         let (tx, rx) = sync_channel::<int>(0);
1637         drop(rx);
1638         tx.send(1);
1639     } #[should_fail])
1640
1641     test!(fn smoke_shared_port_gone2() {
1642         let (tx, rx) = sync_channel::<int>(0);
1643         drop(rx);
1644         let tx2 = tx.clone();
1645         drop(tx);
1646         tx2.send(1);
1647     } #[should_fail])
1648
1649     test!(fn port_gone_concurrent() {
1650         let (tx, rx) = sync_channel::<int>(0);
1651         spawn(proc() {
1652             rx.recv();
1653         });
1654         loop { tx.send(1) }
1655     } #[should_fail])
1656
1657     test!(fn port_gone_concurrent_shared() {
1658         let (tx, rx) = sync_channel::<int>(0);
1659         let tx2 = tx.clone();
1660         spawn(proc() {
1661             rx.recv();
1662         });
1663         loop {
1664             tx.send(1);
1665             tx2.send(1);
1666         }
1667     } #[should_fail])
1668
1669     test!(fn smoke_chan_gone() {
1670         let (tx, rx) = sync_channel::<int>(0);
1671         drop(tx);
1672         rx.recv();
1673     } #[should_fail])
1674
1675     test!(fn smoke_chan_gone_shared() {
1676         let (tx, rx) = sync_channel::<()>(0);
1677         let tx2 = tx.clone();
1678         drop(tx);
1679         drop(tx2);
1680         rx.recv();
1681     } #[should_fail])
1682
1683     test!(fn chan_gone_concurrent() {
1684         let (tx, rx) = sync_channel::<int>(0);
1685         spawn(proc() {
1686             tx.send(1);
1687             tx.send(1);
1688         });
1689         loop { rx.recv(); }
1690     } #[should_fail])
1691
1692     test!(fn stress() {
1693         let (tx, rx) = sync_channel::<int>(0);
1694         spawn(proc() {
1695             for _ in range(0u, 10000) { tx.send(1); }
1696         });
1697         for _ in range(0u, 10000) {
1698             assert_eq!(rx.recv(), 1);
1699         }
1700     })
1701
1702     test!(fn stress_shared() {
1703         static AMT: uint = 1000;
1704         static NTHREADS: uint = 8;
1705         let (tx, rx) = sync_channel::<int>(0);
1706         let (dtx, drx) = sync_channel::<()>(0);
1707
1708         spawn(proc() {
1709             for _ in range(0, AMT * NTHREADS) {
1710                 assert_eq!(rx.recv(), 1);
1711             }
1712             match rx.try_recv() {
1713                 Ok(..) => fail!(),
1714                 _ => {}
1715             }
1716             dtx.send(());
1717         });
1718
1719         for _ in range(0, NTHREADS) {
1720             let tx = tx.clone();
1721             spawn(proc() {
1722                 for _ in range(0, AMT) { tx.send(1); }
1723             });
1724         }
1725         drop(tx);
1726         drx.recv();
1727     })
1728
1729     test!(fn oneshot_single_thread_close_port_first() {
1730         // Simple test of closing without sending
1731         let (_tx, rx) = sync_channel::<int>(0);
1732         drop(rx);
1733     })
1734
1735     test!(fn oneshot_single_thread_close_chan_first() {
1736         // Simple test of closing without sending
1737         let (tx, _rx) = sync_channel::<int>(0);
1738         drop(tx);
1739     })
1740
1741     test!(fn oneshot_single_thread_send_port_close() {
1742         // Testing that the sender cleans up the payload if receiver is closed
1743         let (tx, rx) = sync_channel::<Box<int>>(0);
1744         drop(rx);
1745         tx.send(box 0);
1746     } #[should_fail])
1747
1748     test!(fn oneshot_single_thread_recv_chan_close() {
1749         // Receiving on a closed chan will fail
1750         let res = task::try(proc() {
1751             let (tx, rx) = sync_channel::<int>(0);
1752             drop(tx);
1753             rx.recv();
1754         });
1755         // What is our res?
1756         assert!(res.is_err());
1757     })
1758
1759     test!(fn oneshot_single_thread_send_then_recv() {
1760         let (tx, rx) = sync_channel::<Box<int>>(1);
1761         tx.send(box 10);
1762         assert!(rx.recv() == box 10);
1763     })
1764
1765     test!(fn oneshot_single_thread_try_send_open() {
1766         let (tx, rx) = sync_channel::<int>(1);
1767         assert_eq!(tx.try_send(10), Ok(()));
1768         assert!(rx.recv() == 10);
1769     })
1770
1771     test!(fn oneshot_single_thread_try_send_closed() {
1772         let (tx, rx) = sync_channel::<int>(0);
1773         drop(rx);
1774         assert_eq!(tx.try_send(10), Err(RecvDisconnected(10)));
1775     })
1776
1777     test!(fn oneshot_single_thread_try_send_closed2() {
1778         let (tx, _rx) = sync_channel::<int>(0);
1779         assert_eq!(tx.try_send(10), Err(Full(10)));
1780     })
1781
1782     test!(fn oneshot_single_thread_try_recv_open() {
1783         let (tx, rx) = sync_channel::<int>(1);
1784         tx.send(10);
1785         assert!(rx.recv_opt() == Ok(10));
1786     })
1787
1788     test!(fn oneshot_single_thread_try_recv_closed() {
1789         let (tx, rx) = sync_channel::<int>(0);
1790         drop(tx);
1791         assert!(rx.recv_opt() == Err(()));
1792     })
1793
1794     test!(fn oneshot_single_thread_peek_data() {
1795         let (tx, rx) = sync_channel::<int>(1);
1796         assert_eq!(rx.try_recv(), Err(Empty))
1797         tx.send(10);
1798         assert_eq!(rx.try_recv(), Ok(10));
1799     })
1800
1801     test!(fn oneshot_single_thread_peek_close() {
1802         let (tx, rx) = sync_channel::<int>(0);
1803         drop(tx);
1804         assert_eq!(rx.try_recv(), Err(Disconnected));
1805         assert_eq!(rx.try_recv(), Err(Disconnected));
1806     })
1807
1808     test!(fn oneshot_single_thread_peek_open() {
1809         let (_tx, rx) = sync_channel::<int>(0);
1810         assert_eq!(rx.try_recv(), Err(Empty));
1811     })
1812
1813     test!(fn oneshot_multi_task_recv_then_send() {
1814         let (tx, rx) = sync_channel::<Box<int>>(0);
1815         spawn(proc() {
1816             assert!(rx.recv() == box 10);
1817         });
1818
1819         tx.send(box 10);
1820     })
1821
1822     test!(fn oneshot_multi_task_recv_then_close() {
1823         let (tx, rx) = sync_channel::<Box<int>>(0);
1824         spawn(proc() {
1825             drop(tx);
1826         });
1827         let res = task::try(proc() {
1828             assert!(rx.recv() == box 10);
1829         });
1830         assert!(res.is_err());
1831     })
1832
1833     test!(fn oneshot_multi_thread_close_stress() {
1834         for _ in range(0, stress_factor()) {
1835             let (tx, rx) = sync_channel::<int>(0);
1836             spawn(proc() {
1837                 drop(rx);
1838             });
1839             drop(tx);
1840         }
1841     })
1842
1843     test!(fn oneshot_multi_thread_send_close_stress() {
1844         for _ in range(0, stress_factor()) {
1845             let (tx, rx) = sync_channel::<int>(0);
1846             spawn(proc() {
1847                 drop(rx);
1848             });
1849             let _ = task::try(proc() {
1850                 tx.send(1);
1851             });
1852         }
1853     })
1854
1855     test!(fn oneshot_multi_thread_recv_close_stress() {
1856         for _ in range(0, stress_factor()) {
1857             let (tx, rx) = sync_channel::<int>(0);
1858             spawn(proc() {
1859                 let res = task::try(proc() {
1860                     rx.recv();
1861                 });
1862                 assert!(res.is_err());
1863             });
1864             spawn(proc() {
1865                 spawn(proc() {
1866                     drop(tx);
1867                 });
1868             });
1869         }
1870     })
1871
1872     test!(fn oneshot_multi_thread_send_recv_stress() {
1873         for _ in range(0, stress_factor()) {
1874             let (tx, rx) = sync_channel::<Box<int>>(0);
1875             spawn(proc() {
1876                 tx.send(box 10i);
1877             });
1878             spawn(proc() {
1879                 assert!(rx.recv() == box 10i);
1880             });
1881         }
1882     })
1883
1884     test!(fn stream_send_recv_stress() {
1885         for _ in range(0, stress_factor()) {
1886             let (tx, rx) = sync_channel::<Box<int>>(0);
1887
1888             send(tx, 0);
1889             recv(rx, 0);
1890
1891             fn send(tx: SyncSender<Box<int>>, i: int) {
1892                 if i == 10 { return }
1893
1894                 spawn(proc() {
1895                     tx.send(box i);
1896                     send(tx, i + 1);
1897                 });
1898             }
1899
1900             fn recv(rx: Receiver<Box<int>>, i: int) {
1901                 if i == 10 { return }
1902
1903                 spawn(proc() {
1904                     assert!(rx.recv() == box i);
1905                     recv(rx, i + 1);
1906                 });
1907             }
1908         }
1909     })
1910
1911     test!(fn recv_a_lot() {
1912         // Regression test that we don't run out of stack in scheduler context
1913         let (tx, rx) = sync_channel(10000);
1914         for _ in range(0u, 10000) { tx.send(()); }
1915         for _ in range(0u, 10000) { rx.recv(); }
1916     })
1917
1918     test!(fn shared_chan_stress() {
1919         let (tx, rx) = sync_channel(0);
1920         let total = stress_factor() + 100;
1921         for _ in range(0, total) {
1922             let tx = tx.clone();
1923             spawn(proc() {
1924                 tx.send(());
1925             });
1926         }
1927
1928         for _ in range(0, total) {
1929             rx.recv();
1930         }
1931     })
1932
1933     test!(fn test_nested_recv_iter() {
1934         let (tx, rx) = sync_channel::<int>(0);
1935         let (total_tx, total_rx) = sync_channel::<int>(0);
1936
1937         spawn(proc() {
1938             let mut acc = 0;
1939             for x in rx.iter() {
1940                 acc += x;
1941             }
1942             total_tx.send(acc);
1943         });
1944
1945         tx.send(3);
1946         tx.send(1);
1947         tx.send(2);
1948         drop(tx);
1949         assert_eq!(total_rx.recv(), 6);
1950     })
1951
1952     test!(fn test_recv_iter_break() {
1953         let (tx, rx) = sync_channel::<int>(0);
1954         let (count_tx, count_rx) = sync_channel(0);
1955
1956         spawn(proc() {
1957             let mut count = 0;
1958             for x in rx.iter() {
1959                 if count >= 3 {
1960                     break;
1961                 } else {
1962                     count += x;
1963                 }
1964             }
1965             count_tx.send(count);
1966         });
1967
1968         tx.send(2);
1969         tx.send(2);
1970         tx.send(2);
1971         let _ = tx.try_send(2);
1972         drop(tx);
1973         assert_eq!(count_rx.recv(), 4);
1974     })
1975
1976     test!(fn try_recv_states() {
1977         let (tx1, rx1) = sync_channel::<int>(1);
1978         let (tx2, rx2) = sync_channel::<()>(1);
1979         let (tx3, rx3) = sync_channel::<()>(1);
1980         spawn(proc() {
1981             rx2.recv();
1982             tx1.send(1);
1983             tx3.send(());
1984             rx2.recv();
1985             drop(tx1);
1986             tx3.send(());
1987         });
1988
1989         assert_eq!(rx1.try_recv(), Err(Empty));
1990         tx2.send(());
1991         rx3.recv();
1992         assert_eq!(rx1.try_recv(), Ok(1));
1993         assert_eq!(rx1.try_recv(), Err(Empty));
1994         tx2.send(());
1995         rx3.recv();
1996         assert_eq!(rx1.try_recv(), Err(Disconnected));
1997     })
1998
1999     // This bug used to end up in a livelock inside of the Receiver destructor
2000     // because the internal state of the Shared packet was corrupted
2001     test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
2002         let (tx, rx) = sync_channel::<()>(0);
2003         let (tx2, rx2) = sync_channel::<()>(0);
2004         spawn(proc() {
2005             rx.recv(); // wait on a oneshot
2006             drop(rx);  // destroy a shared
2007             tx2.send(());
2008         });
2009         // make sure the other task has gone to sleep
2010         for _ in range(0u, 5000) { task::deschedule(); }
2011
2012         // upgrade to a shared chan and send a message
2013         let t = tx.clone();
2014         drop(tx);
2015         t.send(());
2016
2017         // wait for the child task to exit before we exit
2018         rx2.recv();
2019     })
2020
2021     test!(fn try_recvs_off_the_runtime() {
2022         use std::rt::thread::Thread;
2023
2024         let (tx, rx) = sync_channel::<()>(0);
2025         let (cdone, pdone) = channel();
2026         let t = Thread::start(proc() {
2027             let mut hits = 0u;
2028             while hits < 10 {
2029                 match rx.try_recv() {
2030                     Ok(()) => { hits += 1; }
2031                     Err(Empty) => { Thread::yield_now(); }
2032                     Err(Disconnected) => return,
2033                 }
2034             }
2035             cdone.send(());
2036         });
2037         for _ in range(0u, 10) {
2038             tx.send(());
2039         }
2040         t.join();
2041         pdone.recv();
2042     })
2043
2044     test!(fn send_opt1() {
2045         let (tx, rx) = sync_channel::<int>(0);
2046         spawn(proc() { rx.recv(); });
2047         assert_eq!(tx.send_opt(1), Ok(()));
2048     })
2049
2050     test!(fn send_opt2() {
2051         let (tx, rx) = sync_channel::<int>(0);
2052         spawn(proc() { drop(rx); });
2053         assert_eq!(tx.send_opt(1), Err(1));
2054     })
2055
2056     test!(fn send_opt3() {
2057         let (tx, rx) = sync_channel::<int>(1);
2058         assert_eq!(tx.send_opt(1), Ok(()));
2059         spawn(proc() { drop(rx); });
2060         assert_eq!(tx.send_opt(1), Err(1));
2061     })
2062
2063     test!(fn send_opt4() {
2064         let (tx, rx) = sync_channel::<int>(0);
2065         let tx2 = tx.clone();
2066         let (done, donerx) = channel();
2067         let done2 = done.clone();
2068         spawn(proc() {
2069             assert_eq!(tx.send_opt(1), Err(1));
2070             done.send(());
2071         });
2072         spawn(proc() {
2073             assert_eq!(tx2.send_opt(2), Err(2));
2074             done2.send(());
2075         });
2076         drop(rx);
2077         donerx.recv();
2078         donerx.recv();
2079     })
2080
2081     test!(fn try_send1() {
2082         let (tx, _rx) = sync_channel::<int>(0);
2083         assert_eq!(tx.try_send(1), Err(Full(1)));
2084     })
2085
2086     test!(fn try_send2() {
2087         let (tx, _rx) = sync_channel::<int>(1);
2088         assert_eq!(tx.try_send(1), Ok(()));
2089         assert_eq!(tx.try_send(1), Err(Full(1)));
2090     })
2091
2092     test!(fn try_send3() {
2093         let (tx, rx) = sync_channel::<int>(1);
2094         assert_eq!(tx.try_send(1), Ok(()));
2095         drop(rx);
2096         assert_eq!(tx.try_send(1), Err(RecvDisconnected(1)));
2097     })
2098
2099     test!(fn try_send4() {
2100         let (tx, rx) = sync_channel::<int>(0);
2101         spawn(proc() {
2102             for _ in range(0u, 1000) { task::deschedule(); }
2103             assert_eq!(tx.try_send(1), Ok(()));
2104         });
2105         assert_eq!(rx.recv(), 1);
2106     } #[ignore(reason = "flaky on libnative")])
2107
2108     test!(fn issue_15761() {
2109         fn repro() {
2110             let (tx1, rx1) = sync_channel::<()>(3);
2111             let (tx2, rx2) = sync_channel::<()>(3);
2112
2113             spawn(proc() {
2114                 rx1.recv();
2115                 tx2.try_send(()).unwrap();
2116             });
2117
2118             tx1.try_send(()).unwrap();
2119             rx2.recv();
2120         }
2121
2122         for _ in range(0u, 100) {
2123             repro()
2124         }
2125     })
2126 }