]> git.lizzy.rs Git - rust.git/commitdiff
std: use semaphore for thread parking on Apple platforms
authorjoboet <jonasboettiger@icloud.com>
Thu, 6 Oct 2022 20:46:15 +0000 (22:46 +0200)
committerjoboet <jonasboettiger@icloud.com>
Thu, 6 Oct 2022 20:46:15 +0000 (22:46 +0200)
library/std/src/sys/unix/thread_parker/darwin.rs [new file with mode: 0644]
library/std/src/sys/unix/thread_parker/mod.rs

diff --git a/library/std/src/sys/unix/thread_parker/darwin.rs b/library/std/src/sys/unix/thread_parker/darwin.rs
new file mode 100644 (file)
index 0000000..510839d
--- /dev/null
@@ -0,0 +1,131 @@
+//! Thread parking for Darwin-based systems.
+//!
+//! Darwin actually has futex syscalls (`__ulock_wait`/`__ulock_wake`), but they
+//! cannot be used in `std` because they are non-public (their use will lead to
+//! rejection from the App Store) and because they are only available starting
+//! with macOS version 10.12, even though the minimum target version is 10.7.
+//!
+//! Therefore, we need to look for other synchronization primitives. Luckily, Darwin
+//! supports semaphores, which allow us to implement the behaviour we need with
+//! only one primitive (as opposed to a mutex-condvar pair). We use the semaphore
+//! provided by libdispatch, as the underlying Mach semaphore is only dubiously
+//! public.
+
+use crate::pin::Pin;
+use crate::sync::atomic::{
+    AtomicI8,
+    Ordering::{Acquire, Release},
+};
+use crate::time::Duration;
+
+type dispatch_semaphore_t = *mut crate::ffi::c_void;
+type dispatch_time_t = u64;
+
+const DISPATCH_TIME_NOW: dispatch_time_t = 0;
+const DISPATCH_TIME_FOREVER: dispatch_time_t = !0;
+
+#[link(name = "System", kind = "dylib")]
+extern "C" {
+    fn dispatch_time(when: dispatch_time_t, delta: i64) -> dispatch_time_t;
+    fn dispatch_semaphore_create(val: isize) -> dispatch_semaphore_t;
+    fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) -> isize;
+    fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) -> isize;
+    fn dispatch_release(object: *mut crate::ffi::c_void);
+}
+
+const EMPTY: i8 = 0;
+const NOTIFIED: i8 = 1;
+const PARKED: i8 = -1;
+
+pub struct Parker {
+    semaphore: dispatch_semaphore_t,
+    state: AtomicI8,
+}
+
+unsafe impl Sync for Parker {}
+unsafe impl Send for Parker {}
+
+impl Parker {
+    pub unsafe fn new(parker: *mut Parker) {
+        let semaphore = dispatch_semaphore_create(0);
+        assert!(
+            !semaphore.is_null(),
+            "failed to create dispatch semaphore for thread synchronization"
+        );
+        parker.write(Parker { semaphore, state: AtomicI8::new(EMPTY) })
+    }
+
+    // Does not need `Pin`, but other implementation do.
+    pub unsafe fn park(self: Pin<&Self>) {
+        // The semaphore counter must be zero at this point, because unparking
+        // threads will not actually increase it until we signalled that we
+        // are waiting.
+
+        // Change NOTIFIED to EMPTY and EMPTY to PARKED.
+        if self.state.fetch_sub(1, Acquire) == NOTIFIED {
+            return;
+        }
+
+        // Another thread may increase the semaphore counter from this point on.
+        // If it is faster than us, we will decrement it again immediately below.
+        // If we are faster, we wait.
+
+        // Ensure that the semaphore counter has actually been decremented, even
+        // if the call timed out for some reason.
+        while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {}
+
+        // At this point, the semaphore counter is zero again.
+
+        // We were definitely woken up, so we don't need to check the state.
+        // Still, we need to reset the state using a swap to observe the state
+        // change with acquire ordering.
+        self.state.swap(EMPTY, Acquire);
+    }
+
+    // Does not need `Pin`, but other implementation do.
+    pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
+        if self.state.fetch_sub(1, Acquire) == NOTIFIED {
+            return;
+        }
+
+        let nanos = dur.as_nanos().try_into().unwrap_or(i64::MAX);
+        let timeout = dispatch_time(DISPATCH_TIME_NOW, nanos);
+
+        let timeout = dispatch_semaphore_wait(self.semaphore, timeout) != 0;
+
+        let state = self.state.swap(EMPTY, Acquire);
+        if state == NOTIFIED && timeout {
+            // If the state was NOTIFIED but semaphore_wait returned without
+            // decrementing the count because of a timeout, it means another
+            // thread is about to call semaphore_signal. We must wait for that
+            // to happen to ensure the semaphore count is reset.
+            while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {}
+        } else {
+            // Either a timeout occurred and we reset the state before any thread
+            // tried to wake us up, or we were woken up and reset the state,
+            // making sure to observe the state change with acquire ordering.
+            // Either way, the semaphore counter is now zero again.
+        }
+    }
+
+    // Does not need `Pin`, but other implementation do.
+    pub fn unpark(self: Pin<&Self>) {
+        let state = self.state.swap(NOTIFIED, Release);
+        if state == PARKED {
+            unsafe {
+                dispatch_semaphore_signal(self.semaphore);
+            }
+        }
+    }
+}
+
+impl Drop for Parker {
+    fn drop(&mut self) {
+        // SAFETY:
+        // We always ensure that the semaphore count is reset, so this will
+        // never cause an exception.
+        unsafe {
+            dispatch_release(self.semaphore);
+        }
+    }
+}
index e2453580dc72a62ffa3d2c60870426969de5e487..724ec2d482edb6330828fc7e6b7e93f05693a7a7 100644 (file)
 )))]
 
 cfg_if::cfg_if! {
-    if #[cfg(target_os = "netbsd")] {
+    if #[cfg(any(
+        target_os = "macos",
+        target_os = "ios",
+        target_os = "watchos",
+        target_os = "tvos",
+    ))] {
+        mod darwin;
+        pub use darwin::Parker;
+    } else if #[cfg(target_os = "netbsd")] {
         mod netbsd;
         pub use netbsd::Parker;
     } else {