1 //! Thread parking without `futex` using the `pthread` synchronization primitives.
3 use crate::cell::UnsafeCell;
4 use crate::marker::PhantomPinned;
6 use crate::ptr::addr_of_mut;
7 use crate::sync::atomic::AtomicUsize;
8 use crate::sync::atomic::Ordering::SeqCst;
9 use crate::sys::time::TIMESPEC_MAX;
10 use crate::time::Duration;
12 const EMPTY: usize = 0;
13 const PARKED: usize = 1;
14 const NOTIFIED: usize = 2;
16 unsafe fn lock(lock: *mut libc::pthread_mutex_t) {
17 let r = libc::pthread_mutex_lock(lock);
18 debug_assert_eq!(r, 0);
21 unsafe fn unlock(lock: *mut libc::pthread_mutex_t) {
22 let r = libc::pthread_mutex_unlock(lock);
23 debug_assert_eq!(r, 0);
26 unsafe fn notify_one(cond: *mut libc::pthread_cond_t) {
27 let r = libc::pthread_cond_signal(cond);
28 debug_assert_eq!(r, 0);
31 unsafe fn wait(cond: *mut libc::pthread_cond_t, lock: *mut libc::pthread_mutex_t) {
32 let r = libc::pthread_cond_wait(cond, lock);
33 debug_assert_eq!(r, 0);
36 unsafe fn wait_timeout(
37 cond: *mut libc::pthread_cond_t,
38 lock: *mut libc::pthread_mutex_t,
41 // Use the system clock on systems that do not support pthread_condattr_setclock.
42 // This unfortunately results in problems when the system time changes.
46 target_os = "watchos",
48 target_os = "horizon",
52 use crate::sys::time::SystemTime;
54 // OSX implementation of `pthread_cond_timedwait` is buggy
55 // with super long durations. When duration is greater than
56 // 0x100_0000_0000_0000 seconds, `pthread_cond_timedwait`
57 // in macOS Sierra return error 316.
59 // This program demonstrates the issue:
60 // https://gist.github.com/stepancheg/198db4623a20aad2ad7cddb8fda4a63c
62 // To work around this issue, and possible bugs of other OSes, timeout
63 // is clamped to 1000 years, which is allowable per the API of `park_timeout`
64 // because of spurious wakeups.
65 let dur = min(dur, Duration::from_secs(1000 * 365 * 86400));
66 let now = SystemTime::now().t;
69 // Use the monotonic clock on other systems.
73 target_os = "watchos",
75 target_os = "horizon",
78 use crate::sys::time::Timespec;
80 (Timespec::now(libc::CLOCK_MONOTONIC), dur)
84 now.checked_add_duration(&dur).and_then(|t| t.to_timespec()).unwrap_or(TIMESPEC_MAX);
85 let r = libc::pthread_cond_timedwait(cond, lock, &timeout);
86 debug_assert!(r == libc::ETIMEDOUT || r == 0);
91 lock: UnsafeCell<libc::pthread_mutex_t>,
92 cvar: UnsafeCell<libc::pthread_cond_t>,
93 // The `pthread` primitives require a stable address, so make this struct `!Unpin`.
94 _pinned: PhantomPinned,
98 /// Construct the UNIX parker in-place.
101 /// The constructed parker must never be moved.
102 pub unsafe fn new(parker: *mut Parker) {
103 // Use the default mutex implementation to allow for simpler initialization.
104 // This could lead to undefined behaviour when deadlocking. This is avoided
105 // by not deadlocking. Note in particular the unlocking operation before any
106 // panic, as code after the panic could try to park again.
107 addr_of_mut!((*parker).state).write(AtomicUsize::new(EMPTY));
108 addr_of_mut!((*parker).lock).write(UnsafeCell::new(libc::PTHREAD_MUTEX_INITIALIZER));
114 target_os = "watchos",
116 target_os = "android",
119 addr_of_mut!((*parker).cvar).write(UnsafeCell::new(libc::PTHREAD_COND_INITIALIZER));
120 } else if #[cfg(any(target_os = "espidf", target_os = "horizon"))] {
121 let r = libc::pthread_cond_init(addr_of_mut!((*parker).cvar).cast(), crate::ptr::null());
124 use crate::mem::MaybeUninit;
125 let mut attr = MaybeUninit::<libc::pthread_condattr_t>::uninit();
126 let r = libc::pthread_condattr_init(attr.as_mut_ptr());
128 let r = libc::pthread_condattr_setclock(attr.as_mut_ptr(), libc::CLOCK_MONOTONIC);
130 let r = libc::pthread_cond_init(addr_of_mut!((*parker).cvar).cast(), attr.as_ptr());
132 let r = libc::pthread_condattr_destroy(attr.as_mut_ptr());
138 // This implementation doesn't require `unsafe`, but other implementations
139 // may assume this is only called by the thread that owns the Parker.
140 pub unsafe fn park(self: Pin<&Self>) {
141 // If we were previously notified then we consume this notification and
143 if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
147 // Otherwise we need to coordinate going to sleep
148 lock(self.lock.get());
149 match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
152 // We must read here, even though we know it will be `NOTIFIED`.
153 // This is because `unpark` may have been called again since we read
154 // `NOTIFIED` in the `compare_exchange` above. We must perform an
155 // acquire operation that synchronizes with that `unpark` to observe
156 // any writes it made before the call to unpark. To do that we must
157 // read from the write it made to `state`.
158 let old = self.state.swap(EMPTY, SeqCst);
160 unlock(self.lock.get());
162 assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
164 } // should consume this notification, so prohibit spurious wakeups in next park.
166 unlock(self.lock.get());
168 panic!("inconsistent park state")
173 wait(self.cvar.get(), self.lock.get());
175 match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
176 Ok(_) => break, // got a notification
177 Err(_) => {} // spurious wakeup, go back to sleep
181 unlock(self.lock.get());
184 // This implementation doesn't require `unsafe`, but other implementations
185 // may assume this is only called by the thread that owns the Parker. Use
186 // `Pin` to guarantee a stable address for the mutex and condition variable.
187 pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
188 // Like `park` above we have a fast path for an already-notified thread, and
189 // afterwards we start coordinating for a sleep.
191 if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
195 lock(self.lock.get());
196 match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
199 // We must read again here, see `park`.
200 let old = self.state.swap(EMPTY, SeqCst);
201 unlock(self.lock.get());
203 assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
205 } // should consume this notification, so prohibit spurious wakeups in next park.
207 unlock(self.lock.get());
208 panic!("inconsistent park_timeout state")
212 // Wait with a timeout, and if we spuriously wake up or otherwise wake up
213 // from a notification we just want to unconditionally set the state back to
214 // empty, either consuming a notification or un-flagging ourselves as
216 wait_timeout(self.cvar.get(), self.lock.get(), dur);
218 match self.state.swap(EMPTY, SeqCst) {
219 NOTIFIED => unlock(self.lock.get()), // got a notification, hurray!
220 PARKED => unlock(self.lock.get()), // no notification, alas
222 unlock(self.lock.get());
223 panic!("inconsistent park_timeout state: {n}")
228 pub fn unpark(self: Pin<&Self>) {
229 // To ensure the unparked thread will observe any writes we made
230 // before this call, we must perform a release operation that `park`
231 // can synchronize with. To do that we must write `NOTIFIED` even if
232 // `state` is already `NOTIFIED`. That is why this must be a swap
233 // rather than a compare-and-swap that returns if it reads `NOTIFIED`
235 match self.state.swap(NOTIFIED, SeqCst) {
236 EMPTY => return, // no one was waiting
237 NOTIFIED => return, // already unparked
238 PARKED => {} // gotta go wake someone up
239 _ => panic!("inconsistent state in unpark"),
242 // There is a period between when the parked thread sets `state` to
243 // `PARKED` (or last checked `state` in the case of a spurious wake
244 // up) and when it actually waits on `cvar`. If we were to notify
245 // during this period it would be ignored and then when the parked
246 // thread went to sleep it would never wake up. Fortunately, it has
247 // `lock` locked at this stage so we can acquire `lock` to wait until
248 // it is ready to receive the notification.
250 // Releasing `lock` before the call to `notify_one` means that when the
251 // parked thread wakes it doesn't get woken only to have to wait for us
252 // to release `lock`.
254 lock(self.lock.get());
255 unlock(self.lock.get());
256 notify_one(self.cvar.get());
261 impl Drop for Parker {
264 libc::pthread_cond_destroy(self.cvar.get_mut());
265 libc::pthread_mutex_destroy(self.lock.get_mut());
270 unsafe impl Sync for Parker {}
271 unsafe impl Send for Parker {}