1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Multi-producer, single-consumer communication primitives threads
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
20 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
24 //! These channels come in two flavors:
26 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27 //! will return a `(Sender, Receiver)` tuple where all sends will be
28 //! **asynchronous** (they never block). The channel conceptually has an
31 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32 //! a `(SyncSender, Receiver)` tuple where the storage for pending messages
33 //! is a pre-allocated buffer of a fixed size. All sends will be
34 //! **synchronous** by blocking until there is buffer space available. Note
35 //! that a bound of 0 is allowed, causing the channel to become a
36 //! "rendezvous" channel where each sender atomically hands off a message to
41 //! The send and receive operations on channels will all return a `Result`
42 //! indicating whether the operation succeeded or not. An unsuccessful operation
43 //! is normally indicative of the other half of a channel having "hung up" by
44 //! being dropped in its corresponding thread.
46 //! Once half of a channel has been deallocated, most operations can no longer
47 //! continue to make progress, so `Err` will be returned. Many applications will
48 //! continue to `unwrap()` the results returned from this module, instigating a
49 //! propagation of failure among threads if one unexpectedly dies.
56 //! use std::thread::Thread;
57 //! use std::sync::mpsc::channel;
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! Thread::spawn(move|| {
62 //! tx.send(10i).unwrap();
64 //! assert_eq!(rx.recv().unwrap(), 10i);
70 //! use std::thread::Thread;
71 //! use std::sync::mpsc::channel;
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
77 //! for i in range(0i, 10i) {
78 //! let tx = tx.clone();
79 //! Thread::spawn(move|| {
80 //! tx.send(i).unwrap();
84 //! for _ in range(0i, 10i) {
85 //! let j = rx.recv().unwrap();
86 //! assert!(0 <= j && j < 10);
90 //! Propagating panics:
93 //! use std::sync::mpsc::channel;
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<int>();
99 //! assert!(rx.recv().is_err());
102 //! Synchronous channels:
105 //! use std::thread::Thread;
106 //! use std::sync::mpsc::sync_channel;
108 //! let (tx, rx) = sync_channel::<int>(0);
109 //! Thread::spawn(move|| {
110 //! // This will wait for the parent task to start receiving
111 //! tx.send(53).unwrap();
113 //! rx.recv().unwrap();
116 //! Reading from a channel with a timeout requires to use a Timer together
117 //! with the channel. You can use the select! macro to select either and
118 //! handle the timeout case. This first example will break out of the loop
119 //! after 10 seconds no matter what:
122 //! use std::sync::mpsc::channel;
123 //! use std::io::timer::Timer;
124 //! use std::time::Duration;
126 //! let (tx, rx) = channel::<int>();
127 //! let mut timer = Timer::new().unwrap();
128 //! let timeout = timer.oneshot(Duration::seconds(10));
132 //! val = rx.recv() => println!("Received {}", val.unwrap()),
133 //! _ = timeout.recv() => {
134 //! println!("timed out, total time was more than 10 seconds");
141 //! This second example is more costly since it allocates a new timer every
142 //! time a message is received, but it allows you to timeout after the channel
143 //! has been inactive for 5 seconds:
146 //! use std::sync::mpsc::channel;
147 //! use std::io::timer::Timer;
148 //! use std::time::Duration;
150 //! let (tx, rx) = channel::<int>();
151 //! let mut timer = Timer::new().unwrap();
154 //! let timeout = timer.oneshot(Duration::seconds(5));
157 //! val = rx.recv() => println!("Received {}", val.unwrap()),
158 //! _ = timeout.recv() => {
159 //! println!("timed out, no message received in 5 seconds");
166 // A description of how Rust's channel implementation works
168 // Channels are supposed to be the basic building block for all other
169 // concurrent primitives that are used in Rust. As a result, the channel type
170 // needs to be highly optimized, flexible, and broad enough for use everywhere.
172 // The choice of implementation of all channels is to be built on lock-free data
173 // structures. The channels themselves are then consequently also lock-free data
174 // structures. As always with lock-free code, this is a very "here be dragons"
175 // territory, especially because I'm unaware of any academic papers that have
176 // gone into great length about channels of these flavors.
178 // ## Flavors of channels
180 // From the perspective of a consumer of this library, there is only one flavor
181 // of channel. This channel can be used as a stream and cloned to allow multiple
182 // senders. Under the hood, however, there are actually three flavors of
185 // * Flavor::Oneshots - these channels are highly optimized for the one-send use case.
186 // They contain as few atomics as possible and involve one and
187 // exactly one allocation.
188 // * Streams - these channels are optimized for the non-shared use case. They
189 // use a different concurrent queue that is more tailored for this
190 // use case. The initial allocation of this flavor of channel is not
192 // * Shared - this is the most general form of channel that this module offers,
193 // a channel with multiple senders. This type is as optimized as it
194 // can be, but the previous two types mentioned are much faster for
197 // ## Concurrent queues
199 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
200 // recv() obviously blocks. This means that under the hood there must be some
201 // shared and concurrent queue holding all of the actual data.
203 // With two flavors of channels, two flavors of queues are also used. We have
204 // chosen to use queues from a well-known author that are abbreviated as SPSC
205 // and MPSC (single producer, single consumer and multiple producer, single
206 // consumer). SPSC queues are used for streams while MPSC queues are used for
209 // ### SPSC optimizations
211 // The SPSC queue found online is essentially a linked list of nodes where one
212 // half of the nodes are the "queue of data" and the other half of nodes are a
213 // cache of unused nodes. The unused nodes are used such that an allocation is
214 // not required on every push() and a free doesn't need to happen on every
217 // As found online, however, the cache of nodes is of an infinite size. This
218 // means that if a channel at one point in its life had 50k items in the queue,
219 // then the queue will always have the capacity for 50k items. I believed that
220 // this was an unnecessary limitation of the implementation, so I have altered
221 // the queue to optionally have a bound on the cache size.
223 // By default, streams will have an unbounded SPSC queue with a small-ish cache
224 // size. The hope is that the cache is still large enough to have very fast
225 // send() operations while not too large such that millions of channels can
228 // ### MPSC optimizations
230 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
231 // a linked list under the hood to earn its unboundedness, but I have not put
232 // forth much effort into having a cache of nodes similar to the SPSC queue.
234 // For now, I believe that this is "ok" because shared channels are not the most
235 // common type, but soon we may wish to revisit this queue choice and determine
236 // another candidate for backend storage of shared channels.
238 // ## Overview of the Implementation
240 // Now that there's a little background on the concurrent queues used, it's
241 // worth going into much more detail about the channels themselves. The basic
242 // pseudocode for a send/recv are:
246 // queue.push(t) return if queue.pop()
247 // if increment() == -1 deschedule {
248 // wakeup() if decrement() > 0
249 // cancel_deschedule()
253 // As mentioned before, there are no locks in this implementation, only atomic
254 // instructions are used.
256 // ### The internal atomic counter
258 // Every channel has a shared counter with each half to keep track of the size
259 // of the queue. This counter is used to abort descheduling by the receiver and
260 // to know when to wake up on the sending side.
262 // As seen in the pseudocode, senders will increment this count and receivers
263 // will decrement the count. The theory behind this is that if a sender sees a
264 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
265 // then it doesn't need to block.
267 // The recv() method has a beginning call to pop(), and if successful, it needs
268 // to decrement the count. It is a crucial implementation detail that this
269 // decrement does *not* happen to the shared counter. If this were the case,
270 // then it would be possible for the counter to be very negative when there were
271 // no receivers waiting, in which case the senders would have to determine when
272 // it was actually appropriate to wake up a receiver.
274 // Instead, the "steal count" is kept track of separately (not atomically
275 // because it's only used by receivers), and then the decrement() call when
276 // descheduling will lump in all of the recent steals into one large decrement.
278 // The implication of this is that if a sender sees a -1 count, then there's
279 // guaranteed to be a waiter waiting!
281 // ## Native Implementation
283 // A major goal of these channels is to work seamlessly on and off the runtime.
284 // All of the previous race conditions have been worded in terms of
285 // scheduler-isms (which is obviously not available without the runtime).
287 // For now, native usage of channels (off the runtime) will fall back onto
288 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
289 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
290 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
291 // condition variable.
295 // Being able to support selection over channels has greatly influenced this
296 // design, and not only does selection need to work inside the runtime, but also
297 // outside the runtime.
299 // The implementation is fairly straightforward. The goal of select() is not to
300 // return some data, but only to return which channel can receive data without
301 // blocking. The implementation is essentially the entire blocking procedure
302 // followed by an increment as soon as its woken up. The cancellation procedure
303 // involves an increment and swapping out of to_wake to acquire ownership of the
306 // Sadly this current implementation requires multiple allocations, so I have
307 // seen the throughput of select() be much worse than it should be. I do not
308 // believe that there is anything fundamental that needs to change about these
309 // channels, however, in order to support a more efficient select().
313 // And now that you've seen all the races that I found and attempted to fix,
314 // here's the code for you to find some more!
322 use cell::UnsafeCell;
324 pub use self::select::{Select, Handle};
325 use self::select::StartResult;
326 use self::select::StartResult::*;
327 use self::blocking::SignalToken;
338 /// The receiving-half of Rust's channel type. This half can only be owned by
341 pub struct Receiver<T> {
342 inner: UnsafeCell<Flavor<T>>,
345 // The receiver port can be sent from place to place, so long as it
346 // is not used to receive non-sendable things.
347 unsafe impl<T:Send> Send for Receiver<T> { }
349 /// An iterator over messages on a receiver, this iterator will block
350 /// whenever `next` is called, waiting for a new message, and `None` will be
351 /// returned when the corresponding channel has hung up.
353 pub struct Iter<'a, T:'a> {
357 /// The sending-half of Rust's asynchronous channel type. This half can only be
358 /// owned by one task, but it can be cloned to send to other tasks.
360 pub struct Sender<T> {
361 inner: UnsafeCell<Flavor<T>>,
364 // The send port can be sent from place to place, so long as it
365 // is not used to send non-sendable things.
366 unsafe impl<T:Send> Send for Sender<T> { }
368 /// The sending-half of Rust's synchronous channel type. This half can only be
369 /// owned by one task, but it can be cloned to send to other tasks.
371 pub struct SyncSender<T> {
372 inner: Arc<RacyCell<sync::Packet<T>>>,
373 // can't share in an arc
374 _marker: marker::NoSync,
377 /// An error returned from the `send` function on channels.
379 /// A `send` operation can only fail if the receiving end of a channel is
380 /// disconnected, implying that the data could never be received. The error
381 /// contains the data being sent as a payload so it can be recovered.
382 #[deriving(PartialEq, Eq)]
384 pub struct SendError<T>(pub T);
386 /// An error returned from the `recv` function on a `Receiver`.
388 /// The `recv` operation can only fail if the sending half of a channel is
389 /// disconnected, implying that no further messages will ever be received.
390 #[deriving(PartialEq, Eq, Clone, Copy)]
392 pub struct RecvError;
394 /// This enumeration is the list of the possible reasons that try_recv could not
395 /// return data when called.
396 #[deriving(PartialEq, Clone, Copy)]
398 pub enum TryRecvError {
399 /// This channel is currently empty, but the sender(s) have not yet
400 /// disconnected, so data may yet become available.
404 /// This channel's sending half has become disconnected, and there will
405 /// never be any more data received on this channel
410 /// This enumeration is the list of the possible error outcomes for the
411 /// `SyncSender::try_send` method.
412 #[deriving(PartialEq, Clone)]
414 pub enum TrySendError<T> {
415 /// The data could not be sent on the channel because it would require that
416 /// the callee block to send the data.
418 /// If this is a buffered channel, then the buffer is full at this time. If
419 /// this is not a buffered channel, then there is no receiver available to
420 /// acquire the data.
424 /// This channel's receiving half has disconnected, so the data could not be
425 /// sent. The data is returned back to the callee in this case.
431 Oneshot(Arc<RacyCell<oneshot::Packet<T>>>),
432 Stream(Arc<RacyCell<stream::Packet<T>>>),
433 Shared(Arc<RacyCell<shared::Packet<T>>>),
434 Sync(Arc<RacyCell<sync::Packet<T>>>),
438 trait UnsafeFlavor<T> {
439 fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>>;
440 unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> {
441 &mut *self.inner_unsafe().get()
443 unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
444 &*self.inner_unsafe().get()
447 impl<T> UnsafeFlavor<T> for Sender<T> {
448 fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
452 impl<T> UnsafeFlavor<T> for Receiver<T> {
453 fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
458 /// Creates a new asynchronous channel, returning the sender/receiver halves.
460 /// All data sent on the sender will become available on the receiver, and no
461 /// send will block the calling task (this channel has an "infinite buffer").
466 /// use std::sync::mpsc::channel;
467 /// use std::thread::Thread;
469 /// // tx is is the sending half (tx for transmission), and rx is the receiving
470 /// // half (rx for receiving).
471 /// let (tx, rx) = channel();
473 /// // Spawn off an expensive computation
474 /// Thread::spawn(move|| {
475 /// # fn expensive_computation() {}
476 /// tx.send(expensive_computation()).unwrap();
479 /// // Do some useful work for awhile
481 /// // Let's see what that answer was
482 /// println!("{}", rx.recv().unwrap());
485 pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
486 let a = Arc::new(RacyCell::new(oneshot::Packet::new()));
487 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
490 /// Creates a new synchronous, bounded channel.
492 /// Like asynchronous channels, the `Receiver` will block until a message
493 /// becomes available. These channels differ greatly in the semantics of the
494 /// sender from asynchronous channels, however.
496 /// This channel has an internal buffer on which messages will be queued. When
497 /// the internal buffer becomes full, future sends will *block* waiting for the
498 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
499 /// becomes "rendezvous channel" where each send will not return until a recv
500 /// is paired with it.
502 /// As with asynchronous channels, all senders will panic in `send` if the
503 /// `Receiver` has been destroyed.
508 /// use std::sync::mpsc::sync_channel;
509 /// use std::thread::Thread;
511 /// let (tx, rx) = sync_channel(1);
513 /// // this returns immediately
514 /// tx.send(1i).unwrap();
516 /// Thread::spawn(move|| {
517 /// // this will block until the previous message has been received
518 /// tx.send(2i).unwrap();
521 /// assert_eq!(rx.recv().unwrap(), 1i);
522 /// assert_eq!(rx.recv().unwrap(), 2i);
525 pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
526 let a = Arc::new(RacyCell::new(sync::Packet::new(bound)));
527 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
530 ////////////////////////////////////////////////////////////////////////////////
532 ////////////////////////////////////////////////////////////////////////////////
534 impl<T: Send> Sender<T> {
535 fn new(inner: Flavor<T>) -> Sender<T> {
537 inner: UnsafeCell::new(inner),
541 /// Attempts to send a value on this channel, returning it back if it could
544 /// A successful send occurs when it is determined that the other end of
545 /// the channel has not hung up already. An unsuccessful send would be one
546 /// where the corresponding receiver has already been deallocated. Note
547 /// that a return value of `Err` means that the data will never be
548 /// received, but a return value of `Ok` does *not* mean that the data
549 /// will be received. It is possible for the corresponding receiver to
550 /// hang up immediately after this function returns `Ok`.
552 /// This method will never block the current thread.
557 /// use std::sync::mpsc::channel;
559 /// let (tx, rx) = channel();
561 /// // This send is always successful
562 /// tx.send(1i).unwrap();
564 /// // This send will fail because the receiver is gone
566 /// assert_eq!(tx.send(1i).err().unwrap().0, 1);
568 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
569 let (new_inner, ret) = match *unsafe { self.inner() } {
570 Flavor::Oneshot(ref p) => {
574 return (*p).send(t).map_err(SendError);
577 Arc::new(RacyCell::new(stream::Packet::new()));
578 let rx = Receiver::new(Flavor::Stream(a.clone()));
579 match (*p).upgrade(rx) {
580 oneshot::UpSuccess => {
581 let ret = (*a.get()).send(t);
584 oneshot::UpDisconnected => (a, Err(t)),
585 oneshot::UpWoke(token) => {
586 // This send cannot panic because the thread is
587 // asleep (we're looking at it), so the receiver
589 (*a.get()).send(t).ok().unwrap();
597 Flavor::Stream(ref p) => return unsafe {
598 (*p.get()).send(t).map_err(SendError)
600 Flavor::Shared(ref p) => return unsafe {
601 (*p.get()).send(t).map_err(SendError)
603 Flavor::Sync(..) => unreachable!(),
607 let tmp = Sender::new(Flavor::Stream(new_inner));
608 mem::swap(self.inner_mut(), tmp.inner_mut());
610 ret.map_err(SendError)
615 impl<T: Send> Clone for Sender<T> {
616 fn clone(&self) -> Sender<T> {
617 let (packet, sleeper, guard) = match *unsafe { self.inner() } {
618 Flavor::Oneshot(ref p) => {
619 let a = Arc::new(RacyCell::new(shared::Packet::new()));
621 let guard = (*a.get()).postinit_lock();
622 let rx = Receiver::new(Flavor::Shared(a.clone()));
623 match (*p.get()).upgrade(rx) {
625 oneshot::UpDisconnected => (a, None, guard),
626 oneshot::UpWoke(task) => (a, Some(task), guard)
630 Flavor::Stream(ref p) => {
631 let a = Arc::new(RacyCell::new(shared::Packet::new()));
633 let guard = (*a.get()).postinit_lock();
634 let rx = Receiver::new(Flavor::Shared(a.clone()));
635 match (*p.get()).upgrade(rx) {
637 stream::UpDisconnected => (a, None, guard),
638 stream::UpWoke(task) => (a, Some(task), guard),
642 Flavor::Shared(ref p) => {
643 unsafe { (*p.get()).clone_chan(); }
644 return Sender::new(Flavor::Shared(p.clone()));
646 Flavor::Sync(..) => unreachable!(),
650 (*packet.get()).inherit_blocker(sleeper, guard);
652 let tmp = Sender::new(Flavor::Shared(packet.clone()));
653 mem::swap(self.inner_mut(), tmp.inner_mut());
655 Sender::new(Flavor::Shared(packet))
660 impl<T: Send> Drop for Sender<T> {
662 match *unsafe { self.inner_mut() } {
663 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
664 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
665 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
666 Flavor::Sync(..) => unreachable!(),
671 ////////////////////////////////////////////////////////////////////////////////
673 ////////////////////////////////////////////////////////////////////////////////
675 impl<T: Send> SyncSender<T> {
676 fn new(inner: Arc<RacyCell<sync::Packet<T>>>) -> SyncSender<T> {
677 SyncSender { inner: inner, _marker: marker::NoSync }
680 /// Sends a value on this synchronous channel.
682 /// This function will *block* until space in the internal buffer becomes
683 /// available or a receiver is available to hand off the message to.
685 /// Note that a successful send does *not* guarantee that the receiver will
686 /// ever see the data if there is a buffer on this channel. Items may be
687 /// enqueued in the internal buffer for the receiver to receive at a later
688 /// time. If the buffer size is 0, however, it can be guaranteed that the
689 /// receiver has indeed received the data if this function returns success.
691 /// This function will never panic, but it may return `Err` if the
692 /// `Receiver` has disconnected and is no longer able to receive
695 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
696 unsafe { (*self.inner.get()).send(t).map_err(SendError) }
699 /// Attempts to send a value on this channel without blocking.
701 /// This method differs from `send` by returning immediately if the
702 /// channel's buffer is full or no receiver is waiting to acquire some
703 /// data. Compared with `send`, this function has two failure cases
704 /// instead of one (one for disconnection, one for a full buffer).
706 /// See `SyncSender::send` for notes about guarantees of whether the
707 /// receiver has received the data or not if this function is successful.
709 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
710 unsafe { (*self.inner.get()).try_send(t) }
715 impl<T: Send> Clone for SyncSender<T> {
716 fn clone(&self) -> SyncSender<T> {
717 unsafe { (*self.inner.get()).clone_chan(); }
718 return SyncSender::new(self.inner.clone());
723 impl<T: Send> Drop for SyncSender<T> {
725 unsafe { (*self.inner.get()).drop_chan(); }
729 ////////////////////////////////////////////////////////////////////////////////
731 ////////////////////////////////////////////////////////////////////////////////
733 impl<T: Send> Receiver<T> {
734 fn new(inner: Flavor<T>) -> Receiver<T> {
735 Receiver { inner: UnsafeCell::new(inner) }
738 /// Attempts to return a pending value on this receiver without blocking
740 /// This method will never block the caller in order to wait for data to
741 /// become available. Instead, this will always return immediately with a
742 /// possible option of pending data on the channel.
744 /// This is useful for a flavor of "optimistic check" before deciding to
745 /// block on a receiver.
747 pub fn try_recv(&self) -> Result<T, TryRecvError> {
749 let new_port = match *unsafe { self.inner() } {
750 Flavor::Oneshot(ref p) => {
751 match unsafe { (*p.get()).try_recv() } {
752 Ok(t) => return Ok(t),
753 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
754 Err(oneshot::Disconnected) => {
755 return Err(TryRecvError::Disconnected)
757 Err(oneshot::Upgraded(rx)) => rx,
760 Flavor::Stream(ref p) => {
761 match unsafe { (*p.get()).try_recv() } {
762 Ok(t) => return Ok(t),
763 Err(stream::Empty) => return Err(TryRecvError::Empty),
764 Err(stream::Disconnected) => {
765 return Err(TryRecvError::Disconnected)
767 Err(stream::Upgraded(rx)) => rx,
770 Flavor::Shared(ref p) => {
771 match unsafe { (*p.get()).try_recv() } {
772 Ok(t) => return Ok(t),
773 Err(shared::Empty) => return Err(TryRecvError::Empty),
774 Err(shared::Disconnected) => {
775 return Err(TryRecvError::Disconnected)
779 Flavor::Sync(ref p) => {
780 match unsafe { (*p.get()).try_recv() } {
781 Ok(t) => return Ok(t),
782 Err(sync::Empty) => return Err(TryRecvError::Empty),
783 Err(sync::Disconnected) => {
784 return Err(TryRecvError::Disconnected)
790 mem::swap(self.inner_mut(),
791 new_port.inner_mut());
796 /// Attempt to wait for a value on this receiver, returning an error if the
797 /// corresponding channel has hung up.
799 /// This function will always block the current thread if there is no data
800 /// available and it's possible for more data to be sent. Once a message is
801 /// sent to the corresponding `Sender`, then this receiver will wake up and
802 /// return that message.
804 /// If the corresponding `Sender` has disconnected, or it disconnects while
805 /// this call is blocking, this call will wake up and return `Err` to
806 /// indicate that no more messages can ever be received on this channel.
808 pub fn recv(&self) -> Result<T, RecvError> {
810 let new_port = match *unsafe { self.inner() } {
811 Flavor::Oneshot(ref p) => {
812 match unsafe { (*p.get()).recv() } {
813 Ok(t) => return Ok(t),
814 Err(oneshot::Empty) => return unreachable!(),
815 Err(oneshot::Disconnected) => return Err(RecvError),
816 Err(oneshot::Upgraded(rx)) => rx,
819 Flavor::Stream(ref p) => {
820 match unsafe { (*p.get()).recv() } {
821 Ok(t) => return Ok(t),
822 Err(stream::Empty) => return unreachable!(),
823 Err(stream::Disconnected) => return Err(RecvError),
824 Err(stream::Upgraded(rx)) => rx,
827 Flavor::Shared(ref p) => {
828 match unsafe { (*p.get()).recv() } {
829 Ok(t) => return Ok(t),
830 Err(shared::Empty) => return unreachable!(),
831 Err(shared::Disconnected) => return Err(RecvError),
834 Flavor::Sync(ref p) => return unsafe {
835 (*p.get()).recv().map_err(|()| RecvError)
839 mem::swap(self.inner_mut(), new_port.inner_mut());
844 /// Returns an iterator that will block waiting for messages, but never
845 /// `panic!`. It will return `None` when the channel has hung up.
847 pub fn iter(&self) -> Iter<T> {
852 impl<T: Send> select::Packet for Receiver<T> {
853 fn can_recv(&self) -> bool {
855 let new_port = match *unsafe { self.inner() } {
856 Flavor::Oneshot(ref p) => {
857 match unsafe { (*p.get()).can_recv() } {
858 Ok(ret) => return ret,
859 Err(upgrade) => upgrade,
862 Flavor::Stream(ref p) => {
863 match unsafe { (*p.get()).can_recv() } {
864 Ok(ret) => return ret,
865 Err(upgrade) => upgrade,
868 Flavor::Shared(ref p) => {
869 return unsafe { (*p.get()).can_recv() };
871 Flavor::Sync(ref p) => {
872 return unsafe { (*p.get()).can_recv() };
876 mem::swap(self.inner_mut(),
877 new_port.inner_mut());
882 fn start_selection(&self, mut token: SignalToken) -> StartResult {
884 let (t, new_port) = match *unsafe { self.inner() } {
885 Flavor::Oneshot(ref p) => {
886 match unsafe { (*p.get()).start_selection(token) } {
887 oneshot::SelSuccess => return Installed,
888 oneshot::SelCanceled => return Abort,
889 oneshot::SelUpgraded(t, rx) => (t, rx),
892 Flavor::Stream(ref p) => {
893 match unsafe { (*p.get()).start_selection(token) } {
894 stream::SelSuccess => return Installed,
895 stream::SelCanceled => return Abort,
896 stream::SelUpgraded(t, rx) => (t, rx),
899 Flavor::Shared(ref p) => {
900 return unsafe { (*p.get()).start_selection(token) };
902 Flavor::Sync(ref p) => {
903 return unsafe { (*p.get()).start_selection(token) };
908 mem::swap(self.inner_mut(), new_port.inner_mut());
913 fn abort_selection(&self) -> bool {
914 let mut was_upgrade = false;
916 let result = match *unsafe { self.inner() } {
917 Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
918 Flavor::Stream(ref p) => unsafe {
919 (*p.get()).abort_selection(was_upgrade)
921 Flavor::Shared(ref p) => return unsafe {
922 (*p.get()).abort_selection(was_upgrade)
924 Flavor::Sync(ref p) => return unsafe {
925 (*p.get()).abort_selection()
928 let new_port = match result { Ok(b) => return b, Err(p) => p };
931 mem::swap(self.inner_mut(),
932 new_port.inner_mut());
939 impl<'a, T: Send> Iterator for Iter<'a, T> {
942 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
946 impl<T: Send> Drop for Receiver<T> {
948 match *unsafe { self.inner_mut() } {
949 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
950 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
951 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
952 Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
957 /// A version of `UnsafeCell` intended for use in concurrent data
958 /// structures (for example, you might put it in an `Arc`).
959 struct RacyCell<T>(pub UnsafeCell<T>);
961 impl<T> RacyCell<T> {
963 fn new(value: T) -> RacyCell<T> {
964 RacyCell(UnsafeCell { value: value })
967 unsafe fn get(&self) -> *mut T {
973 unsafe impl<T:Send> Send for RacyCell<T> { }
975 unsafe impl<T> Sync for RacyCell<T> { } // Oh dear
977 impl<T> fmt::Show for SendError<T> {
978 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
979 "sending on a closed channel".fmt(f)
983 impl<T> fmt::Show for TrySendError<T> {
984 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
986 TrySendError::Full(..) => {
987 "sending on a full channel".fmt(f)
989 TrySendError::Disconnected(..) => {
990 "sending on a closed channel".fmt(f)
996 impl fmt::Show for RecvError {
997 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
998 "receiving on a closed channel".fmt(f)
1002 impl fmt::Show for TryRecvError {
1003 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1005 TryRecvError::Empty => {
1006 "receiving on an empty channel".fmt(f)
1008 TryRecvError::Disconnected => {
1009 "receiving on a closed channel".fmt(f)
1023 pub fn stress_factor() -> uint {
1024 match os::getenv("RUST_TEST_STRESS") {
1025 Some(val) => val.parse().unwrap(),
1032 let (tx, rx) = channel::<int>();
1033 tx.send(1).unwrap();
1034 assert_eq!(rx.recv().unwrap(), 1);
1039 let (tx, _rx) = channel();
1040 tx.send(box 1i).unwrap();
1044 fn drop_full_shared() {
1045 let (tx, _rx) = channel();
1048 tx.send(box 1i).unwrap();
1053 let (tx, rx) = channel::<int>();
1054 tx.send(1).unwrap();
1055 assert_eq!(rx.recv().unwrap(), 1);
1056 let tx = tx.clone();
1057 tx.send(1).unwrap();
1058 assert_eq!(rx.recv().unwrap(), 1);
1062 fn smoke_threads() {
1063 let (tx, rx) = channel::<int>();
1064 let _t = Thread::spawn(move|| {
1065 tx.send(1).unwrap();
1067 assert_eq!(rx.recv().unwrap(), 1);
1071 fn smoke_port_gone() {
1072 let (tx, rx) = channel::<int>();
1074 assert!(tx.send(1).is_err());
1078 fn smoke_shared_port_gone() {
1079 let (tx, rx) = channel::<int>();
1081 assert!(tx.send(1).is_err())
1085 fn smoke_shared_port_gone2() {
1086 let (tx, rx) = channel::<int>();
1088 let tx2 = tx.clone();
1090 assert!(tx2.send(1).is_err());
1094 fn port_gone_concurrent() {
1095 let (tx, rx) = channel::<int>();
1096 let _t = Thread::spawn(move|| {
1099 while tx.send(1).is_ok() {}
1103 fn port_gone_concurrent_shared() {
1104 let (tx, rx) = channel::<int>();
1105 let tx2 = tx.clone();
1106 let _t = Thread::spawn(move|| {
1109 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1113 fn smoke_chan_gone() {
1114 let (tx, rx) = channel::<int>();
1116 assert!(rx.recv().is_err());
1120 fn smoke_chan_gone_shared() {
1121 let (tx, rx) = channel::<()>();
1122 let tx2 = tx.clone();
1125 assert!(rx.recv().is_err());
1129 fn chan_gone_concurrent() {
1130 let (tx, rx) = channel::<int>();
1131 let _t = Thread::spawn(move|| {
1132 tx.send(1).unwrap();
1133 tx.send(1).unwrap();
1135 while rx.recv().is_ok() {}
1140 let (tx, rx) = channel::<int>();
1141 let t = Thread::spawn(move|| {
1142 for _ in range(0u, 10000) { tx.send(1i).unwrap(); }
1144 for _ in range(0u, 10000) {
1145 assert_eq!(rx.recv().unwrap(), 1);
1147 t.join().ok().unwrap();
1151 fn stress_shared() {
1152 static AMT: uint = 10000;
1153 static NTHREADS: uint = 8;
1154 let (tx, rx) = channel::<int>();
1156 let t = Thread::spawn(move|| {
1157 for _ in range(0, AMT * NTHREADS) {
1158 assert_eq!(rx.recv().unwrap(), 1);
1160 match rx.try_recv() {
1166 for _ in range(0, NTHREADS) {
1167 let tx = tx.clone();
1168 Thread::spawn(move|| {
1169 for _ in range(0, AMT) { tx.send(1).unwrap(); }
1173 t.join().ok().unwrap();
1177 fn send_from_outside_runtime() {
1178 let (tx1, rx1) = channel::<()>();
1179 let (tx2, rx2) = channel::<int>();
1180 let t1 = Thread::spawn(move|| {
1181 tx1.send(()).unwrap();
1182 for _ in range(0i, 40) {
1183 assert_eq!(rx2.recv().unwrap(), 1);
1186 rx1.recv().unwrap();
1187 let t2 = Thread::spawn(move|| {
1188 for _ in range(0i, 40) {
1189 tx2.send(1).unwrap();
1192 t1.join().ok().unwrap();
1193 t2.join().ok().unwrap();
1197 fn recv_from_outside_runtime() {
1198 let (tx, rx) = channel::<int>();
1199 let t = Thread::spawn(move|| {
1200 for _ in range(0i, 40) {
1201 assert_eq!(rx.recv().unwrap(), 1);
1204 for _ in range(0u, 40) {
1205 tx.send(1).unwrap();
1207 t.join().ok().unwrap();
1212 let (tx1, rx1) = channel::<int>();
1213 let (tx2, rx2) = channel::<int>();
1214 let t1 = Thread::spawn(move|| {
1215 assert_eq!(rx1.recv().unwrap(), 1);
1216 tx2.send(2).unwrap();
1218 let t2 = Thread::spawn(move|| {
1219 tx1.send(1).unwrap();
1220 assert_eq!(rx2.recv().unwrap(), 2);
1222 t1.join().ok().unwrap();
1223 t2.join().ok().unwrap();
1227 fn oneshot_single_thread_close_port_first() {
1228 // Simple test of closing without sending
1229 let (_tx, rx) = channel::<int>();
1234 fn oneshot_single_thread_close_chan_first() {
1235 // Simple test of closing without sending
1236 let (tx, _rx) = channel::<int>();
1241 fn oneshot_single_thread_send_port_close() {
1242 // Testing that the sender cleans up the payload if receiver is closed
1243 let (tx, rx) = channel::<Box<int>>();
1245 assert!(tx.send(box 0).is_err());
1249 fn oneshot_single_thread_recv_chan_close() {
1250 // Receiving on a closed chan will panic
1251 let res = Thread::spawn(move|| {
1252 let (tx, rx) = channel::<int>();
1257 assert!(res.is_err());
1261 fn oneshot_single_thread_send_then_recv() {
1262 let (tx, rx) = channel::<Box<int>>();
1263 tx.send(box 10).unwrap();
1264 assert!(rx.recv().unwrap() == box 10);
1268 fn oneshot_single_thread_try_send_open() {
1269 let (tx, rx) = channel::<int>();
1270 assert!(tx.send(10).is_ok());
1271 assert!(rx.recv().unwrap() == 10);
1275 fn oneshot_single_thread_try_send_closed() {
1276 let (tx, rx) = channel::<int>();
1278 assert!(tx.send(10).is_err());
1282 fn oneshot_single_thread_try_recv_open() {
1283 let (tx, rx) = channel::<int>();
1284 tx.send(10).unwrap();
1285 assert!(rx.recv() == Ok(10));
1289 fn oneshot_single_thread_try_recv_closed() {
1290 let (tx, rx) = channel::<int>();
1292 assert!(rx.recv().is_err());
1296 fn oneshot_single_thread_peek_data() {
1297 let (tx, rx) = channel::<int>();
1298 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1299 tx.send(10).unwrap();
1300 assert_eq!(rx.try_recv(), Ok(10));
1304 fn oneshot_single_thread_peek_close() {
1305 let (tx, rx) = channel::<int>();
1307 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1308 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1312 fn oneshot_single_thread_peek_open() {
1313 let (_tx, rx) = channel::<int>();
1314 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1318 fn oneshot_multi_task_recv_then_send() {
1319 let (tx, rx) = channel::<Box<int>>();
1320 let _t = Thread::spawn(move|| {
1321 assert!(rx.recv().unwrap() == box 10);
1324 tx.send(box 10).unwrap();
1328 fn oneshot_multi_task_recv_then_close() {
1329 let (tx, rx) = channel::<Box<int>>();
1330 let _t = Thread::spawn(move|| {
1333 let res = Thread::spawn(move|| {
1334 assert!(rx.recv().unwrap() == box 10);
1336 assert!(res.is_err());
1340 fn oneshot_multi_thread_close_stress() {
1341 for _ in range(0, stress_factor()) {
1342 let (tx, rx) = channel::<int>();
1343 let _t = Thread::spawn(move|| {
1351 fn oneshot_multi_thread_send_close_stress() {
1352 for _ in range(0, stress_factor()) {
1353 let (tx, rx) = channel::<int>();
1354 let _t = Thread::spawn(move|| {
1357 let _ = Thread::spawn(move|| {
1358 tx.send(1).unwrap();
1364 fn oneshot_multi_thread_recv_close_stress() {
1365 for _ in range(0, stress_factor()) {
1366 let (tx, rx) = channel::<int>();
1367 Thread::spawn(move|| {
1368 let res = Thread::spawn(move|| {
1371 assert!(res.is_err());
1373 let _t = Thread::spawn(move|| {
1374 Thread::spawn(move|| {
1382 fn oneshot_multi_thread_send_recv_stress() {
1383 for _ in range(0, stress_factor()) {
1384 let (tx, rx) = channel();
1385 let _t = Thread::spawn(move|| {
1386 tx.send(box 10i).unwrap();
1388 assert!(rx.recv().unwrap() == box 10i);
1393 fn stream_send_recv_stress() {
1394 for _ in range(0, stress_factor()) {
1395 let (tx, rx) = channel();
1400 fn send(tx: Sender<Box<int>>, i: int) {
1401 if i == 10 { return }
1403 Thread::spawn(move|| {
1404 tx.send(box i).unwrap();
1409 fn recv(rx: Receiver<Box<int>>, i: int) {
1410 if i == 10 { return }
1412 Thread::spawn(move|| {
1413 assert!(rx.recv().unwrap() == box i);
1422 // Regression test that we don't run out of stack in scheduler context
1423 let (tx, rx) = channel();
1424 for _ in range(0i, 10000) { tx.send(()).unwrap(); }
1425 for _ in range(0i, 10000) { rx.recv().unwrap(); }
1429 fn shared_chan_stress() {
1430 let (tx, rx) = channel();
1431 let total = stress_factor() + 100;
1432 for _ in range(0, total) {
1433 let tx = tx.clone();
1434 Thread::spawn(move|| {
1435 tx.send(()).unwrap();
1439 for _ in range(0, total) {
1445 fn test_nested_recv_iter() {
1446 let (tx, rx) = channel::<int>();
1447 let (total_tx, total_rx) = channel::<int>();
1449 let _t = Thread::spawn(move|| {
1451 for x in rx.iter() {
1454 total_tx.send(acc).unwrap();
1457 tx.send(3).unwrap();
1458 tx.send(1).unwrap();
1459 tx.send(2).unwrap();
1461 assert_eq!(total_rx.recv().unwrap(), 6);
1465 fn test_recv_iter_break() {
1466 let (tx, rx) = channel::<int>();
1467 let (count_tx, count_rx) = channel();
1469 let _t = Thread::spawn(move|| {
1471 for x in rx.iter() {
1478 count_tx.send(count).unwrap();
1481 tx.send(2).unwrap();
1482 tx.send(2).unwrap();
1483 tx.send(2).unwrap();
1486 assert_eq!(count_rx.recv().unwrap(), 4);
1490 fn try_recv_states() {
1491 let (tx1, rx1) = channel::<int>();
1492 let (tx2, rx2) = channel::<()>();
1493 let (tx3, rx3) = channel::<()>();
1494 let _t = Thread::spawn(move|| {
1495 rx2.recv().unwrap();
1496 tx1.send(1).unwrap();
1497 tx3.send(()).unwrap();
1498 rx2.recv().unwrap();
1500 tx3.send(()).unwrap();
1503 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1504 tx2.send(()).unwrap();
1505 rx3.recv().unwrap();
1506 assert_eq!(rx1.try_recv(), Ok(1));
1507 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1508 tx2.send(()).unwrap();
1509 rx3.recv().unwrap();
1510 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1513 // This bug used to end up in a livelock inside of the Receiver destructor
1514 // because the internal state of the Shared packet was corrupted
1516 fn destroy_upgraded_shared_port_when_sender_still_active() {
1517 let (tx, rx) = channel();
1518 let (tx2, rx2) = channel();
1519 let _t = Thread::spawn(move|| {
1520 rx.recv().unwrap(); // wait on a oneshot
1521 drop(rx); // destroy a shared
1522 tx2.send(()).unwrap();
1524 // make sure the other task has gone to sleep
1525 for _ in range(0u, 5000) { Thread::yield_now(); }
1527 // upgrade to a shared chan and send a message
1530 t.send(()).unwrap();
1532 // wait for the child task to exit before we exit
1533 rx2.recv().unwrap();
1545 pub fn stress_factor() -> uint {
1546 match os::getenv("RUST_TEST_STRESS") {
1547 Some(val) => val.parse().unwrap(),
1554 let (tx, rx) = sync_channel::<int>(1);
1555 tx.send(1).unwrap();
1556 assert_eq!(rx.recv().unwrap(), 1);
1561 let (tx, _rx) = sync_channel(1);
1562 tx.send(box 1i).unwrap();
1567 let (tx, rx) = sync_channel::<int>(1);
1568 tx.send(1).unwrap();
1569 assert_eq!(rx.recv().unwrap(), 1);
1570 let tx = tx.clone();
1571 tx.send(1).unwrap();
1572 assert_eq!(rx.recv().unwrap(), 1);
1576 fn smoke_threads() {
1577 let (tx, rx) = sync_channel::<int>(0);
1578 let _t = Thread::spawn(move|| {
1579 tx.send(1).unwrap();
1581 assert_eq!(rx.recv().unwrap(), 1);
1585 fn smoke_port_gone() {
1586 let (tx, rx) = sync_channel::<int>(0);
1588 assert!(tx.send(1).is_err());
1592 fn smoke_shared_port_gone2() {
1593 let (tx, rx) = sync_channel::<int>(0);
1595 let tx2 = tx.clone();
1597 assert!(tx2.send(1).is_err());
1601 fn port_gone_concurrent() {
1602 let (tx, rx) = sync_channel::<int>(0);
1603 let _t = Thread::spawn(move|| {
1606 while tx.send(1).is_ok() {}
1610 fn port_gone_concurrent_shared() {
1611 let (tx, rx) = sync_channel::<int>(0);
1612 let tx2 = tx.clone();
1613 let _t = Thread::spawn(move|| {
1616 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1620 fn smoke_chan_gone() {
1621 let (tx, rx) = sync_channel::<int>(0);
1623 assert!(rx.recv().is_err());
1627 fn smoke_chan_gone_shared() {
1628 let (tx, rx) = sync_channel::<()>(0);
1629 let tx2 = tx.clone();
1632 assert!(rx.recv().is_err());
1636 fn chan_gone_concurrent() {
1637 let (tx, rx) = sync_channel::<int>(0);
1638 Thread::spawn(move|| {
1639 tx.send(1).unwrap();
1640 tx.send(1).unwrap();
1642 while rx.recv().is_ok() {}
1647 let (tx, rx) = sync_channel::<int>(0);
1648 Thread::spawn(move|| {
1649 for _ in range(0u, 10000) { tx.send(1).unwrap(); }
1651 for _ in range(0u, 10000) {
1652 assert_eq!(rx.recv().unwrap(), 1);
1657 fn stress_shared() {
1658 static AMT: uint = 1000;
1659 static NTHREADS: uint = 8;
1660 let (tx, rx) = sync_channel::<int>(0);
1661 let (dtx, drx) = sync_channel::<()>(0);
1663 Thread::spawn(move|| {
1664 for _ in range(0, AMT * NTHREADS) {
1665 assert_eq!(rx.recv().unwrap(), 1);
1667 match rx.try_recv() {
1671 dtx.send(()).unwrap();
1674 for _ in range(0, NTHREADS) {
1675 let tx = tx.clone();
1676 Thread::spawn(move|| {
1677 for _ in range(0, AMT) { tx.send(1).unwrap(); }
1681 drx.recv().unwrap();
1685 fn oneshot_single_thread_close_port_first() {
1686 // Simple test of closing without sending
1687 let (_tx, rx) = sync_channel::<int>(0);
1692 fn oneshot_single_thread_close_chan_first() {
1693 // Simple test of closing without sending
1694 let (tx, _rx) = sync_channel::<int>(0);
1699 fn oneshot_single_thread_send_port_close() {
1700 // Testing that the sender cleans up the payload if receiver is closed
1701 let (tx, rx) = sync_channel::<Box<int>>(0);
1703 assert!(tx.send(box 0).is_err());
1707 fn oneshot_single_thread_recv_chan_close() {
1708 // Receiving on a closed chan will panic
1709 let res = Thread::spawn(move|| {
1710 let (tx, rx) = sync_channel::<int>(0);
1715 assert!(res.is_err());
1719 fn oneshot_single_thread_send_then_recv() {
1720 let (tx, rx) = sync_channel::<Box<int>>(1);
1721 tx.send(box 10).unwrap();
1722 assert!(rx.recv().unwrap() == box 10);
1726 fn oneshot_single_thread_try_send_open() {
1727 let (tx, rx) = sync_channel::<int>(1);
1728 assert_eq!(tx.try_send(10), Ok(()));
1729 assert!(rx.recv().unwrap() == 10);
1733 fn oneshot_single_thread_try_send_closed() {
1734 let (tx, rx) = sync_channel::<int>(0);
1736 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1740 fn oneshot_single_thread_try_send_closed2() {
1741 let (tx, _rx) = sync_channel::<int>(0);
1742 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1746 fn oneshot_single_thread_try_recv_open() {
1747 let (tx, rx) = sync_channel::<int>(1);
1748 tx.send(10).unwrap();
1749 assert!(rx.recv() == Ok(10));
1753 fn oneshot_single_thread_try_recv_closed() {
1754 let (tx, rx) = sync_channel::<int>(0);
1756 assert!(rx.recv().is_err());
1760 fn oneshot_single_thread_peek_data() {
1761 let (tx, rx) = sync_channel::<int>(1);
1762 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1763 tx.send(10).unwrap();
1764 assert_eq!(rx.try_recv(), Ok(10));
1768 fn oneshot_single_thread_peek_close() {
1769 let (tx, rx) = sync_channel::<int>(0);
1771 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1772 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1776 fn oneshot_single_thread_peek_open() {
1777 let (_tx, rx) = sync_channel::<int>(0);
1778 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1782 fn oneshot_multi_task_recv_then_send() {
1783 let (tx, rx) = sync_channel::<Box<int>>(0);
1784 let _t = Thread::spawn(move|| {
1785 assert!(rx.recv().unwrap() == box 10);
1788 tx.send(box 10).unwrap();
1792 fn oneshot_multi_task_recv_then_close() {
1793 let (tx, rx) = sync_channel::<Box<int>>(0);
1794 let _t = Thread::spawn(move|| {
1797 let res = Thread::spawn(move|| {
1798 assert!(rx.recv().unwrap() == box 10);
1800 assert!(res.is_err());
1804 fn oneshot_multi_thread_close_stress() {
1805 for _ in range(0, stress_factor()) {
1806 let (tx, rx) = sync_channel::<int>(0);
1807 let _t = Thread::spawn(move|| {
1815 fn oneshot_multi_thread_send_close_stress() {
1816 for _ in range(0, stress_factor()) {
1817 let (tx, rx) = sync_channel::<int>(0);
1818 let _t = Thread::spawn(move|| {
1821 let _ = Thread::spawn(move || {
1822 tx.send(1).unwrap();
1828 fn oneshot_multi_thread_recv_close_stress() {
1829 for _ in range(0, stress_factor()) {
1830 let (tx, rx) = sync_channel::<int>(0);
1831 let _t = Thread::spawn(move|| {
1832 let res = Thread::spawn(move|| {
1835 assert!(res.is_err());
1837 let _t = Thread::spawn(move|| {
1838 Thread::spawn(move|| {
1846 fn oneshot_multi_thread_send_recv_stress() {
1847 for _ in range(0, stress_factor()) {
1848 let (tx, rx) = sync_channel::<Box<int>>(0);
1849 let _t = Thread::spawn(move|| {
1850 tx.send(box 10i).unwrap();
1852 assert!(rx.recv().unwrap() == box 10i);
1857 fn stream_send_recv_stress() {
1858 for _ in range(0, stress_factor()) {
1859 let (tx, rx) = sync_channel::<Box<int>>(0);
1864 fn send(tx: SyncSender<Box<int>>, i: int) {
1865 if i == 10 { return }
1867 Thread::spawn(move|| {
1868 tx.send(box i).unwrap();
1873 fn recv(rx: Receiver<Box<int>>, i: int) {
1874 if i == 10 { return }
1876 Thread::spawn(move|| {
1877 assert!(rx.recv().unwrap() == box i);
1886 // Regression test that we don't run out of stack in scheduler context
1887 let (tx, rx) = sync_channel(10000);
1888 for _ in range(0u, 10000) { tx.send(()).unwrap(); }
1889 for _ in range(0u, 10000) { rx.recv().unwrap(); }
1893 fn shared_chan_stress() {
1894 let (tx, rx) = sync_channel(0);
1895 let total = stress_factor() + 100;
1896 for _ in range(0, total) {
1897 let tx = tx.clone();
1898 Thread::spawn(move|| {
1899 tx.send(()).unwrap();
1903 for _ in range(0, total) {
1909 fn test_nested_recv_iter() {
1910 let (tx, rx) = sync_channel::<int>(0);
1911 let (total_tx, total_rx) = sync_channel::<int>(0);
1913 let _t = Thread::spawn(move|| {
1915 for x in rx.iter() {
1918 total_tx.send(acc).unwrap();
1921 tx.send(3).unwrap();
1922 tx.send(1).unwrap();
1923 tx.send(2).unwrap();
1925 assert_eq!(total_rx.recv().unwrap(), 6);
1929 fn test_recv_iter_break() {
1930 let (tx, rx) = sync_channel::<int>(0);
1931 let (count_tx, count_rx) = sync_channel(0);
1933 let _t = Thread::spawn(move|| {
1935 for x in rx.iter() {
1942 count_tx.send(count).unwrap();
1945 tx.send(2).unwrap();
1946 tx.send(2).unwrap();
1947 tx.send(2).unwrap();
1948 let _ = tx.try_send(2);
1950 assert_eq!(count_rx.recv().unwrap(), 4);
1954 fn try_recv_states() {
1955 let (tx1, rx1) = sync_channel::<int>(1);
1956 let (tx2, rx2) = sync_channel::<()>(1);
1957 let (tx3, rx3) = sync_channel::<()>(1);
1958 let _t = Thread::spawn(move|| {
1959 rx2.recv().unwrap();
1960 tx1.send(1).unwrap();
1961 tx3.send(()).unwrap();
1962 rx2.recv().unwrap();
1964 tx3.send(()).unwrap();
1967 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1968 tx2.send(()).unwrap();
1969 rx3.recv().unwrap();
1970 assert_eq!(rx1.try_recv(), Ok(1));
1971 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1972 tx2.send(()).unwrap();
1973 rx3.recv().unwrap();
1974 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1977 // This bug used to end up in a livelock inside of the Receiver destructor
1978 // because the internal state of the Shared packet was corrupted
1980 fn destroy_upgraded_shared_port_when_sender_still_active() {
1981 let (tx, rx) = sync_channel::<()>(0);
1982 let (tx2, rx2) = sync_channel::<()>(0);
1983 let _t = Thread::spawn(move|| {
1984 rx.recv().unwrap(); // wait on a oneshot
1985 drop(rx); // destroy a shared
1986 tx2.send(()).unwrap();
1988 // make sure the other task has gone to sleep
1989 for _ in range(0u, 5000) { Thread::yield_now(); }
1991 // upgrade to a shared chan and send a message
1994 t.send(()).unwrap();
1996 // wait for the child task to exit before we exit
1997 rx2.recv().unwrap();
2002 let (tx, rx) = sync_channel::<int>(0);
2003 let _t = Thread::spawn(move|| { rx.recv().unwrap(); });
2004 assert_eq!(tx.send(1), Ok(()));
2009 let (tx, rx) = sync_channel::<int>(0);
2010 let _t = Thread::spawn(move|| { drop(rx); });
2011 assert!(tx.send(1).is_err());
2016 let (tx, rx) = sync_channel::<int>(1);
2017 assert_eq!(tx.send(1), Ok(()));
2018 let _t =Thread::spawn(move|| { drop(rx); });
2019 assert!(tx.send(1).is_err());
2024 let (tx, rx) = sync_channel::<int>(0);
2025 let tx2 = tx.clone();
2026 let (done, donerx) = channel();
2027 let done2 = done.clone();
2028 let _t = Thread::spawn(move|| {
2029 assert!(tx.send(1).is_err());
2030 done.send(()).unwrap();
2032 let _t = Thread::spawn(move|| {
2033 assert!(tx2.send(2).is_err());
2034 done2.send(()).unwrap();
2037 donerx.recv().unwrap();
2038 donerx.recv().unwrap();
2043 let (tx, _rx) = sync_channel::<int>(0);
2044 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2049 let (tx, _rx) = sync_channel::<int>(1);
2050 assert_eq!(tx.try_send(1), Ok(()));
2051 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2056 let (tx, rx) = sync_channel::<int>(1);
2057 assert_eq!(tx.try_send(1), Ok(()));
2059 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2065 let (tx1, rx1) = sync_channel::<()>(3);
2066 let (tx2, rx2) = sync_channel::<()>(3);
2068 let _t = Thread::spawn(move|| {
2069 rx1.recv().unwrap();
2070 tx2.try_send(()).unwrap();
2073 tx1.try_send(()).unwrap();
2074 rx2.recv().unwrap();
2077 for _ in range(0u, 100) {