]> git.lizzy.rs Git - rust.git/commitdiff
Remove ptr-int transmute in std::sync::mpsc
authorBen Kimock <kimockb@gmail.com>
Sun, 3 Apr 2022 19:50:11 +0000 (15:50 -0400)
committerBen Kimock <kimockb@gmail.com>
Sat, 9 Apr 2022 03:28:31 +0000 (23:28 -0400)
Since https://github.com/rust-lang/rust/pull/95340 landed, Miri with
-Zmiri-check-number-validity produces an error on the test suites of
some crates which implement concurrency tools, because it seems like
such crates tend to use std::sync::mpsc in their tests. This fixes the
problem by storing pointer bytes in a pointer.

library/std/src/sync/mpsc/blocking.rs
library/std/src/sync/mpsc/oneshot.rs
library/std/src/sync/mpsc/shared.rs
library/std/src/sync/mpsc/stream.rs

index 4c852b8ee812f1f7a6b25c36e12ea3f496a72c42..021df7b096cbc0670155fa63fcc65d8798544147 100644 (file)
@@ -1,6 +1,5 @@
 //! Generic support for building blocking abstractions.
 
-use crate::mem;
 use crate::sync::atomic::{AtomicBool, Ordering};
 use crate::sync::Arc;
 use crate::thread::{self, Thread};
@@ -47,18 +46,18 @@ pub fn signal(&self) -> bool {
         wake
     }
 
-    /// Converts to an unsafe usize value. Useful for storing in a pipe's state
+    /// Converts to an unsafe raw pointer. Useful for storing in a pipe's state
     /// flag.
     #[inline]
-    pub unsafe fn cast_to_usize(self) -> usize {
-        mem::transmute(self.inner)
+    pub unsafe fn to_raw(self) -> *mut u8 {
+        Arc::into_raw(self.inner) as *mut u8
     }
 
-    /// Converts from an unsafe usize value. Useful for retrieving a pipe's state
+    /// Converts from an unsafe raw pointer. Useful for retrieving a pipe's state
     /// flag.
     #[inline]
-    pub unsafe fn cast_from_usize(signal_ptr: usize) -> SignalToken {
-        SignalToken { inner: mem::transmute(signal_ptr) }
+    pub unsafe fn from_raw(signal_ptr: *mut u8) -> SignalToken {
+        SignalToken { inner: Arc::from_raw(signal_ptr as *mut Inner) }
     }
 }
 
index 3dcf03f579a0f32f0f5dd00ceba42b1057913f2c..0e259b8aecb9a35744ae9225ab33a41d4c7ac83d 100644 (file)
 
 use crate::cell::UnsafeCell;
 use crate::ptr;
-use crate::sync::atomic::{AtomicUsize, Ordering};
+use crate::sync::atomic::{AtomicPtr, Ordering};
 use crate::sync::mpsc::blocking::{self, SignalToken};
 use crate::sync::mpsc::Receiver;
 use crate::time::Instant;
 
 // Various states you can find a port in.
-const EMPTY: usize = 0; // initial state: no data, no blocked receiver
-const DATA: usize = 1; // data ready for receiver to take
-const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded
+const EMPTY: *mut u8 = ptr::invalid_mut::<u8>(0); // initial state: no data, no blocked receiver
+const DATA: *mut u8 = ptr::invalid_mut::<u8>(1); // data ready for receiver to take
+const DISCONNECTED: *mut u8 = ptr::invalid_mut::<u8>(2); // channel is disconnected OR upgraded
 // Any other value represents a pointer to a SignalToken value. The
 // protocol ensures that when the state moves *to* a pointer,
 // ownership of the token is given to the packet, and when the state
@@ -44,7 +44,7 @@
 
 pub struct Packet<T> {
     // Internal state of the chan/port pair (stores the blocked thread as well)
-    state: AtomicUsize,
+    state: AtomicPtr<u8>,
     // One-shot data slot location
     data: UnsafeCell<Option<T>>,
     // when used for the second time, a oneshot channel must be upgraded, and
@@ -75,7 +75,7 @@ pub fn new() -> Packet<T> {
         Packet {
             data: UnsafeCell::new(None),
             upgrade: UnsafeCell::new(NothingSent),
-            state: AtomicUsize::new(EMPTY),
+            state: AtomicPtr::new(EMPTY),
         }
     }
 
@@ -108,7 +108,7 @@ pub fn send(&self, t: T) -> Result<(), T> {
                 // There is a thread waiting on the other end. We leave the 'DATA'
                 // state inside so it'll pick it up on the other end.
                 ptr => {
-                    SignalToken::cast_from_usize(ptr).signal();
+                    SignalToken::from_raw(ptr).signal();
                     Ok(())
                 }
             }
@@ -126,7 +126,7 @@ pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
         // like we're not empty, then immediately go through to `try_recv`.
         if self.state.load(Ordering::SeqCst) == EMPTY {
             let (wait_token, signal_token) = blocking::tokens();
-            let ptr = unsafe { signal_token.cast_to_usize() };
+            let ptr = unsafe { signal_token.to_raw() };
 
             // race with senders to enter the blocking state
             if self.state.compare_exchange(EMPTY, ptr, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
@@ -142,7 +142,7 @@ pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
                 }
             } else {
                 // drop the signal token, since we never blocked
-                drop(unsafe { SignalToken::cast_from_usize(ptr) });
+                drop(unsafe { SignalToken::from_raw(ptr) });
             }
         }
 
@@ -218,7 +218,7 @@ pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
                 }
 
                 // If someone's waiting, we gotta wake them up
