]> git.lizzy.rs Git - rust.git/blob - src/libstd/comm/mod.rs
267140a0089bdfbbea30a81ecf2d30f605eb9d6f
[rust.git] / src / libstd / comm / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Communication primitives for concurrent tasks
12 //!
13 //! Rust makes it very difficult to share data among tasks to prevent race
14 //! conditions and to improve parallelism, but there is often a need for
15 //! communication between concurrent tasks. The primitives defined in this
16 //! module are the building blocks for synchronization in rust.
17 //!
18 //! This module provides message-based communication over channels, concretely
19 //! defined as two types:
20 //!
21 //! * `Sender`
22 //! * `Receiver`
23 //!
24 //! A `Sender` is used to send data to a `Receiver`. A `Sender` is clone-able
25 //! such that many tasks can send simultaneously to one receiver. These
26 //! channels are *task blocking*, not *thread blocking*. This means that if one
27 //! task is blocked on a channel, other tasks can continue to make progress.
28 //!
29 //! Rust channels can be used as if they have an infinite internal buffer. What
30 //! this means is that the `send` operation will never block. `Receiver`s, on
31 //! the other hand, will block the task if there is no data to be received.
32 //!
33 //! ## Failure Propagation
34 //!
35 //! In addition to being a core primitive for communicating in rust, channels
36 //! are the points at which failure is propagated among tasks.  Whenever the one
37 //! half of channel is closed, the other half will have its next operation
38 //! `fail!`. The purpose of this is to allow propagation of failure among tasks
39 //! that are linked to one another via channels.
40 //!
41 //! There are methods on both of `Sender` and `Receiver` to perform their
42 //! respective operations without failing, however.
43 //!
44 //! ## Outside the Runtime
45 //!
46 //! All channels and ports work seamlessly inside and outside of the rust
47 //! runtime. This means that code may use channels to communicate information
48 //! inside and outside of the runtime. For example, if rust were embedded as an
49 //! FFI module in another application, the rust runtime would probably be
50 //! running in its own external thread pool. Channels created can communicate
51 //! from the native application threads to the rust threads through the use of
52 //! native mutexes and condition variables.
53 //!
54 //! What this means is that if a native thread is using a channel, execution
55 //! will be blocked accordingly by blocking the OS thread.
56 //!
57 //! # Example
58 //!
59 //! ```rust,should_fail
60 //! // Create a simple streaming channel
61 //! let (tx, rx) = channel();
62 //! spawn(proc() {
63 //!     tx.send(10);
64 //! });
65 //! assert_eq!(rx.recv(), 10);
66 //!
67 //! // Create a shared channel which can be sent along from many tasks
68 //! let (tx, rx) = channel();
69 //! for i in range(0, 10) {
70 //!     let tx = tx.clone();
71 //!     spawn(proc() {
72 //!         tx.send(i);
73 //!     })
74 //! }
75 //!
76 //! for _ in range(0, 10) {
77 //!     let j = rx.recv();
78 //!     assert!(0 <= j && j < 10);
79 //! }
80 //!
81 //! // The call to recv() will fail!() because the channel has already hung
82 //! // up (or been deallocated)
83 //! let (tx, rx) = channel::<int>();
84 //! drop(tx);
85 //! rx.recv();
86 //! ```
87
88 // A description of how Rust's channel implementation works
89 //
90 // Channels are supposed to be the basic building block for all other
91 // concurrent primitives that are used in Rust. As a result, the channel type
92 // needs to be highly optimized, flexible, and broad enough for use everywhere.
93 //
94 // The choice of implementation of all channels is to be built on lock-free data
95 // structures. The channels themselves are then consequently also lock-free data
96 // structures. As always with lock-free code, this is a very "here be dragons"
97 // territory, especially because I'm unaware of any academic papers which have
98 // gone into great length about channels of these flavors.
99 //
100 // ## Flavors of channels
101 //
102 // From the perspective of a consumer of this library, there is only one flavor
103 // of channel. This channel can be used as a stream and cloned to allow multiple
104 // senders. Under the hood, however, there are actually three flavors of
105 // channels in play.
106 //
107 // * Oneshots - these channels are highly optimized for the one-send use case.
108 //              They contain as few atomics as possible and involve one and
109 //              exactly one allocation.
110 // * Streams - these channels are optimized for the non-shared use case. They
111 //             use a different concurrent queue which is more tailored for this
112 //             use case. The initial allocation of this flavor of channel is not
113 //             optimized.
114 // * Shared - this is the most general form of channel that this module offers,
115 //            a channel with multiple senders. This type is as optimized as it
116 //            can be, but the previous two types mentioned are much faster for
117 //            their use-cases.
118 //
119 // ## Concurrent queues
120 //
121 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
122 // recv() obviously blocks. This means that under the hood there must be some
123 // shared and concurrent queue holding all of the actual data.
124 //
125 // With two flavors of channels, two flavors of queues are also used. We have
126 // chosen to use queues from a well-known author which are abbreviated as SPSC
127 // and MPSC (single producer, single consumer and multiple producer, single
128 // consumer). SPSC queues are used for streams while MPSC queues are used for
129 // shared channels.
130 //
131 // ### SPSC optimizations
132 //
133 // The SPSC queue found online is essentially a linked list of nodes where one
134 // half of the nodes are the "queue of data" and the other half of nodes are a
135 // cache of unused nodes. The unused nodes are used such that an allocation is
136 // not required on every push() and a free doesn't need to happen on every
137 // pop().
138 //
139 // As found online, however, the cache of nodes is of an infinite size. This
140 // means that if a channel at one point in its life had 50k items in the queue,
141 // then the queue will always have the capacity for 50k items. I believed that
142 // this was an unnecessary limitation of the implementation, so I have altered
143 // the queue to optionally have a bound on the cache size.
144 //
145 // By default, streams will have an unbounded SPSC queue with a small-ish cache
146 // size. The hope is that the cache is still large enough to have very fast
147 // send() operations while not too large such that millions of channels can
148 // coexist at once.
149 //
150 // ### MPSC optimizations
151 //
152 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
153 // a linked list under the hood to earn its unboundedness, but I have not put
154 // forth much effort into having a cache of nodes similar to the SPSC queue.
155 //
156 // For now, I believe that this is "ok" because shared channels are not the most
157 // common type, but soon we may wish to revisit this queue choice and determine
158 // another candidate for backend storage of shared channels.
159 //
160 // ## Overview of the Implementation
161 //
162 // Now that there's a little background on the concurrent queues used, it's
163 // worth going into much more detail about the channels themselves. The basic
164 // pseudocode for a send/recv are:
165 //
166 //
167 //      send(t)                             recv()
168 //        queue.push(t)                       return if queue.pop()
169 //        if increment() == -1                deschedule {
170 //          wakeup()                            if decrement() > 0
171 //                                                cancel_deschedule()
172 //                                            }
173 //                                            queue.pop()
174 //
175 // As mentioned before, there are no locks in this implementation, only atomic
176 // instructions are used.
177 //
178 // ### The internal atomic counter
179 //
180 // Every channel has a shared counter with each half to keep track of the size
181 // of the queue. This counter is used to abort descheduling by the receiver and
182 // to know when to wake up on the sending side.
183 //
184 // As seen in the pseudocode, senders will increment this count and receivers
185 // will decrement the count. The theory behind this is that if a sender sees a
186 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
187 // then it doesn't need to block.
188 //
189 // The recv() method has a beginning call to pop(), and if successful, it needs
190 // to decrement the count. It is a crucial implementation detail that this
191 // decrement does *not* happen to the shared counter. If this were the case,
192 // then it would be possible for the counter to be very negative when there were
193 // no receivers waiting, in which case the senders would have to determine when
194 // it was actually appropriate to wake up a receiver.
195 //
196 // Instead, the "steal count" is kept track of separately (not atomically
197 // because it's only used by receivers), and then the decrement() call when
198 // descheduling will lump in all of the recent steals into one large decrement.
199 //
200 // The implication of this is that if a sender sees a -1 count, then there's
201 // guaranteed to be a waiter waiting!
202 //
203 // ## Native Implementation
204 //
205 // A major goal of these channels is to work seamlessly on and off the runtime.
206 // All of the previous race conditions have been worded in terms of
207 // scheduler-isms (which is obviously not available without the runtime).
208 //
209 // For now, native usage of channels (off the runtime) will fall back onto
210 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
211 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
212 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
213 // condition variable.
214 //
215 // ## Select
216 //
217 // Being able to support selection over channels has greatly influenced this
218 // design, and not only does selection need to work inside the runtime, but also
219 // outside the runtime.
220 //
221 // The implementation is fairly straightforward. The goal of select() is not to
222 // return some data, but only to return which channel can receive data without
223 // blocking. The implementation is essentially the entire blocking procedure
224 // followed by an increment as soon as its woken up. The cancellation procedure
225 // involves an increment and swapping out of to_wake to acquire ownership of the
226 // task to unblock.
227 //
228 // Sadly this current implementation requires multiple allocations, so I have
229 // seen the throughput of select() be much worse than it should be. I do not
230 // believe that there is anything fundamental which needs to change about these
231 // channels, however, in order to support a more efficient select().
232 //
233 // # Conclusion
234 //
235 // And now that you've seen all the races that I found and attempted to fix,
236 // here's the code for you to find some more!
237
238 use cast;
239 use cell::Cell;
240 use clone::Clone;
241 use iter::Iterator;
242 use kinds::Send;
243 use kinds::marker;
244 use mem;
245 use ops::Drop;
246 use option::{Some, None, Option};
247 use result::{Ok, Err, Result};
248 use rt::local::Local;
249 use rt::task::{Task, BlockedTask};
250 use sync::arc::UnsafeArc;
251
252 pub use comm::select::{Select, Handle};
253
254 macro_rules! test (
255     { fn $name:ident() $b:block $($a:attr)*} => (
256         mod $name {
257             #[allow(unused_imports)];
258
259             use native;
260             use comm::*;
261             use prelude::*;
262             use super::*;
263             use super::super::*;
264             use task;
265
266             fn f() $b
267
268             $($a)* #[test] fn uv() { f() }
269             $($a)* #[test] fn native() {
270                 use native;
271                 let (tx, rx) = channel();
272                 native::task::spawn(proc() { tx.send(f()) });
273                 rx.recv();
274             }
275         }
276     )
277 )
278
279 mod select;
280 mod oneshot;
281 mod stream;
282 mod shared;
283
284 // Use a power of 2 to allow LLVM to optimize to something that's not a
285 // division, this is hit pretty regularly.
286 static RESCHED_FREQ: int = 256;
287
288 /// The receiving-half of Rust's channel type. This half can only be owned by
289 /// one task
290 pub struct Receiver<T> {
291     priv inner: Flavor<T>,
292     priv receives: Cell<uint>,
293     // can't share in an arc
294     priv marker: marker::NoShare,
295 }
296
297 /// An iterator over messages on a receiver, this iterator will block
298 /// whenever `next` is called, waiting for a new message, and `None` will be
299 /// returned when the corresponding channel has hung up.
300 pub struct Messages<'a, T> {
301     priv rx: &'a Receiver<T>
302 }
303
304 /// The sending-half of Rust's channel type. This half can only be owned by one
305 /// task
306 pub struct Sender<T> {
307     priv inner: Flavor<T>,
308     priv sends: Cell<uint>,
309     // can't share in an arc
310     priv marker: marker::NoShare,
311 }
312
313 /// This enumeration is the list of the possible reasons that try_recv could not
314 /// return data when called.
315 #[deriving(Eq, Clone, Show)]
316 pub enum TryRecvResult<T> {
317     /// This channel is currently empty, but the sender(s) have not yet
318     /// disconnected, so data may yet become available.
319     Empty,
320     /// This channel's sending half has become disconnected, and there will
321     /// never be any more data received on this channel
322     Disconnected,
323     /// The channel had some data and we successfully popped it
324     Data(T),
325 }
326
327 enum Flavor<T> {
328     Oneshot(UnsafeArc<oneshot::Packet<T>>),
329     Stream(UnsafeArc<stream::Packet<T>>),
330     Shared(UnsafeArc<shared::Packet<T>>),
331 }
332
333 /// Creates a new channel, returning the sender/receiver halves. All data sent
334 /// on the sender will become available on the receiver. See the documentation
335 /// of `Receiver` and `Sender` to see what's possible with them.
336 pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
337     let (a, b) = UnsafeArc::new2(oneshot::Packet::new());
338     (Sender::my_new(Oneshot(b)), Receiver::my_new(Oneshot(a)))
339 }
340
341 impl<T: Send> Sender<T> {
342     fn my_new(inner: Flavor<T>) -> Sender<T> {
343         Sender { inner: inner, sends: Cell::new(0), marker: marker::NoShare }
344     }
345
346     /// Sends a value along this channel to be received by the corresponding
347     /// receiver.
348     ///
349     /// Rust channels are infinitely buffered so this method will never block.
350     ///
351     /// # Failure
352     ///
353     /// This function will fail if the other end of the channel has hung up.
354     /// This means that if the corresponding receiver has fallen out of scope,
355     /// this function will trigger a fail message saying that a message is
356     /// being sent on a closed channel.
357     ///
358     /// Note that if this function does *not* fail, it does not mean that the
359     /// data will be successfully received. All sends are placed into a queue,
360     /// so it is possible for a send to succeed (the other end is alive), but
361     /// then the other end could immediately disconnect.
362     ///
363     /// The purpose of this functionality is to propagate failure among tasks.
364     /// If failure is not desired, then consider using the `try_send` method
365     pub fn send(&self, t: T) {
366         if !self.try_send(t) {
367             fail!("sending on a closed channel");
368         }
369     }
370
371     /// Attempts to send a value on this channel, returning whether it was
372     /// successfully sent.
373     ///
374     /// A successful send occurs when it is determined that the other end of
375     /// the channel has not hung up already. An unsuccessful send would be one
376     /// where the corresponding receiver has already been deallocated. Note
377     /// that a return value of `false` means that the data will never be
378     /// received, but a return value of `true` does *not* mean that the data
379     /// will be received.  It is possible for the corresponding receiver to
380     /// hang up immediately after this function returns `true`.
381     ///
382     /// Like `send`, this method will never block. If the failure of send cannot
383     /// be tolerated, then this method should be used instead.
384     pub fn try_send(&self, t: T) -> bool {
385         // In order to prevent starvation of other tasks in situations where
386         // a task sends repeatedly without ever receiving, we occassionally
387         // yield instead of doing a send immediately.
388         //
389         // Don't unconditionally attempt to yield because the TLS overhead can
390         // be a bit much, and also use `try_take` instead of `take` because
391         // there's no reason that this send shouldn't be usable off the
392         // runtime.
393         let cnt = self.sends.get() + 1;
394         self.sends.set(cnt);
395         if cnt % (RESCHED_FREQ as uint) == 0 {
396             let task: Option<~Task> = Local::try_take();
397             task.map(|t| t.maybe_yield());
398         }
399
400         let (new_inner, ret) = match self.inner {
401             Oneshot(ref p) => {
402                 let p = p.get();
403                 unsafe {
404                     if !(*p).sent() {
405                         return (*p).send(t);
406                     } else {
407                         let (a, b) = UnsafeArc::new2(stream::Packet::new());
408                         match (*p).upgrade(Receiver::my_new(Stream(b))) {
409                             oneshot::UpSuccess => {
410                                 (*a.get()).send(t);
411                                 (a, true)
412                             }
413                             oneshot::UpDisconnected => (a, false),
414                             oneshot::UpWoke(task) => {
415                                 (*a.get()).send(t);
416                                 task.wake().map(|t| t.reawaken());
417                                 (a, true)
418                             }
419                         }
420                     }
421                 }
422             }
423             Stream(ref p) => return unsafe { (*p.get()).send(t) },
424             Shared(ref p) => return unsafe { (*p.get()).send(t) },
425         };
426
427         unsafe {
428             let mut tmp = Sender::my_new(Stream(new_inner));
429             mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
430         }
431         return ret;
432     }
433 }
434
435 impl<T: Send> Clone for Sender<T> {
436     fn clone(&self) -> Sender<T> {
437         let (packet, sleeper) = match self.inner {
438             Oneshot(ref p) => {
439                 let (a, b) = UnsafeArc::new2(shared::Packet::new());
440                 match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } {
441                     oneshot::UpSuccess | oneshot::UpDisconnected => (b, None),
442                     oneshot::UpWoke(task) => (b, Some(task))
443                 }
444             }
445             Stream(ref p) => {
446                 let (a, b) = UnsafeArc::new2(shared::Packet::new());
447                 match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } {
448                     stream::UpSuccess | stream::UpDisconnected => (b, None),
449                     stream::UpWoke(task) => (b, Some(task)),
450                 }
451             }
452             Shared(ref p) => {
453                 unsafe { (*p.get()).clone_chan(); }
454                 return Sender::my_new(Shared(p.clone()));
455             }
456         };
457
458         unsafe {
459             (*packet.get()).inherit_blocker(sleeper);
460
461             let mut tmp = Sender::my_new(Shared(packet.clone()));
462             mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
463         }
464         Sender::my_new(Shared(packet))
465     }
466 }
467
468 #[unsafe_destructor]
469 impl<T: Send> Drop for Sender<T> {
470     fn drop(&mut self) {
471         match self.inner {
472             Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
473             Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
474             Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
475         }
476     }
477 }
478
479 impl<T: Send> Receiver<T> {
480     fn my_new(inner: Flavor<T>) -> Receiver<T> {
481         Receiver { inner: inner, receives: Cell::new(0), marker: marker::NoShare }
482     }
483
484     /// Blocks waiting for a value on this receiver
485     ///
486     /// This function will block if necessary to wait for a corresponding send
487     /// on the channel from its paired `Sender` structure. This receiver will
488     /// be woken up when data is ready, and the data will be returned.
489     ///
490     /// # Failure
491     ///
492     /// Similar to channels, this method will trigger a task failure if the
493     /// other end of the channel has hung up (been deallocated). The purpose of
494     /// this is to propagate failure among tasks.
495     ///
496     /// If failure is not desired, then there are two options:
497     ///
498     /// * If blocking is still desired, the `recv_opt` method will return `None`
499     ///   when the other end hangs up
500     ///
501     /// * If blocking is not desired, then the `try_recv` method will attempt to
502     ///   peek at a value on this receiver.
503     pub fn recv(&self) -> T {
504         match self.recv_opt() {
505             Some(t) => t,
506             None => fail!("receiving on a closed channel"),
507         }
508     }
509
510     /// Attempts to return a pending value on this receiver without blocking
511     ///
512     /// This method will never block the caller in order to wait for data to
513     /// become available. Instead, this will always return immediately with a
514     /// possible option of pending data on the channel.
515     ///
516     /// This is useful for a flavor of "optimistic check" before deciding to
517     /// block on a receiver.
518     ///
519     /// This function cannot fail.
520     pub fn try_recv(&self) -> TryRecvResult<T> {
521         // If a thread is spinning in try_recv, we should take the opportunity
522         // to reschedule things occasionally. See notes above in scheduling on
523         // sends for why this doesn't always hit TLS, and also for why this uses
524         // `try_take` instead of `take`.
525         let cnt = self.receives.get() + 1;
526         self.receives.set(cnt);
527         if cnt % (RESCHED_FREQ as uint) == 0 {
528             let task: Option<~Task> = Local::try_take();
529             task.map(|t| t.maybe_yield());
530         }
531
532         loop {
533             let mut new_port = match self.inner {
534                 Oneshot(ref p) => {
535                     match unsafe { (*p.get()).try_recv() } {
536                         Ok(t) => return Data(t),
537                         Err(oneshot::Empty) => return Empty,
538                         Err(oneshot::Disconnected) => return Disconnected,
539                         Err(oneshot::Upgraded(rx)) => rx,
540                     }
541                 }
542                 Stream(ref p) => {
543                     match unsafe { (*p.get()).try_recv() } {
544                         Ok(t) => return Data(t),
545                         Err(stream::Empty) => return Empty,
546                         Err(stream::Disconnected) => return Disconnected,
547                         Err(stream::Upgraded(rx)) => rx,
548                     }
549                 }
550                 Shared(ref p) => {
551                     match unsafe { (*p.get()).try_recv() } {
552                         Ok(t) => return Data(t),
553                         Err(shared::Empty) => return Empty,
554                         Err(shared::Disconnected) => return Disconnected,
555                     }
556                 }
557             };
558             unsafe {
559                 mem::swap(&mut cast::transmute_mut(self).inner,
560                           &mut new_port.inner);
561             }
562         }
563     }
564
565     /// Attempt to wait for a value on this receiver, but does not fail if the
566     /// corresponding channel has hung up.
567     ///
568     /// This implementation of iterators for ports will always block if there is
569     /// not data available on the receiver, but it will not fail in the case
570     /// that the channel has been deallocated.
571     ///
572     /// In other words, this function has the same semantics as the `recv`
573     /// method except for the failure aspect.
574     ///
575     /// If the channel has hung up, then `None` is returned. Otherwise `Some` of
576     /// the value found on the receiver is returned.
577     pub fn recv_opt(&self) -> Option<T> {
578         loop {
579             let mut new_port = match self.inner {
580                 Oneshot(ref p) => {
581                     match unsafe { (*p.get()).recv() } {
582                         Ok(t) => return Some(t),
583                         Err(oneshot::Empty) => return unreachable!(),
584                         Err(oneshot::Disconnected) => return None,
585                         Err(oneshot::Upgraded(rx)) => rx,
586                     }
587                 }
588                 Stream(ref p) => {
589                     match unsafe { (*p.get()).recv() } {
590                         Ok(t) => return Some(t),
591                         Err(stream::Empty) => return unreachable!(),
592                         Err(stream::Disconnected) => return None,
593                         Err(stream::Upgraded(rx)) => rx,
594                     }
595                 }
596                 Shared(ref p) => {
597                     match unsafe { (*p.get()).recv() } {
598                         Ok(t) => return Some(t),
599                         Err(shared::Empty) => return unreachable!(),
600                         Err(shared::Disconnected) => return None,
601                     }
602                 }
603             };
604             unsafe {
605                 mem::swap(&mut cast::transmute_mut(self).inner,
606                           &mut new_port.inner);
607             }
608         }
609     }
610
611     /// Returns an iterator which will block waiting for messages, but never
612     /// `fail!`. It will return `None` when the channel has hung up.
613     pub fn iter<'a>(&'a self) -> Messages<'a, T> {
614         Messages { rx: self }
615     }
616 }
617
618 impl<T: Send> select::Packet for Receiver<T> {
619     fn can_recv(&self) -> bool {
620         loop {
621             let mut new_port = match self.inner {
622                 Oneshot(ref p) => {
623                     match unsafe { (*p.get()).can_recv() } {
624                         Ok(ret) => return ret,
625                         Err(upgrade) => upgrade,
626                     }
627                 }
628                 Stream(ref p) => {
629                     match unsafe { (*p.get()).can_recv() } {
630                         Ok(ret) => return ret,
631                         Err(upgrade) => upgrade,
632                     }
633                 }
634                 Shared(ref p) => {
635                     return unsafe { (*p.get()).can_recv() };
636                 }
637             };
638             unsafe {
639                 mem::swap(&mut cast::transmute_mut(self).inner,
640                           &mut new_port.inner);
641             }
642         }
643     }
644
645     fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{
646         loop {
647             let (t, mut new_port) = match self.inner {
648                 Oneshot(ref p) => {
649                     match unsafe { (*p.get()).start_selection(task) } {
650                         oneshot::SelSuccess => return Ok(()),
651                         oneshot::SelCanceled(task) => return Err(task),
652                         oneshot::SelUpgraded(t, rx) => (t, rx),
653                     }
654                 }
655                 Stream(ref p) => {
656                     match unsafe { (*p.get()).start_selection(task) } {
657                         stream::SelSuccess => return Ok(()),
658                         stream::SelCanceled(task) => return Err(task),
659                         stream::SelUpgraded(t, rx) => (t, rx),
660                     }
661                 }
662                 Shared(ref p) => {
663                     return unsafe { (*p.get()).start_selection(task) };
664                 }
665             };
666             task = t;
667             unsafe {
668                 mem::swap(&mut cast::transmute_mut(self).inner,
669                           &mut new_port.inner);
670             }
671         }
672     }
673
674     fn abort_selection(&self) -> bool {
675         let mut was_upgrade = false;
676         loop {
677             let result = match self.inner {
678                 Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
679                 Stream(ref p) => unsafe {
680                     (*p.get()).abort_selection(was_upgrade)
681                 },
682                 Shared(ref p) => return unsafe {
683                     (*p.get()).abort_selection(was_upgrade)
684                 },
685             };
686             let mut new_port = match result { Ok(b) => return b, Err(p) => p };
687             was_upgrade = true;
688             unsafe {
689                 mem::swap(&mut cast::transmute_mut(self).inner,
690                           &mut new_port.inner);
691             }
692         }
693     }
694 }
695
696 impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
697     fn next(&mut self) -> Option<T> { self.rx.recv_opt() }
698 }
699
700 #[unsafe_destructor]
701 impl<T: Send> Drop for Receiver<T> {
702     fn drop(&mut self) {
703         match self.inner {
704             Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
705             Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
706             Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
707         }
708     }
709 }
710
711 #[cfg(test)]
712 mod test {
713     use prelude::*;
714
715     use native;
716     use os;
717     use super::*;
718
719     pub fn stress_factor() -> uint {
720         match os::getenv("RUST_TEST_STRESS") {
721             Some(val) => from_str::<uint>(val).unwrap(),
722             None => 1,
723         }
724     }
725
726     test!(fn smoke() {
727         let (tx, rx) = channel();
728         tx.send(1);
729         assert_eq!(rx.recv(), 1);
730     })
731
732     test!(fn drop_full() {
733         let (tx, _rx) = channel();
734         tx.send(~1);
735     })
736
737     test!(fn drop_full_shared() {
738         let (tx, _rx) = channel();
739         drop(tx.clone());
740         drop(tx.clone());
741         tx.send(~1);
742     })
743
744     test!(fn smoke_shared() {
745         let (tx, rx) = channel();
746         tx.send(1);
747         assert_eq!(rx.recv(), 1);
748         let tx = tx.clone();
749         tx.send(1);
750         assert_eq!(rx.recv(), 1);
751     })
752
753     test!(fn smoke_threads() {
754         let (tx, rx) = channel();
755         spawn(proc() {
756             tx.send(1);
757         });
758         assert_eq!(rx.recv(), 1);
759     })
760
761     test!(fn smoke_port_gone() {
762         let (tx, rx) = channel();
763         drop(rx);
764         tx.send(1);
765     } #[should_fail])
766
767     test!(fn smoke_shared_port_gone() {
768         let (tx, rx) = channel();
769         drop(rx);
770         tx.send(1);
771     } #[should_fail])
772
773     test!(fn smoke_shared_port_gone2() {
774         let (tx, rx) = channel();
775         drop(rx);
776         let tx2 = tx.clone();
777         drop(tx);
778         tx2.send(1);
779     } #[should_fail])
780
781     test!(fn port_gone_concurrent() {
782         let (tx, rx) = channel();
783         spawn(proc() {
784             rx.recv();
785         });
786         loop { tx.send(1) }
787     } #[should_fail])
788
789     test!(fn port_gone_concurrent_shared() {
790         let (tx, rx) = channel();
791         let tx2 = tx.clone();
792         spawn(proc() {
793             rx.recv();
794         });
795         loop {
796             tx.send(1);
797             tx2.send(1);
798         }
799     } #[should_fail])
800
801     test!(fn smoke_chan_gone() {
802         let (tx, rx) = channel::<int>();
803         drop(tx);
804         rx.recv();
805     } #[should_fail])
806
807     test!(fn smoke_chan_gone_shared() {
808         let (tx, rx) = channel::<()>();
809         let tx2 = tx.clone();
810         drop(tx);
811         drop(tx2);
812         rx.recv();
813     } #[should_fail])
814
815     test!(fn chan_gone_concurrent() {
816         let (tx, rx) = channel();
817         spawn(proc() {
818             tx.send(1);
819             tx.send(1);
820         });
821         loop { rx.recv(); }
822     } #[should_fail])
823
824     test!(fn stress() {
825         let (tx, rx) = channel();
826         spawn(proc() {
827             for _ in range(0, 10000) { tx.send(1); }
828         });
829         for _ in range(0, 10000) {
830             assert_eq!(rx.recv(), 1);
831         }
832     })
833
834     test!(fn stress_shared() {
835         static AMT: uint = 10000;
836         static NTHREADS: uint = 8;
837         let (tx, rx) = channel::<int>();
838         let (dtx, drx) = channel::<()>();
839
840         spawn(proc() {
841             for _ in range(0, AMT * NTHREADS) {
842                 assert_eq!(rx.recv(), 1);
843             }
844             match rx.try_recv() {
845                 Data(..) => fail!(),
846                 _ => {}
847             }
848             dtx.send(());
849         });
850
851         for _ in range(0, NTHREADS) {
852             let tx = tx.clone();
853             spawn(proc() {
854                 for _ in range(0, AMT) { tx.send(1); }
855             });
856         }
857         drop(tx);
858         drx.recv();
859     })
860
861     #[test]
862     fn send_from_outside_runtime() {
863         let (tx1, rx1) = channel::<()>();
864         let (tx2, rx2) = channel::<int>();
865         let (tx3, rx3) = channel::<()>();
866         let tx4 = tx3.clone();
867         spawn(proc() {
868             tx1.send(());
869             for _ in range(0, 40) {
870                 assert_eq!(rx2.recv(), 1);
871             }
872             tx3.send(());
873         });
874         rx1.recv();
875         native::task::spawn(proc() {
876             for _ in range(0, 40) {
877                 tx2.send(1);
878             }
879             tx4.send(());
880         });
881         rx3.recv();
882         rx3.recv();
883     }
884
885     #[test]
886     fn recv_from_outside_runtime() {
887         let (tx, rx) = channel::<int>();
888         let (dtx, drx) = channel();
889         native::task::spawn(proc() {
890             for _ in range(0, 40) {
891                 assert_eq!(rx.recv(), 1);
892             }
893             dtx.send(());
894         });
895         for _ in range(0, 40) {
896             tx.send(1);
897         }
898         drx.recv();
899     }
900
901     #[test]
902     fn no_runtime() {
903         let (tx1, rx1) = channel::<int>();
904         let (tx2, rx2) = channel::<int>();
905         let (tx3, rx3) = channel::<()>();
906         let tx4 = tx3.clone();
907         native::task::spawn(proc() {
908             assert_eq!(rx1.recv(), 1);
909             tx2.send(2);
910             tx4.send(());
911         });
912         native::task::spawn(proc() {
913             tx1.send(1);
914             assert_eq!(rx2.recv(), 2);
915             tx3.send(());
916         });
917         rx3.recv();
918         rx3.recv();
919     }
920
921     test!(fn oneshot_single_thread_close_port_first() {
922         // Simple test of closing without sending
923         let (_tx, rx) = channel::<int>();
924         drop(rx);
925     })
926
927     test!(fn oneshot_single_thread_close_chan_first() {
928         // Simple test of closing without sending
929         let (tx, _rx) = channel::<int>();
930         drop(tx);
931     })
932
933     test!(fn oneshot_single_thread_send_port_close() {
934         // Testing that the sender cleans up the payload if receiver is closed
935         let (tx, rx) = channel::<~int>();
936         drop(rx);
937         tx.send(~0);
938     } #[should_fail])
939
940     test!(fn oneshot_single_thread_recv_chan_close() {
941         // Receiving on a closed chan will fail
942         let res = task::try(proc() {
943             let (tx, rx) = channel::<int>();
944             drop(tx);
945             rx.recv();
946         });
947         // What is our res?
948         assert!(res.is_err());
949     })
950
951     test!(fn oneshot_single_thread_send_then_recv() {
952         let (tx, rx) = channel::<~int>();
953         tx.send(~10);
954         assert!(rx.recv() == ~10);
955     })
956
957     test!(fn oneshot_single_thread_try_send_open() {
958         let (tx, rx) = channel::<int>();
959         assert!(tx.try_send(10));
960         assert!(rx.recv() == 10);
961     })
962
963     test!(fn oneshot_single_thread_try_send_closed() {
964         let (tx, rx) = channel::<int>();
965         drop(rx);
966         assert!(!tx.try_send(10));
967     })
968
969     test!(fn oneshot_single_thread_try_recv_open() {
970         let (tx, rx) = channel::<int>();
971         tx.send(10);
972         assert!(rx.recv_opt() == Some(10));
973     })
974
975     test!(fn oneshot_single_thread_try_recv_closed() {
976         let (tx, rx) = channel::<int>();
977         drop(tx);
978         assert!(rx.recv_opt() == None);
979     })
980
981     test!(fn oneshot_single_thread_peek_data() {
982         let (tx, rx) = channel::<int>();
983         assert_eq!(rx.try_recv(), Empty)
984         tx.send(10);
985         assert_eq!(rx.try_recv(), Data(10));
986     })
987
988     test!(fn oneshot_single_thread_peek_close() {
989         let (tx, rx) = channel::<int>();
990         drop(tx);
991         assert_eq!(rx.try_recv(), Disconnected);
992         assert_eq!(rx.try_recv(), Disconnected);
993     })
994
995     test!(fn oneshot_single_thread_peek_open() {
996         let (_tx, rx) = channel::<int>();
997         assert_eq!(rx.try_recv(), Empty);
998     })
999
1000     test!(fn oneshot_multi_task_recv_then_send() {
1001         let (tx, rx) = channel::<~int>();
1002         spawn(proc() {
1003             assert!(rx.recv() == ~10);
1004         });
1005
1006         tx.send(~10);
1007     })
1008
1009     test!(fn oneshot_multi_task_recv_then_close() {
1010         let (tx, rx) = channel::<~int>();
1011         spawn(proc() {
1012             drop(tx);
1013         });
1014         let res = task::try(proc() {
1015             assert!(rx.recv() == ~10);
1016         });
1017         assert!(res.is_err());
1018     })
1019
1020     test!(fn oneshot_multi_thread_close_stress() {
1021         for _ in range(0, stress_factor()) {
1022             let (tx, rx) = channel::<int>();
1023             spawn(proc() {
1024                 drop(rx);
1025             });
1026             drop(tx);
1027         }
1028     })
1029
1030     test!(fn oneshot_multi_thread_send_close_stress() {
1031         for _ in range(0, stress_factor()) {
1032             let (tx, rx) = channel::<int>();
1033             spawn(proc() {
1034                 drop(rx);
1035             });
1036             let _ = task::try(proc() {
1037                 tx.send(1);
1038             });
1039         }
1040     })
1041
1042     test!(fn oneshot_multi_thread_recv_close_stress() {
1043         for _ in range(0, stress_factor()) {
1044             let (tx, rx) = channel::<int>();
1045             spawn(proc() {
1046                 let res = task::try(proc() {
1047                     rx.recv();
1048                 });
1049                 assert!(res.is_err());
1050             });
1051             spawn(proc() {
1052                 spawn(proc() {
1053                     drop(tx);
1054                 });
1055             });
1056         }
1057     })
1058
1059     test!(fn oneshot_multi_thread_send_recv_stress() {
1060         for _ in range(0, stress_factor()) {
1061             let (tx, rx) = channel();
1062             spawn(proc() {
1063                 tx.send(~10);
1064             });
1065             spawn(proc() {
1066                 assert!(rx.recv() == ~10);
1067             });
1068         }
1069     })
1070
1071     test!(fn stream_send_recv_stress() {
1072         for _ in range(0, stress_factor()) {
1073             let (tx, rx) = channel();
1074
1075             send(tx, 0);
1076             recv(rx, 0);
1077
1078             fn send(tx: Sender<~int>, i: int) {
1079                 if i == 10 { return }
1080
1081                 spawn(proc() {
1082                     tx.send(~i);
1083                     send(tx, i + 1);
1084                 });
1085             }
1086
1087             fn recv(rx: Receiver<~int>, i: int) {
1088                 if i == 10 { return }
1089
1090                 spawn(proc() {
1091                     assert!(rx.recv() == ~i);
1092                     recv(rx, i + 1);
1093                 });
1094             }
1095         }
1096     })
1097
1098     test!(fn recv_a_lot() {
1099         // Regression test that we don't run out of stack in scheduler context
1100         let (tx, rx) = channel();
1101         for _ in range(0, 10000) { tx.send(()); }
1102         for _ in range(0, 10000) { rx.recv(); }
1103     })
1104
1105     test!(fn shared_chan_stress() {
1106         let (tx, rx) = channel();
1107         let total = stress_factor() + 100;
1108         for _ in range(0, total) {
1109             let tx = tx.clone();
1110             spawn(proc() {
1111                 tx.send(());
1112             });
1113         }
1114
1115         for _ in range(0, total) {
1116             rx.recv();
1117         }
1118     })
1119
1120     test!(fn test_nested_recv_iter() {
1121         let (tx, rx) = channel::<int>();
1122         let (total_tx, total_rx) = channel::<int>();
1123
1124         spawn(proc() {
1125             let mut acc = 0;
1126             for x in rx.iter() {
1127                 acc += x;
1128             }
1129             total_tx.send(acc);
1130         });
1131
1132         tx.send(3);
1133         tx.send(1);
1134         tx.send(2);
1135         drop(tx);
1136         assert_eq!(total_rx.recv(), 6);
1137     })
1138
1139     test!(fn test_recv_iter_break() {
1140         let (tx, rx) = channel::<int>();
1141         let (count_tx, count_rx) = channel();
1142
1143         spawn(proc() {
1144             let mut count = 0;
1145             for x in rx.iter() {
1146                 if count >= 3 {
1147                     break;
1148                 } else {
1149                     count += x;
1150                 }
1151             }
1152             count_tx.send(count);
1153         });
1154
1155         tx.send(2);
1156         tx.send(2);
1157         tx.send(2);
1158         tx.try_send(2);
1159         drop(tx);
1160         assert_eq!(count_rx.recv(), 4);
1161     })
1162
1163     test!(fn try_recv_states() {
1164         let (tx1, rx1) = channel::<int>();
1165         let (tx2, rx2) = channel::<()>();
1166         let (tx3, rx3) = channel::<()>();
1167         spawn(proc() {
1168             rx2.recv();
1169             tx1.send(1);
1170             tx3.send(());
1171             rx2.recv();
1172             drop(tx1);
1173             tx3.send(());
1174         });
1175
1176         assert_eq!(rx1.try_recv(), Empty);
1177         tx2.send(());
1178         rx3.recv();
1179         assert_eq!(rx1.try_recv(), Data(1));
1180         assert_eq!(rx1.try_recv(), Empty);
1181         tx2.send(());
1182         rx3.recv();
1183         assert_eq!(rx1.try_recv(), Disconnected);
1184     })
1185
1186     // This bug used to end up in a livelock inside of the Receiver destructor
1187     // because the internal state of the Shared packet was corrupted
1188     test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
1189         let (tx, rx) = channel();
1190         let (tx2, rx2) = channel();
1191         spawn(proc() {
1192             rx.recv(); // wait on a oneshot
1193             drop(rx);  // destroy a shared
1194             tx2.send(());
1195         });
1196         // make sure the other task has gone to sleep
1197         for _ in range(0, 5000) { task::deschedule(); }
1198
1199         // upgrade to a shared chan and send a message
1200         let t = tx.clone();
1201         drop(tx);
1202         t.send(());
1203
1204         // wait for the child task to exit before we exit
1205         rx2.recv();
1206     })
1207
1208     test!(fn sends_off_the_runtime() {
1209         use rt::thread::Thread;
1210
1211         let (tx, rx) = channel();
1212         let t = Thread::start(proc() {
1213             for _ in range(0, 1000) {
1214                 tx.send(());
1215             }
1216         });
1217         for _ in range(0, 1000) {
1218             rx.recv();
1219         }
1220         t.join();
1221     })
1222
1223     test!(fn try_recvs_off_the_runtime() {
1224         use rt::thread::Thread;
1225
1226         let (tx, rx) = channel();
1227         let (cdone, pdone) = channel();
1228         let t = Thread::start(proc() {
1229             let mut hits = 0;
1230             while hits < 10 {
1231                 match rx.try_recv() {
1232                     Data(()) => { hits += 1; }
1233                     Empty => { Thread::yield_now(); }
1234                     Disconnected => return,
1235                 }
1236             }
1237             cdone.send(());
1238         });
1239         for _ in range(0, 10) {
1240             tx.send(());
1241         }
1242         t.join();
1243         pdone.recv();
1244     })
1245 }