]> git.lizzy.rs Git - rust.git/blob - src/libstd/comm/mod.rs
comm: Implement synchronous channels
[rust.git] / src / libstd / comm / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Communication primitives for concurrent tasks
12 //!
13 //! Rust makes it very difficult to share data among tasks to prevent race
14 //! conditions and to improve parallelism, but there is often a need for
15 //! communication between concurrent tasks. The primitives defined in this
16 //! module are the building blocks for synchronization in rust.
17 //!
18 //! This module provides message-based communication over channels, concretely
19 //! defined as two types:
20 //!
21 //! * `Sender`
22 //! * `Receiver`
23 //!
24 //! A `Sender` is used to send data to a `Receiver`. A `Sender` is clone-able
25 //! such that many tasks can send simultaneously to one receiver. These
26 //! channels are *task blocking*, not *thread blocking*. This means that if one
27 //! task is blocked on a channel, other tasks can continue to make progress.
28 //!
29 //! Rust channels can be used as if they have an infinite internal buffer. What
30 //! this means is that the `send` operation will never block. `Receiver`s, on
31 //! the other hand, will block the task if there is no data to be received.
32 //!
33 //! ## Failure Propagation
34 //!
35 //! In addition to being a core primitive for communicating in rust, channels
36 //! are the points at which failure is propagated among tasks.  Whenever the one
37 //! half of channel is closed, the other half will have its next operation
38 //! `fail!`. The purpose of this is to allow propagation of failure among tasks
39 //! that are linked to one another via channels.
40 //!
41 //! There are methods on both of `Sender` and `Receiver` to perform their
42 //! respective operations without failing, however.
43 //!
44 //! ## Outside the Runtime
45 //!
46 //! All channels and ports work seamlessly inside and outside of the rust
47 //! runtime. This means that code may use channels to communicate information
48 //! inside and outside of the runtime. For example, if rust were embedded as an
49 //! FFI module in another application, the rust runtime would probably be
50 //! running in its own external thread pool. Channels created can communicate
51 //! from the native application threads to the rust threads through the use of
52 //! native mutexes and condition variables.
53 //!
54 //! What this means is that if a native thread is using a channel, execution
55 //! will be blocked accordingly by blocking the OS thread.
56 //!
57 //! # Example
58 //!
59 //! ```rust,should_fail
60 //! // Create a simple streaming channel
61 //! let (tx, rx) = channel();
62 //! spawn(proc() {
63 //!     tx.send(10);
64 //! });
65 //! assert_eq!(rx.recv(), 10);
66 //!
67 //! // Create a shared channel which can be sent along from many tasks
68 //! let (tx, rx) = channel();
69 //! for i in range(0, 10) {
70 //!     let tx = tx.clone();
71 //!     spawn(proc() {
72 //!         tx.send(i);
73 //!     })
74 //! }
75 //!
76 //! for _ in range(0, 10) {
77 //!     let j = rx.recv();
78 //!     assert!(0 <= j && j < 10);
79 //! }
80 //!
81 //! // The call to recv() will fail!() because the channel has already hung
82 //! // up (or been deallocated)
83 //! let (tx, rx) = channel::<int>();
84 //! drop(tx);
85 //! rx.recv();
86 //! ```
87
88 // A description of how Rust's channel implementation works
89 //
90 // Channels are supposed to be the basic building block for all other
91 // concurrent primitives that are used in Rust. As a result, the channel type
92 // needs to be highly optimized, flexible, and broad enough for use everywhere.
93 //
94 // The choice of implementation of all channels is to be built on lock-free data
95 // structures. The channels themselves are then consequently also lock-free data
96 // structures. As always with lock-free code, this is a very "here be dragons"
97 // territory, especially because I'm unaware of any academic papers which have
98 // gone into great length about channels of these flavors.
99 //
100 // ## Flavors of channels
101 //
102 // From the perspective of a consumer of this library, there is only one flavor
103 // of channel. This channel can be used as a stream and cloned to allow multiple
104 // senders. Under the hood, however, there are actually three flavors of
105 // channels in play.
106 //
107 // * Oneshots - these channels are highly optimized for the one-send use case.
108 //              They contain as few atomics as possible and involve one and
109 //              exactly one allocation.
110 // * Streams - these channels are optimized for the non-shared use case. They
111 //             use a different concurrent queue which is more tailored for this
112 //             use case. The initial allocation of this flavor of channel is not
113 //             optimized.
114 // * Shared - this is the most general form of channel that this module offers,
115 //            a channel with multiple senders. This type is as optimized as it
116 //            can be, but the previous two types mentioned are much faster for
117 //            their use-cases.
118 //
119 // ## Concurrent queues
120 //
121 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
122 // recv() obviously blocks. This means that under the hood there must be some
123 // shared and concurrent queue holding all of the actual data.
124 //
125 // With two flavors of channels, two flavors of queues are also used. We have
126 // chosen to use queues from a well-known author which are abbreviated as SPSC
127 // and MPSC (single producer, single consumer and multiple producer, single
128 // consumer). SPSC queues are used for streams while MPSC queues are used for
129 // shared channels.
130 //
131 // ### SPSC optimizations
132 //
133 // The SPSC queue found online is essentially a linked list of nodes where one
134 // half of the nodes are the "queue of data" and the other half of nodes are a
135 // cache of unused nodes. The unused nodes are used such that an allocation is
136 // not required on every push() and a free doesn't need to happen on every
137 // pop().
138 //
139 // As found online, however, the cache of nodes is of an infinite size. This
140 // means that if a channel at one point in its life had 50k items in the queue,
141 // then the queue will always have the capacity for 50k items. I believed that
142 // this was an unnecessary limitation of the implementation, so I have altered
143 // the queue to optionally have a bound on the cache size.
144 //
145 // By default, streams will have an unbounded SPSC queue with a small-ish cache
146 // size. The hope is that the cache is still large enough to have very fast
147 // send() operations while not too large such that millions of channels can
148 // coexist at once.
149 //
150 // ### MPSC optimizations
151 //
152 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
153 // a linked list under the hood to earn its unboundedness, but I have not put
154 // forth much effort into having a cache of nodes similar to the SPSC queue.
155 //
156 // For now, I believe that this is "ok" because shared channels are not the most
157 // common type, but soon we may wish to revisit this queue choice and determine
158 // another candidate for backend storage of shared channels.
159 //
160 // ## Overview of the Implementation
161 //
162 // Now that there's a little background on the concurrent queues used, it's
163 // worth going into much more detail about the channels themselves. The basic
164 // pseudocode for a send/recv are:
165 //
166 //
167 //      send(t)                             recv()
168 //        queue.push(t)                       return if queue.pop()
169 //        if increment() == -1                deschedule {
170 //          wakeup()                            if decrement() > 0
171 //                                                cancel_deschedule()
172 //                                            }
173 //                                            queue.pop()
174 //
175 // As mentioned before, there are no locks in this implementation, only atomic
176 // instructions are used.
177 //
178 // ### The internal atomic counter
179 //
180 // Every channel has a shared counter with each half to keep track of the size
181 // of the queue. This counter is used to abort descheduling by the receiver and
182 // to know when to wake up on the sending side.
183 //
184 // As seen in the pseudocode, senders will increment this count and receivers
185 // will decrement the count. The theory behind this is that if a sender sees a
186 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
187 // then it doesn't need to block.
188 //
189 // The recv() method has a beginning call to pop(), and if successful, it needs
190 // to decrement the count. It is a crucial implementation detail that this
191 // decrement does *not* happen to the shared counter. If this were the case,
192 // then it would be possible for the counter to be very negative when there were
193 // no receivers waiting, in which case the senders would have to determine when
194 // it was actually appropriate to wake up a receiver.
195 //
196 // Instead, the "steal count" is kept track of separately (not atomically
197 // because it's only used by receivers), and then the decrement() call when
198 // descheduling will lump in all of the recent steals into one large decrement.
199 //
200 // The implication of this is that if a sender sees a -1 count, then there's
201 // guaranteed to be a waiter waiting!
202 //
203 // ## Native Implementation
204 //
205 // A major goal of these channels is to work seamlessly on and off the runtime.
206 // All of the previous race conditions have been worded in terms of
207 // scheduler-isms (which is obviously not available without the runtime).
208 //
209 // For now, native usage of channels (off the runtime) will fall back onto
210 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
211 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
212 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
213 // condition variable.
214 //
215 // ## Select
216 //
217 // Being able to support selection over channels has greatly influenced this
218 // design, and not only does selection need to work inside the runtime, but also
219 // outside the runtime.
220 //
221 // The implementation is fairly straightforward. The goal of select() is not to
222 // return some data, but only to return which channel can receive data without
223 // blocking. The implementation is essentially the entire blocking procedure
224 // followed by an increment as soon as its woken up. The cancellation procedure
225 // involves an increment and swapping out of to_wake to acquire ownership of the
226 // task to unblock.
227 //
228 // Sadly this current implementation requires multiple allocations, so I have
229 // seen the throughput of select() be much worse than it should be. I do not
230 // believe that there is anything fundamental which needs to change about these
231 // channels, however, in order to support a more efficient select().
232 //
233 // # Conclusion
234 //
235 // And now that you've seen all the races that I found and attempted to fix,
236 // here's the code for you to find some more!
237
238 use cast;
239 use cell::Cell;
240 use clone::Clone;
241 use iter::Iterator;
242 use kinds::Send;
243 use kinds::marker;
244 use mem;
245 use ops::Drop;
246 use option::{Some, None, Option};
247 use result::{Ok, Err, Result};
248 use rt::local::Local;
249 use rt::task::{Task, BlockedTask};
250 use sync::arc::UnsafeArc;
251
252 pub use comm::select::{Select, Handle};
253
254 macro_rules! test (
255     { fn $name:ident() $b:block $($a:attr)*} => (
256         mod $name {
257             #[allow(unused_imports)];
258
259             use native;
260             use comm::*;
261             use prelude::*;
262             use super::*;
263             use super::super::*;
264             use task;
265
266             fn f() $b
267
268             $($a)* #[test] fn uv() { f() }
269             $($a)* #[test] fn native() {
270                 use native;
271                 let (tx, rx) = channel();
272                 native::task::spawn(proc() { tx.send(f()) });
273                 rx.recv();
274             }
275         }
276     )
277 )
278
279 mod select;
280 mod oneshot;
281 mod stream;
282 mod shared;
283 mod sync;
284
285 // Use a power of 2 to allow LLVM to optimize to something that's not a
286 // division, this is hit pretty regularly.
287 static RESCHED_FREQ: int = 256;
288
289 /// The receiving-half of Rust's channel type. This half can only be owned by
290 /// one task
291 pub struct Receiver<T> {
292     priv inner: Flavor<T>,
293     priv receives: Cell<uint>,
294     // can't share in an arc
295     priv marker: marker::NoShare,
296 }
297
298 /// An iterator over messages on a receiver, this iterator will block
299 /// whenever `next` is called, waiting for a new message, and `None` will be
300 /// returned when the corresponding channel has hung up.
301 pub struct Messages<'a, T> {
302     priv rx: &'a Receiver<T>
303 }
304
305 /// The sending-half of Rust's asynchronous channel type. This half can only be
306 /// owned by one task, but it can be cloned to send to other tasks.
307 pub struct Sender<T> {
308     priv inner: Flavor<T>,
309     priv sends: Cell<uint>,
310     // can't share in an arc
311     priv marker: marker::NoShare,
312 }
313
314 /// The sending-half of Rust's synchronous channel type. This half can only be
315 /// owned by one task, but it can be cloned to send to other tasks.
316 pub struct SyncSender<T> {
317     priv inner: UnsafeArc<sync::Packet<T>>,
318     // can't share in an arc
319     priv marker: marker::NoShare,
320 }
321
322 /// This enumeration is the list of the possible reasons that try_recv could not
323 /// return data when called.
324 #[deriving(Eq, Clone, Show)]
325 pub enum TryRecvResult<T> {
326     /// This channel is currently empty, but the sender(s) have not yet
327     /// disconnected, so data may yet become available.
328     Empty,
329     /// This channel's sending half has become disconnected, and there will
330     /// never be any more data received on this channel
331     Disconnected,
332     /// The channel had some data and we successfully popped it
333     Data(T),
334 }
335
336 /// This enumeration is the list of the possible outcomes for the
337 /// `SyncSender::try_send` method.
338 #[deriving(Eq, Clone, Show)]
339 pub enum TrySendResult<T> {
340     /// The data was successfully sent along the channel. This either means that
341     /// it was buffered in the channel, or handed off to a receiver. In either
342     /// case, the callee no longer has ownership of the data.
343     Sent,
344     /// The data could not be sent on the channel because it would require that
345     /// the callee block to send the data.
346     ///
347     /// If this is a buffered channel, then the buffer is full at this time. If
348     /// this is not a buffered channel, then there is no receiver available to
349     /// acquire the data.
350     Full(T),
351     /// This channel's receiving half has disconnected, so the data could not be
352     /// sent. The data is returned back to the callee in this case.
353     RecvDisconnected(T),
354 }
355
356 enum Flavor<T> {
357     Oneshot(UnsafeArc<oneshot::Packet<T>>),
358     Stream(UnsafeArc<stream::Packet<T>>),
359     Shared(UnsafeArc<shared::Packet<T>>),
360     Sync(UnsafeArc<sync::Packet<T>>),
361 }
362
363 /// Creates a new channel, returning the sender/receiver halves. All data sent
364 /// on the sender will become available on the receiver. See the documentation
365 /// of `Receiver` and `Sender` to see what's possible with them.
366 pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
367     let (a, b) = UnsafeArc::new2(oneshot::Packet::new());
368     (Sender::my_new(Oneshot(b)), Receiver::my_new(Oneshot(a)))
369 }
370
371 /// Creates a new synchronous, bounded channel.
372 ///
373 /// Like asynchronous channels, the `Receiver` will block until a message
374 /// becomes available. These channels differ greatly in the semantics of the
375 /// sender from asynchronous channels, however.
376 ///
377 /// This channel has an internal buffer on which messages will be queued. When
378 /// the internal buffer becomes full, future sends will *block* waiting for the
379 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
380 /// becomes  "rendezvous channel" where each send will not return until a recv
381 /// is paired with it.
382 ///
383 /// As with asynchronous channels, all senders will fail in `send` if the
384 /// `Receiver` has been destroyed.
385 ///
386 /// # Example
387 ///
388 /// ```
389 /// let (tx, rx) = sync_channel(1);
390 ///
391 /// // this returns immediately
392 /// tx.send(1);
393 ///
394 /// spawn(proc() {
395 ///     // this will block until the previous message has been received
396 ///     tx.send(2);
397 /// });
398 ///
399 /// assert_eq!(rx.recv(), 1);
400 /// assert_eq!(rx.recv(), 2);
401 /// ```
402 pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
403     let (a, b) = UnsafeArc::new2(sync::Packet::new(bound));
404     (SyncSender::new(a), Receiver::my_new(Sync(b)))
405 }
406
407 ////////////////////////////////////////////////////////////////////////////////
408 // Sender
409 ////////////////////////////////////////////////////////////////////////////////
410
411 impl<T: Send> Sender<T> {
412     fn my_new(inner: Flavor<T>) -> Sender<T> {
413         Sender { inner: inner, sends: Cell::new(0), marker: marker::NoShare }
414     }
415
416     /// Sends a value along this channel to be received by the corresponding
417     /// receiver.
418     ///
419     /// Rust channels are infinitely buffered so this method will never block.
420     ///
421     /// # Failure
422     ///
423     /// This function will fail if the other end of the channel has hung up.
424     /// This means that if the corresponding receiver has fallen out of scope,
425     /// this function will trigger a fail message saying that a message is
426     /// being sent on a closed channel.
427     ///
428     /// Note that if this function does *not* fail, it does not mean that the
429     /// data will be successfully received. All sends are placed into a queue,
430     /// so it is possible for a send to succeed (the other end is alive), but
431     /// then the other end could immediately disconnect.
432     ///
433     /// The purpose of this functionality is to propagate failure among tasks.
434     /// If failure is not desired, then consider using the `try_send` method
435     pub fn send(&self, t: T) {
436         if !self.try_send(t) {
437             fail!("sending on a closed channel");
438         }
439     }
440
441     /// Attempts to send a value on this channel, returning whether it was
442     /// successfully sent.
443     ///
444     /// A successful send occurs when it is determined that the other end of
445     /// the channel has not hung up already. An unsuccessful send would be one
446     /// where the corresponding receiver has already been deallocated. Note
447     /// that a return value of `false` means that the data will never be
448     /// received, but a return value of `true` does *not* mean that the data
449     /// will be received.  It is possible for the corresponding receiver to
450     /// hang up immediately after this function returns `true`.
451     ///
452     /// Like `send`, this method will never block. If the failure of send cannot
453     /// be tolerated, then this method should be used instead.
454     pub fn try_send(&self, t: T) -> bool {
455         // In order to prevent starvation of other tasks in situations where
456         // a task sends repeatedly without ever receiving, we occassionally
457         // yield instead of doing a send immediately.
458         //
459         // Don't unconditionally attempt to yield because the TLS overhead can
460         // be a bit much, and also use `try_take` instead of `take` because
461         // there's no reason that this send shouldn't be usable off the
462         // runtime.
463         let cnt = self.sends.get() + 1;
464         self.sends.set(cnt);
465         if cnt % (RESCHED_FREQ as uint) == 0 {
466             let task: Option<~Task> = Local::try_take();
467             task.map(|t| t.maybe_yield());
468         }
469
470         let (new_inner, ret) = match self.inner {
471             Oneshot(ref p) => {
472                 let p = p.get();
473                 unsafe {
474                     if !(*p).sent() {
475                         return (*p).send(t);
476                     } else {
477                         let (a, b) = UnsafeArc::new2(stream::Packet::new());
478                         match (*p).upgrade(Receiver::my_new(Stream(b))) {
479                             oneshot::UpSuccess => {
480                                 (*a.get()).send(t);
481                                 (a, true)
482                             }
483                             oneshot::UpDisconnected => (a, false),
484                             oneshot::UpWoke(task) => {
485                                 (*a.get()).send(t);
486                                 task.wake().map(|t| t.reawaken());
487                                 (a, true)
488                             }
489                         }
490                     }
491                 }
492             }
493             Stream(ref p) => return unsafe { (*p.get()).send(t) },
494             Shared(ref p) => return unsafe { (*p.get()).send(t) },
495             Sync(..) => unreachable!(),
496         };
497
498         unsafe {
499             let mut tmp = Sender::my_new(Stream(new_inner));
500             mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
501         }
502         return ret;
503     }
504 }
505
506 impl<T: Send> Clone for Sender<T> {
507     fn clone(&self) -> Sender<T> {
508         let (packet, sleeper) = match self.inner {
509             Oneshot(ref p) => {
510                 let (a, b) = UnsafeArc::new2(shared::Packet::new());
511                 match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } {
512                     oneshot::UpSuccess | oneshot::UpDisconnected => (b, None),
513                     oneshot::UpWoke(task) => (b, Some(task))
514                 }
515             }
516             Stream(ref p) => {
517                 let (a, b) = UnsafeArc::new2(shared::Packet::new());
518                 match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } {
519                     stream::UpSuccess | stream::UpDisconnected => (b, None),
520                     stream::UpWoke(task) => (b, Some(task)),
521                 }
522             }
523             Shared(ref p) => {
524                 unsafe { (*p.get()).clone_chan(); }
525                 return Sender::my_new(Shared(p.clone()));
526             }
527             Sync(..) => unreachable!(),
528         };
529
530         unsafe {
531             (*packet.get()).inherit_blocker(sleeper);
532
533             let mut tmp = Sender::my_new(Shared(packet.clone()));
534             mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
535         }
536         Sender::my_new(Shared(packet))
537     }
538 }
539
540 #[unsafe_destructor]
541 impl<T: Send> Drop for Sender<T> {
542     fn drop(&mut self) {
543         match self.inner {
544             Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
545             Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
546             Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
547             Sync(..) => unreachable!(),
548         }
549     }
550 }
551
552 ////////////////////////////////////////////////////////////////////////////////
553 // SyncSender
554 ////////////////////////////////////////////////////////////////////////////////
555
556 impl<T: Send> SyncSender<T> {
557     fn new(inner: UnsafeArc<sync::Packet<T>>) -> SyncSender<T> {
558         SyncSender { inner: inner, marker: marker::NoShare }
559     }
560
561     /// Sends a value on this synchronous channel.
562     ///
563     /// This function will *block* until space in the internal buffer becomes
564     /// available or a receiver is available to hand off the message to.
565     ///
566     /// Note that a successful send does *not* guarantee that the receiver will
567     /// ever see the data if there is a buffer on this channel. Messages may be
568     /// enqueued in the internal buffer for the receiver to receive at a later
569     /// time. If the buffer size is 0, however, it can be guaranteed that the
570     /// receiver has indeed received the data if this function returns success.
571     ///
572     /// # Failure
573     ///
574     /// Similarly to `Sender::send`, this function will fail if the
575     /// corresponding `Receiver` for this channel has disconnected. This
576     /// behavior is used to propagate failure among tasks.
577     ///
578     /// If failure is not desired, you can achieve the same semantics with the
579     /// `SyncSender::send_opt` method which will not fail if the receiver
580     /// disconnects.
581     pub fn send(&self, t: T) {
582         if self.send_opt(t).is_some() {
583             fail!("sending on a closed channel");
584         }
585     }
586
587     /// Send a value on a channel, returning it back if the receiver
588     /// disconnected
589     ///
590     /// This method will *block* to send the value `t` on the channel, but if
591     /// the value could not be sent due to the receiver disconnecting, the value
592     /// is returned back to the callee. This function is similar to `try_send`,
593     /// except that it will block if the channel is currently full.
594     ///
595     /// # Failure
596     ///
597     /// This function cannot fail.
598     pub fn send_opt(&self, t: T) -> Option<T> {
599         match unsafe { (*self.inner.get()).send(t) } {
600             Ok(()) => None,
601             Err(t) => Some(t),
602         }
603     }
604
605     /// Attempts to send a value on this channel without blocking.
606     ///
607     /// This method semantically differs from `Sender::try_send` because it can
608     /// fail if the receiver has not disconnected yet. If the buffer on this
609     /// channel is full, this function will immediately return the data back to
610     /// the callee.
611     ///
612     /// See `SyncSender::send` for notes about guarantees of whether the
613     /// receiver has received the data or not if this function is successful.
614     ///
615     /// # Failure
616     ///
617     /// This function cannot fail
618     pub fn try_send(&self, t: T) -> TrySendResult<T> {
619         unsafe { (*self.inner.get()).try_send(t) }
620     }
621 }
622
623 impl<T: Send> Clone for SyncSender<T> {
624     fn clone(&self) -> SyncSender<T> {
625         unsafe { (*self.inner.get()).clone_chan(); }
626         return SyncSender::new(self.inner.clone());
627     }
628 }
629
630 #[unsafe_destructor]
631 impl<T: Send> Drop for SyncSender<T> {
632     fn drop(&mut self) {
633         unsafe { (*self.inner.get()).drop_chan(); }
634     }
635 }
636
637 ////////////////////////////////////////////////////////////////////////////////
638 // Receiver
639 ////////////////////////////////////////////////////////////////////////////////
640
641 impl<T: Send> Receiver<T> {
642     fn my_new(inner: Flavor<T>) -> Receiver<T> {
643         Receiver { inner: inner, receives: Cell::new(0), marker: marker::NoShare }
644     }
645
646     /// Blocks waiting for a value on this receiver
647     ///
648     /// This function will block if necessary to wait for a corresponding send
649     /// on the channel from its paired `Sender` structure. This receiver will
650     /// be woken up when data is ready, and the data will be returned.
651     ///
652     /// # Failure
653     ///
654     /// Similar to channels, this method will trigger a task failure if the
655     /// other end of the channel has hung up (been deallocated). The purpose of
656     /// this is to propagate failure among tasks.
657     ///
658     /// If failure is not desired, then there are two options:
659     ///
660     /// * If blocking is still desired, the `recv_opt` method will return `None`
661     ///   when the other end hangs up
662     ///
663     /// * If blocking is not desired, then the `try_recv` method will attempt to
664     ///   peek at a value on this receiver.
665     pub fn recv(&self) -> T {
666         match self.recv_opt() {
667             Some(t) => t,
668             None => fail!("receiving on a closed channel"),
669         }
670     }
671
672     /// Attempts to return a pending value on this receiver without blocking
673     ///
674     /// This method will never block the caller in order to wait for data to
675     /// become available. Instead, this will always return immediately with a
676     /// possible option of pending data on the channel.
677     ///
678     /// This is useful for a flavor of "optimistic check" before deciding to
679     /// block on a receiver.
680     ///
681     /// This function cannot fail.
682     pub fn try_recv(&self) -> TryRecvResult<T> {
683         // If a thread is spinning in try_recv, we should take the opportunity
684         // to reschedule things occasionally. See notes above in scheduling on
685         // sends for why this doesn't always hit TLS, and also for why this uses
686         // `try_take` instead of `take`.
687         let cnt = self.receives.get() + 1;
688         self.receives.set(cnt);
689         if cnt % (RESCHED_FREQ as uint) == 0 {
690             let task: Option<~Task> = Local::try_take();
691             task.map(|t| t.maybe_yield());
692         }
693
694         loop {
695             let mut new_port = match self.inner {
696                 Oneshot(ref p) => {
697                     match unsafe { (*p.get()).try_recv() } {
698                         Ok(t) => return Data(t),
699                         Err(oneshot::Empty) => return Empty,
700                         Err(oneshot::Disconnected) => return Disconnected,
701                         Err(oneshot::Upgraded(rx)) => rx,
702                     }
703                 }
704                 Stream(ref p) => {
705                     match unsafe { (*p.get()).try_recv() } {
706                         Ok(t) => return Data(t),
707                         Err(stream::Empty) => return Empty,
708                         Err(stream::Disconnected) => return Disconnected,
709                         Err(stream::Upgraded(rx)) => rx,
710                     }
711                 }
712                 Shared(ref p) => {
713                     match unsafe { (*p.get()).try_recv() } {
714                         Ok(t) => return Data(t),
715                         Err(shared::Empty) => return Empty,
716                         Err(shared::Disconnected) => return Disconnected,
717                     }
718                 }
719                 Sync(ref p) => {
720                     match unsafe { (*p.get()).try_recv() } {
721                         Ok(t) => return Data(t),
722                         Err(sync::Empty) => return Empty,
723                         Err(sync::Disconnected) => return Disconnected,
724                     }
725                 }
726             };
727             unsafe {
728                 mem::swap(&mut cast::transmute_mut(self).inner,
729                           &mut new_port.inner);
730             }
731         }
732     }
733
734     /// Attempt to wait for a value on this receiver, but does not fail if the
735     /// corresponding channel has hung up.
736     ///
737     /// This implementation of iterators for ports will always block if there is
738     /// not data available on the receiver, but it will not fail in the case
739     /// that the channel has been deallocated.
740     ///
741     /// In other words, this function has the same semantics as the `recv`
742     /// method except for the failure aspect.
743     ///
744     /// If the channel has hung up, then `None` is returned. Otherwise `Some` of
745     /// the value found on the receiver is returned.
746     pub fn recv_opt(&self) -> Option<T> {
747         loop {
748             let mut new_port = match self.inner {
749                 Oneshot(ref p) => {
750                     match unsafe { (*p.get()).recv() } {
751                         Ok(t) => return Some(t),
752                         Err(oneshot::Empty) => return unreachable!(),
753                         Err(oneshot::Disconnected) => return None,
754                         Err(oneshot::Upgraded(rx)) => rx,
755                     }
756                 }
757                 Stream(ref p) => {
758                     match unsafe { (*p.get()).recv() } {
759                         Ok(t) => return Some(t),
760                         Err(stream::Empty) => return unreachable!(),
761                         Err(stream::Disconnected) => return None,
762                         Err(stream::Upgraded(rx)) => rx,
763                     }
764                 }
765                 Shared(ref p) => {
766                     match unsafe { (*p.get()).recv() } {
767                         Ok(t) => return Some(t),
768                         Err(shared::Empty) => return unreachable!(),
769                         Err(shared::Disconnected) => return None,
770                     }
771                 }
772                 Sync(ref p) => return unsafe { (*p.get()).recv() }
773             };
774             unsafe {
775                 mem::swap(&mut cast::transmute_mut(self).inner,
776                           &mut new_port.inner);
777             }
778         }
779     }
780
781     /// Returns an iterator which will block waiting for messages, but never
782     /// `fail!`. It will return `None` when the channel has hung up.
783     pub fn iter<'a>(&'a self) -> Messages<'a, T> {
784         Messages { rx: self }
785     }
786 }
787
788 impl<T: Send> select::Packet for Receiver<T> {
789     fn can_recv(&self) -> bool {
790         loop {
791             let mut new_port = match self.inner {
792                 Oneshot(ref p) => {
793                     match unsafe { (*p.get()).can_recv() } {
794                         Ok(ret) => return ret,
795                         Err(upgrade) => upgrade,
796                     }
797                 }
798                 Stream(ref p) => {
799                     match unsafe { (*p.get()).can_recv() } {
800                         Ok(ret) => return ret,
801                         Err(upgrade) => upgrade,
802                     }
803                 }
804                 Shared(ref p) => {
805                     return unsafe { (*p.get()).can_recv() };
806                 }
807                 Sync(ref p) => {
808                     return unsafe { (*p.get()).can_recv() };
809                 }
810             };
811             unsafe {
812                 mem::swap(&mut cast::transmute_mut(self).inner,
813                           &mut new_port.inner);
814             }
815         }
816     }
817
818     fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{
819         loop {
820             let (t, mut new_port) = match self.inner {
821                 Oneshot(ref p) => {
822                     match unsafe { (*p.get()).start_selection(task) } {
823                         oneshot::SelSuccess => return Ok(()),
824                         oneshot::SelCanceled(task) => return Err(task),
825                         oneshot::SelUpgraded(t, rx) => (t, rx),
826                     }
827                 }
828                 Stream(ref p) => {
829                     match unsafe { (*p.get()).start_selection(task) } {
830                         stream::SelSuccess => return Ok(()),
831                         stream::SelCanceled(task) => return Err(task),
832                         stream::SelUpgraded(t, rx) => (t, rx),
833                     }
834                 }
835                 Shared(ref p) => {
836                     return unsafe { (*p.get()).start_selection(task) };
837                 }
838                 Sync(ref p) => {
839                     return unsafe { (*p.get()).start_selection(task) };
840                 }
841             };
842             task = t;
843             unsafe {
844                 mem::swap(&mut cast::transmute_mut(self).inner,
845                           &mut new_port.inner);
846             }
847         }
848     }
849
850     fn abort_selection(&self) -> bool {
851         let mut was_upgrade = false;
852         loop {
853             let result = match self.inner {
854                 Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
855                 Stream(ref p) => unsafe {
856                     (*p.get()).abort_selection(was_upgrade)
857                 },
858                 Shared(ref p) => return unsafe {
859                     (*p.get()).abort_selection(was_upgrade)
860                 },
861                 Sync(ref p) => return unsafe {
862                     (*p.get()).abort_selection()
863                 },
864             };
865             let mut new_port = match result { Ok(b) => return b, Err(p) => p };
866             was_upgrade = true;
867             unsafe {
868                 mem::swap(&mut cast::transmute_mut(self).inner,
869                           &mut new_port.inner);
870             }
871         }
872     }
873 }
874
875 impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
876     fn next(&mut self) -> Option<T> { self.rx.recv_opt() }
877 }
878
879 #[unsafe_destructor]
880 impl<T: Send> Drop for Receiver<T> {
881     fn drop(&mut self) {
882         match self.inner {
883             Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
884             Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
885             Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
886             Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
887         }
888     }
889 }
890
891 #[cfg(test)]
892 mod test {
893     use prelude::*;
894
895     use native;
896     use os;
897     use super::*;
898
899     pub fn stress_factor() -> uint {
900         match os::getenv("RUST_TEST_STRESS") {
901             Some(val) => from_str::<uint>(val).unwrap(),
902             None => 1,
903         }
904     }
905
906     test!(fn smoke() {
907         let (tx, rx) = channel();
908         tx.send(1);
909         assert_eq!(rx.recv(), 1);
910     })
911
912     test!(fn drop_full() {
913         let (tx, _rx) = channel();
914         tx.send(~1);
915     })
916
917     test!(fn drop_full_shared() {
918         let (tx, _rx) = channel();
919         drop(tx.clone());
920         drop(tx.clone());
921         tx.send(~1);
922     })
923
924     test!(fn smoke_shared() {
925         let (tx, rx) = channel();
926         tx.send(1);
927         assert_eq!(rx.recv(), 1);
928         let tx = tx.clone();
929         tx.send(1);
930         assert_eq!(rx.recv(), 1);
931     })
932
933     test!(fn smoke_threads() {
934         let (tx, rx) = channel();
935         spawn(proc() {
936             tx.send(1);
937         });
938         assert_eq!(rx.recv(), 1);
939     })
940
941     test!(fn smoke_port_gone() {
942         let (tx, rx) = channel();
943         drop(rx);
944         tx.send(1);
945     } #[should_fail])
946
947     test!(fn smoke_shared_port_gone() {
948         let (tx, rx) = channel();
949         drop(rx);
950         tx.send(1);
951     } #[should_fail])
952
953     test!(fn smoke_shared_port_gone2() {
954         let (tx, rx) = channel();
955         drop(rx);
956         let tx2 = tx.clone();
957         drop(tx);
958         tx2.send(1);
959     } #[should_fail])
960
961     test!(fn port_gone_concurrent() {
962         let (tx, rx) = channel();
963         spawn(proc() {
964             rx.recv();
965         });
966         loop { tx.send(1) }
967     } #[should_fail])
968
969     test!(fn port_gone_concurrent_shared() {
970         let (tx, rx) = channel();
971         let tx2 = tx.clone();
972         spawn(proc() {
973             rx.recv();
974         });
975         loop {
976             tx.send(1);
977             tx2.send(1);
978         }
979     } #[should_fail])
980
981     test!(fn smoke_chan_gone() {
982         let (tx, rx) = channel::<int>();
983         drop(tx);
984         rx.recv();
985     } #[should_fail])
986
987     test!(fn smoke_chan_gone_shared() {
988         let (tx, rx) = channel::<()>();
989         let tx2 = tx.clone();
990         drop(tx);
991         drop(tx2);
992         rx.recv();
993     } #[should_fail])
994
995     test!(fn chan_gone_concurrent() {
996         let (tx, rx) = channel();
997         spawn(proc() {
998             tx.send(1);
999             tx.send(1);
1000         });
1001         loop { rx.recv(); }
1002     } #[should_fail])
1003
1004     test!(fn stress() {
1005         let (tx, rx) = channel();
1006         spawn(proc() {
1007             for _ in range(0, 10000) { tx.send(1); }
1008         });
1009         for _ in range(0, 10000) {
1010             assert_eq!(rx.recv(), 1);
1011         }
1012     })
1013
1014     test!(fn stress_shared() {
1015         static AMT: uint = 10000;
1016         static NTHREADS: uint = 8;
1017         let (tx, rx) = channel::<int>();
1018         let (dtx, drx) = channel::<()>();
1019
1020         spawn(proc() {
1021             for _ in range(0, AMT * NTHREADS) {
1022                 assert_eq!(rx.recv(), 1);
1023             }
1024             match rx.try_recv() {
1025                 Data(..) => fail!(),
1026                 _ => {}
1027             }
1028             dtx.send(());
1029         });
1030
1031         for _ in range(0, NTHREADS) {
1032             let tx = tx.clone();
1033             spawn(proc() {
1034                 for _ in range(0, AMT) { tx.send(1); }
1035             });
1036         }
1037         drop(tx);
1038         drx.recv();
1039     })
1040
1041     #[test]
1042     fn send_from_outside_runtime() {
1043         let (tx1, rx1) = channel::<()>();
1044         let (tx2, rx2) = channel::<int>();
1045         let (tx3, rx3) = channel::<()>();
1046         let tx4 = tx3.clone();
1047         spawn(proc() {
1048             tx1.send(());
1049             for _ in range(0, 40) {
1050                 assert_eq!(rx2.recv(), 1);
1051             }
1052             tx3.send(());
1053         });
1054         rx1.recv();
1055         native::task::spawn(proc() {
1056             for _ in range(0, 40) {
1057                 tx2.send(1);
1058             }
1059             tx4.send(());
1060         });
1061         rx3.recv();
1062         rx3.recv();
1063     }
1064
1065     #[test]
1066     fn recv_from_outside_runtime() {
1067         let (tx, rx) = channel::<int>();
1068         let (dtx, drx) = channel();
1069         native::task::spawn(proc() {
1070             for _ in range(0, 40) {
1071                 assert_eq!(rx.recv(), 1);
1072             }
1073             dtx.send(());
1074         });
1075         for _ in range(0, 40) {
1076             tx.send(1);
1077         }
1078         drx.recv();
1079     }
1080
1081     #[test]
1082     fn no_runtime() {
1083         let (tx1, rx1) = channel::<int>();
1084         let (tx2, rx2) = channel::<int>();
1085         let (tx3, rx3) = channel::<()>();
1086         let tx4 = tx3.clone();
1087         native::task::spawn(proc() {
1088             assert_eq!(rx1.recv(), 1);
1089             tx2.send(2);
1090             tx4.send(());
1091         });
1092         native::task::spawn(proc() {
1093             tx1.send(1);
1094             assert_eq!(rx2.recv(), 2);
1095             tx3.send(());
1096         });
1097         rx3.recv();
1098         rx3.recv();
1099     }
1100
1101     test!(fn oneshot_single_thread_close_port_first() {
1102         // Simple test of closing without sending
1103         let (_tx, rx) = channel::<int>();
1104         drop(rx);
1105     })
1106
1107     test!(fn oneshot_single_thread_close_chan_first() {
1108         // Simple test of closing without sending
1109         let (tx, _rx) = channel::<int>();
1110         drop(tx);
1111     })
1112
1113     test!(fn oneshot_single_thread_send_port_close() {
1114         // Testing that the sender cleans up the payload if receiver is closed
1115         let (tx, rx) = channel::<~int>();
1116         drop(rx);
1117         tx.send(~0);
1118     } #[should_fail])
1119
1120     test!(fn oneshot_single_thread_recv_chan_close() {
1121         // Receiving on a closed chan will fail
1122         let res = task::try(proc() {
1123             let (tx, rx) = channel::<int>();
1124             drop(tx);
1125             rx.recv();
1126         });
1127         // What is our res?
1128         assert!(res.is_err());
1129     })
1130
1131     test!(fn oneshot_single_thread_send_then_recv() {
1132         let (tx, rx) = channel::<~int>();
1133         tx.send(~10);
1134         assert!(rx.recv() == ~10);
1135     })
1136
1137     test!(fn oneshot_single_thread_try_send_open() {
1138         let (tx, rx) = channel::<int>();
1139         assert!(tx.try_send(10));
1140         assert!(rx.recv() == 10);
1141     })
1142
1143     test!(fn oneshot_single_thread_try_send_closed() {
1144         let (tx, rx) = channel::<int>();
1145         drop(rx);
1146         assert!(!tx.try_send(10));
1147     })
1148
1149     test!(fn oneshot_single_thread_try_recv_open() {
1150         let (tx, rx) = channel::<int>();
1151         tx.send(10);
1152         assert!(rx.recv_opt() == Some(10));
1153     })
1154
1155     test!(fn oneshot_single_thread_try_recv_closed() {
1156         let (tx, rx) = channel::<int>();
1157         drop(tx);
1158         assert!(rx.recv_opt() == None);
1159     })
1160
1161     test!(fn oneshot_single_thread_peek_data() {
1162         let (tx, rx) = channel::<int>();
1163         assert_eq!(rx.try_recv(), Empty)
1164         tx.send(10);
1165         assert_eq!(rx.try_recv(), Data(10));
1166     })
1167
1168     test!(fn oneshot_single_thread_peek_close() {
1169         let (tx, rx) = channel::<int>();
1170         drop(tx);
1171         assert_eq!(rx.try_recv(), Disconnected);
1172         assert_eq!(rx.try_recv(), Disconnected);
1173     })
1174
1175     test!(fn oneshot_single_thread_peek_open() {
1176         let (_tx, rx) = channel::<int>();
1177         assert_eq!(rx.try_recv(), Empty);
1178     })
1179
1180     test!(fn oneshot_multi_task_recv_then_send() {
1181         let (tx, rx) = channel::<~int>();
1182         spawn(proc() {
1183             assert!(rx.recv() == ~10);
1184         });
1185
1186         tx.send(~10);
1187     })
1188
1189     test!(fn oneshot_multi_task_recv_then_close() {
1190         let (tx, rx) = channel::<~int>();
1191         spawn(proc() {
1192             drop(tx);
1193         });
1194         let res = task::try(proc() {
1195             assert!(rx.recv() == ~10);
1196         });
1197         assert!(res.is_err());
1198     })
1199
1200     test!(fn oneshot_multi_thread_close_stress() {
1201         for _ in range(0, stress_factor()) {
1202             let (tx, rx) = channel::<int>();
1203             spawn(proc() {
1204                 drop(rx);
1205             });
1206             drop(tx);
1207         }
1208     })
1209
1210     test!(fn oneshot_multi_thread_send_close_stress() {
1211         for _ in range(0, stress_factor()) {
1212             let (tx, rx) = channel::<int>();
1213             spawn(proc() {
1214                 drop(rx);
1215             });
1216             let _ = task::try(proc() {
1217                 tx.send(1);
1218             });
1219         }
1220     })
1221
1222     test!(fn oneshot_multi_thread_recv_close_stress() {
1223         for _ in range(0, stress_factor()) {
1224             let (tx, rx) = channel::<int>();
1225             spawn(proc() {
1226                 let res = task::try(proc() {
1227                     rx.recv();
1228                 });
1229                 assert!(res.is_err());
1230             });
1231             spawn(proc() {
1232                 spawn(proc() {
1233                     drop(tx);
1234                 });
1235             });
1236         }
1237     })
1238
1239     test!(fn oneshot_multi_thread_send_recv_stress() {
1240         for _ in range(0, stress_factor()) {
1241             let (tx, rx) = channel();
1242             spawn(proc() {
1243                 tx.send(~10);
1244             });
1245             spawn(proc() {
1246                 assert!(rx.recv() == ~10);
1247             });
1248         }
1249     })
1250
1251     test!(fn stream_send_recv_stress() {
1252         for _ in range(0, stress_factor()) {
1253             let (tx, rx) = channel();
1254
1255             send(tx, 0);
1256             recv(rx, 0);
1257
1258             fn send(tx: Sender<~int>, i: int) {
1259                 if i == 10 { return }
1260
1261                 spawn(proc() {
1262                     tx.send(~i);
1263                     send(tx, i + 1);
1264                 });
1265             }
1266
1267             fn recv(rx: Receiver<~int>, i: int) {
1268                 if i == 10 { return }
1269
1270                 spawn(proc() {
1271                     assert!(rx.recv() == ~i);
1272                     recv(rx, i + 1);
1273                 });
1274             }
1275         }
1276     })
1277
1278     test!(fn recv_a_lot() {
1279         // Regression test that we don't run out of stack in scheduler context
1280         let (tx, rx) = channel();
1281         for _ in range(0, 10000) { tx.send(()); }
1282         for _ in range(0, 10000) { rx.recv(); }
1283     })
1284
1285     test!(fn shared_chan_stress() {
1286         let (tx, rx) = channel();
1287         let total = stress_factor() + 100;
1288         for _ in range(0, total) {
1289             let tx = tx.clone();
1290             spawn(proc() {
1291                 tx.send(());
1292             });
1293         }
1294
1295         for _ in range(0, total) {
1296             rx.recv();
1297         }
1298     })
1299
1300     test!(fn test_nested_recv_iter() {
1301         let (tx, rx) = channel::<int>();
1302         let (total_tx, total_rx) = channel::<int>();
1303
1304         spawn(proc() {
1305             let mut acc = 0;
1306             for x in rx.iter() {
1307                 acc += x;
1308             }
1309             total_tx.send(acc);
1310         });
1311
1312         tx.send(3);
1313         tx.send(1);
1314         tx.send(2);
1315         drop(tx);
1316         assert_eq!(total_rx.recv(), 6);
1317     })
1318
1319     test!(fn test_recv_iter_break() {
1320         let (tx, rx) = channel::<int>();
1321         let (count_tx, count_rx) = channel();
1322
1323         spawn(proc() {
1324             let mut count = 0;
1325             for x in rx.iter() {
1326                 if count >= 3 {
1327                     break;
1328                 } else {
1329                     count += x;
1330                 }
1331             }
1332             count_tx.send(count);
1333         });
1334
1335         tx.send(2);
1336         tx.send(2);
1337         tx.send(2);
1338         tx.try_send(2);
1339         drop(tx);
1340         assert_eq!(count_rx.recv(), 4);
1341     })
1342
1343     test!(fn try_recv_states() {
1344         let (tx1, rx1) = channel::<int>();
1345         let (tx2, rx2) = channel::<()>();
1346         let (tx3, rx3) = channel::<()>();
1347         spawn(proc() {
1348             rx2.recv();
1349             tx1.send(1);
1350             tx3.send(());
1351             rx2.recv();
1352             drop(tx1);
1353             tx3.send(());
1354         });
1355
1356         assert_eq!(rx1.try_recv(), Empty);
1357         tx2.send(());
1358         rx3.recv();
1359         assert_eq!(rx1.try_recv(), Data(1));
1360         assert_eq!(rx1.try_recv(), Empty);
1361         tx2.send(());
1362         rx3.recv();
1363         assert_eq!(rx1.try_recv(), Disconnected);
1364     })
1365
1366     // This bug used to end up in a livelock inside of the Receiver destructor
1367     // because the internal state of the Shared packet was corrupted
1368     test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
1369         let (tx, rx) = channel();
1370         let (tx2, rx2) = channel();
1371         spawn(proc() {
1372             rx.recv(); // wait on a oneshot
1373             drop(rx);  // destroy a shared
1374             tx2.send(());
1375         });
1376         // make sure the other task has gone to sleep
1377         for _ in range(0, 5000) { task::deschedule(); }
1378
1379         // upgrade to a shared chan and send a message
1380         let t = tx.clone();
1381         drop(tx);
1382         t.send(());
1383
1384         // wait for the child task to exit before we exit
1385         rx2.recv();
1386     })
1387
1388     test!(fn sends_off_the_runtime() {
1389         use rt::thread::Thread;
1390
1391         let (tx, rx) = channel();
1392         let t = Thread::start(proc() {
1393             for _ in range(0, 1000) {
1394                 tx.send(());
1395             }
1396         });
1397         for _ in range(0, 1000) {
1398             rx.recv();
1399         }
1400         t.join();
1401     })
1402
1403     test!(fn try_recvs_off_the_runtime() {
1404         use rt::thread::Thread;
1405
1406         let (tx, rx) = channel();
1407         let (cdone, pdone) = channel();
1408         let t = Thread::start(proc() {
1409             let mut hits = 0;
1410             while hits < 10 {
1411                 match rx.try_recv() {
1412                     Data(()) => { hits += 1; }
1413                     Empty => { Thread::yield_now(); }
1414                     Disconnected => return,
1415                 }
1416             }
1417             cdone.send(());
1418         });
1419         for _ in range(0, 10) {
1420             tx.send(());
1421         }
1422         t.join();
1423         pdone.recv();
1424     })
1425 }
1426
1427 #[cfg(test)]
1428 mod sync_tests {
1429     use prelude::*;
1430     use os;
1431
1432     pub fn stress_factor() -> uint {
1433         match os::getenv("RUST_TEST_STRESS") {
1434             Some(val) => from_str::<uint>(val).unwrap(),
1435             None => 1,
1436         }
1437     }
1438
1439     test!(fn smoke() {
1440         let (tx, rx) = sync_channel(1);
1441         tx.send(1);
1442         assert_eq!(rx.recv(), 1);
1443     })
1444
1445     test!(fn drop_full() {
1446         let (tx, _rx) = sync_channel(1);
1447         tx.send(~1);
1448     })
1449
1450     test!(fn smoke_shared() {
1451         let (tx, rx) = sync_channel(1);
1452         tx.send(1);
1453         assert_eq!(rx.recv(), 1);
1454         let tx = tx.clone();
1455         tx.send(1);
1456         assert_eq!(rx.recv(), 1);
1457     })
1458
1459     test!(fn smoke_threads() {
1460         let (tx, rx) = sync_channel(0);
1461         spawn(proc() {
1462             tx.send(1);
1463         });
1464         assert_eq!(rx.recv(), 1);
1465     })
1466
1467     test!(fn smoke_port_gone() {
1468         let (tx, rx) = sync_channel(0);
1469         drop(rx);
1470         tx.send(1);
1471     } #[should_fail])
1472
1473     test!(fn smoke_shared_port_gone2() {
1474         let (tx, rx) = sync_channel(0);
1475         drop(rx);
1476         let tx2 = tx.clone();
1477         drop(tx);
1478         tx2.send(1);
1479     } #[should_fail])
1480
1481     test!(fn port_gone_concurrent() {
1482         let (tx, rx) = sync_channel(0);
1483         spawn(proc() {
1484             rx.recv();
1485         });
1486         loop { tx.send(1) }
1487     } #[should_fail])
1488
1489     test!(fn port_gone_concurrent_shared() {
1490         let (tx, rx) = sync_channel(0);
1491         let tx2 = tx.clone();
1492         spawn(proc() {
1493             rx.recv();
1494         });
1495         loop {
1496             tx.send(1);
1497             tx2.send(1);
1498         }
1499     } #[should_fail])
1500
1501     test!(fn smoke_chan_gone() {
1502         let (tx, rx) = sync_channel::<int>(0);
1503         drop(tx);
1504         rx.recv();
1505     } #[should_fail])
1506
1507     test!(fn smoke_chan_gone_shared() {
1508         let (tx, rx) = sync_channel::<()>(0);
1509         let tx2 = tx.clone();
1510         drop(tx);
1511         drop(tx2);
1512         rx.recv();
1513     } #[should_fail])
1514
1515     test!(fn chan_gone_concurrent() {
1516         let (tx, rx) = sync_channel(0);
1517         spawn(proc() {
1518             tx.send(1);
1519             tx.send(1);
1520         });
1521         loop { rx.recv(); }
1522     } #[should_fail])
1523
1524     test!(fn stress() {
1525         let (tx, rx) = sync_channel(0);
1526         spawn(proc() {
1527             for _ in range(0, 10000) { tx.send(1); }
1528         });
1529         for _ in range(0, 10000) {
1530             assert_eq!(rx.recv(), 1);
1531         }
1532     })
1533
1534     test!(fn stress_shared() {
1535         static AMT: uint = 1000;
1536         static NTHREADS: uint = 8;
1537         let (tx, rx) = sync_channel::<int>(0);
1538         let (dtx, drx) = sync_channel::<()>(0);
1539
1540         spawn(proc() {
1541             for _ in range(0, AMT * NTHREADS) {
1542                 assert_eq!(rx.recv(), 1);
1543             }
1544             match rx.try_recv() {
1545                 Data(..) => fail!(),
1546                 _ => {}
1547             }
1548             dtx.send(());
1549         });
1550
1551         for _ in range(0, NTHREADS) {
1552             let tx = tx.clone();
1553             spawn(proc() {
1554                 for _ in range(0, AMT) { tx.send(1); }
1555             });
1556         }
1557         drop(tx);
1558         drx.recv();
1559     })
1560
1561     test!(fn oneshot_single_thread_close_port_first() {
1562         // Simple test of closing without sending
1563         let (_tx, rx) = sync_channel::<int>(0);
1564         drop(rx);
1565     })
1566
1567     test!(fn oneshot_single_thread_close_chan_first() {
1568         // Simple test of closing without sending
1569         let (tx, _rx) = sync_channel::<int>(0);
1570         drop(tx);
1571     })
1572
1573     test!(fn oneshot_single_thread_send_port_close() {
1574         // Testing that the sender cleans up the payload if receiver is closed
1575         let (tx, rx) = sync_channel::<~int>(0);
1576         drop(rx);
1577         tx.send(~0);
1578     } #[should_fail])
1579
1580     test!(fn oneshot_single_thread_recv_chan_close() {
1581         // Receiving on a closed chan will fail
1582         let res = task::try(proc() {
1583             let (tx, rx) = sync_channel::<int>(0);
1584             drop(tx);
1585             rx.recv();
1586         });
1587         // What is our res?
1588         assert!(res.is_err());
1589     })
1590
1591     test!(fn oneshot_single_thread_send_then_recv() {
1592         let (tx, rx) = sync_channel::<~int>(1);
1593         tx.send(~10);
1594         assert!(rx.recv() == ~10);
1595     })
1596
1597     test!(fn oneshot_single_thread_try_send_open() {
1598         let (tx, rx) = sync_channel::<int>(1);
1599         assert_eq!(tx.try_send(10), Sent);
1600         assert!(rx.recv() == 10);
1601     })
1602
1603     test!(fn oneshot_single_thread_try_send_closed() {
1604         let (tx, rx) = sync_channel::<int>(0);
1605         drop(rx);
1606         assert_eq!(tx.try_send(10), RecvDisconnected(10));
1607     })
1608
1609     test!(fn oneshot_single_thread_try_send_closed2() {
1610         let (tx, _rx) = sync_channel::<int>(0);
1611         assert_eq!(tx.try_send(10), Full(10));
1612     })
1613
1614     test!(fn oneshot_single_thread_try_recv_open() {
1615         let (tx, rx) = sync_channel::<int>(1);
1616         tx.send(10);
1617         assert!(rx.recv_opt() == Some(10));
1618     })
1619
1620     test!(fn oneshot_single_thread_try_recv_closed() {
1621         let (tx, rx) = sync_channel::<int>(0);
1622         drop(tx);
1623         assert!(rx.recv_opt() == None);
1624     })
1625
1626     test!(fn oneshot_single_thread_peek_data() {
1627         let (tx, rx) = sync_channel::<int>(1);
1628         assert_eq!(rx.try_recv(), Empty)
1629         tx.send(10);
1630         assert_eq!(rx.try_recv(), Data(10));
1631     })
1632
1633     test!(fn oneshot_single_thread_peek_close() {
1634         let (tx, rx) = sync_channel::<int>(0);
1635         drop(tx);
1636         assert_eq!(rx.try_recv(), Disconnected);
1637         assert_eq!(rx.try_recv(), Disconnected);
1638     })
1639
1640     test!(fn oneshot_single_thread_peek_open() {
1641         let (_tx, rx) = sync_channel::<int>(0);
1642         assert_eq!(rx.try_recv(), Empty);
1643     })
1644
1645     test!(fn oneshot_multi_task_recv_then_send() {
1646         let (tx, rx) = sync_channel::<~int>(0);
1647         spawn(proc() {
1648             assert!(rx.recv() == ~10);
1649         });
1650
1651         tx.send(~10);
1652     })
1653
1654     test!(fn oneshot_multi_task_recv_then_close() {
1655         let (tx, rx) = sync_channel::<~int>(0);
1656         spawn(proc() {
1657             drop(tx);
1658         });
1659         let res = task::try(proc() {
1660             assert!(rx.recv() == ~10);
1661         });
1662         assert!(res.is_err());
1663     })
1664
1665     test!(fn oneshot_multi_thread_close_stress() {
1666         for _ in range(0, stress_factor()) {
1667             let (tx, rx) = sync_channel::<int>(0);
1668             spawn(proc() {
1669                 drop(rx);
1670             });
1671             drop(tx);
1672         }
1673     })
1674
1675     test!(fn oneshot_multi_thread_send_close_stress() {
1676         for _ in range(0, stress_factor()) {
1677             let (tx, rx) = sync_channel::<int>(0);
1678             spawn(proc() {
1679                 drop(rx);
1680             });
1681             let _ = task::try(proc() {
1682                 tx.send(1);
1683             });
1684         }
1685     })
1686
1687     test!(fn oneshot_multi_thread_recv_close_stress() {
1688         for _ in range(0, stress_factor()) {
1689             let (tx, rx) = sync_channel::<int>(0);
1690             spawn(proc() {
1691                 let res = task::try(proc() {
1692                     rx.recv();
1693                 });
1694                 assert!(res.is_err());
1695             });
1696             spawn(proc() {
1697                 spawn(proc() {
1698                     drop(tx);
1699                 });
1700             });
1701         }
1702     })
1703
1704     test!(fn oneshot_multi_thread_send_recv_stress() {
1705         for _ in range(0, stress_factor()) {
1706             let (tx, rx) = sync_channel(0);
1707             spawn(proc() {
1708                 tx.send(~10);
1709             });
1710             spawn(proc() {
1711                 assert!(rx.recv() == ~10);
1712             });
1713         }
1714     })
1715
1716     test!(fn stream_send_recv_stress() {
1717         for _ in range(0, stress_factor()) {
1718             let (tx, rx) = sync_channel(0);
1719
1720             send(tx, 0);
1721             recv(rx, 0);
1722
1723             fn send(tx: SyncSender<~int>, i: int) {
1724                 if i == 10 { return }
1725
1726                 spawn(proc() {
1727                     tx.send(~i);
1728                     send(tx, i + 1);
1729                 });
1730             }
1731
1732             fn recv(rx: Receiver<~int>, i: int) {
1733                 if i == 10 { return }
1734
1735                 spawn(proc() {
1736                     assert!(rx.recv() == ~i);
1737                     recv(rx, i + 1);
1738                 });
1739             }
1740         }
1741     })
1742
1743     test!(fn recv_a_lot() {
1744         // Regression test that we don't run out of stack in scheduler context
1745         let (tx, rx) = sync_channel(10000);
1746         for _ in range(0, 10000) { tx.send(()); }
1747         for _ in range(0, 10000) { rx.recv(); }
1748     })
1749
1750     test!(fn shared_chan_stress() {
1751         let (tx, rx) = sync_channel(0);
1752         let total = stress_factor() + 100;
1753         for _ in range(0, total) {
1754             let tx = tx.clone();
1755             spawn(proc() {
1756                 tx.send(());
1757             });
1758         }
1759
1760         for _ in range(0, total) {
1761             rx.recv();
1762         }
1763     })
1764
1765     test!(fn test_nested_recv_iter() {
1766         let (tx, rx) = sync_channel::<int>(0);
1767         let (total_tx, total_rx) = sync_channel::<int>(0);
1768
1769         spawn(proc() {
1770             let mut acc = 0;
1771             for x in rx.iter() {
1772                 acc += x;
1773             }
1774             total_tx.send(acc);
1775         });
1776
1777         tx.send(3);
1778         tx.send(1);
1779         tx.send(2);
1780         drop(tx);
1781         assert_eq!(total_rx.recv(), 6);
1782     })
1783
1784     test!(fn test_recv_iter_break() {
1785         let (tx, rx) = sync_channel::<int>(0);
1786         let (count_tx, count_rx) = sync_channel(0);
1787
1788         spawn(proc() {
1789             let mut count = 0;
1790             for x in rx.iter() {
1791                 if count >= 3 {
1792                     break;
1793                 } else {
1794                     count += x;
1795                 }
1796             }
1797             count_tx.send(count);
1798         });
1799
1800         tx.send(2);
1801         tx.send(2);
1802         tx.send(2);
1803         tx.try_send(2);
1804         drop(tx);
1805         assert_eq!(count_rx.recv(), 4);
1806     })
1807
1808     test!(fn try_recv_states() {
1809         let (tx1, rx1) = sync_channel::<int>(1);
1810         let (tx2, rx2) = sync_channel::<()>(1);
1811         let (tx3, rx3) = sync_channel::<()>(1);
1812         spawn(proc() {
1813             rx2.recv();
1814             tx1.send(1);
1815             tx3.send(());
1816             rx2.recv();
1817             drop(tx1);
1818             tx3.send(());
1819         });
1820
1821         assert_eq!(rx1.try_recv(), Empty);
1822         tx2.send(());
1823         rx3.recv();
1824         assert_eq!(rx1.try_recv(), Data(1));
1825         assert_eq!(rx1.try_recv(), Empty);
1826         tx2.send(());
1827         rx3.recv();
1828         assert_eq!(rx1.try_recv(), Disconnected);
1829     })
1830
1831     // This bug used to end up in a livelock inside of the Receiver destructor
1832     // because the internal state of the Shared packet was corrupted
1833     test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
1834         let (tx, rx) = sync_channel(0);
1835         let (tx2, rx2) = sync_channel(0);
1836         spawn(proc() {
1837             rx.recv(); // wait on a oneshot
1838             drop(rx);  // destroy a shared
1839             tx2.send(());
1840         });
1841         // make sure the other task has gone to sleep
1842         for _ in range(0, 5000) { task::deschedule(); }
1843
1844         // upgrade to a shared chan and send a message
1845         let t = tx.clone();
1846         drop(tx);
1847         t.send(());
1848
1849         // wait for the child task to exit before we exit
1850         rx2.recv();
1851     })
1852
1853     test!(fn try_recvs_off_the_runtime() {
1854         use std::rt::thread::Thread;
1855
1856         let (tx, rx) = sync_channel(0);
1857         let (cdone, pdone) = channel();
1858         let t = Thread::start(proc() {
1859             let mut hits = 0;
1860             while hits < 10 {
1861                 match rx.try_recv() {
1862                     Data(()) => { hits += 1; }
1863                     Empty => { Thread::yield_now(); }
1864                     Disconnected => return,
1865                 }
1866             }
1867             cdone.send(());
1868         });
1869         for _ in range(0, 10) {
1870             tx.send(());
1871         }
1872         t.join();
1873         pdone.recv();
1874     })
1875
1876     test!(fn send_opt1() {
1877         let (tx, rx) = sync_channel(0);
1878         spawn(proc() { rx.recv(); });
1879         assert_eq!(tx.send_opt(1), None);
1880     })
1881
1882     test!(fn send_opt2() {
1883         let (tx, rx) = sync_channel(0);
1884         spawn(proc() { drop(rx); });
1885         assert_eq!(tx.send_opt(1), Some(1));
1886     })
1887
1888     test!(fn send_opt3() {
1889         let (tx, rx) = sync_channel(1);
1890         assert_eq!(tx.send_opt(1), None);
1891         spawn(proc() { drop(rx); });
1892         assert_eq!(tx.send_opt(1), Some(1));
1893     })
1894
1895     test!(fn send_opt4() {
1896         let (tx, rx) = sync_channel(0);
1897         let tx2 = tx.clone();
1898         let (done, donerx) = channel();
1899         let done2 = done.clone();
1900         spawn(proc() {
1901             assert_eq!(tx.send_opt(1), Some(1));
1902             done.send(());
1903         });
1904         spawn(proc() {
1905             assert_eq!(tx2.send_opt(2), Some(2));
1906             done2.send(());
1907         });
1908         drop(rx);
1909         donerx.recv();
1910         donerx.recv();
1911     })
1912
1913     test!(fn try_send1() {
1914         let (tx, _rx) = sync_channel(0);
1915         assert_eq!(tx.try_send(1), Full(1));
1916     })
1917
1918     test!(fn try_send2() {
1919         let (tx, _rx) = sync_channel(1);
1920         assert_eq!(tx.try_send(1), Sent);
1921         assert_eq!(tx.try_send(1), Full(1));
1922     })
1923
1924     test!(fn try_send3() {
1925         let (tx, rx) = sync_channel(1);
1926         assert_eq!(tx.try_send(1), Sent);
1927         drop(rx);
1928         assert_eq!(tx.try_send(1), RecvDisconnected(1));
1929     })
1930
1931     test!(fn try_send4() {
1932         let (tx, rx) = sync_channel(0);
1933         spawn(proc() {
1934             for _ in range(0, 1000) { task::deschedule(); }
1935             assert_eq!(tx.try_send(1), Sent);
1936         });
1937         assert_eq!(rx.recv(), 1);
1938     })
1939 }