-                ptr => UpWoke(SignalToken::cast_from_usize(ptr)),
+                ptr => UpWoke(SignalToken::from_raw(ptr)),
             }
         }
     }
@@ -229,7 +229,7 @@ pub fn drop_chan(&self) {
 
             // If someone's waiting, we gotta wake them up
             ptr => unsafe {
-                SignalToken::cast_from_usize(ptr).signal();
+                SignalToken::from_raw(ptr).signal();
             },
         }
     }
@@ -301,7 +301,7 @@ pub fn abort_selection(&self) -> Result<bool, Receiver<T>> {
 
             // We woke ourselves up from select.
             ptr => unsafe {
-                drop(SignalToken::cast_from_usize(ptr));
+                drop(SignalToken::from_raw(ptr));
                 Ok(false)
             },
         }
index 561626555441045ac7129266a43ce3c6e38b4cbb..51917bd96bd60a46bd7f1b96df4a0e0bbc0d9a34 100644 (file)
@@ -15,7 +15,7 @@
 
 use crate::cell::UnsafeCell;
 use crate::ptr;
-use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
+use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
 use crate::sync::mpsc::blocking::{self, SignalToken};
 use crate::sync::mpsc::mpsc_queue as mpsc;
 use crate::sync::{Mutex, MutexGuard};
 const MAX_STEALS: isize = 5;
 #[cfg(not(test))]
 const MAX_STEALS: isize = 1 << 20;
