]> git.lizzy.rs Git - rust.git/commitdiff
Implement weak memory emulation
authorAndy Wang <cbeuw.andy@gmail.com>
Mon, 27 Dec 2021 19:07:23 +0000 (19:07 +0000)
committerAndy Wang <cbeuw.andy@gmail.com>
Mon, 6 Jun 2022 18:15:20 +0000 (19:15 +0100)
src/data_race.rs
src/lib.rs
src/machine.rs
src/weak_memory.rs [new file with mode: 0644]
tests/run-pass/concurrency/weak_memory.rs

index eb67a487b5a50d5e6e1e94dd456e5953c4ac7286..82ee32ddee71f275c8f23a5f633ec6f5fd8eccc4 100644 (file)
@@ -12,7 +12,7 @@
 //! The implementation also models races with memory allocation and deallocation via treating allocation and
 //! deallocation as a type of write internally for detecting data-races.
 //!
-//! This does not explore weak memory orders and so can still miss data-races
+//! Weak memory orders are explored but not all weak behaviours are exhibited, so it can still miss data-races
 //! but should not report false-positives
 //!
 //! Data-race definition from(<https://en.cppreference.com/w/cpp/language/memory_model#Threads_and_data_races>):
 //! This means that the thread-index can be safely re-used, starting on the next timestamp for the newly created
 //! thread.
 //!
-//! The sequentially consistent ordering corresponds to the ordering that the threads
-//! are currently scheduled, this means that the data-race detector has no additional
-//! logic for sequentially consistent accesses at the moment since they are indistinguishable
-//! from acquire/release operations. If weak memory orderings are explored then this
-//! may need to change or be updated accordingly.
-//!
-//! Per the C++ spec for the memory model a sequentially consistent operation:
-//!   "A load operation with this memory order performs an acquire operation,
-//!    a store performs a release operation, and read-modify-write performs
-//!    both an acquire operation and a release operation, plus a single total
-//!    order exists in which all threads observe all modifications in the same
-//!    order (see Sequentially-consistent ordering below) "
-//! So in the absence of weak memory effects a seq-cst load & a seq-cst store is identical
-//! to an acquire load and a release store given the global sequentially consistent order
-//! of the schedule.
-//!
 //! The timestamps used in the data-race detector assign each sequence of non-atomic operations
 //! followed by a single atomic or concurrent operation a single timestamp.
 //! Write, Read, Write, ThreadJoin will be represented by a single timestamp value on a thread.
@@ -67,6 +51,7 @@
     mem,
 };
 
+use rustc_const_eval::interpret::alloc_range;
 use rustc_data_structures::fx::{FxHashMap, FxHashSet};
 use rustc_index::vec::{Idx, IndexVec};
 use rustc_middle::{mir, ty::layout::TyAndLayout};
