3 /// This is the flavor of channels which are not necessarily optimized for any
4 /// particular use case, but are the most general in how they are used. Shared
5 /// channels are cloneable allowing for multiple senders.
7 /// High level implementation details can be found in the comment of the parent
8 /// module. You'll also note that the implementation of the shared and stream
9 /// channels are quite similar, and this is no coincidence!
10 pub use self::Failure::*;
11 use self::StartResult::*;
14 use core::intrinsics::abort;
16 use crate::cell::UnsafeCell;
18 use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
19 use crate::sync::mpsc::blocking::{self, SignalToken};
20 use crate::sync::mpsc::mpsc_queue as mpsc;
21 use crate::sync::{Mutex, MutexGuard};
23 use crate::time::Instant;
25 const DISCONNECTED: isize = isize::MIN;
26 const FUDGE: isize = 1024;
27 const MAX_REFCOUNT: usize = (isize::MAX) as usize;
29 const MAX_STEALS: isize = 5;
31 const MAX_STEALS: isize = 1 << 20;
33 pub struct Packet<T> {
34 queue: mpsc::Queue<T>,
35 cnt: AtomicIsize, // How many items are on this channel
36 steals: UnsafeCell<isize>, // How many times has a port received without blocking?
37 to_wake: AtomicUsize, // SignalToken for wake up
39 // The number of channels which are currently using this packet.
40 channels: AtomicUsize,
42 // See the discussion in Port::drop and the channel send methods for what
44 port_dropped: AtomicBool,
45 sender_drain: AtomicIsize,
47 // this lock protects various portions of this implementation during
49 select_lock: Mutex<()>,
57 #[derive(PartialEq, Eq)]
64 // Creation of a packet *must* be followed by a call to postinit_lock
65 // and later by inherit_blocker
66 pub fn new() -> Packet<T> {
68 queue: mpsc::Queue::new(),
69 cnt: AtomicIsize::new(0),
70 steals: UnsafeCell::new(0),
71 to_wake: AtomicUsize::new(0),
72 channels: AtomicUsize::new(2),
73 port_dropped: AtomicBool::new(false),
74 sender_drain: AtomicIsize::new(0),
75 select_lock: Mutex::new(()),
79 // This function should be used after newly created Packet
80 // was wrapped with an Arc
81 // In other case mutex data will be duplicated while cloning
82 // and that could cause problems on platforms where it is
83 // represented by opaque data structure
84 pub fn postinit_lock(&self) -> MutexGuard<'_, ()> {
85 self.select_lock.lock().unwrap()
88 // This function is used at the creation of a shared packet to inherit a
89 // previously blocked thread. This is done to prevent spurious wakeups of
90 // threads in select().
92 // This can only be called at channel-creation time
93 pub fn inherit_blocker(&self, token: Option<SignalToken>, guard: MutexGuard<'_, ()>) {
94 if let Some(token) = token {
95 assert_eq!(self.cnt.load(Ordering::SeqCst), 0);
96 assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
97 self.to_wake.store(unsafe { token.cast_to_usize() }, Ordering::SeqCst);
98 self.cnt.store(-1, Ordering::SeqCst);
100 // This store is a little sketchy. What's happening here is that
101 // we're transferring a blocker from a oneshot or stream channel to
102 // this shared channel. In doing so, we never spuriously wake them
103 // up and rather only wake them up at the appropriate time. This
104 // implementation of shared channels assumes that any blocking
105 // recv() will undo the increment of steals performed in try_recv()
106 // once the recv is complete. This thread that we're inheriting,
107 // however, is not in the middle of recv. Hence, the first time we
108 // wake them up, they're going to wake up from their old port, move
109 // on to the upgraded port, and then call the block recv() function.
111 // When calling this function, they'll find there's data immediately
112 // available, counting it as a steal. This in fact wasn't a steal
113 // because we appropriately blocked them waiting for data.
115 // To offset this bad increment, we initially set the steal count to
116 // -1. You'll find some special code in abort_selection() as well to
117 // ensure that this -1 steal count doesn't escape too far.
119 *self.steals.get() = -1;
123 // When the shared packet is constructed, we grabbed this lock. The
124 // purpose of this lock is to ensure that abort_selection() doesn't
125 // interfere with this method. After we unlock this lock, we're
126 // signifying that we're done modifying self.cnt and self.to_wake and
127 // the port is ready for the world to continue using it.
131 pub fn send(&self, t: T) -> Result<(), T> {
132 // See Port::drop for what's going on
133 if self.port_dropped.load(Ordering::SeqCst) {
137 // Note that the multiple sender case is a little trickier
138 // semantically than the single sender case. The logic for
139 // incrementing is "add and if disconnected store disconnected".
140 // This could end up leading some senders to believe that there
141 // wasn't a disconnect if in fact there was a disconnect. This means
142 // that while one thread is attempting to re-store the disconnected
143 // states, other threads could walk through merrily incrementing
144 // this very-negative disconnected count. To prevent senders from
145 // spuriously attempting to send when the channels is actually
146 // disconnected, the count has a ranged check here.
148 // This is also done for another reason. Remember that the return
149 // value of this function is:
151 // `true` == the data *may* be received, this essentially has no
153 // `false` == the data will *never* be received, this has a lot of
156 // In the SPSC case, we have a check of 'queue.is_empty()' to see
157 // whether the data was actually received, but this same condition
158 // means nothing in a multi-producer context. As a result, this
159 // preflight check serves as the definitive "this will never be
160 // received". Once we get beyond this check, we have permanently
161 // entered the realm of "this may be received"
162 if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE {
167 match self.cnt.fetch_add(1, Ordering::SeqCst) {
169 self.take_to_wake().signal();
172 // In this case, we have possibly failed to send our data, and
173 // we need to consider re-popping the data in order to fully
174 // destroy it. We must arbitrate among the multiple senders,
175 // however, because the queues that we're using are
176 // single-consumer queues. In order to do this, all exiting
177 // pushers will use an atomic count in order to count those
178 // flowing through. Pushers who see 0 are required to drain as
179 // much as possible, and then can only exit when they are the
180 // only pusher (otherwise they must try again).
181 n if n < DISCONNECTED + FUDGE => {
182 // see the comment in 'try' for a shared channel for why this
183 // window of "not disconnected" is ok.
184 self.cnt.store(DISCONNECTED, Ordering::SeqCst);
186 if self.sender_drain.fetch_add(1, Ordering::SeqCst) == 0 {
188 // drain the queue, for info on the thread yield see the
189 // discussion in try_recv
191 match self.queue.pop() {
193 mpsc::Empty => break,
194 mpsc::Inconsistent => thread::yield_now(),
197 // maybe we're done, if we're not the last ones
198 // here, then we need to go try again.
199 if self.sender_drain.fetch_sub(1, Ordering::SeqCst) == 1 {
204 // At this point, there may still be data on the queue,
205 // but only if the count hasn't been incremented and
206 // some other sender hasn't finished pushing data just
207 // yet. That sender in question will drain its own data.
211 // Can't make any assumptions about this case like in the SPSC case.
218 pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
219 // This code is essentially the exact same as that found in the stream
220 // case (see stream.rs)
221 match self.try_recv() {
226 let (wait_token, signal_token) = blocking::tokens();
227 if self.decrement(signal_token) == Installed {
228 if let Some(deadline) = deadline {
229 let timed_out = !wait_token.wait_max_until(deadline);
231 self.abort_selection(false);
238 match self.try_recv() {
239 data @ Ok(..) => unsafe {
240 *self.steals.get() -= 1;
247 // Essentially the exact same thing as the stream decrement function.
248 // Returns true if blocking should proceed.
249 fn decrement(&self, token: SignalToken) -> StartResult {
251 assert_eq!(self.to_wake.load(Ordering::SeqCst), 0, "This is a known bug in rust. See https://github.com/rust-lang/rust/issues/39364");
252 let ptr = token.cast_to_usize();
253 self.to_wake.store(ptr, Ordering::SeqCst);
255 let steals = ptr::replace(self.steals.get(), 0);
257 match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
259 self.cnt.store(DISCONNECTED, Ordering::SeqCst);
261 // If we factor in our steals and notice that the channel has no
262 // data, we successfully sleep
271 self.to_wake.store(0, Ordering::SeqCst);
272 drop(SignalToken::cast_from_usize(ptr));
277 pub fn try_recv(&self) -> Result<T, Failure> {
278 let ret = match self.queue.pop() {
279 mpsc::Data(t) => Some(t),
282 // This is a bit of an interesting case. The channel is reported as
283 // having data available, but our pop() has failed due to the queue
284 // being in an inconsistent state. This means that there is some
285 // pusher somewhere which has yet to complete, but we are guaranteed
286 // that a pop will eventually succeed. In this case, we spin in a
287 // yield loop because the remote sender should finish their enqueue
288 // operation "very quickly".
290 // Avoiding this yield loop would require a different queue
291 // abstraction which provides the guarantee that after M pushes have
292 // succeeded, at least M pops will succeed. The current queues
293 // guarantee that if there are N active pushes, you can pop N times
294 // once all N have finished.
295 mpsc::Inconsistent => {
299 match self.queue.pop() {
304 mpsc::Empty => panic!("inconsistent => empty"),
305 mpsc::Inconsistent => {}
312 // See the discussion in the stream implementation for why we
313 // might decrement steals.
314 Some(data) => unsafe {
315 if *self.steals.get() > MAX_STEALS {
316 match self.cnt.swap(0, Ordering::SeqCst) {
318 self.cnt.store(DISCONNECTED, Ordering::SeqCst);
321 let m = cmp::min(n, *self.steals.get());
322 *self.steals.get() -= m;
326 assert!(*self.steals.get() >= 0);
328 *self.steals.get() += 1;
332 // See the discussion in the stream implementation for why we try
335 match self.cnt.load(Ordering::SeqCst) {
336 n if n != DISCONNECTED => Err(Empty),
338 match self.queue.pop() {
339 mpsc::Data(t) => Ok(t),
340 mpsc::Empty => Err(Disconnected),
341 // with no senders, an inconsistency is impossible.
342 mpsc::Inconsistent => unreachable!(),
350 // Prepares this shared packet for a channel clone, essentially just bumping
352 pub fn clone_chan(&self) {
353 let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
355 // See comments on Arc::clone() on why we do this (for `mem::forget`).
356 if old_count > MAX_REFCOUNT {
361 // Decrement the reference count on a channel. This is called whenever a
362 // Chan is dropped and may end up waking up a receiver. It's the receiver's
363 // responsibility on the other end to figure out that we've disconnected.
364 pub fn drop_chan(&self) {
365 match self.channels.fetch_sub(1, Ordering::SeqCst) {
367 n if n > 1 => return,
368 n => panic!("bad number of channels left {}", n),
371 match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) {
373 self.take_to_wake().signal();
382 // See the long discussion inside of stream.rs for why the queue is drained,
383 // and why it is done in this fashion.
384 pub fn drop_port(&self) {
385 self.port_dropped.store(true, Ordering::SeqCst);
386 let mut steals = unsafe { *self.steals.get() };
388 match self.cnt.compare_exchange(
395 Err(old) => old != DISCONNECTED,
398 // See the discussion in 'try_recv' for why we yield
399 // control of this thread.
401 match self.queue.pop() {
405 mpsc::Empty | mpsc::Inconsistent => break,
411 // Consumes ownership of the 'to_wake' field.
412 fn take_to_wake(&self) -> SignalToken {
413 let ptr = self.to_wake.load(Ordering::SeqCst);
414 self.to_wake.store(0, Ordering::SeqCst);
416 unsafe { SignalToken::cast_from_usize(ptr) }
419 ////////////////////////////////////////////////////////////////////////////
420 // select implementation
421 ////////////////////////////////////////////////////////////////////////////
423 // increment the count on the channel (used for selection)
424 fn bump(&self, amt: isize) -> isize {
425 match self.cnt.fetch_add(amt, Ordering::SeqCst) {
427 self.cnt.store(DISCONNECTED, Ordering::SeqCst);
434 // Cancels a previous thread waiting on this port, returning whether there's
437 // This is similar to the stream implementation (hence fewer comments), but
438 // uses a different value for the "steals" variable.
439 pub fn abort_selection(&self, _was_upgrade: bool) -> bool {
440 // Before we do anything else, we bounce on this lock. The reason for
441 // doing this is to ensure that any upgrade-in-progress is gone and
442 // done with. Without this bounce, we can race with inherit_blocker
443 // about looking at and dealing with to_wake. Once we have acquired the
444 // lock, we are guaranteed that inherit_blocker is done.
446 let _guard = self.select_lock.lock().unwrap();
449 // Like the stream implementation, we want to make sure that the count
450 // on the channel goes non-negative. We don't know how negative the
451 // stream currently is, so instead of using a steal value of 1, we load
452 // the channel count and figure out what we should do to make it
455 let cnt = self.cnt.load(Ordering::SeqCst);
456 if cnt < 0 && cnt != DISCONNECTED { -cnt } else { 0 }
458 let prev = self.bump(steals + 1);
460 if prev == DISCONNECTED {
461 assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
464 let cur = prev + steals + 1;
467 drop(self.take_to_wake());
469 while self.to_wake.load(Ordering::SeqCst) != 0 {
474 // if the number of steals is -1, it was the pre-emptive -1 steal
475 // count from when we inherited a blocker. This is fine because
476 // we're just going to overwrite it with a real value.
477 let old = self.steals.get();
478 assert!(*old == 0 || *old == -1);
486 impl<T> Drop for Packet<T> {
488 // Note that this load is not only an assert for correctness about
489 // disconnection, but also a proper fence before the read of
490 // `to_wake`, so this assert cannot be removed with also removing
491 // the `to_wake` assert.
492 assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
493 assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
494 assert_eq!(self.channels.load(Ordering::SeqCst), 0);