1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Multi-producer, single-consumer communication primitives threads
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
20 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
24 //! These channels come in two flavors:
26 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27 //! will return a `(Sender, Receiver)` tuple where all sends will be
28 //! **asynchronous** (they never block). The channel conceptually has an
31 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32 //! a `(SyncSender, Receiver)` tuple where the storage for pending messages
33 //! is a pre-allocated buffer of a fixed size. All sends will be
34 //! **synchronous** by blocking until there is buffer space available. Note
35 //! that a bound of 0 is allowed, causing the channel to become a
36 //! "rendezvous" channel where each sender atomically hands off a message to
41 //! The send and receive operations on channels will all return a `Result`
42 //! indicating whether the operation succeeded or not. An unsuccessful operation
43 //! is normally indicative of the other half of a channel having "hung up" by
44 //! being dropped in its corresponding thread.
46 //! Once half of a channel has been deallocated, most operations can no longer
47 //! continue to make progress, so `Err` will be returned. Many applications will
48 //! continue to `unwrap()` the results returned from this module, instigating a
49 //! propagation of failure among threads if one unexpectedly dies.
56 //! use std::thread::Thread;
57 //! use std::sync::mpsc::channel;
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! Thread::spawn(move|| {
62 //! tx.send(10i).unwrap();
64 //! assert_eq!(rx.recv().unwrap(), 10i);
70 //! use std::thread::Thread;
71 //! use std::sync::mpsc::channel;
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
77 //! for i in range(0i, 10i) {
78 //! let tx = tx.clone();
79 //! Thread::spawn(move|| {
80 //! tx.send(i).unwrap();
84 //! for _ in range(0i, 10i) {
85 //! let j = rx.recv().unwrap();
86 //! assert!(0 <= j && j < 10);
90 //! Propagating panics:
93 //! use std::sync::mpsc::channel;
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<int>();
99 //! assert!(rx.recv().is_err());
102 //! Synchronous channels:
105 //! use std::thread::Thread;
106 //! use std::sync::mpsc::sync_channel;
108 //! let (tx, rx) = sync_channel::<int>(0);
109 //! Thread::spawn(move|| {
110 //! // This will wait for the parent task to start receiving
111 //! tx.send(53).unwrap();
113 //! rx.recv().unwrap();
116 //! Reading from a channel with a timeout requires to use a Timer together
117 //! with the channel. You can use the select! macro to select either and
118 //! handle the timeout case. This first example will break out of the loop
119 //! after 10 seconds no matter what:
122 //! use std::sync::mpsc::channel;
123 //! use std::io::timer::Timer;
124 //! use std::time::Duration;
126 //! let (tx, rx) = channel::<int>();
127 //! let mut timer = Timer::new().unwrap();
128 //! let timeout = timer.oneshot(Duration::seconds(10));
132 //! val = rx.recv() => println!("Received {}", val.unwrap()),
133 //! _ = timeout.recv() => {
134 //! println!("timed out, total time was more than 10 seconds");
141 //! This second example is more costly since it allocates a new timer every
142 //! time a message is received, but it allows you to timeout after the channel
143 //! has been inactive for 5 seconds:
146 //! use std::sync::mpsc::channel;
147 //! use std::io::timer::Timer;
148 //! use std::time::Duration;
150 //! let (tx, rx) = channel::<int>();
151 //! let mut timer = Timer::new().unwrap();
154 //! let timeout = timer.oneshot(Duration::seconds(5));
157 //! val = rx.recv() => println!("Received {}", val.unwrap()),
158 //! _ = timeout.recv() => {
159 //! println!("timed out, no message received in 5 seconds");
168 // A description of how Rust's channel implementation works
170 // Channels are supposed to be the basic building block for all other
171 // concurrent primitives that are used in Rust. As a result, the channel type
172 // needs to be highly optimized, flexible, and broad enough for use everywhere.
174 // The choice of implementation of all channels is to be built on lock-free data
175 // structures. The channels themselves are then consequently also lock-free data
176 // structures. As always with lock-free code, this is a very "here be dragons"
177 // territory, especially because I'm unaware of any academic papers that have
178 // gone into great length about channels of these flavors.
180 // ## Flavors of channels
182 // From the perspective of a consumer of this library, there is only one flavor
183 // of channel. This channel can be used as a stream and cloned to allow multiple
184 // senders. Under the hood, however, there are actually three flavors of
187 // * Flavor::Oneshots - these channels are highly optimized for the one-send use case.
188 // They contain as few atomics as possible and involve one and
189 // exactly one allocation.
190 // * Streams - these channels are optimized for the non-shared use case. They
191 // use a different concurrent queue that is more tailored for this
192 // use case. The initial allocation of this flavor of channel is not
194 // * Shared - this is the most general form of channel that this module offers,
195 // a channel with multiple senders. This type is as optimized as it
196 // can be, but the previous two types mentioned are much faster for
199 // ## Concurrent queues
201 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
202 // recv() obviously blocks. This means that under the hood there must be some
203 // shared and concurrent queue holding all of the actual data.
205 // With two flavors of channels, two flavors of queues are also used. We have
206 // chosen to use queues from a well-known author that are abbreviated as SPSC
207 // and MPSC (single producer, single consumer and multiple producer, single
208 // consumer). SPSC queues are used for streams while MPSC queues are used for
211 // ### SPSC optimizations
213 // The SPSC queue found online is essentially a linked list of nodes where one
214 // half of the nodes are the "queue of data" and the other half of nodes are a
215 // cache of unused nodes. The unused nodes are used such that an allocation is
216 // not required on every push() and a free doesn't need to happen on every
219 // As found online, however, the cache of nodes is of an infinite size. This
220 // means that if a channel at one point in its life had 50k items in the queue,
221 // then the queue will always have the capacity for 50k items. I believed that
222 // this was an unnecessary limitation of the implementation, so I have altered
223 // the queue to optionally have a bound on the cache size.
225 // By default, streams will have an unbounded SPSC queue with a small-ish cache
226 // size. The hope is that the cache is still large enough to have very fast
227 // send() operations while not too large such that millions of channels can
230 // ### MPSC optimizations
232 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
233 // a linked list under the hood to earn its unboundedness, but I have not put
234 // forth much effort into having a cache of nodes similar to the SPSC queue.
236 // For now, I believe that this is "ok" because shared channels are not the most
237 // common type, but soon we may wish to revisit this queue choice and determine
238 // another candidate for backend storage of shared channels.
240 // ## Overview of the Implementation
242 // Now that there's a little background on the concurrent queues used, it's
243 // worth going into much more detail about the channels themselves. The basic
244 // pseudocode for a send/recv are:
248 // queue.push(t) return if queue.pop()
249 // if increment() == -1 deschedule {
250 // wakeup() if decrement() > 0
251 // cancel_deschedule()
255 // As mentioned before, there are no locks in this implementation, only atomic
256 // instructions are used.
258 // ### The internal atomic counter
260 // Every channel has a shared counter with each half to keep track of the size
261 // of the queue. This counter is used to abort descheduling by the receiver and
262 // to know when to wake up on the sending side.
264 // As seen in the pseudocode, senders will increment this count and receivers
265 // will decrement the count. The theory behind this is that if a sender sees a
266 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
267 // then it doesn't need to block.
269 // The recv() method has a beginning call to pop(), and if successful, it needs
270 // to decrement the count. It is a crucial implementation detail that this
271 // decrement does *not* happen to the shared counter. If this were the case,
272 // then it would be possible for the counter to be very negative when there were
273 // no receivers waiting, in which case the senders would have to determine when
274 // it was actually appropriate to wake up a receiver.
276 // Instead, the "steal count" is kept track of separately (not atomically
277 // because it's only used by receivers), and then the decrement() call when
278 // descheduling will lump in all of the recent steals into one large decrement.
280 // The implication of this is that if a sender sees a -1 count, then there's
281 // guaranteed to be a waiter waiting!
283 // ## Native Implementation
285 // A major goal of these channels is to work seamlessly on and off the runtime.
286 // All of the previous race conditions have been worded in terms of
287 // scheduler-isms (which is obviously not available without the runtime).
289 // For now, native usage of channels (off the runtime) will fall back onto
290 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
291 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
292 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
293 // condition variable.
297 // Being able to support selection over channels has greatly influenced this
298 // design, and not only does selection need to work inside the runtime, but also
299 // outside the runtime.
301 // The implementation is fairly straightforward. The goal of select() is not to
302 // return some data, but only to return which channel can receive data without
303 // blocking. The implementation is essentially the entire blocking procedure
304 // followed by an increment as soon as its woken up. The cancellation procedure
305 // involves an increment and swapping out of to_wake to acquire ownership of the
308 // Sadly this current implementation requires multiple allocations, so I have
309 // seen the throughput of select() be much worse than it should be. I do not
310 // believe that there is anything fundamental that needs to change about these
311 // channels, however, in order to support a more efficient select().
315 // And now that you've seen all the races that I found and attempted to fix,
316 // here's the code for you to find some more!
324 use cell::UnsafeCell;
326 pub use self::select::{Select, Handle};
327 use self::select::StartResult;
328 use self::select::StartResult::*;
329 use self::blocking::SignalToken;
340 /// The receiving-half of Rust's channel type. This half can only be owned by
343 pub struct Receiver<T> {
344 inner: UnsafeCell<Flavor<T>>,
347 // The receiver port can be sent from place to place, so long as it
348 // is not used to receive non-sendable things.
349 unsafe impl<T:Send> Send for Receiver<T> { }
351 /// An iterator over messages on a receiver, this iterator will block
352 /// whenever `next` is called, waiting for a new message, and `None` will be
353 /// returned when the corresponding channel has hung up.
355 pub struct Iter<'a, T:'a> {
359 /// The sending-half of Rust's asynchronous channel type. This half can only be
360 /// owned by one task, but it can be cloned to send to other tasks.
362 pub struct Sender<T> {
363 inner: UnsafeCell<Flavor<T>>,
366 // The send port can be sent from place to place, so long as it
367 // is not used to send non-sendable things.
368 unsafe impl<T:Send> Send for Sender<T> { }
370 /// The sending-half of Rust's synchronous channel type. This half can only be
371 /// owned by one task, but it can be cloned to send to other tasks.
373 #[cfg(stage0)] // NOTE remove impl after next snapshot
374 pub struct SyncSender<T> {
375 inner: Arc<RacyCell<sync::Packet<T>>>,
376 // can't share in an arc
377 _marker: marker::NoSync,
380 /// The sending-half of Rust's synchronous channel type. This half can only be
381 /// owned by one task, but it can be cloned to send to other tasks.
383 #[cfg(not(stage0))] // NOTE remove cfg after next snapshot
384 pub struct SyncSender<T> {
385 inner: Arc<RacyCell<sync::Packet<T>>>,
388 #[cfg(not(stage0))] // NOTE remove cfg after next snapshot
389 impl<T> !marker::Sync for SyncSender<T> {}
391 /// An error returned from the `send` function on channels.
393 /// A `send` operation can only fail if the receiving end of a channel is
394 /// disconnected, implying that the data could never be received. The error
395 /// contains the data being sent as a payload so it can be recovered.
396 #[derive(PartialEq, Eq)]
398 pub struct SendError<T>(pub T);
400 /// An error returned from the `recv` function on a `Receiver`.
402 /// The `recv` operation can only fail if the sending half of a channel is
403 /// disconnected, implying that no further messages will ever be received.
404 #[derive(PartialEq, Eq, Clone, Copy)]
406 pub struct RecvError;
408 /// This enumeration is the list of the possible reasons that try_recv could not
409 /// return data when called.
410 #[derive(PartialEq, Clone, Copy)]
412 pub enum TryRecvError {
413 /// This channel is currently empty, but the sender(s) have not yet
414 /// disconnected, so data may yet become available.
418 /// This channel's sending half has become disconnected, and there will
419 /// never be any more data received on this channel
424 /// This enumeration is the list of the possible error outcomes for the
425 /// `SyncSender::try_send` method.
426 #[derive(PartialEq, Clone)]
428 pub enum TrySendError<T> {
429 /// The data could not be sent on the channel because it would require that
430 /// the callee block to send the data.
432 /// If this is a buffered channel, then the buffer is full at this time. If
433 /// this is not a buffered channel, then there is no receiver available to
434 /// acquire the data.
438 /// This channel's receiving half has disconnected, so the data could not be
439 /// sent. The data is returned back to the callee in this case.
445 Oneshot(Arc<RacyCell<oneshot::Packet<T>>>),
446 Stream(Arc<RacyCell<stream::Packet<T>>>),
447 Shared(Arc<RacyCell<shared::Packet<T>>>),
448 Sync(Arc<RacyCell<sync::Packet<T>>>),
452 trait UnsafeFlavor<T> {
453 fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>>;
454 unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> {
455 &mut *self.inner_unsafe().get()
457 unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
458 &*self.inner_unsafe().get()
461 impl<T> UnsafeFlavor<T> for Sender<T> {
462 fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
466 impl<T> UnsafeFlavor<T> for Receiver<T> {
467 fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
472 /// Creates a new asynchronous channel, returning the sender/receiver halves.
474 /// All data sent on the sender will become available on the receiver, and no
475 /// send will block the calling task (this channel has an "infinite buffer").
480 /// use std::sync::mpsc::channel;
481 /// use std::thread::Thread;
483 /// // tx is is the sending half (tx for transmission), and rx is the receiving
484 /// // half (rx for receiving).
485 /// let (tx, rx) = channel();
487 /// // Spawn off an expensive computation
488 /// Thread::spawn(move|| {
489 /// # fn expensive_computation() {}
490 /// tx.send(expensive_computation()).unwrap();
493 /// // Do some useful work for awhile
495 /// // Let's see what that answer was
496 /// println!("{:?}", rx.recv().unwrap());
499 pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
500 let a = Arc::new(RacyCell::new(oneshot::Packet::new()));
501 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
504 /// Creates a new synchronous, bounded channel.
506 /// Like asynchronous channels, the `Receiver` will block until a message
507 /// becomes available. These channels differ greatly in the semantics of the
508 /// sender from asynchronous channels, however.
510 /// This channel has an internal buffer on which messages will be queued. When
511 /// the internal buffer becomes full, future sends will *block* waiting for the
512 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
513 /// becomes "rendezvous channel" where each send will not return until a recv
514 /// is paired with it.
516 /// As with asynchronous channels, all senders will panic in `send` if the
517 /// `Receiver` has been destroyed.
522 /// use std::sync::mpsc::sync_channel;
523 /// use std::thread::Thread;
525 /// let (tx, rx) = sync_channel(1);
527 /// // this returns immediately
528 /// tx.send(1i).unwrap();
530 /// Thread::spawn(move|| {
531 /// // this will block until the previous message has been received
532 /// tx.send(2i).unwrap();
535 /// assert_eq!(rx.recv().unwrap(), 1i);
536 /// assert_eq!(rx.recv().unwrap(), 2i);
539 pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
540 let a = Arc::new(RacyCell::new(sync::Packet::new(bound)));
541 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
544 ////////////////////////////////////////////////////////////////////////////////
546 ////////////////////////////////////////////////////////////////////////////////
548 impl<T: Send> Sender<T> {
549 fn new(inner: Flavor<T>) -> Sender<T> {
551 inner: UnsafeCell::new(inner),
555 /// Attempts to send a value on this channel, returning it back if it could
558 /// A successful send occurs when it is determined that the other end of
559 /// the channel has not hung up already. An unsuccessful send would be one
560 /// where the corresponding receiver has already been deallocated. Note
561 /// that a return value of `Err` means that the data will never be
562 /// received, but a return value of `Ok` does *not* mean that the data
563 /// will be received. It is possible for the corresponding receiver to
564 /// hang up immediately after this function returns `Ok`.
566 /// This method will never block the current thread.
571 /// use std::sync::mpsc::channel;
573 /// let (tx, rx) = channel();
575 /// // This send is always successful
576 /// tx.send(1i).unwrap();
578 /// // This send will fail because the receiver is gone
580 /// assert_eq!(tx.send(1i).err().unwrap().0, 1);
583 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
584 let (new_inner, ret) = match *unsafe { self.inner() } {
585 Flavor::Oneshot(ref p) => {
589 return (*p).send(t).map_err(SendError);
592 Arc::new(RacyCell::new(stream::Packet::new()));
593 let rx = Receiver::new(Flavor::Stream(a.clone()));
594 match (*p).upgrade(rx) {
595 oneshot::UpSuccess => {
596 let ret = (*a.get()).send(t);
599 oneshot::UpDisconnected => (a, Err(t)),
600 oneshot::UpWoke(token) => {
601 // This send cannot panic because the thread is
602 // asleep (we're looking at it), so the receiver
604 (*a.get()).send(t).ok().unwrap();
612 Flavor::Stream(ref p) => return unsafe {
613 (*p.get()).send(t).map_err(SendError)
615 Flavor::Shared(ref p) => return unsafe {
616 (*p.get()).send(t).map_err(SendError)
618 Flavor::Sync(..) => unreachable!(),
622 let tmp = Sender::new(Flavor::Stream(new_inner));
623 mem::swap(self.inner_mut(), tmp.inner_mut());
625 ret.map_err(SendError)
630 impl<T: Send> Clone for Sender<T> {
631 fn clone(&self) -> Sender<T> {
632 let (packet, sleeper, guard) = match *unsafe { self.inner() } {
633 Flavor::Oneshot(ref p) => {
634 let a = Arc::new(RacyCell::new(shared::Packet::new()));
636 let guard = (*a.get()).postinit_lock();
637 let rx = Receiver::new(Flavor::Shared(a.clone()));
638 match (*p.get()).upgrade(rx) {
640 oneshot::UpDisconnected => (a, None, guard),
641 oneshot::UpWoke(task) => (a, Some(task), guard)
645 Flavor::Stream(ref p) => {
646 let a = Arc::new(RacyCell::new(shared::Packet::new()));
648 let guard = (*a.get()).postinit_lock();
649 let rx = Receiver::new(Flavor::Shared(a.clone()));
650 match (*p.get()).upgrade(rx) {
652 stream::UpDisconnected => (a, None, guard),
653 stream::UpWoke(task) => (a, Some(task), guard),
657 Flavor::Shared(ref p) => {
658 unsafe { (*p.get()).clone_chan(); }
659 return Sender::new(Flavor::Shared(p.clone()));
661 Flavor::Sync(..) => unreachable!(),
665 (*packet.get()).inherit_blocker(sleeper, guard);
667 let tmp = Sender::new(Flavor::Shared(packet.clone()));
668 mem::swap(self.inner_mut(), tmp.inner_mut());
670 Sender::new(Flavor::Shared(packet))
676 impl<T: Send> Drop for Sender<T> {
678 match *unsafe { self.inner_mut() } {
679 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
680 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
681 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
682 Flavor::Sync(..) => unreachable!(),
687 ////////////////////////////////////////////////////////////////////////////////
689 ////////////////////////////////////////////////////////////////////////////////
691 impl<T: Send> SyncSender<T> {
692 #[cfg(stage0)] // NOTE remove impl after next snapshot
693 fn new(inner: Arc<RacyCell<sync::Packet<T>>>) -> SyncSender<T> {
694 SyncSender { inner: inner, _marker: marker::NoSync }
697 #[cfg(not(stage0))] // NOTE remove cfg after next snapshot
698 fn new(inner: Arc<RacyCell<sync::Packet<T>>>) -> SyncSender<T> {
699 SyncSender { inner: inner }
702 /// Sends a value on this synchronous channel.
704 /// This function will *block* until space in the internal buffer becomes
705 /// available or a receiver is available to hand off the message to.
707 /// Note that a successful send does *not* guarantee that the receiver will
708 /// ever see the data if there is a buffer on this channel. Items may be
709 /// enqueued in the internal buffer for the receiver to receive at a later
710 /// time. If the buffer size is 0, however, it can be guaranteed that the
711 /// receiver has indeed received the data if this function returns success.
713 /// This function will never panic, but it may return `Err` if the
714 /// `Receiver` has disconnected and is no longer able to receive
717 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
718 unsafe { (*self.inner.get()).send(t).map_err(SendError) }
721 /// Attempts to send a value on this channel without blocking.
723 /// This method differs from `send` by returning immediately if the
724 /// channel's buffer is full or no receiver is waiting to acquire some
725 /// data. Compared with `send`, this function has two failure cases
726 /// instead of one (one for disconnection, one for a full buffer).
728 /// See `SyncSender::send` for notes about guarantees of whether the
729 /// receiver has received the data or not if this function is successful.
731 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
732 unsafe { (*self.inner.get()).try_send(t) }
737 impl<T: Send> Clone for SyncSender<T> {
738 fn clone(&self) -> SyncSender<T> {
739 unsafe { (*self.inner.get()).clone_chan(); }
740 return SyncSender::new(self.inner.clone());
746 impl<T: Send> Drop for SyncSender<T> {
748 unsafe { (*self.inner.get()).drop_chan(); }
752 ////////////////////////////////////////////////////////////////////////////////
754 ////////////////////////////////////////////////////////////////////////////////
756 impl<T: Send> Receiver<T> {
757 fn new(inner: Flavor<T>) -> Receiver<T> {
758 Receiver { inner: UnsafeCell::new(inner) }
761 /// Attempts to return a pending value on this receiver without blocking
763 /// This method will never block the caller in order to wait for data to
764 /// become available. Instead, this will always return immediately with a
765 /// possible option of pending data on the channel.
767 /// This is useful for a flavor of "optimistic check" before deciding to
768 /// block on a receiver.
770 pub fn try_recv(&self) -> Result<T, TryRecvError> {
772 let new_port = match *unsafe { self.inner() } {
773 Flavor::Oneshot(ref p) => {
774 match unsafe { (*p.get()).try_recv() } {
775 Ok(t) => return Ok(t),
776 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
777 Err(oneshot::Disconnected) => {
778 return Err(TryRecvError::Disconnected)
780 Err(oneshot::Upgraded(rx)) => rx,
783 Flavor::Stream(ref p) => {
784 match unsafe { (*p.get()).try_recv() } {
785 Ok(t) => return Ok(t),
786 Err(stream::Empty) => return Err(TryRecvError::Empty),
787 Err(stream::Disconnected) => {
788 return Err(TryRecvError::Disconnected)
790 Err(stream::Upgraded(rx)) => rx,
793 Flavor::Shared(ref p) => {
794 match unsafe { (*p.get()).try_recv() } {
795 Ok(t) => return Ok(t),
796 Err(shared::Empty) => return Err(TryRecvError::Empty),
797 Err(shared::Disconnected) => {
798 return Err(TryRecvError::Disconnected)
802 Flavor::Sync(ref p) => {
803 match unsafe { (*p.get()).try_recv() } {
804 Ok(t) => return Ok(t),
805 Err(sync::Empty) => return Err(TryRecvError::Empty),
806 Err(sync::Disconnected) => {
807 return Err(TryRecvError::Disconnected)
813 mem::swap(self.inner_mut(),
814 new_port.inner_mut());
819 /// Attempt to wait for a value on this receiver, returning an error if the
820 /// corresponding channel has hung up.
822 /// This function will always block the current thread if there is no data
823 /// available and it's possible for more data to be sent. Once a message is
824 /// sent to the corresponding `Sender`, then this receiver will wake up and
825 /// return that message.
827 /// If the corresponding `Sender` has disconnected, or it disconnects while
828 /// this call is blocking, this call will wake up and return `Err` to
829 /// indicate that no more messages can ever be received on this channel.
831 pub fn recv(&self) -> Result<T, RecvError> {
833 let new_port = match *unsafe { self.inner() } {
834 Flavor::Oneshot(ref p) => {
835 match unsafe { (*p.get()).recv() } {
836 Ok(t) => return Ok(t),
837 Err(oneshot::Empty) => return unreachable!(),
838 Err(oneshot::Disconnected) => return Err(RecvError),
839 Err(oneshot::Upgraded(rx)) => rx,
842 Flavor::Stream(ref p) => {
843 match unsafe { (*p.get()).recv() } {
844 Ok(t) => return Ok(t),
845 Err(stream::Empty) => return unreachable!(),
846 Err(stream::Disconnected) => return Err(RecvError),
847 Err(stream::Upgraded(rx)) => rx,
850 Flavor::Shared(ref p) => {
851 match unsafe { (*p.get()).recv() } {
852 Ok(t) => return Ok(t),
853 Err(shared::Empty) => return unreachable!(),
854 Err(shared::Disconnected) => return Err(RecvError),
857 Flavor::Sync(ref p) => return unsafe {
858 (*p.get()).recv().map_err(|()| RecvError)
862 mem::swap(self.inner_mut(), new_port.inner_mut());
867 /// Returns an iterator that will block waiting for messages, but never
868 /// `panic!`. It will return `None` when the channel has hung up.
870 pub fn iter(&self) -> Iter<T> {
875 impl<T: Send> select::Packet for Receiver<T> {
876 fn can_recv(&self) -> bool {
878 let new_port = match *unsafe { self.inner() } {
879 Flavor::Oneshot(ref p) => {
880 match unsafe { (*p.get()).can_recv() } {
881 Ok(ret) => return ret,
882 Err(upgrade) => upgrade,
885 Flavor::Stream(ref p) => {
886 match unsafe { (*p.get()).can_recv() } {
887 Ok(ret) => return ret,
888 Err(upgrade) => upgrade,
891 Flavor::Shared(ref p) => {
892 return unsafe { (*p.get()).can_recv() };
894 Flavor::Sync(ref p) => {
895 return unsafe { (*p.get()).can_recv() };
899 mem::swap(self.inner_mut(),
900 new_port.inner_mut());
905 fn start_selection(&self, mut token: SignalToken) -> StartResult {
907 let (t, new_port) = match *unsafe { self.inner() } {
908 Flavor::Oneshot(ref p) => {
909 match unsafe { (*p.get()).start_selection(token) } {
910 oneshot::SelSuccess => return Installed,
911 oneshot::SelCanceled => return Abort,
912 oneshot::SelUpgraded(t, rx) => (t, rx),
915 Flavor::Stream(ref p) => {
916 match unsafe { (*p.get()).start_selection(token) } {
917 stream::SelSuccess => return Installed,
918 stream::SelCanceled => return Abort,
919 stream::SelUpgraded(t, rx) => (t, rx),
922 Flavor::Shared(ref p) => {
923 return unsafe { (*p.get()).start_selection(token) };
925 Flavor::Sync(ref p) => {
926 return unsafe { (*p.get()).start_selection(token) };
931 mem::swap(self.inner_mut(), new_port.inner_mut());
936 fn abort_selection(&self) -> bool {
937 let mut was_upgrade = false;
939 let result = match *unsafe { self.inner() } {
940 Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
941 Flavor::Stream(ref p) => unsafe {
942 (*p.get()).abort_selection(was_upgrade)
944 Flavor::Shared(ref p) => return unsafe {
945 (*p.get()).abort_selection(was_upgrade)
947 Flavor::Sync(ref p) => return unsafe {
948 (*p.get()).abort_selection()
951 let new_port = match result { Ok(b) => return b, Err(p) => p };
954 mem::swap(self.inner_mut(),
955 new_port.inner_mut());
962 impl<'a, T: Send> Iterator for Iter<'a, T> {
965 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
970 impl<T: Send> Drop for Receiver<T> {
972 match *unsafe { self.inner_mut() } {
973 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
974 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
975 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
976 Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
981 /// A version of `UnsafeCell` intended for use in concurrent data
982 /// structures (for example, you might put it in an `Arc`).
983 struct RacyCell<T>(pub UnsafeCell<T>);
985 impl<T> RacyCell<T> {
987 fn new(value: T) -> RacyCell<T> {
988 RacyCell(UnsafeCell { value: value })
991 unsafe fn get(&self) -> *mut T {
997 unsafe impl<T:Send> Send for RacyCell<T> { }
999 unsafe impl<T> Sync for RacyCell<T> { } // Oh dear
1001 impl<T> fmt::Show for SendError<T> {
1002 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1003 "sending on a closed channel".fmt(f)
1007 impl<T> fmt::Show for TrySendError<T> {
1008 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1010 TrySendError::Full(..) => {
1011 "sending on a full channel".fmt(f)
1013 TrySendError::Disconnected(..) => {
1014 "sending on a closed channel".fmt(f)
1020 impl fmt::Show for RecvError {
1021 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1022 "receiving on a closed channel".fmt(f)
1026 impl fmt::Show for TryRecvError {
1027 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1029 TryRecvError::Empty => {
1030 "receiving on an empty channel".fmt(f)
1032 TryRecvError::Disconnected => {
1033 "receiving on a closed channel".fmt(f)
1047 pub fn stress_factor() -> uint {
1048 match os::getenv("RUST_TEST_STRESS") {
1049 Some(val) => val.parse().unwrap(),
1056 let (tx, rx) = channel::<int>();
1057 tx.send(1).unwrap();
1058 assert_eq!(rx.recv().unwrap(), 1);
1063 let (tx, _rx) = channel();
1064 tx.send(box 1i).unwrap();
1068 fn drop_full_shared() {
1069 let (tx, _rx) = channel();
1072 tx.send(box 1i).unwrap();
1077 let (tx, rx) = channel::<int>();
1078 tx.send(1).unwrap();
1079 assert_eq!(rx.recv().unwrap(), 1);
1080 let tx = tx.clone();
1081 tx.send(1).unwrap();
1082 assert_eq!(rx.recv().unwrap(), 1);
1086 fn smoke_threads() {
1087 let (tx, rx) = channel::<int>();
1088 let _t = Thread::spawn(move|| {
1089 tx.send(1).unwrap();
1091 assert_eq!(rx.recv().unwrap(), 1);
1095 fn smoke_port_gone() {
1096 let (tx, rx) = channel::<int>();
1098 assert!(tx.send(1).is_err());
1102 fn smoke_shared_port_gone() {
1103 let (tx, rx) = channel::<int>();
1105 assert!(tx.send(1).is_err())
1109 fn smoke_shared_port_gone2() {
1110 let (tx, rx) = channel::<int>();
1112 let tx2 = tx.clone();
1114 assert!(tx2.send(1).is_err());
1118 fn port_gone_concurrent() {
1119 let (tx, rx) = channel::<int>();
1120 let _t = Thread::spawn(move|| {
1123 while tx.send(1).is_ok() {}
1127 fn port_gone_concurrent_shared() {
1128 let (tx, rx) = channel::<int>();
1129 let tx2 = tx.clone();
1130 let _t = Thread::spawn(move|| {
1133 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1137 fn smoke_chan_gone() {
1138 let (tx, rx) = channel::<int>();
1140 assert!(rx.recv().is_err());
1144 fn smoke_chan_gone_shared() {
1145 let (tx, rx) = channel::<()>();
1146 let tx2 = tx.clone();
1149 assert!(rx.recv().is_err());
1153 fn chan_gone_concurrent() {
1154 let (tx, rx) = channel::<int>();
1155 let _t = Thread::spawn(move|| {
1156 tx.send(1).unwrap();
1157 tx.send(1).unwrap();
1159 while rx.recv().is_ok() {}
1164 let (tx, rx) = channel::<int>();
1165 let t = Thread::scoped(move|| {
1166 for _ in range(0u, 10000) { tx.send(1i).unwrap(); }
1168 for _ in range(0u, 10000) {
1169 assert_eq!(rx.recv().unwrap(), 1);
1171 t.join().ok().unwrap();
1175 fn stress_shared() {
1176 static AMT: uint = 10000;
1177 static NTHREADS: uint = 8;
1178 let (tx, rx) = channel::<int>();
1180 let t = Thread::scoped(move|| {
1181 for _ in range(0, AMT * NTHREADS) {
1182 assert_eq!(rx.recv().unwrap(), 1);
1184 match rx.try_recv() {
1190 for _ in range(0, NTHREADS) {
1191 let tx = tx.clone();
1192 Thread::spawn(move|| {
1193 for _ in range(0, AMT) { tx.send(1).unwrap(); }
1197 t.join().ok().unwrap();
1201 fn send_from_outside_runtime() {
1202 let (tx1, rx1) = channel::<()>();
1203 let (tx2, rx2) = channel::<int>();
1204 let t1 = Thread::scoped(move|| {
1205 tx1.send(()).unwrap();
1206 for _ in range(0i, 40) {
1207 assert_eq!(rx2.recv().unwrap(), 1);
1210 rx1.recv().unwrap();
1211 let t2 = Thread::scoped(move|| {
1212 for _ in range(0i, 40) {
1213 tx2.send(1).unwrap();
1216 t1.join().ok().unwrap();
1217 t2.join().ok().unwrap();
1221 fn recv_from_outside_runtime() {
1222 let (tx, rx) = channel::<int>();
1223 let t = Thread::scoped(move|| {
1224 for _ in range(0i, 40) {
1225 assert_eq!(rx.recv().unwrap(), 1);
1228 for _ in range(0u, 40) {
1229 tx.send(1).unwrap();
1231 t.join().ok().unwrap();
1236 let (tx1, rx1) = channel::<int>();
1237 let (tx2, rx2) = channel::<int>();
1238 let t1 = Thread::scoped(move|| {
1239 assert_eq!(rx1.recv().unwrap(), 1);
1240 tx2.send(2).unwrap();
1242 let t2 = Thread::scoped(move|| {
1243 tx1.send(1).unwrap();
1244 assert_eq!(rx2.recv().unwrap(), 2);
1246 t1.join().ok().unwrap();
1247 t2.join().ok().unwrap();
1251 fn oneshot_single_thread_close_port_first() {
1252 // Simple test of closing without sending
1253 let (_tx, rx) = channel::<int>();
1258 fn oneshot_single_thread_close_chan_first() {
1259 // Simple test of closing without sending
1260 let (tx, _rx) = channel::<int>();
1265 fn oneshot_single_thread_send_port_close() {
1266 // Testing that the sender cleans up the payload if receiver is closed
1267 let (tx, rx) = channel::<Box<int>>();
1269 assert!(tx.send(box 0).is_err());
1273 fn oneshot_single_thread_recv_chan_close() {
1274 // Receiving on a closed chan will panic
1275 let res = Thread::scoped(move|| {
1276 let (tx, rx) = channel::<int>();
1281 assert!(res.is_err());
1285 fn oneshot_single_thread_send_then_recv() {
1286 let (tx, rx) = channel::<Box<int>>();
1287 tx.send(box 10).unwrap();
1288 assert!(rx.recv().unwrap() == box 10);
1292 fn oneshot_single_thread_try_send_open() {
1293 let (tx, rx) = channel::<int>();
1294 assert!(tx.send(10).is_ok());
1295 assert!(rx.recv().unwrap() == 10);
1299 fn oneshot_single_thread_try_send_closed() {
1300 let (tx, rx) = channel::<int>();
1302 assert!(tx.send(10).is_err());
1306 fn oneshot_single_thread_try_recv_open() {
1307 let (tx, rx) = channel::<int>();
1308 tx.send(10).unwrap();
1309 assert!(rx.recv() == Ok(10));
1313 fn oneshot_single_thread_try_recv_closed() {
1314 let (tx, rx) = channel::<int>();
1316 assert!(rx.recv().is_err());
1320 fn oneshot_single_thread_peek_data() {
1321 let (tx, rx) = channel::<int>();
1322 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1323 tx.send(10).unwrap();
1324 assert_eq!(rx.try_recv(), Ok(10));
1328 fn oneshot_single_thread_peek_close() {
1329 let (tx, rx) = channel::<int>();
1331 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1332 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1336 fn oneshot_single_thread_peek_open() {
1337 let (_tx, rx) = channel::<int>();
1338 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1342 fn oneshot_multi_task_recv_then_send() {
1343 let (tx, rx) = channel::<Box<int>>();
1344 let _t = Thread::spawn(move|| {
1345 assert!(rx.recv().unwrap() == box 10);
1348 tx.send(box 10).unwrap();
1352 fn oneshot_multi_task_recv_then_close() {
1353 let (tx, rx) = channel::<Box<int>>();
1354 let _t = Thread::spawn(move|| {
1357 let res = Thread::scoped(move|| {
1358 assert!(rx.recv().unwrap() == box 10);
1360 assert!(res.is_err());
1364 fn oneshot_multi_thread_close_stress() {
1365 for _ in range(0, stress_factor()) {
1366 let (tx, rx) = channel::<int>();
1367 let _t = Thread::spawn(move|| {
1375 fn oneshot_multi_thread_send_close_stress() {
1376 for _ in range(0, stress_factor()) {
1377 let (tx, rx) = channel::<int>();
1378 let _t = Thread::spawn(move|| {
1381 let _ = Thread::scoped(move|| {
1382 tx.send(1).unwrap();
1388 fn oneshot_multi_thread_recv_close_stress() {
1389 for _ in range(0, stress_factor()) {
1390 let (tx, rx) = channel::<int>();
1391 Thread::spawn(move|| {
1392 let res = Thread::scoped(move|| {
1395 assert!(res.is_err());
1397 let _t = Thread::spawn(move|| {
1398 Thread::spawn(move|| {
1406 fn oneshot_multi_thread_send_recv_stress() {
1407 for _ in range(0, stress_factor()) {
1408 let (tx, rx) = channel();
1409 let _t = Thread::spawn(move|| {
1410 tx.send(box 10i).unwrap();
1412 assert!(rx.recv().unwrap() == box 10i);
1417 fn stream_send_recv_stress() {
1418 for _ in range(0, stress_factor()) {
1419 let (tx, rx) = channel();
1424 fn send(tx: Sender<Box<int>>, i: int) {
1425 if i == 10 { return }
1427 Thread::spawn(move|| {
1428 tx.send(box i).unwrap();
1433 fn recv(rx: Receiver<Box<int>>, i: int) {
1434 if i == 10 { return }
1436 Thread::spawn(move|| {
1437 assert!(rx.recv().unwrap() == box i);
1446 // Regression test that we don't run out of stack in scheduler context
1447 let (tx, rx) = channel();
1448 for _ in range(0i, 10000) { tx.send(()).unwrap(); }
1449 for _ in range(0i, 10000) { rx.recv().unwrap(); }
1453 fn shared_chan_stress() {
1454 let (tx, rx) = channel();
1455 let total = stress_factor() + 100;
1456 for _ in range(0, total) {
1457 let tx = tx.clone();
1458 Thread::spawn(move|| {
1459 tx.send(()).unwrap();
1463 for _ in range(0, total) {
1469 fn test_nested_recv_iter() {
1470 let (tx, rx) = channel::<int>();
1471 let (total_tx, total_rx) = channel::<int>();
1473 let _t = Thread::spawn(move|| {
1475 for x in rx.iter() {
1478 total_tx.send(acc).unwrap();
1481 tx.send(3).unwrap();
1482 tx.send(1).unwrap();
1483 tx.send(2).unwrap();
1485 assert_eq!(total_rx.recv().unwrap(), 6);
1489 fn test_recv_iter_break() {
1490 let (tx, rx) = channel::<int>();
1491 let (count_tx, count_rx) = channel();
1493 let _t = Thread::spawn(move|| {
1495 for x in rx.iter() {
1502 count_tx.send(count).unwrap();
1505 tx.send(2).unwrap();
1506 tx.send(2).unwrap();
1507 tx.send(2).unwrap();
1510 assert_eq!(count_rx.recv().unwrap(), 4);
1514 fn try_recv_states() {
1515 let (tx1, rx1) = channel::<int>();
1516 let (tx2, rx2) = channel::<()>();
1517 let (tx3, rx3) = channel::<()>();
1518 let _t = Thread::spawn(move|| {
1519 rx2.recv().unwrap();
1520 tx1.send(1).unwrap();
1521 tx3.send(()).unwrap();
1522 rx2.recv().unwrap();
1524 tx3.send(()).unwrap();
1527 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1528 tx2.send(()).unwrap();
1529 rx3.recv().unwrap();
1530 assert_eq!(rx1.try_recv(), Ok(1));
1531 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1532 tx2.send(()).unwrap();
1533 rx3.recv().unwrap();
1534 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1537 // This bug used to end up in a livelock inside of the Receiver destructor
1538 // because the internal state of the Shared packet was corrupted
1540 fn destroy_upgraded_shared_port_when_sender_still_active() {
1541 let (tx, rx) = channel();
1542 let (tx2, rx2) = channel();
1543 let _t = Thread::spawn(move|| {
1544 rx.recv().unwrap(); // wait on a oneshot
1545 drop(rx); // destroy a shared
1546 tx2.send(()).unwrap();
1548 // make sure the other task has gone to sleep
1549 for _ in range(0u, 5000) { Thread::yield_now(); }
1551 // upgrade to a shared chan and send a message
1554 t.send(()).unwrap();
1556 // wait for the child task to exit before we exit
1557 rx2.recv().unwrap();
1569 pub fn stress_factor() -> uint {
1570 match os::getenv("RUST_TEST_STRESS") {
1571 Some(val) => val.parse().unwrap(),
1578 let (tx, rx) = sync_channel::<int>(1);
1579 tx.send(1).unwrap();
1580 assert_eq!(rx.recv().unwrap(), 1);
1585 let (tx, _rx) = sync_channel(1);
1586 tx.send(box 1i).unwrap();
1591 let (tx, rx) = sync_channel::<int>(1);
1592 tx.send(1).unwrap();
1593 assert_eq!(rx.recv().unwrap(), 1);
1594 let tx = tx.clone();
1595 tx.send(1).unwrap();
1596 assert_eq!(rx.recv().unwrap(), 1);
1600 fn smoke_threads() {
1601 let (tx, rx) = sync_channel::<int>(0);
1602 let _t = Thread::spawn(move|| {
1603 tx.send(1).unwrap();
1605 assert_eq!(rx.recv().unwrap(), 1);
1609 fn smoke_port_gone() {
1610 let (tx, rx) = sync_channel::<int>(0);
1612 assert!(tx.send(1).is_err());
1616 fn smoke_shared_port_gone2() {
1617 let (tx, rx) = sync_channel::<int>(0);
1619 let tx2 = tx.clone();
1621 assert!(tx2.send(1).is_err());
1625 fn port_gone_concurrent() {
1626 let (tx, rx) = sync_channel::<int>(0);
1627 let _t = Thread::spawn(move|| {
1630 while tx.send(1).is_ok() {}
1634 fn port_gone_concurrent_shared() {
1635 let (tx, rx) = sync_channel::<int>(0);
1636 let tx2 = tx.clone();
1637 let _t = Thread::spawn(move|| {
1640 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1644 fn smoke_chan_gone() {
1645 let (tx, rx) = sync_channel::<int>(0);
1647 assert!(rx.recv().is_err());
1651 fn smoke_chan_gone_shared() {
1652 let (tx, rx) = sync_channel::<()>(0);
1653 let tx2 = tx.clone();
1656 assert!(rx.recv().is_err());
1660 fn chan_gone_concurrent() {
1661 let (tx, rx) = sync_channel::<int>(0);
1662 Thread::spawn(move|| {
1663 tx.send(1).unwrap();
1664 tx.send(1).unwrap();
1666 while rx.recv().is_ok() {}
1671 let (tx, rx) = sync_channel::<int>(0);
1672 Thread::spawn(move|| {
1673 for _ in range(0u, 10000) { tx.send(1).unwrap(); }
1675 for _ in range(0u, 10000) {
1676 assert_eq!(rx.recv().unwrap(), 1);
1681 fn stress_shared() {
1682 static AMT: uint = 1000;
1683 static NTHREADS: uint = 8;
1684 let (tx, rx) = sync_channel::<int>(0);
1685 let (dtx, drx) = sync_channel::<()>(0);
1687 Thread::spawn(move|| {
1688 for _ in range(0, AMT * NTHREADS) {
1689 assert_eq!(rx.recv().unwrap(), 1);
1691 match rx.try_recv() {
1695 dtx.send(()).unwrap();
1698 for _ in range(0, NTHREADS) {
1699 let tx = tx.clone();
1700 Thread::spawn(move|| {
1701 for _ in range(0, AMT) { tx.send(1).unwrap(); }
1705 drx.recv().unwrap();
1709 fn oneshot_single_thread_close_port_first() {
1710 // Simple test of closing without sending
1711 let (_tx, rx) = sync_channel::<int>(0);
1716 fn oneshot_single_thread_close_chan_first() {
1717 // Simple test of closing without sending
1718 let (tx, _rx) = sync_channel::<int>(0);
1723 fn oneshot_single_thread_send_port_close() {
1724 // Testing that the sender cleans up the payload if receiver is closed
1725 let (tx, rx) = sync_channel::<Box<int>>(0);
1727 assert!(tx.send(box 0).is_err());
1731 fn oneshot_single_thread_recv_chan_close() {
1732 // Receiving on a closed chan will panic
1733 let res = Thread::scoped(move|| {
1734 let (tx, rx) = sync_channel::<int>(0);
1739 assert!(res.is_err());
1743 fn oneshot_single_thread_send_then_recv() {
1744 let (tx, rx) = sync_channel::<Box<int>>(1);
1745 tx.send(box 10).unwrap();
1746 assert!(rx.recv().unwrap() == box 10);
1750 fn oneshot_single_thread_try_send_open() {
1751 let (tx, rx) = sync_channel::<int>(1);
1752 assert_eq!(tx.try_send(10), Ok(()));
1753 assert!(rx.recv().unwrap() == 10);
1757 fn oneshot_single_thread_try_send_closed() {
1758 let (tx, rx) = sync_channel::<int>(0);
1760 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1764 fn oneshot_single_thread_try_send_closed2() {
1765 let (tx, _rx) = sync_channel::<int>(0);
1766 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1770 fn oneshot_single_thread_try_recv_open() {
1771 let (tx, rx) = sync_channel::<int>(1);
1772 tx.send(10).unwrap();
1773 assert!(rx.recv() == Ok(10));
1777 fn oneshot_single_thread_try_recv_closed() {
1778 let (tx, rx) = sync_channel::<int>(0);
1780 assert!(rx.recv().is_err());
1784 fn oneshot_single_thread_peek_data() {
1785 let (tx, rx) = sync_channel::<int>(1);
1786 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1787 tx.send(10).unwrap();
1788 assert_eq!(rx.try_recv(), Ok(10));
1792 fn oneshot_single_thread_peek_close() {
1793 let (tx, rx) = sync_channel::<int>(0);
1795 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1796 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1800 fn oneshot_single_thread_peek_open() {
1801 let (_tx, rx) = sync_channel::<int>(0);
1802 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1806 fn oneshot_multi_task_recv_then_send() {
1807 let (tx, rx) = sync_channel::<Box<int>>(0);
1808 let _t = Thread::spawn(move|| {
1809 assert!(rx.recv().unwrap() == box 10);
1812 tx.send(box 10).unwrap();
1816 fn oneshot_multi_task_recv_then_close() {
1817 let (tx, rx) = sync_channel::<Box<int>>(0);
1818 let _t = Thread::spawn(move|| {
1821 let res = Thread::scoped(move|| {
1822 assert!(rx.recv().unwrap() == box 10);
1824 assert!(res.is_err());
1828 fn oneshot_multi_thread_close_stress() {
1829 for _ in range(0, stress_factor()) {
1830 let (tx, rx) = sync_channel::<int>(0);
1831 let _t = Thread::spawn(move|| {
1839 fn oneshot_multi_thread_send_close_stress() {
1840 for _ in range(0, stress_factor()) {
1841 let (tx, rx) = sync_channel::<int>(0);
1842 let _t = Thread::spawn(move|| {
1845 let _ = Thread::scoped(move || {
1846 tx.send(1).unwrap();
1852 fn oneshot_multi_thread_recv_close_stress() {
1853 for _ in range(0, stress_factor()) {
1854 let (tx, rx) = sync_channel::<int>(0);
1855 let _t = Thread::spawn(move|| {
1856 let res = Thread::scoped(move|| {
1859 assert!(res.is_err());
1861 let _t = Thread::spawn(move|| {
1862 Thread::spawn(move|| {
1870 fn oneshot_multi_thread_send_recv_stress() {
1871 for _ in range(0, stress_factor()) {
1872 let (tx, rx) = sync_channel::<Box<int>>(0);
1873 let _t = Thread::spawn(move|| {
1874 tx.send(box 10i).unwrap();
1876 assert!(rx.recv().unwrap() == box 10i);
1881 fn stream_send_recv_stress() {
1882 for _ in range(0, stress_factor()) {
1883 let (tx, rx) = sync_channel::<Box<int>>(0);
1888 fn send(tx: SyncSender<Box<int>>, i: int) {
1889 if i == 10 { return }
1891 Thread::spawn(move|| {
1892 tx.send(box i).unwrap();
1897 fn recv(rx: Receiver<Box<int>>, i: int) {
1898 if i == 10 { return }
1900 Thread::spawn(move|| {
1901 assert!(rx.recv().unwrap() == box i);
1910 // Regression test that we don't run out of stack in scheduler context
1911 let (tx, rx) = sync_channel(10000);
1912 for _ in range(0u, 10000) { tx.send(()).unwrap(); }
1913 for _ in range(0u, 10000) { rx.recv().unwrap(); }
1917 fn shared_chan_stress() {
1918 let (tx, rx) = sync_channel(0);
1919 let total = stress_factor() + 100;
1920 for _ in range(0, total) {
1921 let tx = tx.clone();
1922 Thread::spawn(move|| {
1923 tx.send(()).unwrap();
1927 for _ in range(0, total) {
1933 fn test_nested_recv_iter() {
1934 let (tx, rx) = sync_channel::<int>(0);
1935 let (total_tx, total_rx) = sync_channel::<int>(0);
1937 let _t = Thread::spawn(move|| {
1939 for x in rx.iter() {
1942 total_tx.send(acc).unwrap();
1945 tx.send(3).unwrap();
1946 tx.send(1).unwrap();
1947 tx.send(2).unwrap();
1949 assert_eq!(total_rx.recv().unwrap(), 6);
1953 fn test_recv_iter_break() {
1954 let (tx, rx) = sync_channel::<int>(0);
1955 let (count_tx, count_rx) = sync_channel(0);
1957 let _t = Thread::spawn(move|| {
1959 for x in rx.iter() {
1966 count_tx.send(count).unwrap();
1969 tx.send(2).unwrap();
1970 tx.send(2).unwrap();
1971 tx.send(2).unwrap();
1972 let _ = tx.try_send(2);
1974 assert_eq!(count_rx.recv().unwrap(), 4);
1978 fn try_recv_states() {
1979 let (tx1, rx1) = sync_channel::<int>(1);
1980 let (tx2, rx2) = sync_channel::<()>(1);
1981 let (tx3, rx3) = sync_channel::<()>(1);
1982 let _t = Thread::spawn(move|| {
1983 rx2.recv().unwrap();
1984 tx1.send(1).unwrap();
1985 tx3.send(()).unwrap();
1986 rx2.recv().unwrap();
1988 tx3.send(()).unwrap();
1991 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1992 tx2.send(()).unwrap();
1993 rx3.recv().unwrap();
1994 assert_eq!(rx1.try_recv(), Ok(1));
1995 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1996 tx2.send(()).unwrap();
1997 rx3.recv().unwrap();
1998 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2001 // This bug used to end up in a livelock inside of the Receiver destructor
2002 // because the internal state of the Shared packet was corrupted
2004 fn destroy_upgraded_shared_port_when_sender_still_active() {
2005 let (tx, rx) = sync_channel::<()>(0);
2006 let (tx2, rx2) = sync_channel::<()>(0);
2007 let _t = Thread::spawn(move|| {
2008 rx.recv().unwrap(); // wait on a oneshot
2009 drop(rx); // destroy a shared
2010 tx2.send(()).unwrap();
2012 // make sure the other task has gone to sleep
2013 for _ in range(0u, 5000) { Thread::yield_now(); }
2015 // upgrade to a shared chan and send a message
2018 t.send(()).unwrap();
2020 // wait for the child task to exit before we exit
2021 rx2.recv().unwrap();
2026 let (tx, rx) = sync_channel::<int>(0);
2027 let _t = Thread::spawn(move|| { rx.recv().unwrap(); });
2028 assert_eq!(tx.send(1), Ok(()));
2033 let (tx, rx) = sync_channel::<int>(0);
2034 let _t = Thread::spawn(move|| { drop(rx); });
2035 assert!(tx.send(1).is_err());
2040 let (tx, rx) = sync_channel::<int>(1);
2041 assert_eq!(tx.send(1), Ok(()));
2042 let _t =Thread::spawn(move|| { drop(rx); });
2043 assert!(tx.send(1).is_err());
2048 let (tx, rx) = sync_channel::<int>(0);
2049 let tx2 = tx.clone();
2050 let (done, donerx) = channel();
2051 let done2 = done.clone();
2052 let _t = Thread::spawn(move|| {
2053 assert!(tx.send(1).is_err());
2054 done.send(()).unwrap();
2056 let _t = Thread::spawn(move|| {
2057 assert!(tx2.send(2).is_err());
2058 done2.send(()).unwrap();
2061 donerx.recv().unwrap();
2062 donerx.recv().unwrap();
2067 let (tx, _rx) = sync_channel::<int>(0);
2068 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2073 let (tx, _rx) = sync_channel::<int>(1);
2074 assert_eq!(tx.try_send(1), Ok(()));
2075 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2080 let (tx, rx) = sync_channel::<int>(1);
2081 assert_eq!(tx.try_send(1), Ok(()));
2083 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2089 let (tx1, rx1) = sync_channel::<()>(3);
2090 let (tx2, rx2) = sync_channel::<()>(3);
2092 let _t = Thread::spawn(move|| {
2093 rx1.recv().unwrap();
2094 tx2.try_send(()).unwrap();
2097 tx1.try_send(()).unwrap();
2098 rx2.recv().unwrap();
2101 for _ in range(0u, 100) {