//! Generic support for building blocking abstractions.
-use crate::mem;
use crate::sync::atomic::{AtomicBool, Ordering};
use crate::sync::Arc;
use crate::thread::{self, Thread};
wake
}
- /// Converts to an unsafe usize value. Useful for storing in a pipe's state
+ /// Converts to an unsafe raw pointer. Useful for storing in a pipe's state
/// flag.
#[inline]
- pub unsafe fn cast_to_usize(self) -> usize {
- mem::transmute(self.inner)
+ pub unsafe fn to_raw(self) -> *mut u8 {
+ Arc::into_raw(self.inner) as *mut u8
}
- /// Converts from an unsafe usize value. Useful for retrieving a pipe's state
+ /// Converts from an unsafe raw pointer. Useful for retrieving a pipe's state
/// flag.
#[inline]
- pub unsafe fn cast_from_usize(signal_ptr: usize) -> SignalToken {
- SignalToken { inner: mem::transmute(signal_ptr) }
+ pub unsafe fn from_raw(signal_ptr: *mut u8) -> SignalToken {
+ SignalToken { inner: Arc::from_raw(signal_ptr as *mut Inner) }
}
}
use crate::cell::UnsafeCell;
use crate::ptr;
-use crate::sync::atomic::{AtomicUsize, Ordering};
+use crate::sync::atomic::{AtomicPtr, Ordering};
use crate::sync::mpsc::blocking::{self, SignalToken};
use crate::sync::mpsc::Receiver;
use crate::time::Instant;
// Various states you can find a port in.
-const EMPTY: usize = 0; // initial state: no data, no blocked receiver
-const DATA: usize = 1; // data ready for receiver to take
-const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded
+const EMPTY: *mut u8 = ptr::invalid_mut::<u8>(0); // initial state: no data, no blocked receiver
+const DATA: *mut u8 = ptr::invalid_mut::<u8>(1); // data ready for receiver to take
+const DISCONNECTED: *mut u8 = ptr::invalid_mut::<u8>(2); // channel is disconnected OR upgraded
// Any other value represents a pointer to a SignalToken value. The
// protocol ensures that when the state moves *to* a pointer,
// ownership of the token is given to the packet, and when the state
pub struct Packet<T> {
// Internal state of the chan/port pair (stores the blocked thread as well)
- state: AtomicUsize,
+ state: AtomicPtr<u8>,
// One-shot data slot location
data: UnsafeCell<Option<T>>,
// when used for the second time, a oneshot channel must be upgraded, and
Packet {
data: UnsafeCell::new(None),
upgrade: UnsafeCell::new(NothingSent),
- state: AtomicUsize::new(EMPTY),
+ state: AtomicPtr::new(EMPTY),
}
}
// There is a thread waiting on the other end. We leave the 'DATA'
// state inside so it'll pick it up on the other end.
ptr => {
- SignalToken::cast_from_usize(ptr).signal();
+ SignalToken::from_raw(ptr).signal();
Ok(())
}
}
// like we're not empty, then immediately go through to `try_recv`.
if self.state.load(Ordering::SeqCst) == EMPTY {
let (wait_token, signal_token) = blocking::tokens();
- let ptr = unsafe { signal_token.cast_to_usize() };
+ let ptr = unsafe { signal_token.to_raw() };
// race with senders to enter the blocking state
if self.state.compare_exchange(EMPTY, ptr, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
}
} else {
// drop the signal token, since we never blocked
- drop(unsafe { SignalToken::cast_from_usize(ptr) });
+ drop(unsafe { SignalToken::from_raw(ptr) });
}
}
}
// If someone's waiting, we gotta wake them up
- ptr => UpWoke(SignalToken::cast_from_usize(ptr)),
+ ptr => UpWoke(SignalToken::from_raw(ptr)),
}
}
}
// If someone's waiting, we gotta wake them up
ptr => unsafe {
- SignalToken::cast_from_usize(ptr).signal();
+ SignalToken::from_raw(ptr).signal();
},
}
}
// We woke ourselves up from select.
ptr => unsafe {
- drop(SignalToken::cast_from_usize(ptr));
+ drop(SignalToken::from_raw(ptr));
Ok(false)
},
}
use crate::cell::UnsafeCell;
use crate::ptr;
-use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
+use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
use crate::sync::mpsc::blocking::{self, SignalToken};
use crate::sync::mpsc::mpsc_queue as mpsc;
use crate::sync::{Mutex, MutexGuard};
const MAX_STEALS: isize = 5;
#[cfg(not(test))]
const MAX_STEALS: isize = 1 << 20;
+const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver
pub struct Packet<T> {
queue: mpsc::Queue<T>,
cnt: AtomicIsize, // How many items are on this channel
steals: UnsafeCell<isize>, // How many times has a port received without blocking?
- to_wake: AtomicUsize, // SignalToken for wake up
+ to_wake: AtomicPtr<u8>, // SignalToken for wake up
// The number of channels which are currently using this packet.
channels: AtomicUsize,
queue: mpsc::Queue::new(),
cnt: AtomicIsize::new(0),
steals: UnsafeCell::new(0),
- to_wake: AtomicUsize::new(0),
+ to_wake: AtomicPtr::new(EMPTY),
channels: AtomicUsize::new(2),
port_dropped: AtomicBool::new(false),
sender_drain: AtomicIsize::new(0),
pub fn inherit_blocker(&self, token: Option<SignalToken>, guard: MutexGuard<'_, ()>) {
if let Some(token) = token {
assert_eq!(self.cnt.load(Ordering::SeqCst), 0);
- assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
- self.to_wake.store(unsafe { token.cast_to_usize() }, Ordering::SeqCst);
+ assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
+ self.to_wake.store(unsafe { token.to_raw() }, Ordering::SeqCst);
self.cnt.store(-1, Ordering::SeqCst);
// This store is a little sketchy. What's happening here is that
unsafe {
assert_eq!(
self.to_wake.load(Ordering::SeqCst),
- 0,
+ EMPTY,
"This is a known bug in the Rust standard library. See https://github.com/rust-lang/rust/issues/39364"
);
- let ptr = token.cast_to_usize();
+ let ptr = token.to_raw();
self.to_wake.store(ptr, Ordering::SeqCst);
let steals = ptr::replace(self.steals.get(), 0);
}
}
- self.to_wake.store(0, Ordering::SeqCst);
- drop(SignalToken::cast_from_usize(ptr));
+ self.to_wake.store(EMPTY, Ordering::SeqCst);
+ drop(SignalToken::from_raw(ptr));
Abort
}
}
// Consumes ownership of the 'to_wake' field.
fn take_to_wake(&self) -> SignalToken {
let ptr = self.to_wake.load(Ordering::SeqCst);
- self.to_wake.store(0, Ordering::SeqCst);
- assert!(ptr != 0);
- unsafe { SignalToken::cast_from_usize(ptr) }
+ self.to_wake.store(EMPTY, Ordering::SeqCst);
+ assert!(ptr != EMPTY);
+ unsafe { SignalToken::from_raw(ptr) }
}
////////////////////////////////////////////////////////////////////////////
let prev = self.bump(steals + 1);
if prev == DISCONNECTED {
- assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+ assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
true
} else {
let cur = prev + steals + 1;
if prev < 0 {
drop(self.take_to_wake());
} else {
- while self.to_wake.load(Ordering::SeqCst) != 0 {
+ while self.to_wake.load(Ordering::SeqCst) != EMPTY {
thread::yield_now();
}
}
// `to_wake`, so this assert cannot be removed with also removing
// the `to_wake` assert.
assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
- assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+ assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
assert_eq!(self.channels.load(Ordering::SeqCst), 0);
}
}
use crate::thread;
use crate::time::Instant;
-use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
+use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, Ordering};
use crate::sync::mpsc::blocking::{self, SignalToken};
use crate::sync::mpsc::spsc_queue as spsc;
use crate::sync::mpsc::Receiver;
const MAX_STEALS: isize = 5;
#[cfg(not(test))]
const MAX_STEALS: isize = 1 << 20;
+const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver
pub struct Packet<T> {
// internal queue for all messages
}
struct ProducerAddition {
- cnt: AtomicIsize, // How many items are on this channel
- to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up
+ cnt: AtomicIsize, // How many items are on this channel
+ to_wake: AtomicPtr<u8>, // SignalToken for the blocked thread to wake up
port_dropped: AtomicBool, // flag if the channel has been destroyed.
}
128,
ProducerAddition {
cnt: AtomicIsize::new(0),
- to_wake: AtomicUsize::new(0),
+ to_wake: AtomicPtr::new(EMPTY),
port_dropped: AtomicBool::new(false),
},
// Consumes ownership of the 'to_wake' field.
fn take_to_wake(&self) -> SignalToken {
let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst);
- self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
- assert!(ptr != 0);
- unsafe { SignalToken::cast_from_usize(ptr) }
+ self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst);
+ assert!(ptr != EMPTY);
+ unsafe { SignalToken::from_raw(ptr) }
}
// Decrements the count on the channel for a sleeper, returning the sleeper
// back if it shouldn't sleep. Note that this is the location where we take
// steals into account.
fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
- assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
- let ptr = unsafe { token.cast_to_usize() };
+ assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
+ let ptr = unsafe { token.to_raw() };
self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst);
let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) };
}
}
- self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
- Err(unsafe { SignalToken::cast_from_usize(ptr) })
+ self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst);
+ Err(unsafe { SignalToken::from_raw(ptr) })
}
pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
// of time until the data is actually sent.
if was_upgrade {
assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0);
- assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
+ assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
return Ok(true);
}
// If we were previously disconnected, then we know for sure that there
// is no thread in to_wake, so just keep going
let has_data = if prev == DISCONNECTED {
- assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
+ assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
true // there is data, that data is that we're disconnected
} else {
let cur = prev + steals + 1;
if prev < 0 {
drop(self.take_to_wake());
} else {
- while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 {
+ while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != EMPTY {
thread::yield_now();
}
}
// `to_wake`, so this assert cannot be removed with also removing
// the `to_wake` assert.
assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED);
- assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
+ assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
}
}