]> git.lizzy.rs Git - rust.git/commitdiff
Move thread parker to sys_common.
authorMara Bos <m-ou.se@m-ou.se>
Sun, 27 Sep 2020 10:27:27 +0000 (12:27 +0200)
committerMara Bos <m-ou.se@m-ou.se>
Sun, 27 Sep 2020 10:28:58 +0000 (12:28 +0200)
library/std/src/sys_common/mod.rs
library/std/src/sys_common/thread_parker/futex.rs [new file with mode: 0644]
library/std/src/sys_common/thread_parker/generic.rs [new file with mode: 0644]
library/std/src/sys_common/thread_parker/mod.rs [new file with mode: 0644]
library/std/src/thread/mod.rs
library/std/src/thread/parker/futex.rs [deleted file]
library/std/src/thread/parker/generic.rs [deleted file]
library/std/src/thread/parker/mod.rs [deleted file]

index 28cdfefb12a08b76742746c3582e89efe56edd2d..234b257aa926e834666398a9a27796e05b12a871 100644 (file)
@@ -66,6 +66,7 @@ macro_rules! rtunwrap {
 pub mod thread_info;
 pub mod thread_local_dtor;
 pub mod thread_local_key;
+pub mod thread_parker;
 pub mod util;
 pub mod wtf8;
 
diff --git a/library/std/src/sys_common/thread_parker/futex.rs b/library/std/src/sys_common/thread_parker/futex.rs
new file mode 100644 (file)
index 0000000..a5d4927
--- /dev/null
@@ -0,0 +1,93 @@
+use crate::sync::atomic::AtomicI32;
+use crate::sync::atomic::Ordering::{Acquire, Release};
+use crate::sys::futex::{futex_wait, futex_wake};
+use crate::time::Duration;
+
+const PARKED: i32 = -1;
+const EMPTY: i32 = 0;
+const NOTIFIED: i32 = 1;
+
+pub struct Parker {
+    state: AtomicI32,
+}
+
+// Notes about memory ordering:
+//
+// Memory ordering is only relevant for the relative ordering of operations
+// between different variables. Even Ordering::Relaxed guarantees a
+// monotonic/consistent order when looking at just a single atomic variable.
+//
+// So, since this parker is just a single atomic variable, we only need to look
+// at the ordering guarantees we need to provide to the 'outside world'.
+//
+// The only memory ordering guarantee that parking and unparking provide, is
+// that things which happened before unpark() are visible on the thread
+// returning from park() afterwards. Otherwise, it was effectively unparked
+// before unpark() was called while still consuming the 'token'.
+//
+// In other words, unpark() needs to synchronize with the part of park() that
+// consumes the token and returns.
+//
+// This is done with a release-acquire synchronization, by using
+// Ordering::Release when writing NOTIFIED (the 'token') in unpark(), and using
+// Ordering::Acquire when checking for this state in park().
+impl Parker {
+    #[inline]
+    pub const fn new() -> Self {
+        Parker { state: AtomicI32::new(EMPTY) }
+    }
+
+    // Assumes this is only called by the thread that owns the Parker,
+    // which means that `self.state != PARKED`.
+    pub unsafe fn park(&self) {
+        // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
+        // first case.
+        if self.state.fetch_sub(1, Acquire) == NOTIFIED {
+            return;
+        }
+        loop {
+            // Wait for something to happen, assuming it's still set to PARKED.
+            futex_wait(&self.state, PARKED, None);
+            // Change NOTIFIED=>EMPTY and return in that case.
+            if self.state.compare_and_swap(NOTIFIED, EMPTY, Acquire) == NOTIFIED {
+                return;
+            } else {
+                // Spurious wake up. We loop to try again.
+            }
+        }
+    }
+
+    // Assumes this is only called by the thread that owns the Parker,
+    // which means that `self.state != PARKED`.
+    pub unsafe fn park_timeout(&self, timeout: Duration) {
+        // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
+        // first case.
+        if self.state.fetch_sub(1, Acquire) == NOTIFIED {
+            return;
+        }
+        // Wait for something to happen, assuming it's still set to PARKED.
+        futex_wait(&self.state, PARKED, Some(timeout));
+        // This is not just a store, because we need to establish a
+        // release-acquire ordering with unpark().
+        if self.state.swap(EMPTY, Acquire) == NOTIFIED {
+            // Woke up because of unpark().
+        } else {
+            // Timeout or spurious wake up.
+            // We return either way, because we can't easily tell if it was the
+            // timeout or not.
+        }
+    }
+
+    #[inline]
+    pub fn unpark(&self) {
+        // Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and
+        // wake the thread in the first case.
+        //
+        // Note that even NOTIFIED=>NOTIFIED results in a write. This is on
+        // purpose, to make sure every unpark() has a release-acquire ordering
+        // with park().
+        if self.state.swap(NOTIFIED, Release) == PARKED {
+            futex_wake(&self.state);
+        }
+    }
+}
diff --git a/library/std/src/sys_common/thread_parker/generic.rs b/library/std/src/sys_common/thread_parker/generic.rs
new file mode 100644 (file)
index 0000000..14cfa95
--- /dev/null
@@ -0,0 +1,119 @@
+//! Parker implementaiton based on a Mutex and Condvar.
+
+use crate::sync::atomic::AtomicUsize;
+use crate::sync::atomic::Ordering::SeqCst;
+use crate::sync::{Condvar, Mutex};
+use crate::time::Duration;
+
+const EMPTY: usize = 0;
+const PARKED: usize = 1;
+const NOTIFIED: usize = 2;
+
+pub struct Parker {
+    state: AtomicUsize,
+    lock: Mutex<()>,
+    cvar: Condvar,
+}
+
+impl Parker {
+    pub fn new() -> Self {
+        Parker { state: AtomicUsize::new(EMPTY), lock: Mutex::new(()), cvar: Condvar::new() }
+    }
+
+    // This implementaiton doesn't require `unsafe`, but other implementations
+    // may assume this is only called by the thread that owns the Parker.
+    pub unsafe fn park(&self) {
+        // If we were previously notified then we consume this notification and
+        // return quickly.
+        if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
+            return;
+        }
+
+        // Otherwise we need to coordinate going to sleep
+        let mut m = self.lock.lock().unwrap();
+        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
+            Ok(_) => {}
+            Err(NOTIFIED) => {
+                // We must read here, even though we know it will be `NOTIFIED`.
+                // This is because `unpark` may have been called again since we read
+                // `NOTIFIED` in the `compare_exchange` above. We must perform an
+                // acquire operation that synchronizes with that `unpark` to observe
+                // any writes it made before the call to unpark. To do that we must
+                // read from the write it made to `state`.
+                let old = self.state.swap(EMPTY, SeqCst);
+                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+                return;
+            } // should consume this notification, so prohibit spurious wakeups in next park.
+            Err(_) => panic!("inconsistent park state"),
+        }
+        loop {
+            m = self.cvar.wait(m).unwrap();
+            match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
+                Ok(_) => return, // got a notification
+                Err(_) => {}     // spurious wakeup, go back to sleep
+            }
+        }
+    }
+
+    // This implementaiton doesn't require `unsafe`, but other implementations
+    // may assume this is only called by the thread that owns the Parker.
+    pub unsafe fn park_timeout(&self, dur: Duration) {
+        // Like `park` above we have a fast path for an already-notified thread, and
+        // afterwards we start coordinating for a sleep.
+        // return quickly.
+        if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
+            return;
+        }
+        let m = self.lock.lock().unwrap();
+        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
+            Ok(_) => {}
+            Err(NOTIFIED) => {
+                // We must read again here, see `park`.
+                let old = self.state.swap(EMPTY, SeqCst);
+                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+                return;
+            } // should consume this notification, so prohibit spurious wakeups in next park.
+            Err(_) => panic!("inconsistent park_timeout state"),
+        }
+
+        // Wait with a timeout, and if we spuriously wake up or otherwise wake up
+        // from a notification we just want to unconditionally set the state back to
+        // empty, either consuming a notification or un-flagging ourselves as
+        // parked.
+        let (_m, _result) = self.cvar.wait_timeout(m, dur).unwrap();
+        match self.state.swap(EMPTY, SeqCst) {
+            NOTIFIED => {} // got a notification, hurray!
+            PARKED => {}   // no notification, alas
+            n => panic!("inconsistent park_timeout state: {}", n),
+        }
+    }
+
+    pub fn unpark(&self) {
+        // To ensure the unparked thread will observe any writes we made
+        // before this call, we must perform a release operation that `park`
+        // can synchronize with. To do that we must write `NOTIFIED` even if
+        // `state` is already `NOTIFIED`. That is why this must be a swap
+        // rather than a compare-and-swap that returns if it reads `NOTIFIED`
+        // on failure.
+        match self.state.swap(NOTIFIED, SeqCst) {
+            EMPTY => return,    // no one was waiting
+            NOTIFIED => return, // already unparked
+            PARKED => {}        // gotta go wake someone up
+            _ => panic!("inconsistent state in unpark"),
+        }
+
+        // There is a period between when the parked thread sets `state` to
+        // `PARKED` (or last checked `state` in the case of a spurious wake
+        // up) and when it actually waits on `cvar`. If we were to notify
+        // during this period it would be ignored and then when the parked
+        // thread went to sleep it would never wake up. Fortunately, it has
+        // `lock` locked at this stage so we can acquire `lock` to wait until
+        // it is ready to receive the notification.
+        //
+        // Releasing `lock` before the call to `notify_one` means that when the
+        // parked thread wakes it doesn't get woken only to have to wait for us
+        // to release `lock`.
+        drop(self.lock.lock().unwrap());
+        self.cvar.notify_one()
+    }
+}
diff --git a/library/std/src/sys_common/thread_parker/mod.rs b/library/std/src/sys_common/thread_parker/mod.rs
new file mode 100644 (file)
index 0000000..23c17c8
--- /dev/null
@@ -0,0 +1,9 @@
+cfg_if::cfg_if! {
+    if #[cfg(any(target_os = "linux", target_os = "android"))] {
+        mod futex;
+        pub use futex::Parker;
+    } else {
+        mod generic;
+        pub use generic::Parker;
+    }
+}
index 45430e58cbbc7da38e71945617b28308d2c8da40..fb2fbb5bf2dbd16aff878af05f7c2ee4704bc119 100644 (file)
 #[cfg(all(test, not(target_os = "emscripten")))]
 mod tests;
 
