1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Communication primitives for concurrent tasks
13 //! Rust makes it very difficult to share data among tasks to prevent race
14 //! conditions and to improve parallelism, but there is often a need for
15 //! communication between concurrent tasks. The primitives defined in this
16 //! module are the building blocks for synchronization in rust.
18 //! This module provides message-based communication over channels, concretely
19 //! defined as two types:
24 //! A `Sender` is used to send data to a `Receiver`. A `Sender` is clone-able
25 //! such that many tasks can send simultaneously to one receiver. These
26 //! channels are *task blocking*, not *thread blocking*. This means that if one
27 //! task is blocked on a channel, other tasks can continue to make progress.
29 //! Rust channels can be used as if they have an infinite internal buffer. What
30 //! this means is that the `send` operation will never block. `Receiver`s, on
31 //! the other hand, will block the task if there is no data to be received.
33 //! ## Failure Propagation
35 //! In addition to being a core primitive for communicating in rust, channels
36 //! are the points at which failure is propagated among tasks. Whenever the one
37 //! half of channel is closed, the other half will have its next operation
38 //! `fail!`. The purpose of this is to allow propagation of failure among tasks
39 //! that are linked to one another via channels.
41 //! There are methods on both of `Sender` and `Receiver` to perform their
42 //! respective operations without failing, however.
44 //! ## Outside the Runtime
46 //! All channels and ports work seamlessly inside and outside of the rust
47 //! runtime. This means that code may use channels to communicate information
48 //! inside and outside of the runtime. For example, if rust were embedded as an
49 //! FFI module in another application, the rust runtime would probably be
50 //! running in its own external thread pool. Channels created can communicate
51 //! from the native application threads to the rust threads through the use of
52 //! native mutexes and condition variables.
54 //! What this means is that if a native thread is using a channel, execution
55 //! will be blocked accordingly by blocking the OS thread.
59 //! ```rust,should_fail
60 //! // Create a simple streaming channel
61 //! let (tx, rx) = channel();
65 //! assert_eq!(rx.recv(), 10);
67 //! // Create a shared channel which can be sent along from many tasks
68 //! let (tx, rx) = channel();
69 //! for i in range(0, 10) {
70 //! let tx = tx.clone();
76 //! for _ in range(0, 10) {
77 //! let j = rx.recv();
78 //! assert!(0 <= j && j < 10);
81 //! // The call to recv() will fail!() because the channel has already hung
82 //! // up (or been deallocated)
83 //! let (tx, rx) = channel::<int>();
88 // A description of how Rust's channel implementation works
90 // Channels are supposed to be the basic building block for all other
91 // concurrent primitives that are used in Rust. As a result, the channel type
92 // needs to be highly optimized, flexible, and broad enough for use everywhere.
94 // The choice of implementation of all channels is to be built on lock-free data
95 // structures. The channels themselves are then consequently also lock-free data
96 // structures. As always with lock-free code, this is a very "here be dragons"
97 // territory, especially because I'm unaware of any academic papers which have
98 // gone into great length about channels of these flavors.
100 // ## Flavors of channels
102 // From the perspective of a consumer of this library, there is only one flavor
103 // of channel. This channel can be used as a stream and cloned to allow multiple
104 // senders. Under the hood, however, there are actually three flavors of
107 // * Oneshots - these channels are highly optimized for the one-send use case.
108 // They contain as few atomics as possible and involve one and
109 // exactly one allocation.
110 // * Streams - these channels are optimized for the non-shared use case. They
111 // use a different concurrent queue which is more tailored for this
112 // use case. The initial allocation of this flavor of channel is not
114 // * Shared - this is the most general form of channel that this module offers,
115 // a channel with multiple senders. This type is as optimized as it
116 // can be, but the previous two types mentioned are much faster for
119 // ## Concurrent queues
121 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
122 // recv() obviously blocks. This means that under the hood there must be some
123 // shared and concurrent queue holding all of the actual data.
125 // With two flavors of channels, two flavors of queues are also used. We have
126 // chosen to use queues from a well-known author which are abbreviated as SPSC
127 // and MPSC (single producer, single consumer and multiple producer, single
128 // consumer). SPSC queues are used for streams while MPSC queues are used for
131 // ### SPSC optimizations
133 // The SPSC queue found online is essentially a linked list of nodes where one
134 // half of the nodes are the "queue of data" and the other half of nodes are a
135 // cache of unused nodes. The unused nodes are used such that an allocation is
136 // not required on every push() and a free doesn't need to happen on every
139 // As found online, however, the cache of nodes is of an infinite size. This
140 // means that if a channel at one point in its life had 50k items in the queue,
141 // then the queue will always have the capacity for 50k items. I believed that
142 // this was an unnecessary limitation of the implementation, so I have altered
143 // the queue to optionally have a bound on the cache size.
145 // By default, streams will have an unbounded SPSC queue with a small-ish cache
146 // size. The hope is that the cache is still large enough to have very fast
147 // send() operations while not too large such that millions of channels can
150 // ### MPSC optimizations
152 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
153 // a linked list under the hood to earn its unboundedness, but I have not put
154 // forth much effort into having a cache of nodes similar to the SPSC queue.
156 // For now, I believe that this is "ok" because shared channels are not the most
157 // common type, but soon we may wish to revisit this queue choice and determine
158 // another candidate for backend storage of shared channels.
160 // ## Overview of the Implementation
162 // Now that there's a little background on the concurrent queues used, it's
163 // worth going into much more detail about the channels themselves. The basic
164 // pseudocode for a send/recv are:
168 // queue.push(t) return if queue.pop()
169 // if increment() == -1 deschedule {
170 // wakeup() if decrement() > 0
171 // cancel_deschedule()
175 // As mentioned before, there are no locks in this implementation, only atomic
176 // instructions are used.
178 // ### The internal atomic counter
180 // Every channel has a shared counter with each half to keep track of the size
181 // of the queue. This counter is used to abort descheduling by the receiver and
182 // to know when to wake up on the sending side.
184 // As seen in the pseudocode, senders will increment this count and receivers
185 // will decrement the count. The theory behind this is that if a sender sees a
186 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
187 // then it doesn't need to block.
189 // The recv() method has a beginning call to pop(), and if successful, it needs
190 // to decrement the count. It is a crucial implementation detail that this
191 // decrement does *not* happen to the shared counter. If this were the case,
192 // then it would be possible for the counter to be very negative when there were
193 // no receivers waiting, in which case the senders would have to determine when
194 // it was actually appropriate to wake up a receiver.
196 // Instead, the "steal count" is kept track of separately (not atomically
197 // because it's only used by receivers), and then the decrement() call when
198 // descheduling will lump in all of the recent steals into one large decrement.
200 // The implication of this is that if a sender sees a -1 count, then there's
201 // guaranteed to be a waiter waiting!
203 // ## Native Implementation
205 // A major goal of these channels is to work seamlessly on and off the runtime.
206 // All of the previous race conditions have been worded in terms of
207 // scheduler-isms (which is obviously not available without the runtime).
209 // For now, native usage of channels (off the runtime) will fall back onto
210 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
211 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
212 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
213 // condition variable.
217 // Being able to support selection over channels has greatly influenced this
218 // design, and not only does selection need to work inside the runtime, but also
219 // outside the runtime.
221 // The implementation is fairly straightforward. The goal of select() is not to
222 // return some data, but only to return which channel can receive data without
223 // blocking. The implementation is essentially the entire blocking procedure
224 // followed by an increment as soon as its woken up. The cancellation procedure
225 // involves an increment and swapping out of to_wake to acquire ownership of the
228 // Sadly this current implementation requires multiple allocations, so I have
229 // seen the throughput of select() be much worse than it should be. I do not
230 // believe that there is anything fundamental which needs to change about these
231 // channels, however, in order to support a more efficient select().
235 // And now that you've seen all the races that I found and attempted to fix,
236 // here's the code for you to find some more!
246 use option::{Some, None, Option};
247 use result::{Ok, Err, Result};
248 use rt::local::Local;
249 use rt::task::{Task, BlockedTask};
250 use sync::arc::UnsafeArc;
252 pub use comm::select::{Select, Handle};
255 { fn $name:ident() $b:block $($a:attr)*} => (
257 #[allow(unused_imports)];
268 $($a)* #[test] fn uv() { f() }
269 $($a)* #[test] fn native() {
271 let (tx, rx) = channel();
272 native::task::spawn(proc() { tx.send(f()) });
285 // Use a power of 2 to allow LLVM to optimize to something that's not a
286 // division, this is hit pretty regularly.
287 static RESCHED_FREQ: int = 256;
289 /// The receiving-half of Rust's channel type. This half can only be owned by
291 pub struct Receiver<T> {
292 priv inner: Flavor<T>,
293 priv receives: Cell<uint>,
294 // can't share in an arc
295 priv marker: marker::NoShare,
298 /// An iterator over messages on a receiver, this iterator will block
299 /// whenever `next` is called, waiting for a new message, and `None` will be
300 /// returned when the corresponding channel has hung up.
301 pub struct Messages<'a, T> {
302 priv rx: &'a Receiver<T>
305 /// The sending-half of Rust's asynchronous channel type. This half can only be
306 /// owned by one task, but it can be cloned to send to other tasks.
307 pub struct Sender<T> {
308 priv inner: Flavor<T>,
309 priv sends: Cell<uint>,
310 // can't share in an arc
311 priv marker: marker::NoShare,
314 /// The sending-half of Rust's synchronous channel type. This half can only be
315 /// owned by one task, but it can be cloned to send to other tasks.
316 pub struct SyncSender<T> {
317 priv inner: UnsafeArc<sync::Packet<T>>,
318 // can't share in an arc
319 priv marker: marker::NoShare,
322 /// This enumeration is the list of the possible reasons that try_recv could not
323 /// return data when called.
324 #[deriving(Eq, Clone, Show)]
325 pub enum TryRecvResult<T> {
326 /// This channel is currently empty, but the sender(s) have not yet
327 /// disconnected, so data may yet become available.
329 /// This channel's sending half has become disconnected, and there will
330 /// never be any more data received on this channel
332 /// The channel had some data and we successfully popped it
336 /// This enumeration is the list of the possible outcomes for the
337 /// `SyncSender::try_send` method.
338 #[deriving(Eq, Clone, Show)]
339 pub enum TrySendResult<T> {
340 /// The data was successfully sent along the channel. This either means that
341 /// it was buffered in the channel, or handed off to a receiver. In either
342 /// case, the callee no longer has ownership of the data.
344 /// The data could not be sent on the channel because it would require that
345 /// the callee block to send the data.
347 /// If this is a buffered channel, then the buffer is full at this time. If
348 /// this is not a buffered channel, then there is no receiver available to
349 /// acquire the data.
351 /// This channel's receiving half has disconnected, so the data could not be
352 /// sent. The data is returned back to the callee in this case.
357 Oneshot(UnsafeArc<oneshot::Packet<T>>),
358 Stream(UnsafeArc<stream::Packet<T>>),
359 Shared(UnsafeArc<shared::Packet<T>>),
360 Sync(UnsafeArc<sync::Packet<T>>),
363 /// Creates a new channel, returning the sender/receiver halves. All data sent
364 /// on the sender will become available on the receiver. See the documentation
365 /// of `Receiver` and `Sender` to see what's possible with them.
366 pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
367 let (a, b) = UnsafeArc::new2(oneshot::Packet::new());
368 (Sender::my_new(Oneshot(b)), Receiver::my_new(Oneshot(a)))
371 /// Creates a new synchronous, bounded channel.
373 /// Like asynchronous channels, the `Receiver` will block until a message
374 /// becomes available. These channels differ greatly in the semantics of the
375 /// sender from asynchronous channels, however.
377 /// This channel has an internal buffer on which messages will be queued. When
378 /// the internal buffer becomes full, future sends will *block* waiting for the
379 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
380 /// becomes "rendezvous channel" where each send will not return until a recv
381 /// is paired with it.
383 /// As with asynchronous channels, all senders will fail in `send` if the
384 /// `Receiver` has been destroyed.
389 /// let (tx, rx) = sync_channel(1);
391 /// // this returns immediately
395 /// // this will block until the previous message has been received
399 /// assert_eq!(rx.recv(), 1);
400 /// assert_eq!(rx.recv(), 2);
402 pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
403 let (a, b) = UnsafeArc::new2(sync::Packet::new(bound));
404 (SyncSender::new(a), Receiver::my_new(Sync(b)))
407 ////////////////////////////////////////////////////////////////////////////////
409 ////////////////////////////////////////////////////////////////////////////////
411 impl<T: Send> Sender<T> {
412 fn my_new(inner: Flavor<T>) -> Sender<T> {
413 Sender { inner: inner, sends: Cell::new(0), marker: marker::NoShare }
416 /// Sends a value along this channel to be received by the corresponding
419 /// Rust channels are infinitely buffered so this method will never block.
423 /// This function will fail if the other end of the channel has hung up.
424 /// This means that if the corresponding receiver has fallen out of scope,
425 /// this function will trigger a fail message saying that a message is
426 /// being sent on a closed channel.
428 /// Note that if this function does *not* fail, it does not mean that the
429 /// data will be successfully received. All sends are placed into a queue,
430 /// so it is possible for a send to succeed (the other end is alive), but
431 /// then the other end could immediately disconnect.
433 /// The purpose of this functionality is to propagate failure among tasks.
434 /// If failure is not desired, then consider using the `try_send` method
435 pub fn send(&self, t: T) {
436 if !self.try_send(t) {
437 fail!("sending on a closed channel");
441 /// Attempts to send a value on this channel, returning whether it was
442 /// successfully sent.
444 /// A successful send occurs when it is determined that the other end of
445 /// the channel has not hung up already. An unsuccessful send would be one
446 /// where the corresponding receiver has already been deallocated. Note
447 /// that a return value of `false` means that the data will never be
448 /// received, but a return value of `true` does *not* mean that the data
449 /// will be received. It is possible for the corresponding receiver to
450 /// hang up immediately after this function returns `true`.
452 /// Like `send`, this method will never block. If the failure of send cannot
453 /// be tolerated, then this method should be used instead.
454 pub fn try_send(&self, t: T) -> bool {
455 // In order to prevent starvation of other tasks in situations where
456 // a task sends repeatedly without ever receiving, we occassionally
457 // yield instead of doing a send immediately.
459 // Don't unconditionally attempt to yield because the TLS overhead can
460 // be a bit much, and also use `try_take` instead of `take` because
461 // there's no reason that this send shouldn't be usable off the
463 let cnt = self.sends.get() + 1;
465 if cnt % (RESCHED_FREQ as uint) == 0 {
466 let task: Option<~Task> = Local::try_take();
467 task.map(|t| t.maybe_yield());
470 let (new_inner, ret) = match self.inner {
477 let (a, b) = UnsafeArc::new2(stream::Packet::new());
478 match (*p).upgrade(Receiver::my_new(Stream(b))) {
479 oneshot::UpSuccess => {
483 oneshot::UpDisconnected => (a, false),
484 oneshot::UpWoke(task) => {
486 task.wake().map(|t| t.reawaken());
493 Stream(ref p) => return unsafe { (*p.get()).send(t) },
494 Shared(ref p) => return unsafe { (*p.get()).send(t) },
495 Sync(..) => unreachable!(),
499 let mut tmp = Sender::my_new(Stream(new_inner));
500 mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
506 impl<T: Send> Clone for Sender<T> {
507 fn clone(&self) -> Sender<T> {
508 let (packet, sleeper) = match self.inner {
510 let (a, b) = UnsafeArc::new2(shared::Packet::new());
511 match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } {
512 oneshot::UpSuccess | oneshot::UpDisconnected => (b, None),
513 oneshot::UpWoke(task) => (b, Some(task))
517 let (a, b) = UnsafeArc::new2(shared::Packet::new());
518 match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } {
519 stream::UpSuccess | stream::UpDisconnected => (b, None),
520 stream::UpWoke(task) => (b, Some(task)),
524 unsafe { (*p.get()).clone_chan(); }
525 return Sender::my_new(Shared(p.clone()));
527 Sync(..) => unreachable!(),
531 (*packet.get()).inherit_blocker(sleeper);
533 let mut tmp = Sender::my_new(Shared(packet.clone()));
534 mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
536 Sender::my_new(Shared(packet))
541 impl<T: Send> Drop for Sender<T> {
544 Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
545 Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
546 Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
547 Sync(..) => unreachable!(),
552 ////////////////////////////////////////////////////////////////////////////////
554 ////////////////////////////////////////////////////////////////////////////////
556 impl<T: Send> SyncSender<T> {
557 fn new(inner: UnsafeArc<sync::Packet<T>>) -> SyncSender<T> {
558 SyncSender { inner: inner, marker: marker::NoShare }
561 /// Sends a value on this synchronous channel.
563 /// This function will *block* until space in the internal buffer becomes
564 /// available or a receiver is available to hand off the message to.
566 /// Note that a successful send does *not* guarantee that the receiver will
567 /// ever see the data if there is a buffer on this channel. Messages may be
568 /// enqueued in the internal buffer for the receiver to receive at a later
569 /// time. If the buffer size is 0, however, it can be guaranteed that the
570 /// receiver has indeed received the data if this function returns success.
574 /// Similarly to `Sender::send`, this function will fail if the
575 /// corresponding `Receiver` for this channel has disconnected. This
576 /// behavior is used to propagate failure among tasks.
578 /// If failure is not desired, you can achieve the same semantics with the
579 /// `SyncSender::send_opt` method which will not fail if the receiver
581 pub fn send(&self, t: T) {
582 if self.send_opt(t).is_some() {
583 fail!("sending on a closed channel");
587 /// Send a value on a channel, returning it back if the receiver
590 /// This method will *block* to send the value `t` on the channel, but if
591 /// the value could not be sent due to the receiver disconnecting, the value
592 /// is returned back to the callee. This function is similar to `try_send`,
593 /// except that it will block if the channel is currently full.
597 /// This function cannot fail.
598 pub fn send_opt(&self, t: T) -> Option<T> {
599 match unsafe { (*self.inner.get()).send(t) } {
605 /// Attempts to send a value on this channel without blocking.
607 /// This method semantically differs from `Sender::try_send` because it can
608 /// fail if the receiver has not disconnected yet. If the buffer on this
609 /// channel is full, this function will immediately return the data back to
612 /// See `SyncSender::send` for notes about guarantees of whether the
613 /// receiver has received the data or not if this function is successful.
617 /// This function cannot fail
618 pub fn try_send(&self, t: T) -> TrySendResult<T> {
619 unsafe { (*self.inner.get()).try_send(t) }
623 impl<T: Send> Clone for SyncSender<T> {
624 fn clone(&self) -> SyncSender<T> {
625 unsafe { (*self.inner.get()).clone_chan(); }
626 return SyncSender::new(self.inner.clone());
631 impl<T: Send> Drop for SyncSender<T> {
633 unsafe { (*self.inner.get()).drop_chan(); }
637 ////////////////////////////////////////////////////////////////////////////////
639 ////////////////////////////////////////////////////////////////////////////////
641 impl<T: Send> Receiver<T> {
642 fn my_new(inner: Flavor<T>) -> Receiver<T> {
643 Receiver { inner: inner, receives: Cell::new(0), marker: marker::NoShare }
646 /// Blocks waiting for a value on this receiver
648 /// This function will block if necessary to wait for a corresponding send
649 /// on the channel from its paired `Sender` structure. This receiver will
650 /// be woken up when data is ready, and the data will be returned.
654 /// Similar to channels, this method will trigger a task failure if the
655 /// other end of the channel has hung up (been deallocated). The purpose of
656 /// this is to propagate failure among tasks.
658 /// If failure is not desired, then there are two options:
660 /// * If blocking is still desired, the `recv_opt` method will return `None`
661 /// when the other end hangs up
663 /// * If blocking is not desired, then the `try_recv` method will attempt to
664 /// peek at a value on this receiver.
665 pub fn recv(&self) -> T {
666 match self.recv_opt() {
668 None => fail!("receiving on a closed channel"),
672 /// Attempts to return a pending value on this receiver without blocking
674 /// This method will never block the caller in order to wait for data to
675 /// become available. Instead, this will always return immediately with a
676 /// possible option of pending data on the channel.
678 /// This is useful for a flavor of "optimistic check" before deciding to
679 /// block on a receiver.
681 /// This function cannot fail.
682 pub fn try_recv(&self) -> TryRecvResult<T> {
683 // If a thread is spinning in try_recv, we should take the opportunity
684 // to reschedule things occasionally. See notes above in scheduling on
685 // sends for why this doesn't always hit TLS, and also for why this uses
686 // `try_take` instead of `take`.
687 let cnt = self.receives.get() + 1;
688 self.receives.set(cnt);
689 if cnt % (RESCHED_FREQ as uint) == 0 {
690 let task: Option<~Task> = Local::try_take();
691 task.map(|t| t.maybe_yield());
695 let mut new_port = match self.inner {
697 match unsafe { (*p.get()).try_recv() } {
698 Ok(t) => return Data(t),
699 Err(oneshot::Empty) => return Empty,
700 Err(oneshot::Disconnected) => return Disconnected,
701 Err(oneshot::Upgraded(rx)) => rx,
705 match unsafe { (*p.get()).try_recv() } {
706 Ok(t) => return Data(t),
707 Err(stream::Empty) => return Empty,
708 Err(stream::Disconnected) => return Disconnected,
709 Err(stream::Upgraded(rx)) => rx,
713 match unsafe { (*p.get()).try_recv() } {
714 Ok(t) => return Data(t),
715 Err(shared::Empty) => return Empty,
716 Err(shared::Disconnected) => return Disconnected,
720 match unsafe { (*p.get()).try_recv() } {
721 Ok(t) => return Data(t),
722 Err(sync::Empty) => return Empty,
723 Err(sync::Disconnected) => return Disconnected,
728 mem::swap(&mut cast::transmute_mut(self).inner,
729 &mut new_port.inner);
734 /// Attempt to wait for a value on this receiver, but does not fail if the
735 /// corresponding channel has hung up.
737 /// This implementation of iterators for ports will always block if there is
738 /// not data available on the receiver, but it will not fail in the case
739 /// that the channel has been deallocated.
741 /// In other words, this function has the same semantics as the `recv`
742 /// method except for the failure aspect.
744 /// If the channel has hung up, then `None` is returned. Otherwise `Some` of
745 /// the value found on the receiver is returned.
746 pub fn recv_opt(&self) -> Option<T> {
748 let mut new_port = match self.inner {
750 match unsafe { (*p.get()).recv() } {
751 Ok(t) => return Some(t),
752 Err(oneshot::Empty) => return unreachable!(),
753 Err(oneshot::Disconnected) => return None,
754 Err(oneshot::Upgraded(rx)) => rx,
758 match unsafe { (*p.get()).recv() } {
759 Ok(t) => return Some(t),
760 Err(stream::Empty) => return unreachable!(),
761 Err(stream::Disconnected) => return None,
762 Err(stream::Upgraded(rx)) => rx,
766 match unsafe { (*p.get()).recv() } {
767 Ok(t) => return Some(t),
768 Err(shared::Empty) => return unreachable!(),
769 Err(shared::Disconnected) => return None,
772 Sync(ref p) => return unsafe { (*p.get()).recv() }
775 mem::swap(&mut cast::transmute_mut(self).inner,
776 &mut new_port.inner);
781 /// Returns an iterator which will block waiting for messages, but never
782 /// `fail!`. It will return `None` when the channel has hung up.
783 pub fn iter<'a>(&'a self) -> Messages<'a, T> {
784 Messages { rx: self }
788 impl<T: Send> select::Packet for Receiver<T> {
789 fn can_recv(&self) -> bool {
791 let mut new_port = match self.inner {
793 match unsafe { (*p.get()).can_recv() } {
794 Ok(ret) => return ret,
795 Err(upgrade) => upgrade,
799 match unsafe { (*p.get()).can_recv() } {
800 Ok(ret) => return ret,
801 Err(upgrade) => upgrade,
805 return unsafe { (*p.get()).can_recv() };
808 return unsafe { (*p.get()).can_recv() };
812 mem::swap(&mut cast::transmute_mut(self).inner,
813 &mut new_port.inner);
818 fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{
820 let (t, mut new_port) = match self.inner {
822 match unsafe { (*p.get()).start_selection(task) } {
823 oneshot::SelSuccess => return Ok(()),
824 oneshot::SelCanceled(task) => return Err(task),
825 oneshot::SelUpgraded(t, rx) => (t, rx),
829 match unsafe { (*p.get()).start_selection(task) } {
830 stream::SelSuccess => return Ok(()),
831 stream::SelCanceled(task) => return Err(task),
832 stream::SelUpgraded(t, rx) => (t, rx),
836 return unsafe { (*p.get()).start_selection(task) };
839 return unsafe { (*p.get()).start_selection(task) };
844 mem::swap(&mut cast::transmute_mut(self).inner,
845 &mut new_port.inner);
850 fn abort_selection(&self) -> bool {
851 let mut was_upgrade = false;
853 let result = match self.inner {
854 Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
855 Stream(ref p) => unsafe {
856 (*p.get()).abort_selection(was_upgrade)
858 Shared(ref p) => return unsafe {
859 (*p.get()).abort_selection(was_upgrade)
861 Sync(ref p) => return unsafe {
862 (*p.get()).abort_selection()
865 let mut new_port = match result { Ok(b) => return b, Err(p) => p };
868 mem::swap(&mut cast::transmute_mut(self).inner,
869 &mut new_port.inner);
875 impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
876 fn next(&mut self) -> Option<T> { self.rx.recv_opt() }
880 impl<T: Send> Drop for Receiver<T> {
883 Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
884 Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
885 Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
886 Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
899 pub fn stress_factor() -> uint {
900 match os::getenv("RUST_TEST_STRESS") {
901 Some(val) => from_str::<uint>(val).unwrap(),
907 let (tx, rx) = channel();
909 assert_eq!(rx.recv(), 1);
912 test!(fn drop_full() {
913 let (tx, _rx) = channel();
917 test!(fn drop_full_shared() {
918 let (tx, _rx) = channel();
924 test!(fn smoke_shared() {
925 let (tx, rx) = channel();
927 assert_eq!(rx.recv(), 1);
930 assert_eq!(rx.recv(), 1);
933 test!(fn smoke_threads() {
934 let (tx, rx) = channel();
938 assert_eq!(rx.recv(), 1);
941 test!(fn smoke_port_gone() {
942 let (tx, rx) = channel();
947 test!(fn smoke_shared_port_gone() {
948 let (tx, rx) = channel();
953 test!(fn smoke_shared_port_gone2() {
954 let (tx, rx) = channel();
956 let tx2 = tx.clone();
961 test!(fn port_gone_concurrent() {
962 let (tx, rx) = channel();
969 test!(fn port_gone_concurrent_shared() {
970 let (tx, rx) = channel();
971 let tx2 = tx.clone();
981 test!(fn smoke_chan_gone() {
982 let (tx, rx) = channel::<int>();
987 test!(fn smoke_chan_gone_shared() {
988 let (tx, rx) = channel::<()>();
989 let tx2 = tx.clone();
995 test!(fn chan_gone_concurrent() {
996 let (tx, rx) = channel();
1005 let (tx, rx) = channel();
1007 for _ in range(0, 10000) { tx.send(1); }
1009 for _ in range(0, 10000) {
1010 assert_eq!(rx.recv(), 1);
1014 test!(fn stress_shared() {
1015 static AMT: uint = 10000;
1016 static NTHREADS: uint = 8;
1017 let (tx, rx) = channel::<int>();
1018 let (dtx, drx) = channel::<()>();
1021 for _ in range(0, AMT * NTHREADS) {
1022 assert_eq!(rx.recv(), 1);
1024 match rx.try_recv() {
1025 Data(..) => fail!(),
1031 for _ in range(0, NTHREADS) {
1032 let tx = tx.clone();
1034 for _ in range(0, AMT) { tx.send(1); }
1042 fn send_from_outside_runtime() {
1043 let (tx1, rx1) = channel::<()>();
1044 let (tx2, rx2) = channel::<int>();
1045 let (tx3, rx3) = channel::<()>();
1046 let tx4 = tx3.clone();
1049 for _ in range(0, 40) {
1050 assert_eq!(rx2.recv(), 1);
1055 native::task::spawn(proc() {
1056 for _ in range(0, 40) {
1066 fn recv_from_outside_runtime() {
1067 let (tx, rx) = channel::<int>();
1068 let (dtx, drx) = channel();
1069 native::task::spawn(proc() {
1070 for _ in range(0, 40) {
1071 assert_eq!(rx.recv(), 1);
1075 for _ in range(0, 40) {
1083 let (tx1, rx1) = channel::<int>();
1084 let (tx2, rx2) = channel::<int>();
1085 let (tx3, rx3) = channel::<()>();
1086 let tx4 = tx3.clone();
1087 native::task::spawn(proc() {
1088 assert_eq!(rx1.recv(), 1);
1092 native::task::spawn(proc() {
1094 assert_eq!(rx2.recv(), 2);
1101 test!(fn oneshot_single_thread_close_port_first() {
1102 // Simple test of closing without sending
1103 let (_tx, rx) = channel::<int>();
1107 test!(fn oneshot_single_thread_close_chan_first() {
1108 // Simple test of closing without sending
1109 let (tx, _rx) = channel::<int>();
1113 test!(fn oneshot_single_thread_send_port_close() {
1114 // Testing that the sender cleans up the payload if receiver is closed
1115 let (tx, rx) = channel::<~int>();
1120 test!(fn oneshot_single_thread_recv_chan_close() {
1121 // Receiving on a closed chan will fail
1122 let res = task::try(proc() {
1123 let (tx, rx) = channel::<int>();
1128 assert!(res.is_err());
1131 test!(fn oneshot_single_thread_send_then_recv() {
1132 let (tx, rx) = channel::<~int>();
1134 assert!(rx.recv() == ~10);
1137 test!(fn oneshot_single_thread_try_send_open() {
1138 let (tx, rx) = channel::<int>();
1139 assert!(tx.try_send(10));
1140 assert!(rx.recv() == 10);
1143 test!(fn oneshot_single_thread_try_send_closed() {
1144 let (tx, rx) = channel::<int>();
1146 assert!(!tx.try_send(10));
1149 test!(fn oneshot_single_thread_try_recv_open() {
1150 let (tx, rx) = channel::<int>();
1152 assert!(rx.recv_opt() == Some(10));
1155 test!(fn oneshot_single_thread_try_recv_closed() {
1156 let (tx, rx) = channel::<int>();
1158 assert!(rx.recv_opt() == None);
1161 test!(fn oneshot_single_thread_peek_data() {
1162 let (tx, rx) = channel::<int>();
1163 assert_eq!(rx.try_recv(), Empty)
1165 assert_eq!(rx.try_recv(), Data(10));
1168 test!(fn oneshot_single_thread_peek_close() {
1169 let (tx, rx) = channel::<int>();
1171 assert_eq!(rx.try_recv(), Disconnected);
1172 assert_eq!(rx.try_recv(), Disconnected);
1175 test!(fn oneshot_single_thread_peek_open() {
1176 let (_tx, rx) = channel::<int>();
1177 assert_eq!(rx.try_recv(), Empty);
1180 test!(fn oneshot_multi_task_recv_then_send() {
1181 let (tx, rx) = channel::<~int>();
1183 assert!(rx.recv() == ~10);
1189 test!(fn oneshot_multi_task_recv_then_close() {
1190 let (tx, rx) = channel::<~int>();
1194 let res = task::try(proc() {
1195 assert!(rx.recv() == ~10);
1197 assert!(res.is_err());
1200 test!(fn oneshot_multi_thread_close_stress() {
1201 for _ in range(0, stress_factor()) {
1202 let (tx, rx) = channel::<int>();
1210 test!(fn oneshot_multi_thread_send_close_stress() {
1211 for _ in range(0, stress_factor()) {
1212 let (tx, rx) = channel::<int>();
1216 let _ = task::try(proc() {
1222 test!(fn oneshot_multi_thread_recv_close_stress() {
1223 for _ in range(0, stress_factor()) {
1224 let (tx, rx) = channel::<int>();
1226 let res = task::try(proc() {
1229 assert!(res.is_err());
1239 test!(fn oneshot_multi_thread_send_recv_stress() {
1240 for _ in range(0, stress_factor()) {
1241 let (tx, rx) = channel();
1246 assert!(rx.recv() == ~10);
1251 test!(fn stream_send_recv_stress() {
1252 for _ in range(0, stress_factor()) {
1253 let (tx, rx) = channel();
1258 fn send(tx: Sender<~int>, i: int) {
1259 if i == 10 { return }
1267 fn recv(rx: Receiver<~int>, i: int) {
1268 if i == 10 { return }
1271 assert!(rx.recv() == ~i);
1278 test!(fn recv_a_lot() {
1279 // Regression test that we don't run out of stack in scheduler context
1280 let (tx, rx) = channel();
1281 for _ in range(0, 10000) { tx.send(()); }
1282 for _ in range(0, 10000) { rx.recv(); }
1285 test!(fn shared_chan_stress() {
1286 let (tx, rx) = channel();
1287 let total = stress_factor() + 100;
1288 for _ in range(0, total) {
1289 let tx = tx.clone();
1295 for _ in range(0, total) {
1300 test!(fn test_nested_recv_iter() {
1301 let (tx, rx) = channel::<int>();
1302 let (total_tx, total_rx) = channel::<int>();
1306 for x in rx.iter() {
1316 assert_eq!(total_rx.recv(), 6);
1319 test!(fn test_recv_iter_break() {
1320 let (tx, rx) = channel::<int>();
1321 let (count_tx, count_rx) = channel();
1325 for x in rx.iter() {
1332 count_tx.send(count);
1340 assert_eq!(count_rx.recv(), 4);
1343 test!(fn try_recv_states() {
1344 let (tx1, rx1) = channel::<int>();
1345 let (tx2, rx2) = channel::<()>();
1346 let (tx3, rx3) = channel::<()>();
1356 assert_eq!(rx1.try_recv(), Empty);
1359 assert_eq!(rx1.try_recv(), Data(1));
1360 assert_eq!(rx1.try_recv(), Empty);
1363 assert_eq!(rx1.try_recv(), Disconnected);
1366 // This bug used to end up in a livelock inside of the Receiver destructor
1367 // because the internal state of the Shared packet was corrupted
1368 test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
1369 let (tx, rx) = channel();
1370 let (tx2, rx2) = channel();
1372 rx.recv(); // wait on a oneshot
1373 drop(rx); // destroy a shared
1376 // make sure the other task has gone to sleep
1377 for _ in range(0, 5000) { task::deschedule(); }
1379 // upgrade to a shared chan and send a message
1384 // wait for the child task to exit before we exit
1388 test!(fn sends_off_the_runtime() {
1389 use rt::thread::Thread;
1391 let (tx, rx) = channel();
1392 let t = Thread::start(proc() {
1393 for _ in range(0, 1000) {
1397 for _ in range(0, 1000) {
1403 test!(fn try_recvs_off_the_runtime() {
1404 use rt::thread::Thread;
1406 let (tx, rx) = channel();
1407 let (cdone, pdone) = channel();
1408 let t = Thread::start(proc() {
1411 match rx.try_recv() {
1412 Data(()) => { hits += 1; }
1413 Empty => { Thread::yield_now(); }
1414 Disconnected => return,
1419 for _ in range(0, 10) {
1432 pub fn stress_factor() -> uint {
1433 match os::getenv("RUST_TEST_STRESS") {
1434 Some(val) => from_str::<uint>(val).unwrap(),
1440 let (tx, rx) = sync_channel(1);
1442 assert_eq!(rx.recv(), 1);
1445 test!(fn drop_full() {
1446 let (tx, _rx) = sync_channel(1);
1450 test!(fn smoke_shared() {
1451 let (tx, rx) = sync_channel(1);
1453 assert_eq!(rx.recv(), 1);
1454 let tx = tx.clone();
1456 assert_eq!(rx.recv(), 1);
1459 test!(fn smoke_threads() {
1460 let (tx, rx) = sync_channel(0);
1464 assert_eq!(rx.recv(), 1);
1467 test!(fn smoke_port_gone() {
1468 let (tx, rx) = sync_channel(0);
1473 test!(fn smoke_shared_port_gone2() {
1474 let (tx, rx) = sync_channel(0);
1476 let tx2 = tx.clone();
1481 test!(fn port_gone_concurrent() {
1482 let (tx, rx) = sync_channel(0);
1489 test!(fn port_gone_concurrent_shared() {
1490 let (tx, rx) = sync_channel(0);
1491 let tx2 = tx.clone();
1501 test!(fn smoke_chan_gone() {
1502 let (tx, rx) = sync_channel::<int>(0);
1507 test!(fn smoke_chan_gone_shared() {
1508 let (tx, rx) = sync_channel::<()>(0);
1509 let tx2 = tx.clone();
1515 test!(fn chan_gone_concurrent() {
1516 let (tx, rx) = sync_channel(0);
1525 let (tx, rx) = sync_channel(0);
1527 for _ in range(0, 10000) { tx.send(1); }
1529 for _ in range(0, 10000) {
1530 assert_eq!(rx.recv(), 1);
1534 test!(fn stress_shared() {
1535 static AMT: uint = 1000;
1536 static NTHREADS: uint = 8;
1537 let (tx, rx) = sync_channel::<int>(0);
1538 let (dtx, drx) = sync_channel::<()>(0);
1541 for _ in range(0, AMT * NTHREADS) {
1542 assert_eq!(rx.recv(), 1);
1544 match rx.try_recv() {
1545 Data(..) => fail!(),
1551 for _ in range(0, NTHREADS) {
1552 let tx = tx.clone();
1554 for _ in range(0, AMT) { tx.send(1); }
1561 test!(fn oneshot_single_thread_close_port_first() {
1562 // Simple test of closing without sending
1563 let (_tx, rx) = sync_channel::<int>(0);
1567 test!(fn oneshot_single_thread_close_chan_first() {
1568 // Simple test of closing without sending
1569 let (tx, _rx) = sync_channel::<int>(0);
1573 test!(fn oneshot_single_thread_send_port_close() {
1574 // Testing that the sender cleans up the payload if receiver is closed
1575 let (tx, rx) = sync_channel::<~int>(0);
1580 test!(fn oneshot_single_thread_recv_chan_close() {
1581 // Receiving on a closed chan will fail
1582 let res = task::try(proc() {
1583 let (tx, rx) = sync_channel::<int>(0);
1588 assert!(res.is_err());
1591 test!(fn oneshot_single_thread_send_then_recv() {
1592 let (tx, rx) = sync_channel::<~int>(1);
1594 assert!(rx.recv() == ~10);
1597 test!(fn oneshot_single_thread_try_send_open() {
1598 let (tx, rx) = sync_channel::<int>(1);
1599 assert_eq!(tx.try_send(10), Sent);
1600 assert!(rx.recv() == 10);
1603 test!(fn oneshot_single_thread_try_send_closed() {
1604 let (tx, rx) = sync_channel::<int>(0);
1606 assert_eq!(tx.try_send(10), RecvDisconnected(10));
1609 test!(fn oneshot_single_thread_try_send_closed2() {
1610 let (tx, _rx) = sync_channel::<int>(0);
1611 assert_eq!(tx.try_send(10), Full(10));
1614 test!(fn oneshot_single_thread_try_recv_open() {
1615 let (tx, rx) = sync_channel::<int>(1);
1617 assert!(rx.recv_opt() == Some(10));
1620 test!(fn oneshot_single_thread_try_recv_closed() {
1621 let (tx, rx) = sync_channel::<int>(0);
1623 assert!(rx.recv_opt() == None);
1626 test!(fn oneshot_single_thread_peek_data() {
1627 let (tx, rx) = sync_channel::<int>(1);
1628 assert_eq!(rx.try_recv(), Empty)
1630 assert_eq!(rx.try_recv(), Data(10));
1633 test!(fn oneshot_single_thread_peek_close() {
1634 let (tx, rx) = sync_channel::<int>(0);
1636 assert_eq!(rx.try_recv(), Disconnected);
1637 assert_eq!(rx.try_recv(), Disconnected);
1640 test!(fn oneshot_single_thread_peek_open() {
1641 let (_tx, rx) = sync_channel::<int>(0);
1642 assert_eq!(rx.try_recv(), Empty);
1645 test!(fn oneshot_multi_task_recv_then_send() {
1646 let (tx, rx) = sync_channel::<~int>(0);
1648 assert!(rx.recv() == ~10);
1654 test!(fn oneshot_multi_task_recv_then_close() {
1655 let (tx, rx) = sync_channel::<~int>(0);
1659 let res = task::try(proc() {
1660 assert!(rx.recv() == ~10);
1662 assert!(res.is_err());
1665 test!(fn oneshot_multi_thread_close_stress() {
1666 for _ in range(0, stress_factor()) {
1667 let (tx, rx) = sync_channel::<int>(0);
1675 test!(fn oneshot_multi_thread_send_close_stress() {
1676 for _ in range(0, stress_factor()) {
1677 let (tx, rx) = sync_channel::<int>(0);
1681 let _ = task::try(proc() {
1687 test!(fn oneshot_multi_thread_recv_close_stress() {
1688 for _ in range(0, stress_factor()) {
1689 let (tx, rx) = sync_channel::<int>(0);
1691 let res = task::try(proc() {
1694 assert!(res.is_err());
1704 test!(fn oneshot_multi_thread_send_recv_stress() {
1705 for _ in range(0, stress_factor()) {
1706 let (tx, rx) = sync_channel(0);
1711 assert!(rx.recv() == ~10);
1716 test!(fn stream_send_recv_stress() {
1717 for _ in range(0, stress_factor()) {
1718 let (tx, rx) = sync_channel(0);
1723 fn send(tx: SyncSender<~int>, i: int) {
1724 if i == 10 { return }
1732 fn recv(rx: Receiver<~int>, i: int) {
1733 if i == 10 { return }
1736 assert!(rx.recv() == ~i);
1743 test!(fn recv_a_lot() {
1744 // Regression test that we don't run out of stack in scheduler context
1745 let (tx, rx) = sync_channel(10000);
1746 for _ in range(0, 10000) { tx.send(()); }
1747 for _ in range(0, 10000) { rx.recv(); }
1750 test!(fn shared_chan_stress() {
1751 let (tx, rx) = sync_channel(0);
1752 let total = stress_factor() + 100;
1753 for _ in range(0, total) {
1754 let tx = tx.clone();
1760 for _ in range(0, total) {
1765 test!(fn test_nested_recv_iter() {
1766 let (tx, rx) = sync_channel::<int>(0);
1767 let (total_tx, total_rx) = sync_channel::<int>(0);
1771 for x in rx.iter() {
1781 assert_eq!(total_rx.recv(), 6);
1784 test!(fn test_recv_iter_break() {
1785 let (tx, rx) = sync_channel::<int>(0);
1786 let (count_tx, count_rx) = sync_channel(0);
1790 for x in rx.iter() {
1797 count_tx.send(count);
1805 assert_eq!(count_rx.recv(), 4);
1808 test!(fn try_recv_states() {
1809 let (tx1, rx1) = sync_channel::<int>(1);
1810 let (tx2, rx2) = sync_channel::<()>(1);
1811 let (tx3, rx3) = sync_channel::<()>(1);
1821 assert_eq!(rx1.try_recv(), Empty);
1824 assert_eq!(rx1.try_recv(), Data(1));
1825 assert_eq!(rx1.try_recv(), Empty);
1828 assert_eq!(rx1.try_recv(), Disconnected);
1831 // This bug used to end up in a livelock inside of the Receiver destructor
1832 // because the internal state of the Shared packet was corrupted
1833 test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
1834 let (tx, rx) = sync_channel(0);
1835 let (tx2, rx2) = sync_channel(0);
1837 rx.recv(); // wait on a oneshot
1838 drop(rx); // destroy a shared
1841 // make sure the other task has gone to sleep
1842 for _ in range(0, 5000) { task::deschedule(); }
1844 // upgrade to a shared chan and send a message
1849 // wait for the child task to exit before we exit
1853 test!(fn try_recvs_off_the_runtime() {
1854 use std::rt::thread::Thread;
1856 let (tx, rx) = sync_channel(0);
1857 let (cdone, pdone) = channel();
1858 let t = Thread::start(proc() {
1861 match rx.try_recv() {
1862 Data(()) => { hits += 1; }
1863 Empty => { Thread::yield_now(); }
1864 Disconnected => return,
1869 for _ in range(0, 10) {
1876 test!(fn send_opt1() {
1877 let (tx, rx) = sync_channel(0);
1878 spawn(proc() { rx.recv(); });
1879 assert_eq!(tx.send_opt(1), None);
1882 test!(fn send_opt2() {
1883 let (tx, rx) = sync_channel(0);
1884 spawn(proc() { drop(rx); });
1885 assert_eq!(tx.send_opt(1), Some(1));
1888 test!(fn send_opt3() {
1889 let (tx, rx) = sync_channel(1);
1890 assert_eq!(tx.send_opt(1), None);
1891 spawn(proc() { drop(rx); });
1892 assert_eq!(tx.send_opt(1), Some(1));
1895 test!(fn send_opt4() {
1896 let (tx, rx) = sync_channel(0);
1897 let tx2 = tx.clone();
1898 let (done, donerx) = channel();
1899 let done2 = done.clone();
1901 assert_eq!(tx.send_opt(1), Some(1));
1905 assert_eq!(tx2.send_opt(2), Some(2));
1913 test!(fn try_send1() {
1914 let (tx, _rx) = sync_channel(0);
1915 assert_eq!(tx.try_send(1), Full(1));
1918 test!(fn try_send2() {
1919 let (tx, _rx) = sync_channel(1);
1920 assert_eq!(tx.try_send(1), Sent);
1921 assert_eq!(tx.try_send(1), Full(1));
1924 test!(fn try_send3() {
1925 let (tx, rx) = sync_channel(1);
1926 assert_eq!(tx.try_send(1), Sent);
1928 assert_eq!(tx.try_send(1), RecvDisconnected(1));
1931 test!(fn try_send4() {
1932 let (tx, rx) = sync_channel(0);
1934 for _ in range(0, 1000) { task::deschedule(); }
1935 assert_eq!(tx.try_send(1), Sent);
1937 assert_eq!(rx.recv(), 1);