3 /// This is the flavor of channels which are optimized for one sender and one
4 /// receiver. The sender will be upgraded to a shared channel if the channel is
7 /// High level implementation details can be found in the comment of the parent
10 pub use self::Failure::*;
11 pub use self::UpgradeResult::*;
17 use crate::cell::UnsafeCell;
20 use crate::time::Instant;
22 use crate::sync::atomic::{AtomicIsize, AtomicUsize, Ordering, AtomicBool};
23 use crate::sync::mpsc::Receiver;
24 use crate::sync::mpsc::blocking::{self, SignalToken};
25 use crate::sync::mpsc::spsc_queue as spsc;
27 const DISCONNECTED: isize = isize::MIN;
29 const MAX_STEALS: isize = 5;
31 const MAX_STEALS: isize = 1 << 20;
33 pub struct Packet<T> {
34 // internal queue for all messages
35 queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>,
38 struct ProducerAddition {
39 cnt: AtomicIsize, // How many items are on this channel
40 to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up
42 port_dropped: AtomicBool, // flag if the channel has been destroyed.
45 struct ConsumerAddition {
46 steals: UnsafeCell<isize>, // How many times has a port received without blocking?
53 Upgraded(Receiver<T>),
56 pub enum UpgradeResult {
62 // Any message could contain an "upgrade request" to a new shared port, so the
63 // internal queue it's a queue of T, but rather Message<T>
70 pub fn new() -> Packet<T> {
72 queue: unsafe { spsc::Queue::with_additions(
75 cnt: AtomicIsize::new(0),
76 to_wake: AtomicUsize::new(0),
78 port_dropped: AtomicBool::new(false),
81 steals: UnsafeCell::new(0),
87 pub fn send(&self, t: T) -> Result<(), T> {
88 // If the other port has deterministically gone away, then definitely
89 // must return the data back up the stack. Otherwise, the data is
90 // considered as being sent.
91 if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { return Err(t) }
93 match self.do_send(Data(t)) {
94 UpSuccess | UpDisconnected => {},
95 UpWoke(token) => { token.signal(); }
100 pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
101 // If the port has gone away, then there's no need to proceed any
103 if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) {
104 return UpDisconnected
107 self.do_send(GoUp(up))
110 fn do_send(&self, t: Message<T>) -> UpgradeResult {
112 match self.queue.producer_addition().cnt.fetch_add(1, Ordering::SeqCst) {
113 // As described in the mod's doc comment, -1 == wakeup
114 -1 => UpWoke(self.take_to_wake()),
115 // As as described before, SPSC queues must be >= -2
118 // Be sure to preserve the disconnected state, and the return value
119 // in this case is going to be whether our data was received or not.
120 // This manifests itself on whether we have an empty queue or not.
122 // Primarily, are required to drain the queue here because the port
123 // will never remove this data. We can only have at most one item to
124 // drain (the port drains the rest).
126 self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
127 let first = self.queue.pop();
128 let second = self.queue.pop();
129 assert!(second.is_none());
132 Some(..) => UpSuccess, // we failed to send the data
133 None => UpDisconnected, // we successfully sent data
137 // Otherwise we just sent some data on a non-waiting queue, so just
138 // make sure the world is sane and carry on!
139 n => { assert!(n >= 0); UpSuccess }
143 // Consumes ownership of the 'to_wake' field.
144 fn take_to_wake(&self) -> SignalToken {
145 let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst);
146 self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
148 unsafe { SignalToken::cast_from_usize(ptr) }
151 // Decrements the count on the channel for a sleeper, returning the sleeper
152 // back if it shouldn't sleep. Note that this is the location where we take
153 // steals into account.
154 fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
155 assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
156 let ptr = unsafe { token.cast_to_usize() };
157 self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst);
159 let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) };
161 match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
163 self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
165 // If we factor in our steals and notice that the channel has no
166 // data, we successfully sleep
169 if n - steals <= 0 { return Ok(()) }
173 self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
174 Err(unsafe { SignalToken::cast_from_usize(ptr) })
177 pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
178 // Optimistic preflight check (scheduling is expensive).
179 match self.try_recv() {
184 // Welp, our channel has no data. Deschedule the current thread and
185 // initiate the blocking protocol.
186 let (wait_token, signal_token) = blocking::tokens();
187 if self.decrement(signal_token).is_ok() {
188 if let Some(deadline) = deadline {
189 let timed_out = !wait_token.wait_max_until(deadline);
191 self.abort_selection(/* was_upgrade = */ false).map_err(Upgraded)?;
198 match self.try_recv() {
199 // Messages which actually popped from the queue shouldn't count as
200 // a steal, so offset the decrement here (we already have our
201 // "steal" factored into the channel count above).
203 data @ Err(Upgraded(..)) => unsafe {
204 *self.queue.consumer_addition().steals.get() -= 1;
212 pub fn try_recv(&self) -> Result<T, Failure<T>> {
213 match self.queue.pop() {
214 // If we stole some data, record to that effect (this will be
215 // factored into cnt later on).
217 // Note that we don't allow steals to grow without bound in order to
218 // prevent eventual overflow of either steals or cnt as an overflow
219 // would have catastrophic results. Sometimes, steals > cnt, but
220 // other times cnt > steals, so we don't know the relation between
221 // steals and cnt. This code path is executed only rarely, so we do
222 // a pretty slow operation, of swapping 0 into cnt, taking steals
223 // down as much as possible (without going negative), and then
224 // adding back in whatever we couldn't factor into steals.
225 Some(data) => unsafe {
226 if *self.queue.consumer_addition().steals.get() > MAX_STEALS {
227 match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) {
229 self.queue.producer_addition().cnt.store(
230 DISCONNECTED, Ordering::SeqCst);
233 let m = cmp::min(n, *self.queue.consumer_addition().steals.get());
234 *self.queue.consumer_addition().steals.get() -= m;
238 assert!(*self.queue.consumer_addition().steals.get() >= 0);
240 *self.queue.consumer_addition().steals.get() += 1;
243 GoUp(up) => Err(Upgraded(up)),
248 match self.queue.producer_addition().cnt.load(Ordering::SeqCst) {
249 n if n != DISCONNECTED => Err(Empty),
251 // This is a little bit of a tricky case. We failed to pop
252 // data above, and then we have viewed that the channel is
253 // disconnected. In this window more data could have been
254 // sent on the channel. It doesn't really make sense to
255 // return that the channel is disconnected when there's
256 // actually data on it, so be extra sure there's no data by
257 // popping one more time.
259 // We can ignore steals because the other end is
260 // disconnected and we'll never need to really factor in our
263 match self.queue.pop() {
264 Some(Data(t)) => Ok(t),
265 Some(GoUp(up)) => Err(Upgraded(up)),
266 None => Err(Disconnected),
274 pub fn drop_chan(&self) {
275 // Dropping a channel is pretty simple, we just flag it as disconnected
276 // and then wakeup a blocker if there is one.
277 match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) {
278 -1 => { self.take_to_wake().signal(); }
280 n => { assert!(n >= 0); }
284 pub fn drop_port(&self) {
285 // Dropping a port seems like a fairly trivial thing. In theory all we
286 // need to do is flag that we're disconnected and then everything else
287 // can take over (we don't have anyone to wake up).
289 // The catch for Ports is that we want to drop the entire contents of
290 // the queue. There are multiple reasons for having this property, the
291 // largest of which is that if another chan is waiting in this channel
292 // (but not received yet), then waiting on that port will cause a
295 // So if we accept that we must now destroy the entire contents of the
296 // queue, this code may make a bit more sense. The tricky part is that
297 // we can't let any in-flight sends go un-dropped, we have to make sure
298 // *everything* is dropped and nothing new will come onto the channel.
300 // The first thing we do is set a flag saying that we're done for. All
301 // sends are gated on this flag, so we're immediately guaranteed that
302 // there are a bounded number of active sends that we'll have to deal
304 self.queue.producer_addition().port_dropped.store(true, Ordering::SeqCst);
306 // Now that we're guaranteed to deal with a bounded number of senders,
307 // we need to drain the queue. This draining process happens atomically
308 // with respect to the "count" of the channel. If the count is nonzero
309 // (with steals taken into account), then there must be data on the
310 // channel. In this case we drain everything and then try again. We will
311 // continue to fail while active senders send data while we're dropping
312 // data, but eventually we're guaranteed to break out of this loop
313 // (because there is a bounded number of senders).
314 let mut steals = unsafe { *self.queue.consumer_addition().steals.get() };
316 let cnt = self.queue.producer_addition().cnt.compare_and_swap(
317 steals, DISCONNECTED, Ordering::SeqCst);
318 cnt != DISCONNECTED && cnt != steals
320 while let Some(_) = self.queue.pop() { steals += 1; }
323 // At this point in time, we have gated all future senders from sending,
324 // and we have flagged the channel as being disconnected. The senders
325 // still have some responsibility, however, because some sends may not
326 // complete until after we flag the disconnection. There are more
327 // details in the sending methods that see DISCONNECTED
330 ////////////////////////////////////////////////////////////////////////////
331 // select implementation
332 ////////////////////////////////////////////////////////////////////////////
334 // increment the count on the channel (used for selection)
335 fn bump(&self, amt: isize) -> isize {
336 match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) {
338 self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
345 // Removes a previous thread from being blocked in this port
346 pub fn abort_selection(&self,
347 was_upgrade: bool) -> Result<bool, Receiver<T>> {
348 // If we're aborting selection after upgrading from a oneshot, then
349 // we're guarantee that no one is waiting. The only way that we could
350 // have seen the upgrade is if data was actually sent on the channel
351 // half again. For us, this means that there is guaranteed to be data on
352 // this channel. Furthermore, we're guaranteed that there was no
353 // start_selection previously, so there's no need to modify `self.cnt`
356 // Hence, because of these invariants, we immediately return `Ok(true)`.
357 // Note that the data may not actually be sent on the channel just yet.
358 // The other end could have flagged the upgrade but not sent data to
359 // this end. This is fine because we know it's a small bounded windows
360 // of time until the data is actually sent.
362 assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0);
363 assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
367 // We want to make sure that the count on the channel goes non-negative,
368 // and in the stream case we can have at most one steal, so just assume
369 // that we had one steal.
371 let prev = self.bump(steals + 1);
373 // If we were previously disconnected, then we know for sure that there
374 // is no thread in to_wake, so just keep going
375 let has_data = if prev == DISCONNECTED {
376 assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
377 true // there is data, that data is that we're disconnected
379 let cur = prev + steals + 1;
382 // If the previous count was negative, then we just made things go
383 // positive, hence we passed the -1 boundary and we're responsible
384 // for removing the to_wake() field and trashing it.
386 // If the previous count was positive then we're in a tougher
387 // situation. A possible race is that a sender just incremented
388 // through -1 (meaning it's going to try to wake a thread up), but it
389 // hasn't yet read the to_wake. In order to prevent a future recv()
390 // from waking up too early (this sender picking up the plastered
391 // over to_wake), we spin loop here waiting for to_wake to be 0.
392 // Note that this entire select() implementation needs an overhaul,
393 // and this is *not* the worst part of it, so this is not done as a
394 // final solution but rather out of necessity for now to get
395 // something working.
397 drop(self.take_to_wake());
399 while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 {
404 assert_eq!(*self.queue.consumer_addition().steals.get(), 0);
405 *self.queue.consumer_addition().steals.get() = steals;
408 // if we were previously positive, then there's surely data to
413 // Now that we've determined that this queue "has data", we peek at the
414 // queue to see if the data is an upgrade or not. If it's an upgrade,
415 // then we need to destroy this port and abort selection on the
418 match self.queue.peek() {
419 Some(&mut GoUp(..)) => {
420 match self.queue.pop() {
421 Some(GoUp(port)) => Err(port),
433 impl<T> Drop for Packet<T> {
435 // Note that this load is not only an assert for correctness about
436 // disconnection, but also a proper fence before the read of
437 // `to_wake`, so this assert cannot be removed with also removing
438 // the `to_wake` assert.
439 assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED);
440 assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);