-mod parker;
-
 use crate::any::Any;
 use crate::cell::UnsafeCell;
 use crate::ffi::{CStr, CString};
 use crate::sys_common::mutex;
 use crate::sys_common::thread;
 use crate::sys_common::thread_info;
+use crate::sys_common::thread_parker::Parker;
 use crate::sys_common::{AsInner, IntoInner};
 use crate::time::Duration;
-use parker::Parker;
 
 ////////////////////////////////////////////////////////////////////////////////
 // Thread-local storage
diff --git a/library/std/src/thread/parker/futex.rs b/library/std/src/thread/parker/futex.rs
deleted file mode 100644 (file)
index a5d4927..0000000
+++ /dev/null
@@ -1,93 +0,0 @@
-use crate::sync::atomic::AtomicI32;
-use crate::sync::atomic::Ordering::{Acquire, Release};
-use crate::sys::futex::{futex_wait, futex_wake};
-use crate::time::Duration;
-
-const PARKED: i32 = -1;
-const EMPTY: i32 = 0;
-const NOTIFIED: i32 = 1;
-
-pub struct Parker {
-    state: AtomicI32,
-}
-
-// Notes about memory ordering:
-//
-// Memory ordering is only relevant for the relative ordering of operations
-// between different variables. Even Ordering::Relaxed guarantees a
-// monotonic/consistent order when looking at just a single atomic variable.
-//
-// So, since this parker is just a single atomic variable, we only need to look
-// at the ordering guarantees we need to provide to the 'outside world'.
-//
-// The only memory ordering guarantee that parking and unparking provide, is
-// that things which happened before unpark() are visible on the thread
-// returning from park() afterwards. Otherwise, it was effectively unparked
-// before unpark() was called while still consuming the 'token'.
-//
-// In other words, unpark() needs to synchronize with the part of park() that
-// consumes the token and returns.
-//
-// This is done with a release-acquire synchronization, by using
-// Ordering::Release when writing NOTIFIED (the 'token') in unpark(), and using
-// Ordering::Acquire when checking for this state in park().
-impl Parker {
-    #[inline]
-    pub const fn new() -> Self {
-        Parker { state: AtomicI32::new(EMPTY) }
-    }
-
-    // Assumes this is only called by the thread that owns the Parker,
-    // which means that `self.state != PARKED`.
-    pub unsafe fn park(&self) {
-        // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
-        // first case.
-        if self.state.fetch_sub(1, Acquire) == NOTIFIED {
-            return;
-        }
-        loop {
-            // Wait for something to happen, assuming it's still set to PARKED.
-            futex_wait(&self.state, PARKED, None);
-            // Change NOTIFIED=>EMPTY and return in that case.
-            if self.state.compare_and_swap(NOTIFIED, EMPTY, Acquire) == NOTIFIED {
-                return;
-            } else {
-                // Spurious wake up. We loop to try again.
-            }
-        }
-    }
-
-    // Assumes this is only called by the thread that owns the Parker,
-    // which means that `self.state != PARKED`.
-    pub unsafe fn park_timeout(&self, timeout: Duration) {
-        // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
-        // first case.
-        if self.state.fetch_sub(1, Acquire) == NOTIFIED {
-            return;
-        }
-        // Wait for something to happen, assuming it's still set to PARKED.
-        futex_wait(&self.state, PARKED, Some(timeout));
-        // This is not just a store, because we need to establish a
-        // release-acquire ordering with unpark().
-        if self.state.swap(EMPTY, Acquire) == NOTIFIED {
-            // Woke up because of unpark().
-        } else {
-            // Timeout or spurious wake up.
-            // We return either way, because we can't easily tell if it was the
-            // timeout or not.
-        }
-    }
-
-    #[inline]
-    pub fn unpark(&self) {
-        // Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and
-        // wake the thread in the first case.
-        //
-        // Note that even NOTIFIED=>NOTIFIED results in a write. This is on
-        // purpose, to make sure every unpark() has a release-acquire ordering
-        // with park().
-        if self.state.swap(NOTIFIED, Release) == PARKED {
-            futex_wake(&self.state);
-        }
-    }
-}
diff --git a/library/std/src/thread/parker/generic.rs b/library/std/src/thread/parker/generic.rs
deleted file mode 100644 (file)
index 14cfa95..0000000
+++ /dev/null
@@ -1,119 +0,0 @@
-//! Parker implementaiton based on a Mutex and Condvar.
-
-use crate::sync::atomic::AtomicUsize;
-use crate::sync::atomic::Ordering::SeqCst;
-use crate::sync::{Condvar, Mutex};
-use crate::time::Duration;
-
-const EMPTY: usize = 0;
-const PARKED: usize = 1;
-const NOTIFIED: usize = 2;
-
-pub struct Parker {
-    state: AtomicUsize,
-    lock: Mutex<()>,
-    cvar: Condvar,
-}
-
-impl Parker {
-    pub fn new() -> Self {
-        Parker { state: AtomicUsize::new(EMPTY), lock: Mutex::new(()), cvar: Condvar::new() }
-    }
-
-    // This implementaiton doesn't require `unsafe`, but other implementations
-    // may assume this is only called by the thread that owns the Parker.
-    pub unsafe fn park(&self) {
-        // If we were previously notified then we consume this notification and
-        // return quickly.
-        if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
-            return;
-        }
-
-        // Otherwise we need to coordinate going to sleep
-        let mut m = self.lock.lock().unwrap();
-        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
-            Ok(_) => {}
-            Err(NOTIFIED) => {
-                // We must read here, even though we know it will be `NOTIFIED`.
-                // This is because `unpark` may have been called again since we read
-                // `NOTIFIED` in the `compare_exchange` above. We must perform an
-                // acquire operation that synchronizes with that `unpark` to observe
-                // any writes it made before the call to unpark. To do that we must
-                // read from the write it made to `state`.
-                let old = self.state.swap(EMPTY, SeqCst);
-                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
-                return;
-            } // should consume this notification, so prohibit spurious wakeups in next park.
-            Err(_) => panic!("inconsistent park state"),
-        }
-        loop {
-            m = self.cvar.wait(m).unwrap();
-            match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
-                Ok(_) => return, // got a notification
-                Err(_) => {}     // spurious wakeup, go back to sleep
-            }
-        }
-    }
-
-    // This implementaiton doesn't require `unsafe`, but other implementations
-    // may assume this is only called by the thread that owns the Parker.
-    pub unsafe fn park_timeout(&self, dur: Duration) {
-        // Like `park` above we have a fast path for an already-notified thread, and
-        // afterwards we start coordinating for a sleep.
-        // return quickly.
-        if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
-            return;
-        }
-        let m = self.lock.lock().unwrap();
-        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
-            Ok(_) => {}
-            Err(NOTIFIED) => {
-                // We must read again here, see `park`.
-                let old = self.state.swap(EMPTY, SeqCst);
-                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
-                return;
-            } // should consume this notification, so prohibit spurious wakeups in next park.
-            Err(_) => panic!("inconsistent park_timeout state"),
-        }
-
-        // Wait with a timeout, and if we spuriously wake up or otherwise wake up
-        // from a notification we just want to unconditionally set the state back to
-        // empty, either consuming a notification or un-flagging ourselves as
-        // parked.
-        let (_m, _result) = self.cvar.wait_timeout(m, dur).unwrap();
-        match self.state.swap(EMPTY, SeqCst) {
-            NOTIFIED => {} // got a notification, hurray!
-            PARKED => {}   // no notification, alas
-            n => panic!("inconsistent park_timeout state: {}", n),
-        }
-    }
-
-    pub fn unpark(&self) {
-        // To ensure the unparked thread will observe any writes we made
-        // before this call, we must perform a release operation that `park`
-        // can synchronize with. To do that we must write `NOTIFIED` even if
-        // `state` is already `NOTIFIED`. That is why this must be a swap
-        // rather than a compare-and-swap that returns if it reads `NOTIFIED`
-        // on failure.
-        match self.state.swap(NOTIFIED, SeqCst) {
-            EMPTY => return,    // no one was waiting
-            NOTIFIED => return, // already unparked
-            PARKED => {}        // gotta go wake someone up
-            _ => panic!("inconsistent state in unpark"),
-        }
-
-        // There is a period between when the parked thread sets `state` to
-        // `PARKED` (or last checked `state` in the case of a spurious wake
-        // up) and when it actually waits on `cvar`. If we were to notify
-        // during this period it would be ignored and then when the parked
-        // thread went to sleep it would never wake up. Fortunately, it has
-        // `lock` locked at this stage so we can acquire `lock` to wait until
-        // it is ready to receive the notification.
-        //
-        // Releasing `lock` before the call to `notify_one` means that when the
-        // parked thread wakes it doesn't get woken only to have to wait for us
-        // to release `lock`.
-        drop(self.lock.lock().unwrap());
-        self.cvar.notify_one()
-    }
-}
diff --git a/library/std/src/thread/parker/mod.rs b/library/std/src/thread/parker/mod.rs
deleted file mode 100644 (file)
index 23c17c8..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-cfg_if::cfg_if! {
-    if #[cfg(any(target_os = "linux", target_os = "android"))] {
-        mod futex;
-        pub use futex::Parker;
-    } else {
-        mod generic;
-        pub use generic::Parker;
-    }
-}