@@ -115,10 +100,10 @@ pub enum AtomicFenceOp {
 /// of a thread, contains the happens-before clock and
 /// additional metadata to model atomic fence operations.
 #[derive(Clone, Default, Debug)]
-struct ThreadClockSet {
+pub struct ThreadClockSet {
     /// The increasing clock representing timestamps
     /// that happen-before this thread.
-    clock: VClock,
+    pub clock: VClock,
 
     /// The set of timestamps that will happen-before this
     /// thread once it performs an acquire fence.
@@ -127,6 +112,12 @@ struct ThreadClockSet {
     /// The last timestamp of happens-before relations that
     /// have been released by this thread by a fence.
     fence_release: VClock,
+
+    pub fence_seqcst: VClock,
+
+    pub write_seqcst: VClock,
+
+    pub read_seqcst: VClock,
 }
 
 impl ThreadClockSet {
@@ -169,7 +160,7 @@ fn join_with(&mut self, other: &ThreadClockSet) {
 /// common case where no atomic operations
 /// exists on the memory cell.
 #[derive(Clone, PartialEq, Eq, Default, Debug)]
-struct AtomicMemoryCellClocks {
+pub struct AtomicMemoryCellClocks {
     /// The clock-vector of the timestamp of the last atomic
     /// read operation performed by each thread.
     /// This detects potential data-races between atomic read
@@ -514,7 +505,32 @@ fn read_scalar_atomic(
         atomic: AtomicReadOp,
     ) -> InterpResult<'tcx, ScalarMaybeUninit<Tag>> {
         let this = self.eval_context_ref();
+        // This will read from the last store in the modification order of this location. In case
+        // weak memory emulation is enabled, this may not be the store we will pick to actually read from and return.
+        // This is fine with StackedBorrow and race checks because they don't concern metadata on
+        // the *value* (including the associated provenance if this is an AtomicPtr) at this location.
+        // Only metadata on the location itself is used.
         let scalar = this.allow_data_races_ref(move |this| this.read_scalar(&place.into()))?;
+
+        if let Some(global) = &this.machine.data_race {
+            let (alloc_id, base_offset, ..) = this.ptr_get_alloc_id(place.ptr)?;
+            if let Some(alloc_buffers) = this.get_alloc_extra(alloc_id)?.weak_memory.as_ref() {
+                if atomic == AtomicReadOp::SeqCst {
+                    global.sc_read();
+                }
+                let mut rng = this.machine.rng.borrow_mut();
+                let loaded = alloc_buffers.buffered_read(
+                    alloc_range(base_offset, place.layout.size),
+                    global,
+                    atomic == AtomicReadOp::SeqCst,
+                    &mut *rng,
+                    || this.validate_atomic_load(place, atomic),
+                )?;
+
+                return Ok(loaded.unwrap_or(scalar));
+            }
+        }
+
         this.validate_atomic_load(place, atomic)?;
         Ok(scalar)
     }
@@ -528,7 +544,27 @@ fn write_scalar_atomic(
     ) -> InterpResult<'tcx> {
         let this = self.eval_context_mut();
         this.allow_data_races_mut(move |this| this.write_scalar(val, &(*dest).into()))?;
-        this.validate_atomic_store(dest, atomic)
+
+        this.validate_atomic_store(dest, atomic)?;
+        let (alloc_id, base_offset, ..) = this.ptr_get_alloc_id(dest.ptr)?;
+        if let (
+            crate::AllocExtra { weak_memory: Some(alloc_buffers), .. },
+            crate::Evaluator { data_race: Some(global), .. },
+        ) = this.get_alloc_extra_mut(alloc_id)?
+        {
+            if atomic == AtomicWriteOp::SeqCst {
+                global.sc_write();
+            }
+            let size = dest.layout.size;
+            alloc_buffers.buffered_write(
+                val,
+                alloc_range(base_offset, size),
+                global,
+                atomic == AtomicWriteOp::SeqCst,
+            )?;
+        }
+
+        Ok(())
     }
 
     /// Perform an atomic operation on a memory location.
@@ -550,6 +586,8 @@ fn atomic_op_immediate(
         this.allow_data_races_mut(|this| this.write_immediate(*val, &(*place).into()))?;
 
         this.validate_atomic_rmw(place, atomic)?;
+
+        this.buffered_atomic_rmw(val.to_scalar_or_uninit(), place, atomic)?;
         Ok(old)
     }
 
@@ -565,7 +603,10 @@ fn atomic_exchange_scalar(
 
         let old = this.allow_data_races_mut(|this| this.read_scalar(&place.into()))?;
         this.allow_data_races_mut(|this| this.write_scalar(new, &(*place).into()))?;
+
         this.validate_atomic_rmw(place, atomic)?;
+
+        this.buffered_atomic_rmw(new, place, atomic)?;
         Ok(old)
     }
 
@@ -584,15 +625,25 @@ fn atomic_min_max_scalar(
         let lt = this.binary_op(mir::BinOp::Lt, &old, &rhs)?.to_scalar()?.to_bool()?;
 
         let new_val = if min {
-            if lt { &old } else { &rhs }
+            if lt {
+                &old
+            } else {
+                &rhs
+            }
         } else {
-            if lt { &rhs } else { &old }
+            if lt {
+                &rhs
+            } else {
+                &old
+            }
         };
 
         this.allow_data_races_mut(|this| this.write_immediate(**new_val, &(*place).into()))?;
 
         this.validate_atomic_rmw(place, atomic)?;
 
+        this.buffered_atomic_rmw(new_val.to_scalar_or_uninit(), place, atomic)?;
+
         // Return the old value.
         Ok(old)
     }
@@ -642,14 +693,56 @@ fn atomic_compare_exchange_scalar(
         if cmpxchg_success {
             this.allow_data_races_mut(|this| this.write_scalar(new, &(*place).into()))?;
             this.validate_atomic_rmw(place, success)?;
+            this.buffered_atomic_rmw(new, place, success)?;
         } else {
             this.validate_atomic_load(place, fail)?;
+            // A failed compare exchange is equivalent to a load, reading from the latest store
+            // in the modification order.
+            // Since `old` is only a value and not the store element, we need to separately
+            // find it in our store buffer and perform load_impl on it.
+            if let Some(global) = &this.machine.data_race {
+                if fail == AtomicReadOp::SeqCst {
+                    global.sc_read();
+                }
+                let size = place.layout.size;
+                let (alloc_id, base_offset, ..) = this.ptr_get_alloc_id(place.ptr)?;
+                if let Some(alloc_buffers) = this.get_alloc_extra(alloc_id)?.weak_memory.as_ref() {
+                    if global.multi_threaded.get() {
+                        alloc_buffers.read_from_last_store(alloc_range(base_offset, size), global);
+                    }
+                }
+            }
         }
 
         // Return the old value.
         Ok(res)
     }
 
+    fn buffered_atomic_rmw(
+        &mut self,
+        new_val: ScalarMaybeUninit<Tag>,
+        place: &MPlaceTy<'tcx, Tag>,
+        atomic: AtomicRwOp,
+    ) -> InterpResult<'tcx> {
+        let this = self.eval_context_mut();
+        let (alloc_id, base_offset, ..) = this.ptr_get_alloc_id(place.ptr)?;
+        if let (
+            crate::AllocExtra { weak_memory: Some(alloc_buffers), .. },
+            crate::Evaluator { data_race: Some(global), .. },
+        ) = this.get_alloc_extra_mut(alloc_id)?
+        {
+            if atomic == AtomicRwOp::SeqCst {
+                global.sc_read();
+                global.sc_write();
+            }
+            let size = place.layout.size;
+            let range = alloc_range(base_offset, size);
+            alloc_buffers.read_from_last_store(range, global);
+            alloc_buffers.buffered_write(new_val, range, global, atomic == AtomicRwOp::SeqCst)?;
+        }
+        Ok(())
+    }
+
     /// Update the data-race detector for an atomic read occurring at the
     /// associated memory-place and on the current thread.
     fn validate_atomic_load(
@@ -723,7 +816,7 @@ fn validate_atomic_rmw(
     fn validate_atomic_fence(&mut self, atomic: AtomicFenceOp) -> InterpResult<'tcx> {
         let this = self.eval_context_mut();
         if let Some(data_race) = &mut this.machine.data_race {
-            data_race.maybe_perform_sync_operation(move |index, mut clocks| {
+            data_race.maybe_perform_sync_operation(|index, mut clocks| {
                 log::trace!("Atomic fence on {:?} with ordering {:?}", index, atomic);
 
                 // Apply data-race detection for the current fences
@@ -737,6 +830,11 @@ fn validate_atomic_fence(&mut self, atomic: AtomicFenceOp) -> InterpResult<'tcx>
                     // Either Release | AcqRel | SeqCst
                     clocks.apply_release_fence();
                 }
+                if atomic == AtomicFenceOp::SeqCst {
+                    data_race.last_sc_fence.borrow_mut().set_at_index(&clocks.clock, index);
+                    clocks.fence_seqcst.join(&data_race.last_sc_fence.borrow());
+                    clocks.write_seqcst.join(&data_race.last_sc_write.borrow());
+                }
 
                 // Increment timestamp in case of release semantics.
                 Ok(atomic != AtomicFenceOp::Acquire)
@@ -1116,6 +1214,12 @@ pub struct GlobalState {
     /// The associated vector index will be moved into re-use candidates
     /// after the join operation occurs.
     terminated_threads: RefCell<FxHashMap<ThreadId, VectorIdx>>,
+
+    /// The timestamp of last SC fence performed by each thread
+    last_sc_fence: RefCell<VClock>,
+
+    /// The timestamp of last SC write performed by each thread
+    last_sc_write: RefCell<VClock>,
 }
 
 impl GlobalState {
@@ -1131,6 +1235,8 @@ pub fn new() -> Self {
             active_thread_count: Cell::new(1),
             reuse_candidates: RefCell::new(FxHashSet::default()),
             terminated_threads: RefCell::new(FxHashMap::default()),
+            last_sc_fence: RefCell::new(VClock::default()),
+            last_sc_write: RefCell::new(VClock::default()),
         };
 
         // Setup the main-thread since it is not explicitly created:
@@ -1445,7 +1551,7 @@ fn load_thread_state_mut(&self, thread: ThreadId) -> (VectorIdx, RefMut<'_, Thre
     /// Load the current vector clock in use and the current set of thread clocks
     /// in use for the vector.
     #[inline]
-    fn current_thread_state(&self) -> (VectorIdx, Ref<'_, ThreadClockSet>) {
+    pub fn current_thread_state(&self) -> (VectorIdx, Ref<'_, ThreadClockSet>) {
         let index = self.current_index();
         let ref_vector = self.vector_clocks.borrow();
         let clocks = Ref::map(ref_vector, |vec| &vec[index]);
@@ -1455,7 +1561,7 @@ fn current_thread_state(&self) -> (VectorIdx, Ref<'_, ThreadClockSet>) {
     /// Load the current vector clock in use and the current set of thread clocks
     /// in use for the vector mutably for modification.
     #[inline]
-    fn current_thread_state_mut(&self) -> (VectorIdx, RefMut<'_, ThreadClockSet>) {
+    pub fn current_thread_state_mut(&self) -> (VectorIdx, RefMut<'_, ThreadClockSet>) {
         let index = self.current_index();
         let ref_vector = self.vector_clocks.borrow_mut();
         let clocks = RefMut::map(ref_vector, |vec| &mut vec[index]);
@@ -1468,4 +1574,16 @@ fn current_thread_state_mut(&self) -> (VectorIdx, RefMut<'_, ThreadClockSet>) {
     fn current_index(&self) -> VectorIdx {
         self.current_index.get()
     }
+
+    // SC ATOMIC STORE rule in the paper.
+    fn sc_write(&self) {
+        let (index, clocks) = self.current_thread_state();
+        self.last_sc_write.borrow_mut().set_at_index(&clocks.clock, index);
+    }
+
+    // SC ATOMIC READ rule in the paper.
+    fn sc_read(&self) {
+        let (.., mut clocks) = self.current_thread_state_mut();
+        clocks.read_seqcst.join(&self.last_sc_fence.borrow());
+    }
 }
index f7c256656a7688ef09e2065cbf2cb6f06995140e..06ab2fabab04f51ecdc774b638829b6b8e764eb6 100644 (file)
@@ -45,6 +45,7 @@
 mod sync;
 mod thread;
 mod vector_clock;
+mod weak_memory;
 
 // Establish a "crate-wide prelude": we often import `crate::*`.
 
index 2060bba0b8530bf56601ad102f95f95c54848144..aa2a930ccd251c2fd5428eaa83a4f050bf47a89f 100644 (file)
@@ -190,6 +190,9 @@ pub struct AllocExtra {
     /// Data race detection via the use of a vector-clock,
     ///  this is only added if it is enabled.
     pub data_race: Option<data_race::AllocExtra>,
+    /// Weak memory emulation via the use of store buffers,
+    ///  this is only added if it is enabled.
+    pub weak_memory: Option<weak_memory::AllocExtra>,
 }
 
 /// Precomputed layouts of primitive types
@@ -630,9 +633,16 @@ fn init_allocation_extra<'b>(
         } else {
             None
         };
+        let buffer_alloc = if ecx.machine.weak_memory {
+            // FIXME: if this is an atomic obejct, we want to supply its initial value
+            // while allocating the store buffer here.
+            Some(weak_memory::AllocExtra::new_allocation(alloc.size()))
+        } else {
+            None
+        };
         let alloc: Allocation<Tag, Self::AllocExtra> = alloc.convert_tag_add_extra(
             &ecx.tcx,
-            AllocExtra { stacked_borrows: stacks, data_race: race_alloc },
+            AllocExtra { stacked_borrows: stacks, data_race: race_alloc, weak_memory: buffer_alloc },
             |ptr| Evaluator::tag_alloc_base_pointer(ecx, ptr),
         );
         Cow::Owned(alloc)
diff --git a/src/weak_memory.rs b/src/weak_memory.rs
new file mode 100644 (file)
index 0000000..c82a31d
--- /dev/null
@@ -0,0 +1,297 @@
+//! Implementation of C++11-consistent weak memory emulation using store buffers
+//! based on Dynamic Race Detection for C++ ("the paper"):
+//! https://www.doc.ic.ac.uk/~afd/homepages/papers/pdfs/2017/POPL.pdf
+
+// Our and the author's own implementation (tsan11) of the paper have some deviations from the provided operational semantics in §5.3:
+// 1. In the operational semantics, store elements keep a copy of the atomic object's vector clock (AtomicCellClocks::sync_vector in miri),
+// but this is not used anywhere so it's omitted here.
+//
+// 2. In the operational semantics, each store element keeps the timestamp of a thread when it loads from the store.
+// If the same thread loads from the same store element multiple times, then the timestamps at all loads are saved in a list of load elements.
+// This is not necessary as later loads by the same thread will always have greater timetstamp values, so we only need to record the timestamp of the first
+// load by each thread. This optimisation is done in tsan11
+// (https://github.com/ChrisLidbury/tsan11/blob/ecbd6b81e9b9454e01cba78eb9d88684168132c7/lib/tsan/rtl/tsan_relaxed.h#L35-L37)
+// and here.
+// 
+// 3. §4.5 of the paper wants an SC store to mark all existing stores in the buffer that happens before it
+// as SC. This is not done in the operational semantics but implemented correctly in tsan11
+// (https://github.com/ChrisLidbury/tsan11/blob/ecbd6b81e9b9454e01cba78eb9d88684168132c7/lib/tsan/rtl/tsan_relaxed.cc#L160-L167)
+// and here.
+//
+// 4. W_SC ; R_SC case requires the SC load to ignore all but last store maked SC (stores not marked SC are not
+// affected). But this rule is applied to all loads in ReadsFromSet from the paper (last two lines of code), not just SC load.
+// This is implemented correctly in tsan11
+// (https://github.com/ChrisLidbury/tsan11/blob/ecbd6b81e9b9454e01cba78eb9d88684168132c7/lib/tsan/rtl/tsan_relaxed.cc#L295)
+// and here.
+
+use std::{
+    cell::{Ref, RefCell, RefMut},
+    collections::VecDeque,
+};
+
+use rustc_const_eval::interpret::{AllocRange, InterpResult, ScalarMaybeUninit};
+use rustc_data_structures::fx::FxHashMap;
+use rustc_target::abi::Size;
+
+use crate::{
+    data_race::{GlobalState, ThreadClockSet},
+    RangeMap, Tag, VClock, VTimestamp, VectorIdx,
+};
+
+pub type AllocExtra = StoreBufferAlloc;
+#[derive(Debug, Clone)]
+pub struct StoreBufferAlloc {
+    /// Store buffer of each atomic object in this allocation
+    // Load may modify a StoreBuffer to record the loading thread's
+    // timestamp so we need interior mutability here.
+    store_buffer: RefCell<RangeMap<StoreBuffer>>,
+}
+
+impl StoreBufferAlloc {
+    pub fn new_allocation(len: Size) -> Self {
+        Self { store_buffer: RefCell::new(RangeMap::new(len, StoreBuffer::default())) }
+    }
+
+    /// Gets a store buffer associated with an atomic object in this allocation
+    fn get_store_buffer(&self, range: AllocRange) -> Ref<'_, StoreBuffer> {
+        Ref::map(self.store_buffer.borrow(), |range_map| {
+            let (.., store_buffer) = range_map.iter(range.start, range.size).next().unwrap();
+            store_buffer
+        })
+    }
+
+    fn get_store_buffer_mut(&self, range: AllocRange) -> RefMut<'_, StoreBuffer> {
+        RefMut::map(self.store_buffer.borrow_mut(), |range_map| {
+            let (.., store_buffer) = range_map.iter_mut(range.start, range.size).next().unwrap();
+            store_buffer
+        })
+    }
+
+    /// Reads from the last store in modification order
+    pub fn read_from_last_store<'tcx>(&self, range: AllocRange, global: &GlobalState) {
+        let store_buffer = self.get_store_buffer(range);
+        let store_elem = store_buffer.buffer.back();
+        if let Some(store_elem) = store_elem {
+            let (index, clocks) = global.current_thread_state();
+            store_elem.load_impl(index, &clocks);
+        }
+    }
+
+    pub fn buffered_read<'tcx>(
+        &self,
+        range: AllocRange,
+        global: &GlobalState,
+        is_seqcst: bool,
+        rng: &mut (impl rand::Rng + ?Sized),
+        validate: impl FnOnce() -> InterpResult<'tcx>,
+    ) -> InterpResult<'tcx, Option<ScalarMaybeUninit<Tag>>> {
+        // Having a live borrow to store_buffer while calling validate_atomic_load is fine
+        // because the race detector doesn't touch store_buffer
+        let store_buffer = self.get_store_buffer(range);
+
+        let store_elem = {
+            // The `clocks` we got here must be dropped before calling validate_atomic_load
+            // as the race detector will update it
+            let (.., clocks) = global.current_thread_state();
+            // Load from a valid entry in the store buffer
+            store_buffer.fetch_store(is_seqcst, &clocks, &mut *rng)
+        };
+
+        // Unlike in write_scalar_atomic, thread clock updates have to be done
+        // after we've picked a store element from the store buffer, as presented
+        // in ATOMIC LOAD rule of the paper. This is because fetch_store
+        // requires access to ThreadClockSet.clock, which is updated by the race detector
+        validate()?;
+
+        let loaded = store_elem.map(|store_elem| {
+            let (index, clocks) = global.current_thread_state();
+            store_elem.load_impl(index, &clocks)
+        });
+        Ok(loaded)
+    }
+
+    pub fn buffered_write<'tcx>(
+        &mut self,
+        val: ScalarMaybeUninit<Tag>,
+        range: AllocRange,
+        global: &GlobalState,
+        is_seqcst: bool,
+    ) -> InterpResult<'tcx> {
+        let (index, clocks) = global.current_thread_state();
+
+        let mut store_buffer = self.get_store_buffer_mut(range);
+        store_buffer.store_impl(val, index, &clocks.clock, is_seqcst);
+        Ok(())
+    }
+}
+
+const STORE_BUFFER_LIMIT: usize = 128;
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct StoreBuffer {
+    // Stores to this location in modification order
+    buffer: VecDeque<StoreElement>,
+}
+
+impl Default for StoreBuffer {
+    fn default() -> Self {
+        let mut buffer = VecDeque::new();
+        buffer.reserve(STORE_BUFFER_LIMIT);
+        Self { buffer }
+    }
+}
+
+impl<'mir, 'tcx: 'mir> StoreBuffer {
+    /// Selects a valid store element in the buffer.
+    /// The buffer does not contain the value used to initialise the atomic object
+    /// so a fresh atomic object has an empty store buffer until an explicit store.
+    fn fetch_store<R: rand::Rng + ?Sized>(
+        &self,
+        is_seqcst: bool,
+        clocks: &ThreadClockSet,
+        rng: &mut R,
+    ) -> Option<&StoreElement> {
+        use rand::seq::IteratorRandom;
+        let mut found_sc = false;
+        // FIXME: this should be an inclusive take_while (stops after a false predicate, but
+        // includes the element that gave the false), but such function doesn't yet
+        // exist in the standard libary https://github.com/rust-lang/rust/issues/62208
+        let mut keep_searching = true;
+        let candidates = self
+            .buffer
+            .iter()
+            .rev()
+            .take_while(move |&store_elem| {
+                if !keep_searching {
+                    return false;
+                }
+                // CoWR: if a store happens-before the current load,
+                // then we can't read-from anything earlier in modification order.
+                if store_elem.timestamp <= clocks.clock[store_elem.store_index] {
+                    log::info!("Stopped due to coherent write-read");
+                    keep_searching = false;
+                    return true;
+                }
+
+                // CoRR: if there was a load from this store which happened-before the current load,
+                // then we cannot read-from anything earlier in modification order.
+                if store_elem.loads.borrow().iter().any(|(&load_index, &load_timestamp)| {
+                    load_timestamp <= clocks.clock[load_index]
+                }) {
+                    log::info!("Stopped due to coherent read-read");
+                    keep_searching = false;
+                    return true;
+                }
+
+                // The current load, which may be sequenced-after an SC fence, can only read-from
+                // the last store sequenced-before an SC fence in another thread (or any stores
+                // later than that SC fence)
+                if store_elem.timestamp <= clocks.fence_seqcst[store_elem.store_index] {
+                    log::info!("Stopped due to coherent load sequenced after sc fence");
+                    keep_searching = false;
+                    return true;
+                }
+
+                // The current non-SC load can only read-from the latest SC store (or any stores later than that
+                // SC store)
+                if store_elem.timestamp <= clocks.write_seqcst[store_elem.store_index]
+                    && store_elem.is_seqcst
+                {
+                    log::info!("Stopped due to needing to load from the last SC store");
+                    keep_searching = false;
+                    return true;
+                }
+
+                // The current SC load can only read-from the last store sequenced-before
+                // the last SC fence (or any stores later than the SC fence)
+                if is_seqcst && store_elem.timestamp <= clocks.read_seqcst[store_elem.store_index] {
+                    log::info!("Stopped due to sc load needing to load from the last SC store before an SC fence");
+                    keep_searching = false;
+                    return true;
+                }
+
+                true
+            })
+            .filter(|&store_elem| {
+                if is_seqcst {
+                    // An SC load needs to ignore all but last store maked SC (stores not marked SC are not
+                    // affected)
+                    let include = !(store_elem.is_seqcst && found_sc);
+                    found_sc |= store_elem.is_seqcst;
+                    include
+                } else {
+                    true
+                }
+            });
+
+        candidates.choose(rng)
+    }
+
+    /// ATOMIC STORE IMPL in the paper (except we don't need the location's vector clock)
+    fn store_impl(
+        &mut self,
+        val: ScalarMaybeUninit<Tag>,
+        index: VectorIdx,
+        thread_clock: &VClock,
+        is_seqcst: bool,
+    ) {
+        let store_elem = StoreElement {
+            store_index: index,
+            timestamp: thread_clock[index],
+            // In the language provided in the paper, an atomic store takes the value from a
+            // non-atomic memory location.
+            // But we already have the immediate value here so we don't need to do the memory
+            // access
+            val,
+            is_seqcst,
+            loads: RefCell::new(FxHashMap::default()),
+        };
+        self.buffer.push_back(store_elem);
+        if self.buffer.len() > STORE_BUFFER_LIMIT {
+            self.buffer.pop_front();
+        }
+        if is_seqcst {
+            // Every store that happens before this needs to be marked as SC
+            // so that in a later SC load, only the last SC store (i.e. this one) or stores that
+            // aren't ordered by hb with the last SC is picked.
+            self.buffer.iter_mut().rev().for_each(|elem| {
+                if elem.timestamp <= thread_clock[elem.store_index] {
+                    elem.is_seqcst = true;
+                }
+            })
+        }
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct StoreElement {
+    /// The identifier of the vector index, corresponding to a thread
+    /// that performed the store.
+    store_index: VectorIdx,
+
+    /// Whether this store is SC.
+    is_seqcst: bool,
+
+    /// The timestamp of the storing thread when it performed the store
+    timestamp: VTimestamp,
+    /// The value of this store
+    val: ScalarMaybeUninit<Tag>,
+
+    /// Timestamp of first loads from this store element by each thread
+    /// Behind a RefCell to keep load op take &self
+    loads: RefCell<FxHashMap<VectorIdx, VTimestamp>>,
+}
+
+impl StoreElement {
+    /// ATOMIC LOAD IMPL in the paper
+    /// Unlike the operational semantics in the paper, we don't need to keep track
+    /// of the thread timestamp for every single load. Keeping track of the first (smallest)
+    /// timestamp of each thread that has loaded from a store is sufficient: if the earliest
+    /// load of another thread happens before the current one, then we must stop searching the store
+    /// buffer regardless of subsequent loads by the same thread; if the earliest load of another
+    /// thread doesn't happen before the current one, then no subsequent load by the other thread
+    /// can happen before the current one.
+    fn load_impl(&self, index: VectorIdx, clocks: &ThreadClockSet) -> ScalarMaybeUninit<Tag> {
+        let _ = self.loads.borrow_mut().try_insert(index, clocks.clock[index]);
+        self.val
+    }
+}
index bd3d1de7c23e220a2dc17bb72e0d761b4611d80a..b8e780ade1a03688568289315a53c4bca8dc53fc 100644 (file)
@@ -63,6 +63,28 @@ fn reads_value(loc: &AtomicUsize, val: usize) -> usize {
     val
 }
 
+// https://plv.mpi-sws.org/scfix/paper.pdf
+// Test case SB
+fn test_sc_store_buffering() {
+    let x = static_atomic(0);
+    let y = static_atomic(0);
+
+    let j1 = spawn(move || {
+        x.store(1, SeqCst);
+        y.load(SeqCst)
+    });
+
+    let j2 = spawn(move || {
+        y.store(1, SeqCst);
+        x.load(SeqCst)
+    });
+
+    let a = j1.join().unwrap();
+    let b = j2.join().unwrap();
+
+    assert_ne!((a, b), (0, 0));
+}
+
 // https://plv.mpi-sws.org/scfix/paper.pdf
 // 2.2 Second Problem: SC Fences are Too Weak
 fn test_rwc_syncs() {
@@ -247,6 +269,7 @@ pub fn main() {
     // prehaps each function should be its own test case so they
     // can be run in parallel
     for _ in 0..500 {
+        test_sc_store_buffering();
         test_mixed_access();
         test_load_buffering_acq_rel();
         test_message_passing();