+const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver
 
 pub struct Packet<T> {
     queue: mpsc::Queue<T>,
     cnt: AtomicIsize,          // How many items are on this channel
     steals: UnsafeCell<isize>, // How many times has a port received without blocking?
-    to_wake: AtomicUsize,      // SignalToken for wake up
+    to_wake: AtomicPtr<u8>,    // SignalToken for wake up
 
     // The number of channels which are currently using this packet.
     channels: AtomicUsize,
@@ -68,7 +69,7 @@ pub fn new() -> Packet<T> {
             queue: mpsc::Queue::new(),
             cnt: AtomicIsize::new(0),
             steals: UnsafeCell::new(0),
-            to_wake: AtomicUsize::new(0),
+            to_wake: AtomicPtr::new(EMPTY),
             channels: AtomicUsize::new(2),
             port_dropped: AtomicBool::new(false),
             sender_drain: AtomicIsize::new(0),
@@ -93,8 +94,8 @@ pub fn postinit_lock(&self) -> MutexGuard<'_, ()> {
     pub fn inherit_blocker(&self, token: Option<SignalToken>, guard: MutexGuard<'_, ()>) {
         if let Some(token) = token {
             assert_eq!(self.cnt.load(Ordering::SeqCst), 0);
-            assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
-            self.to_wake.store(unsafe { token.cast_to_usize() }, Ordering::SeqCst);
+            assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
+            self.to_wake.store(unsafe { token.to_raw() }, Ordering::SeqCst);
             self.cnt.store(-1, Ordering::SeqCst);
 
             // This store is a little sketchy. What's happening here is that
@@ -250,10 +251,10 @@ fn decrement(&self, token: SignalToken) -> StartResult {
         unsafe {
             assert_eq!(
                 self.to_wake.load(Ordering::SeqCst),
-                0,
+                EMPTY,
                 "This is a known bug in the Rust standard library. See https://github.com/rust-lang/rust/issues/39364"
             );
-            let ptr = token.cast_to_usize();
+            let ptr = token.to_raw();
             self.to_wake.store(ptr, Ordering::SeqCst);
 
             let steals = ptr::replace(self.steals.get(), 0);
@@ -272,8 +273,8 @@ fn decrement(&self, token: SignalToken) -> StartResult {
                 }
             }
 
-            self.to_wake.store(0, Ordering::SeqCst);
-            drop(SignalToken::cast_from_usize(ptr));
+            self.to_wake.store(EMPTY, Ordering::SeqCst);
+            drop(SignalToken::from_raw(ptr));
             Abort
         }
     }
@@ -415,9 +416,9 @@ pub fn drop_port(&self) {
     // Consumes ownership of the 'to_wake' field.
     fn take_to_wake(&self) -> SignalToken {
         let ptr = self.to_wake.load(Ordering::SeqCst);
-        self.to_wake.store(0, Ordering::SeqCst);
-        assert!(ptr != 0);
-        unsafe { SignalToken::cast_from_usize(ptr) }
+        self.to_wake.store(EMPTY, Ordering::SeqCst);
+        assert!(ptr != EMPTY);
+        unsafe { SignalToken::from_raw(ptr) }
     }
 
     ////////////////////////////////////////////////////////////////////////////
@@ -462,7 +463,7 @@ pub fn abort_selection(&self, _was_upgrade: bool) -> bool {
         let prev = self.bump(steals + 1);
 
         if prev == DISCONNECTED {
-            assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+            assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
             true
         } else {
             let cur = prev + steals + 1;
@@ -470,7 +471,7 @@ pub fn abort_selection(&self, _was_upgrade: bool) -> bool {
             if prev < 0 {
                 drop(self.take_to_wake());
             } else {
-                while self.to_wake.load(Ordering::SeqCst) != 0 {
+                while self.to_wake.load(Ordering::SeqCst) != EMPTY {
                     thread::yield_now();
                 }
             }
@@ -494,7 +495,7 @@ fn drop(&mut self) {
         // `to_wake`, so this assert cannot be removed with also removing
         // the `to_wake` assert.
         assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
-        assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+        assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
         assert_eq!(self.channels.load(Ordering::SeqCst), 0);
     }
 }
index 2a1d3f8967e995b6e0dd67624e45fc1825a94523..4c3812c79f619f4dcb3a103136042f4fb6d23852 100644 (file)
@@ -17,7 +17,7 @@
 use crate::thread;
 use crate::time::Instant;
 
-use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
+use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, Ordering};
 use crate::sync::mpsc::blocking::{self, SignalToken};
 use crate::sync::mpsc::spsc_queue as spsc;
 use crate::sync::mpsc::Receiver;
@@ -27,6 +27,7 @@
 const MAX_STEALS: isize = 5;
 #[cfg(not(test))]
 const MAX_STEALS: isize = 1 << 20;
+const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver
 
 pub struct Packet<T> {
     // internal queue for all messages
@@ -34,8 +35,8 @@ pub struct Packet<T> {
 }
 
 struct ProducerAddition {
-    cnt: AtomicIsize,     // How many items are on this channel
-    to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up
+    cnt: AtomicIsize,       // How many items are on this channel
+    to_wake: AtomicPtr<u8>, // SignalToken for the blocked thread to wake up
 
     port_dropped: AtomicBool, // flag if the channel has been destroyed.
 }
@@ -71,7 +72,7 @@ pub fn new() -> Packet<T> {
                     128,
                     ProducerAddition {
                         cnt: AtomicIsize::new(0),
-                        to_wake: AtomicUsize::new(0),
+                        to_wake: AtomicPtr::new(EMPTY),
 
                         port_dropped: AtomicBool::new(false),
                     },
@@ -147,17 +148,17 @@ fn do_send(&self, t: Message<T>) -> UpgradeResult {
     // Consumes ownership of the 'to_wake' field.
     fn take_to_wake(&self) -> SignalToken {
         let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst);
-        self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
-        assert!(ptr != 0);
-        unsafe { SignalToken::cast_from_usize(ptr) }
+        self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst);
+        assert!(ptr != EMPTY);
+        unsafe { SignalToken::from_raw(ptr) }
     }
 
     // Decrements the count on the channel for a sleeper, returning the sleeper
     // back if it shouldn't sleep. Note that this is the location where we take
     // steals into account.
     fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
-        assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
-        let ptr = unsafe { token.cast_to_usize() };
+        assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
+        let ptr = unsafe { token.to_raw() };
         self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst);
 
         let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) };
@@ -176,8 +177,8 @@ fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
             }
         }
 
-        self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
-        Err(unsafe { SignalToken::cast_from_usize(ptr) })
+        self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst);
+        Err(unsafe { SignalToken::from_raw(ptr) })
     }
 
     pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
@@ -376,7 +377,7 @@ pub fn abort_selection(&self, was_upgrade: bool) -> Result<bool, Receiver<T>> {
         // of time until the data is actually sent.
         if was_upgrade {
             assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0);
-            assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
+            assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
             return Ok(true);
         }
 
@@ -389,7 +390,7 @@ pub fn abort_selection(&self, was_upgrade: bool) -> Result<bool, Receiver<T>> {
         // If we were previously disconnected, then we know for sure that there
         // is no thread in to_wake, so just keep going
         let has_data = if prev == DISCONNECTED {
-            assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
+            assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
             true // there is data, that data is that we're disconnected
         } else {
             let cur = prev + steals + 1;
@@ -412,7 +413,7 @@ pub fn abort_selection(&self, was_upgrade: bool) -> Result<bool, Receiver<T>> {
             if prev < 0 {
                 drop(self.take_to_wake());
             } else {
-                while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 {
+                while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != EMPTY {
                     thread::yield_now();
                 }
             }
@@ -451,6 +452,6 @@ fn drop(&mut self) {
         // `to_wake`, so this assert cannot be removed with also removing
         // the `to_wake` assert.
         assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED);
-        assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
+        assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
     }
 }