]> git.lizzy.rs Git - rust.git/commitdiff
std: optimize thread parking on NetBSD
authorjoboet <jonasboettiger@icloud.com>
Tue, 6 Sep 2022 12:32:45 +0000 (14:32 +0200)
committerjoboet <jonasboettiger@icloud.com>
Sat, 10 Sep 2022 18:03:15 +0000 (20:03 +0200)
library/std/src/sys/unix/thread_parker.rs [deleted file]
library/std/src/sys/unix/thread_parker/mod.rs [new file with mode: 0644]
library/std/src/sys/unix/thread_parker/netbsd.rs [new file with mode: 0644]
library/std/src/sys/unix/thread_parker/pthread.rs [new file with mode: 0644]

diff --git a/library/std/src/sys/unix/thread_parker.rs b/library/std/src/sys/unix/thread_parker.rs
deleted file mode 100644 (file)
index ca1a713..0000000
+++ /dev/null
@@ -1,281 +0,0 @@
-//! Thread parking without `futex` using the `pthread` synchronization primitives.
-
-#![cfg(not(any(
-    target_os = "linux",
-    target_os = "android",
-    all(target_os = "emscripten", target_feature = "atomics"),
-    target_os = "freebsd",
-    target_os = "openbsd",
-    target_os = "dragonfly",
-    target_os = "fuchsia",
-)))]
-
-use crate::cell::UnsafeCell;
-use crate::marker::PhantomPinned;
-use crate::pin::Pin;
-use crate::ptr::addr_of_mut;
-use crate::sync::atomic::AtomicUsize;
-use crate::sync::atomic::Ordering::SeqCst;
-use crate::time::Duration;
-
-const EMPTY: usize = 0;
-const PARKED: usize = 1;
-const NOTIFIED: usize = 2;
-
-unsafe fn lock(lock: *mut libc::pthread_mutex_t) {
-    let r = libc::pthread_mutex_lock(lock);
-    debug_assert_eq!(r, 0);
-}
-
-unsafe fn unlock(lock: *mut libc::pthread_mutex_t) {
-    let r = libc::pthread_mutex_unlock(lock);
-    debug_assert_eq!(r, 0);
-}
-
-unsafe fn notify_one(cond: *mut libc::pthread_cond_t) {
-    let r = libc::pthread_cond_signal(cond);
-    debug_assert_eq!(r, 0);
-}
-
-unsafe fn wait(cond: *mut libc::pthread_cond_t, lock: *mut libc::pthread_mutex_t) {
-    let r = libc::pthread_cond_wait(cond, lock);
-    debug_assert_eq!(r, 0);
-}
-
-const TIMESPEC_MAX: libc::timespec =
-    libc::timespec { tv_sec: <libc::time_t>::MAX, tv_nsec: 1_000_000_000 - 1 };
-
-unsafe fn wait_timeout(
-    cond: *mut libc::pthread_cond_t,
-    lock: *mut libc::pthread_mutex_t,
-    dur: Duration,
-) {
-    // Use the system clock on systems that do not support pthread_condattr_setclock.
-    // This unfortunately results in problems when the system time changes.
-    #[cfg(any(
-        target_os = "macos",
-        target_os = "ios",
-        target_os = "watchos",
-        target_os = "espidf"
-    ))]
-    let (now, dur) = {
-        use super::time::SystemTime;
-        use crate::cmp::min;
-
-        // OSX implementation of `pthread_cond_timedwait` is buggy
-        // with super long durations. When duration is greater than
-        // 0x100_0000_0000_0000 seconds, `pthread_cond_timedwait`
-        // in macOS Sierra return error 316.
-        //
-        // This program demonstrates the issue:
-        // https://gist.github.com/stepancheg/198db4623a20aad2ad7cddb8fda4a63c
-        //
-        // To work around this issue, and possible bugs of other OSes, timeout
-        // is clamped to 1000 years, which is allowable per the API of `park_timeout`
-        // because of spurious wakeups.
-        let dur = min(dur, Duration::from_secs(1000 * 365 * 86400));
-        let now = SystemTime::now().t;
-        (now, dur)
-    };
-    // Use the monotonic clock on other systems.
-    #[cfg(not(any(
-        target_os = "macos",
-        target_os = "ios",
-        target_os = "watchos",
-        target_os = "espidf"
-    )))]
-    let (now, dur) = {
-        use super::time::Timespec;
-
-        (Timespec::now(libc::CLOCK_MONOTONIC), dur)
-    };
-
-    let timeout =
-        now.checked_add_duration(&dur).and_then(|t| t.to_timespec()).unwrap_or(TIMESPEC_MAX);
-    let r = libc::pthread_cond_timedwait(cond, lock, &timeout);
-    debug_assert!(r == libc::ETIMEDOUT || r == 0);
-}
-
-pub struct Parker {
-    state: AtomicUsize,
-    lock: UnsafeCell<libc::pthread_mutex_t>,
-    cvar: UnsafeCell<libc::pthread_cond_t>,
-    // The `pthread` primitives require a stable address, so make this struct `!Unpin`.
-    _pinned: PhantomPinned,
-}
-
-impl Parker {
-    /// Construct the UNIX parker in-place.
-    ///
-    /// # Safety
-    /// The constructed parker must never be moved.
-    pub unsafe fn new(parker: *mut Parker) {
-        // Use the default mutex implementation to allow for simpler initialization.
-        // This could lead to undefined behaviour when deadlocking. This is avoided
-        // by not deadlocking. Note in particular the unlocking operation before any
-        // panic, as code after the panic could try to park again.
-        addr_of_mut!((*parker).state).write(AtomicUsize::new(EMPTY));
-        addr_of_mut!((*parker).lock).write(UnsafeCell::new(libc::PTHREAD_MUTEX_INITIALIZER));
-
-        cfg_if::cfg_if! {
-            if #[cfg(any(
-                target_os = "macos",
-                target_os = "ios",
-                target_os = "watchos",
-                target_os = "l4re",
-                target_os = "android",
-                target_os = "redox"
-            ))] {
-                addr_of_mut!((*parker).cvar).write(UnsafeCell::new(libc::PTHREAD_COND_INITIALIZER));
-            } else if #[cfg(any(target_os = "espidf", target_os = "horizon"))] {
-                let r = libc::pthread_cond_init(addr_of_mut!((*parker).cvar).cast(), crate::ptr::null());
-                assert_eq!(r, 0);
-            } else {
-                use crate::mem::MaybeUninit;
-                let mut attr = MaybeUninit::<libc::pthread_condattr_t>::uninit();
-                let r = libc::pthread_condattr_init(attr.as_mut_ptr());
-                assert_eq!(r, 0);
-                let r = libc::pthread_condattr_setclock(attr.as_mut_ptr(), libc::CLOCK_MONOTONIC);
-                assert_eq!(r, 0);
-                let r = libc::pthread_cond_init(addr_of_mut!((*parker).cvar).cast(), attr.as_ptr());
-                assert_eq!(r, 0);
-                let r = libc::pthread_condattr_destroy(attr.as_mut_ptr());
-                assert_eq!(r, 0);
-            }
-        }
-    }
-
-    // This implementation doesn't require `unsafe`, but other implementations
-    // may assume this is only called by the thread that owns the Parker.
-    pub unsafe fn park(self: Pin<&Self>) {
-        // If we were previously notified then we consume this notification and
-        // return quickly.
-        if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
-            return;
-        }
-
-        // Otherwise we need to coordinate going to sleep
-        lock(self.lock.get());
-        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
-            Ok(_) => {}
-            Err(NOTIFIED) => {
-                // We must read here, even though we know it will be `NOTIFIED`.
-                // This is because `unpark` may have been called again since we read
-                // `NOTIFIED` in the `compare_exchange` above. We must perform an
-                // acquire operation that synchronizes with that `unpark` to observe
-                // any writes it made before the call to unpark. To do that we must
-                // read from the write it made to `state`.
-                let old = self.state.swap(EMPTY, SeqCst);
-
-                unlock(self.lock.get());
-
-                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
-                return;
-            } // should consume this notification, so prohibit spurious wakeups in next park.
-            Err(_) => {
-                unlock(self.lock.get());
-
-                panic!("inconsistent park state")
-            }
-        }
-
-        loop {
-            wait(self.cvar.get(), self.lock.get());
-
-            match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
-                Ok(_) => break, // got a notification
-                Err(_) => {}    // spurious wakeup, go back to sleep
-            }
-        }
-
-        unlock(self.lock.get());
-    }
-
-    // This implementation doesn't require `unsafe`, but other implementations
-    // may assume this is only called by the thread that owns the Parker. Use
-    // `Pin` to guarantee a stable address for the mutex and condition variable.
-    pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
-        // Like `park` above we have a fast path for an already-notified thread, and
-        // afterwards we start coordinating for a sleep.
-        // return quickly.
-        if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
-            return;
-        }
-
-        lock(self.lock.get());
-        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
-            Ok(_) => {}
-            Err(NOTIFIED) => {
-                // We must read again here, see `park`.
-                let old = self.state.swap(EMPTY, SeqCst);
-                unlock(self.lock.get());
-
-                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
-                return;
-            } // should consume this notification, so prohibit spurious wakeups in next park.
-            Err(_) => {
-                unlock(self.lock.get());
-                panic!("inconsistent park_timeout state")
-            }
-        }
-
-        // Wait with a timeout, and if we spuriously wake up or otherwise wake up
-        // from a notification we just want to unconditionally set the state back to
-        // empty, either consuming a notification or un-flagging ourselves as
-        // parked.
-        wait_timeout(self.cvar.get(), self.lock.get(), dur);
-
-        match self.state.swap(EMPTY, SeqCst) {
-            NOTIFIED => unlock(self.lock.get()), // got a notification, hurray!
-            PARKED => unlock(self.lock.get()),   // no notification, alas
-            n => {
-                unlock(self.lock.get());
-                panic!("inconsistent park_timeout state: {n}")
-            }
-        }
-    }
-
-    pub fn unpark(self: Pin<&Self>) {
-        // To ensure the unparked thread will observe any writes we made
-        // before this call, we must perform a release operation that `park`
-        // can synchronize with. To do that we must write `NOTIFIED` even if
-        // `state` is already `NOTIFIED`. That is why this must be a swap
-        // rather than a compare-and-swap that returns if it reads `NOTIFIED`
-        // on failure.
-        match self.state.swap(NOTIFIED, SeqCst) {
-            EMPTY => return,    // no one was waiting
-            NOTIFIED => return, // already unparked
-            PARKED => {}        // gotta go wake someone up
-            _ => panic!("inconsistent state in unpark"),
-        }
-
-        // There is a period between when the parked thread sets `state` to
-        // `PARKED` (or last checked `state` in the case of a spurious wake
-        // up) and when it actually waits on `cvar`. If we were to notify
-        // during this period it would be ignored and then when the parked
-        // thread went to sleep it would never wake up. Fortunately, it has
-        // `lock` locked at this stage so we can acquire `lock` to wait until
-        // it is ready to receive the notification.
-        //
-        // Releasing `lock` before the call to `notify_one` means that when the
-        // parked thread wakes it doesn't get woken only to have to wait for us
-        // to release `lock`.
-        unsafe {
-            lock(self.lock.get());
-            unlock(self.lock.get());
-            notify_one(self.cvar.get());
-        }
-    }
-}
-
-impl Drop for Parker {
-    fn drop(&mut self) {
-        unsafe {
-            libc::pthread_cond_destroy(self.cvar.get_mut());
-            libc::pthread_mutex_destroy(self.lock.get_mut());
-        }
-    }
-}
-
-unsafe impl Sync for Parker {}
-unsafe impl Send for Parker {}
diff --git a/library/std/src/sys/unix/thread_parker/mod.rs b/library/std/src/sys/unix/thread_parker/mod.rs
new file mode 100644 (file)
index 0000000..e245358
--- /dev/null
@@ -0,0 +1,21 @@
+//! Thread parking on systems without futex support.
+
+#![cfg(not(any(
+    target_os = "linux",
+    target_os = "android",
+    all(target_os = "emscripten", target_feature = "atomics"),
+    target_os = "freebsd",
+    target_os = "openbsd",
+    target_os = "dragonfly",
+    target_os = "fuchsia",
+)))]
+
+cfg_if::cfg_if! {
+    if #[cfg(target_os = "netbsd")] {
+        mod netbsd;
+        pub use netbsd::Parker;
+    } else {
+        mod pthread;
+        pub use pthread::Parker;
+    }
+}
diff --git a/library/std/src/sys/unix/thread_parker/netbsd.rs b/library/std/src/sys/unix/thread_parker/netbsd.rs
new file mode 100644 (file)
index 0000000..7657605
--- /dev/null
@@ -0,0 +1,113 @@
+use crate::ffi::{c_int, c_void};
+use crate::pin::Pin;
+use crate::ptr::{null, null_mut};
+use crate::sync::atomic::{
+    AtomicU64,
+    Ordering::{Acquire, Relaxed, Release},
+};
+use crate::time::Duration;
+use libc::{_lwp_self, clockid_t, lwpid_t, time_t, timespec, CLOCK_MONOTONIC};
+
+extern "C" {
+    fn ___lwp_park60(
+        clock_id: clockid_t,
+        flags: c_int,
+        ts: *mut timespec,
+        unpark: lwpid_t,
+        hint: *const c_void,
+        unparkhint: *const c_void,
+    ) -> c_int;
+    fn _lwp_unpark(lwp: lwpid_t, hint: *const c_void) -> c_int;
+}
+
+/// The thread is not parked and the token is not available.
+///
+/// Zero cannot be a valid LWP id, since it is used as empty value for the unpark
+/// argument in _lwp_park.
+const EMPTY: u64 = 0;
+/// The token is available. Do not park anymore.
+const NOTIFIED: u64 = u64::MAX;
+
+pub struct Parker {
+    /// The parker state. Contains either one of the two state values above or the LWP
+    /// id of the parked thread.
+    state: AtomicU64,
+}
+
+impl Parker {
+    pub unsafe fn new(parker: *mut Parker) {
+        parker.write(Parker { state: AtomicU64::new(EMPTY) })
+    }
+
+    // Does not actually need `unsafe` or `Pin`, but the pthread implementation does.
+    pub unsafe fn park(self: Pin<&Self>) {
+        // If the token has already been made available, we can skip
+        // a bit of work, so check for it here.
+        if self.state.load(Acquire) != NOTIFIED {
+            let parked = _lwp_self() as u64;
+            let hint = self.state.as_mut_ptr().cast();
+            if self.state.compare_exchange(EMPTY, parked, Relaxed, Acquire).is_ok() {
+                // Loop to guard against spurious wakeups.
+                loop {
+                    ___lwp_park60(0, 0, null_mut(), 0, hint, null());
+                    if self.state.load(Acquire) == NOTIFIED {
+                        break;
+                    }
+                }
+            }
+        }
+
+        // At this point, the change to NOTIFIED has always been observed with acquire
+        // ordering, so we can just use a relaxed store here (instead of a swap).
+        self.state.store(EMPTY, Relaxed);
+    }
+
+    // Does not actually need `unsafe` or `Pin`, but the pthread implementation does.
+    pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
+        if self.state.load(Acquire) != NOTIFIED {
+            let parked = _lwp_self() as u64;
+            let hint = self.state.as_mut_ptr().cast();
+            let mut timeout = timespec {
+                // Saturate so that the operation will definitely time out
+                // (even if it is after the heat death of the universe).
+                tv_sec: dur.as_secs().try_into().ok().unwrap_or(time_t::MAX),
+                tv_nsec: dur.subsec_nanos().into(),
+            };
+
+            if self.state.compare_exchange(EMPTY, parked, Relaxed, Acquire).is_ok() {
+                // Timeout needs to be mutable since it is modified on NetBSD 9.0 and
+                // above.
+                ___lwp_park60(CLOCK_MONOTONIC, 0, &mut timeout, 0, hint, null());
+                // Use a swap to get acquire ordering even if the token was set after
+                // the timeout occurred.
+                self.state.swap(EMPTY, Acquire);
+                return;
+            }
+        }
+
+        self.state.store(EMPTY, Relaxed);
+    }
+
+    // Does not actually need `Pin`, but the pthread implementation does.
+    pub fn unpark(self: Pin<&Self>) {
+        let state = self.state.swap(NOTIFIED, Release);
+        if !matches!(state, EMPTY | NOTIFIED) {
+            let lwp = state as lwpid_t;
+            let hint = self.state.as_mut_ptr().cast();
+
+            // If the parking thread terminated and did not actually park, this will
+            // probably return an error, which is OK. In the worst case, another
+            // thread has received the same LWP id. It will then receive a spurious
+            // wakeup, but those are allowable per the API contract. The same reasoning
+            // applies if a timeout occurred before this call, but the state was not
+            // yet reset.
+
+            // SAFETY:
+            // The syscall has no invariants to hold. Only unsafe because it is an
+            // extern function.
+            unsafe {
+                _lwp_unpark(lwp, hint);
+            }
+        }
+    }
+}
diff --git a/library/std/src/sys/unix/thread_parker/pthread.rs b/library/std/src/sys/unix/thread_parker/pthread.rs
new file mode 100644 (file)
index 0000000..3dfc002
--- /dev/null
@@ -0,0 +1,271 @@
+//! Thread parking without `futex` using the `pthread` synchronization primitives.
+
+use crate::cell::UnsafeCell;
+use crate::marker::PhantomPinned;
+use crate::pin::Pin;
+use crate::ptr::addr_of_mut;
+use crate::sync::atomic::AtomicUsize;
+use crate::sync::atomic::Ordering::SeqCst;
+use crate::time::Duration;
+
+const EMPTY: usize = 0;
+const PARKED: usize = 1;
+const NOTIFIED: usize = 2;
+
+unsafe fn lock(lock: *mut libc::pthread_mutex_t) {
+    let r = libc::pthread_mutex_lock(lock);
+    debug_assert_eq!(r, 0);
+}
+
+unsafe fn unlock(lock: *mut libc::pthread_mutex_t) {
+    let r = libc::pthread_mutex_unlock(lock);
+    debug_assert_eq!(r, 0);
+}
+
+unsafe fn notify_one(cond: *mut libc::pthread_cond_t) {
+    let r = libc::pthread_cond_signal(cond);
+    debug_assert_eq!(r, 0);
+}
+
+unsafe fn wait(cond: *mut libc::pthread_cond_t, lock: *mut libc::pthread_mutex_t) {
+    let r = libc::pthread_cond_wait(cond, lock);
+    debug_assert_eq!(r, 0);
+}
+
+const TIMESPEC_MAX: libc::timespec =
+    libc::timespec { tv_sec: <libc::time_t>::MAX, tv_nsec: 1_000_000_000 - 1 };
+
+unsafe fn wait_timeout(
+    cond: *mut libc::pthread_cond_t,
+    lock: *mut libc::pthread_mutex_t,
+    dur: Duration,
+) {
+    // Use the system clock on systems that do not support pthread_condattr_setclock.
+    // This unfortunately results in problems when the system time changes.
+    #[cfg(any(
+        target_os = "macos",
+        target_os = "ios",
+        target_os = "watchos",
+        target_os = "espidf"
+    ))]
+    let (now, dur) = {
+        use crate::cmp::min;
+        use crate::sys::time::SystemTime;
+
+        // OSX implementation of `pthread_cond_timedwait` is buggy
+        // with super long durations. When duration is greater than
+        // 0x100_0000_0000_0000 seconds, `pthread_cond_timedwait`
+        // in macOS Sierra return error 316.
+        //
+        // This program demonstrates the issue:
+        // https://gist.github.com/stepancheg/198db4623a20aad2ad7cddb8fda4a63c
+        //
+        // To work around this issue, and possible bugs of other OSes, timeout
+        // is clamped to 1000 years, which is allowable per the API of `park_timeout`
+        // because of spurious wakeups.
+        let dur = min(dur, Duration::from_secs(1000 * 365 * 86400));
+        let now = SystemTime::now().t;
+        (now, dur)
+    };
+    // Use the monotonic clock on other systems.
+    #[cfg(not(any(
+        target_os = "macos",
+        target_os = "ios",
+        target_os = "watchos",
+        target_os = "espidf"
+    )))]
+    let (now, dur) = {
+        use crate::sys::time::Timespec;
+
+        (Timespec::now(libc::CLOCK_MONOTONIC), dur)
+    };
+
+    let timeout =
+        now.checked_add_duration(&dur).and_then(|t| t.to_timespec()).unwrap_or(TIMESPEC_MAX);
+    let r = libc::pthread_cond_timedwait(cond, lock, &timeout);
+    debug_assert!(r == libc::ETIMEDOUT || r == 0);
+}
+
+pub struct Parker {
+    state: AtomicUsize,
+    lock: UnsafeCell<libc::pthread_mutex_t>,
+    cvar: UnsafeCell<libc::pthread_cond_t>,
+    // The `pthread` primitives require a stable address, so make this struct `!Unpin`.
+    _pinned: PhantomPinned,
+}
+
+impl Parker {
+    /// Construct the UNIX parker in-place.
+    ///
+    /// # Safety
+    /// The constructed parker must never be moved.
+    pub unsafe fn new(parker: *mut Parker) {
+        // Use the default mutex implementation to allow for simpler initialization.
+        // This could lead to undefined behaviour when deadlocking. This is avoided
+        // by not deadlocking. Note in particular the unlocking operation before any
+        // panic, as code after the panic could try to park again.
+        addr_of_mut!((*parker).state).write(AtomicUsize::new(EMPTY));
+        addr_of_mut!((*parker).lock).write(UnsafeCell::new(libc::PTHREAD_MUTEX_INITIALIZER));
+
+        cfg_if::cfg_if! {
+            if #[cfg(any(
+                target_os = "macos",
+                target_os = "ios",
+                target_os = "watchos",
+                target_os = "l4re",
+                target_os = "android",
+                target_os = "redox"
+            ))] {
+                addr_of_mut!((*parker).cvar).write(UnsafeCell::new(libc::PTHREAD_COND_INITIALIZER));
+            } else if #[cfg(any(target_os = "espidf", target_os = "horizon"))] {
+                let r = libc::pthread_cond_init(addr_of_mut!((*parker).cvar).cast(), crate::ptr::null());
+                assert_eq!(r, 0);
+            } else {
+                use crate::mem::MaybeUninit;
+                let mut attr = MaybeUninit::<libc::pthread_condattr_t>::uninit();
+                let r = libc::pthread_condattr_init(attr.as_mut_ptr());
+                assert_eq!(r, 0);
+                let r = libc::pthread_condattr_setclock(attr.as_mut_ptr(), libc::CLOCK_MONOTONIC);
+                assert_eq!(r, 0);
+                let r = libc::pthread_cond_init(addr_of_mut!((*parker).cvar).cast(), attr.as_ptr());
+                assert_eq!(r, 0);
+                let r = libc::pthread_condattr_destroy(attr.as_mut_ptr());
+                assert_eq!(r, 0);
+            }
+        }
+    }
+
+    // This implementation doesn't require `unsafe`, but other implementations
+    // may assume this is only called by the thread that owns the Parker.
+    pub unsafe fn park(self: Pin<&Self>) {
+        // If we were previously notified then we consume this notification and
+        // return quickly.
+        if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
+            return;
+        }
+
+        // Otherwise we need to coordinate going to sleep
+        lock(self.lock.get());
+        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
+            Ok(_) => {}
+            Err(NOTIFIED) => {
+                // We must read here, even though we know it will be `NOTIFIED`.
+                // This is because `unpark` may have been called again since we read
+                // `NOTIFIED` in the `compare_exchange` above. We must perform an
+                // acquire operation that synchronizes with that `unpark` to observe
+                // any writes it made before the call to unpark. To do that we must
+                // read from the write it made to `state`.
+                let old = self.state.swap(EMPTY, SeqCst);
+
+                unlock(self.lock.get());
+
+                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+                return;
+            } // should consume this notification, so prohibit spurious wakeups in next park.
+            Err(_) => {
+                unlock(self.lock.get());
+
+                panic!("inconsistent park state")
+            }
+        }
+
+        loop {
+            wait(self.cvar.get(), self.lock.get());
+
+            match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
+                Ok(_) => break, // got a notification
+                Err(_) => {}    // spurious wakeup, go back to sleep
+            }
+        }
+
+        unlock(self.lock.get());
+    }
+
+    // This implementation doesn't require `unsafe`, but other implementations
+    // may assume this is only called by the thread that owns the Parker. Use
+    // `Pin` to guarantee a stable address for the mutex and condition variable.
+    pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
+        // Like `park` above we have a fast path for an already-notified thread, and
+        // afterwards we start coordinating for a sleep.
+        // return quickly.
+        if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
+            return;
+        }
+
+        lock(self.lock.get());
+        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
+            Ok(_) => {}
+            Err(NOTIFIED) => {
+                // We must read again here, see `park`.
+                let old = self.state.swap(EMPTY, SeqCst);
+                unlock(self.lock.get());
+
+                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+                return;
+            } // should consume this notification, so prohibit spurious wakeups in next park.
+            Err(_) => {
+                unlock(self.lock.get());
+                panic!("inconsistent park_timeout state")
+            }
+        }
+
+        // Wait with a timeout, and if we spuriously wake up or otherwise wake up
+        // from a notification we just want to unconditionally set the state back to
+        // empty, either consuming a notification or un-flagging ourselves as
+        // parked.
+        wait_timeout(self.cvar.get(), self.lock.get(), dur);
+
+        match self.state.swap(EMPTY, SeqCst) {
+            NOTIFIED => unlock(self.lock.get()), // got a notification, hurray!
+            PARKED => unlock(self.lock.get()),   // no notification, alas
+            n => {
+                unlock(self.lock.get());
+                panic!("inconsistent park_timeout state: {n}")
+            }
+        }
+    }
+
+    pub fn unpark(self: Pin<&Self>) {
+        // To ensure the unparked thread will observe any writes we made
+        // before this call, we must perform a release operation that `park`
+        // can synchronize with. To do that we must write `NOTIFIED` even if
+        // `state` is already `NOTIFIED`. That is why this must be a swap
+        // rather than a compare-and-swap that returns if it reads `NOTIFIED`
+        // on failure.
+        match self.state.swap(NOTIFIED, SeqCst) {
+            EMPTY => return,    // no one was waiting
+            NOTIFIED => return, // already unparked
+            PARKED => {}        // gotta go wake someone up
+            _ => panic!("inconsistent state in unpark"),
+        }
+
+        // There is a period between when the parked thread sets `state` to
+        // `PARKED` (or last checked `state` in the case of a spurious wake
+        // up) and when it actually waits on `cvar`. If we were to notify
+        // during this period it would be ignored and then when the parked
+        // thread went to sleep it would never wake up. Fortunately, it has
+        // `lock` locked at this stage so we can acquire `lock` to wait until
+        // it is ready to receive the notification.
+        //
+        // Releasing `lock` before the call to `notify_one` means that when the
+        // parked thread wakes it doesn't get woken only to have to wait for us
+        // to release `lock`.
+        unsafe {
+            lock(self.lock.get());
+            unlock(self.lock.get());
+            notify_one(self.cvar.get());
+        }
+    }
+}
+
+impl Drop for Parker {
+    fn drop(&mut self) {
+        unsafe {
+            libc::pthread_cond_destroy(self.cvar.get_mut());
+            libc::pthread_mutex_destroy(self.lock.get_mut());
+        }
+    }
+}
+
+unsafe impl Sync for Parker {}
+unsafe impl Send for Parker {}